SatyrLee
文章98
标签32
分类15

一言

文章归档

MODIS 遥感产品数据处理及转换

MODIS 遥感产品数据处理及转换

封面图 ID:135140813

0x00 缘起

目前想要复现 Convective potential and fuel availability complement near-surface weather in regulating global wildfire activity 中关于野火燃烧研究内容。使用的数据产品是 MODIS 中的 1 公里分辨率野火产品数据,产品号为 MOD14A1.061,时间跨度约为 2000 年 2 月~现在。数据为分片的 HDF 文件。由此引出将多个 HDF 分片进行时空聚合,转换为更为通用的 NetCDF 需求,具体实现思路如下:

  1. 批量下载 HDF 文件,并将其转换为 NC 格式
  2. NC 按时间进行整理,并进行合并
  3. 针对本文研究的复现,由于技术极为简单,本次不予讨论

0x01 Easy Way

感谢学弟提供的网站,不得不说 NASA APPEEARS 真的不错,提供包括 NetCDFGeoTIFF 在内的多种HDF 格式文件。

可以直接查询数据产品是否在其中,本次研究的内容不在其中,因此直接放弃。

补充:后来帮助别人下载了相关的数据,对应产品是 MOD13A2.061,1km 16 天 NDVI 数据。最大处理容量约为 3/4 个北半球,时间范围为 2000 ~ 2020 年。以供参考。

0x02 诶卧槽 HDF 怎么那么坏啊

层级数据格式(Hierarchical Data Format:HDF)是设计用来存储和组织大量数据的一组文件格式(HDF4,HDF5)。它最初开发于美国国家超级计算应用中心,现在由非营利社团 HDF Group 支持,其任务是确保 HDF5 技术的持续开发和存储在 HDF 中数据的持续可访问性。
伴随着这个目标,HDF 库和相关工具可在自由的类 BSD 许可证下获得用于一般使用。HDF 被很多商业和非商业软件平台所支持,包括 Java、MATLAB、Scilab、Octave、Mathematica、IDL、Python、R、Fortran 和 Julia。可免费获得的 HDF 发行中包括了库,命令行实用程序,测试包源代码,Java 接口,和基于 Java 的 HDF 查看器(HDFView)。
当前版本是 HDF5,在设计和 API 上与主要的遗留版本 HDF4 有显著区别。
维基百科

我的评价是依托,在可用性上比不上 NetCDF 一点,GIS 支持依托,找了一些转换工具(参考文献中)也是依托,这些里面基本没有可用的工具。现代推荐工具是 GDALxarray 等,但它们对 MODIS HDF 的支持仍有限。

曾经的思路是将其转换为 GeoTIFF,然后将其拼接成 NC。直到发现了新的软件 Hydrax。本质上是一个内容分享服务器,转换只是附送的。作为服务器,也更容易通过发送 HTTP 请求来获取,反而降低了工作负荷。

Hyrax 采用 docker 部署,特别注意的是,需要自行设定 /usr/share/hyrax 挂载目录作为数据目录。

最后用一个小脚本获取转换后的 nc4 链接。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
#!/usr/bin/env python3
# hyrax_bulk_from_dap.py
# 用于从 Hyrax 目录页批量抓取 .dap.nc4(或 .nc)Fileout 链接并下载
import re
import time
import html
import queue
import threading
import requests
from pathlib import Path
from urllib.parse import urljoin

# ====== 配置 ======
BASE_DIR_URL = "http://127.0.0.1:8080/opendap/MODIS/MOD14A1/" # 你的 Hyrax 目录页
OUT_DIR = Path("./downloads") # 下载输出目录
OUTPUT_FORMAT = "nc4" # "nc4"=NetCDF-4;改成 "nc" 可要 NetCDF-3

# 如需鉴权(Basic):
AUTH = None
# from requests.auth import HTTPBasicAuth
# AUTH = HTTPBasicAuth("user","pass")

HEADERS = {"User-Agent":"hyrax-fonc-bulk/1.0"}
N_THREADS = 6

# 指定变量子集(留空表示全量)。按需改成 MOD14A1 的变量名,如 FireMask, MaxFRP, QA 等
VARS = ["FireMask", "MaxFRP", "QA"] # [] 表示全量

# 针对维度的切片(可选),格式 "start:step:stop(含)";示例按 MOD14A1 的 [Number_of_Days][YDim][XDim]
SLICE_BY_DIM = {
# "Number_of_Days": "0:1:7", # 取 8 天合成的第 0~7 天
# "YDim": "0:1:1199",
# "XDim": "0:1:1199",
}

# 匹配 granule 文件名的正则(按需)
HDF_RE = re.compile(r"\.h(df|5)$", re.I)

# ====== 实现 ======
def fetch_text(url):
r = requests.get(url, headers=HEADERS, auth=AUTH, timeout=30)
r.raise_for_status()
return r.text

def parse_links_from_dir(html_text, base_url):
# 从 Hyrax 目录页找子项链接(粗略 <a href="...">)
links = []
for m in re.finditer(r'<a\s+href="([^"]+)">', html_text, flags=re.I):
href = html.unescape(m.group(1))
if href.startswith("?") or href.startswith("#"):
continue
links.append(urljoin(base_url, href))
return links

def build_constraint():
proj = ""
if VARS:
proj = ",".join(VARS)
# 拼维度切片
dims = []
for k, v in SLICE_BY_DIM.items():
# DAP4 共享维度切片语法示例:row=[start:step:stop]
# 这里直接用数组切槽写法亦可,例如 var[0:1:10]
dims.append(f"{k}=[{v}]")
dims_ce = "&".join(dims) if dims else ""
return proj, dims_ce

def to_fileout(url):
"""
将 granule 的 dataset 基址转为 Fileout URL:
DAP4 NetCDF-4 Fileout: dataset + '.nc4' + [?constraint]
DAP2 NetCDF-3 Fileout: dataset + '.nc' + [?constraint]
"""
suffix = ".nc4" if OUTPUT_FORMAT == "nc4" else ".nc"
proj, dims_ce = build_constraint()
query = ""
if proj and dims_ce:
query = f"?{proj}&{dims_ce}"
elif proj:
query = f"?{proj}"
elif dims_ce:
query = f"?{dims_ce}"
return f"{url}{suffix}{query}"

def is_granule_href(href):
# 目录项有的会进入下一层目录;有的直接是 granule(HDF/HDF5)
return bool(HDF_RE.search(href))

def producer(dir_url, q):
html_text = fetch_text(dir_url)
links = parse_links_from_dir(html_text, dir_url)
for href in links:
if href.endswith("/") and "up" not in href.lower():
# 可能是子目录,递归
try:
producer(href, q)
except Exception:
pass
else:
if is_granule_href(href):
q.put(href)

def consumer(q, out_dir: Path):
s = requests.Session()
while True:
try:
granule = q.get(timeout=3)
except queue.Empty:
return
try:
fileout = to_fileout(granule)
# 输出文件名
name = granule.rstrip("/").split("/")[-1]
stem = HDF_RE.sub("", name) # 去掉后缀
out = out_dir / f"{stem}.{OUTPUT_FORMAT}"
if out.exists():
print("[SKIP]", out.name)
continue
print("[GET ]", fileout)
with s.get(fileout, headers=HEADERS, auth=AUTH, timeout=600, stream=True) as r:
r.raise_for_status()
out_dir.mkdir(parents=True, exist_ok=True)
with open(out, "wb") as f:
for chunk in r.iter_content(chunk_size=1<<20):
if chunk:
f.write(chunk)
print("[DONE]", out.name)
except Exception as e:
print("[ERR ]", e)
finally:
q.task_done()
time.sleep(0.1)

def main():
q = queue.Queue(maxsize=512)
t_prod = threading.Thread(target=producer, args=(BASE_DIR_URL, q), daemon=True)
t_prod.start()
workers = [threading.Thread(target=consumer, args=(q, OUT_DIR), daemon=True)
for _ in range(N_THREADS)]
for t in workers:
t.start()
t_prod.join()
q.join()

if __name__ == "__main__":
main()

0x03 缝合及转换

在这里需要进行时空聚合,直接采用较为简单的代码拼接即可完成,但是需要注意写入 CF 描述以让 GIS 正确读取,并且设定合理的压缩参数以减少不必要的空间占用。示例代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
#!/usr/bin/env python3
# aggregate_nc_to_monthly_cf.py
# 从按天/多天的 nc 支持,聚合成按“月”的时间维,补 CF 元数据,设置压缩
from pathlib import Path
import numpy as np
import xarray as xr
import pandas as pd

IN_DIR = Path("./downloads") # 前面抓下来的日/8日 granule(已是 .nc 或 .nc4)
OUT_DIR = Path("./monthly_cf") # 输出月聚合 CF 合规结果
VARS = ["FireMask", "MaxFRP", "QA"] # 要聚合的变量;也可留空自动推断
CHUNK = {"time": 1, "y": 600, "x": 600} # dask chunk 可按内存微调
COMP = dict(zlib=True, complevel=4) # NetCDF-3/4 压缩设置
ENCODING = {
"time": dict(units="days since 1970-01-01", calendar="standard"),
}

def guess_vars(ds: xr.Dataset):
if VARS:
return [v for v in VARS if v in ds.data_vars]
# 自动排除坐标与掩膜型坐标
bad = set(ds.coords) | {k for k, v in ds.variables.items() if getattr(v, "cf_role", "")}
return [v for v in ds.data_vars if v not in bad]

def cf_attrs(ds: xr.Dataset) -> xr.Dataset:
# 最小化 CF:lat/lon 单位、标准名,grid_mapping,变量的 long_name、_FillValue 等
if "lat" in ds and "lon" in ds:
ds["lat"] = ds["lat"].assign_attrs(standard_name="latitude", units="degrees_north")
ds["lon"] = ds["lon"].assign_attrs(standard_name="longitude", units="degrees_east")
if "time" in ds:
ds["time"] = xr.decode_cf(ds[["time"]]).time # 解码成 datetime64
ds["time"].attrs.update(dict(standard_name="time"))
for v in guess_vars(ds):
ds[v].attrs.setdefault("long_name", v)
if "_FillValue" not in ds[v].encoding:
ds[v].encoding["_FillValue"] = np.nan
return ds

def month_key(ts: pd.Timestamp) -> str:
return f"{ts.year:04d}-{ts.month:02d}"

def load_all():
files = sorted(IN_DIR.glob("*.nc*"))
out = []
for f in files:
try:
ds = xr.open_dataset(f, chunks=CHUNK, decode_times=True)
ds = cf_attrs(ds)
# 有的产品会出现时间重合;去重
if "time" in ds:
_, idx = np.unique(pd.to_datetime(ds.time.values), return_index=True)
ds = ds.isel(time=np.sort(idx))
out.append(ds)
except Exception as e:
print("[SKIP]", f.name, e)
return out

def groupby_month(dsets):
# 将多文件 concat 后按月份 groupby
big = xr.concat([d for d in dsets], dim="time", data_vars="minimal", coords="minimal", compat="override")
big = big.sortby("time")
groups = {}
for t in pd.to_datetime(big.time.values):
k = month_key(pd.Timestamp(t))
groups.setdefault(k, []).append(t)
return big, groups

def write_monthly(big, groups):
OUT_DIR.mkdir(parents=True, exist_ok=True)
for k, times in groups.items():
part = big.sel(time=times)
# 写出前可再规范编码(压缩)
encoding = {v: COMP | dict(chunksizes=None) for v in guess_vars(part)}
encoding.update(ENCODING)
of = OUT_DIR / f"MOD14A1_{k}.nc"
print("[WRITE]", of.name, dict(n_time=part.sizes.get("time", 0)))
part.to_netcdf(of, encoding=encoding, engine="netcdf4")

def main():
dsets = load_all()
if not dsets:
print("No input .nc files under", IN_DIR)
return
big, groups = groupby_month(dsets)
write_monthly(big, groups)
print("Done")

if __name__ == "__main__":
main()

需要注意的是该产品存在时间重合现象,需要额外的代码进行处理,以避免时间步长信息冲突问题。

事先声明:本文处理方法并不符合最佳实践。合理的方式还是按照曲线网格(curvilinear grid)处理以保留更高精度。

本文处理的原因是出于数据兼容考虑,其他数据均采用规则网格。而通过观察研究区发现,曲线网格也基本保持规则网格分布,因此多了一步转换。

GPT 的代码写的很烂,于是采用了较为愚笨的方法:将其按照步长转换为 GeoTIFF,随后再拼接回 NetCDF,脚本如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
#!/usr/bin/env python3
# convert_via_tifs_no_ref.py
# 将(近似规则格点的)变量逐步导出为 GeoTIFF,再拼回单个 NetCDF
from pathlib import Path
import json
import rasterio
import numpy as np
import xarray as xr
from rasterio.transform import from_origin

IN_DIR = Path("./downloads") # 原始 nc/nc4
TMP_TIF = Path("./tif_tmp") # 中间 GeoTIFF 目录
OUT_NC = Path("./regridded") # 输出回写 NetCDF
VAR = "FireMask" # 选择要转换的变量
RES = 0.008333333333 # 约 1km (经度方向在赤道附近);按需改
CRS = "EPSG:4326"

def export_each_time_to_tif(nc_path: Path):
ds = xr.open_dataset(nc_path, decode_times=True)
if VAR not in ds:
print("[SKIP] no var", VAR, "in", nc_path.name)
return []
da = ds[VAR]
lat = ds.get("lat")
lon = ds.get("lon")
if lat is None or lon is None:
raise ValueError("需要 lat/lon 坐标以构造仿射变换")

# 估算像元大小与左上角(假定近似规则网格)
y = np.asarray(lat)
x = np.asarray(lon)
dy = float(np.nanmedian(np.diff(y, axis= -2))) if y.ndim==2 else float(np.nanmedian(np.diff(y)))
dx = float(np.nanmedian(np.diff(x, axis= -1))) if x.ndim==2 else float(np.nanmedian(np.diff(x)))
# 左上角:以最小经度、最大纬度为锚
y0 = float(np.nanmax(y))
x0 = float(np.nanmin(x))
transform = from_origin(x0, y0, dx, abs(dy))

times = np.asarray(da.time.values)
out_files = []
TMP_TIF.mkdir(parents=True, exist_ok=True)

for i, t in enumerate(times):
arr = np.asarray(da.isel(time=i).values, dtype=np.float32)
arr = np.where(np.isfinite(arr), arr, np.nan)
h, w = arr.shape
meta = {
"driver": "GTiff",
"width": w, "height": h, "count": 1,
"dtype": "float32",
"crs": CRS, "transform": transform,
"tiled": True, "compress": "LZW"
}
of = TMP_TIF / f"{nc_path.stem}_{i:03d}.tif"
with rasterio.open(of, "w", **meta) as dst:
dst.write(arr, 1)
dst.update_tags(TIME=np.datetime_as_string(t, unit="s"))
out_files.append(of)
ds.close()
return out_files

def mosaic_back_to_nc(tifs, out_nc: Path):
# 简单拼接(假定各 tif 共享同一格网)
sample = rasterio.open(tifs[0])
transform = sample.transform
w, h = sample.width, sample.height
crs = sample.crs
sample.close()

stacks = []
times = []
for tf in tifs:
with rasterio.open(tf) as src:
stacks.append(src.read(1).astype(np.float32))
times.append(np.datetime64(src.tags().get("TIME")))
data = np.stack(stacks, axis=0)
y = np.arange(h) * (-transform.e) + transform.f - 0.5*abs(transform.e)
x = np.arange(w) * (transform.a) + transform.c + 0.5*transform.a

ds = xr.Dataset(
{VAR: (("time","y","x"), data)},
coords=dict(
time=("time", np.array(times, dtype="datetime64[ns]")),
y=("y", y, {"units":"degrees_north","standard_name":"latitude"}),
x=("x", x, {"units":"degrees_east","standard_name":"longitude"}),
),
attrs=dict(title=f"{VAR} regridded via GeoTIFF pipeline")
)
enc = {VAR: dict(zlib=True, complevel=4)}
OUT_NC.mkdir(parents=True, exist_ok=True)
ds.to_netcdf(out_nc, encoding=enc, engine="netcdf4")
print("[WRITE]", out_nc)

def main():
OUT_NC.mkdir(parents=True, exist_ok=True)
for f in sorted(IN_DIR.glob("*.nc*")):
tifs = export_each_time_to_tif(f)
if not tifs:
continue
out_nc = OUT_NC / f"{f.stem}_{VAR}_reg.nc"
mosaic_back_to_nc(tifs, out_nc)

if __name__ == "__main__":
main()

至此,初步处理已经完成。

0x04 TL;DR

  • 大部分数据可以通过 NASA APPEEARS 获取处理好的文件。
  • Hydrax 已经被证明是一种稍显麻烦但极为有效的 NC 转换工具。
  • 对于数据需要合理拼合,去除重复步长,如有需要可将曲线网格转换为规则网格。

0x05 参考文献

本文作者:SatyrLee
本文链接:http://www.naive514.top/posts/5333926e/
版权声明:本文采用 CC BY-NC-SA 4.0 CN 协议进行许可