1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77
| import xarray as xr import numpy as np
def wrap_longitude( ds: xr.Dataset, *, to: Literal["-180_180", "0_360"] = "-180_180", coord_priority: Optional[Tuple[str, ...]] = ("lon","longitude","x"), sort: bool = True, drop_edge_duplicate: bool = True ) -> xr.Dataset: """ Wrap the longitude coordinate without copying data by default: - to="-180_180": new_lon = ((lon + 180) % 360) - 180 - to="0_360": new_lon = lon % 360 Then (optionally) sort by longitude and drop edge duplicates (0/360 or -180/180). Works for any lon coordinate name among ('lon','longitude','x'). """ lon_name = None for cand in (coord_priority or ()): if cand in ds.coords: lon_name = cand; break if lon_name is None: _, lon_name = _detect_lat_lon(ds) if lon_name is None: raise ValueError("Longitude coordinate not found.")
if to == "-180_180": lat_name, _ = _detect_lat_lon(ds) if lat_name is None: raise ValueError("Latitude coordinate not found for reprojection.") lons = ds[lon_name].values lats = ds[lat_name].values lon_res = np.abs(lons[1] - lons[0]) target_lons = np.arange(-180, 180 + lon_res/2, lon_res) new_coords = {lat_name: lats, lon_name: target_lons} ds2 = xr.Dataset(coords=new_coords) for var in ds.data_vars: arr = ds[var].values new_shape = (len(lats), len(target_lons)) new_data = np.full(new_shape, np.nan, dtype=arr.dtype) mask1 = lons <= 180 if np.any(mask1): idxs = np.searchsorted(target_lons, lons[mask1]) new_data[:, idxs] = arr[:, mask1] mask2 = lons > 180 if np.any(mask2): lons2 = lons[mask2] - 360 idxs2 = np.searchsorted(target_lons, lons2) new_data[:, idxs2] = arr[:, mask2] ds2[var] = xr.DataArray(new_data, coords=new_coords, dims=(lat_name, lon_name)) elif to == "0_360": new_lon = (ds[lon_name] % 360) ds2 = ds.assign_coords({lon_name: new_lon}) else: raise ValueError(f"Unsupported target: {to}")
if sort: ds2 = ds2.sortby(lon_name)
if drop_edge_duplicate: vals = ds2[lon_name].values unique_vals = _drop_duplicate_coords(vals) if unique_vals.size < vals.size: ds2 = ds2.sel({lon_name: unique_vals})
return ds2
|