Source code for sea_ice_observations

from __future__ import annotations
import os
import xarray   as xr
import pandas   as pd
import numpy    as np
from pathlib    import Path
from typing     import Iterable, List, Optional, Tuple
__all__ = ["SeaIceObservations"]
[docs] class SeaIceObservations: """ Observational sea-ice I/O and derived diagnostics for AFIM workflows. This class centralises reading, standardisation, and metric generation for several observational products used throughout AFIM/SeaIceToolbox analyses: - NSIDC (e.g., G02202 v4) sea-ice concentration: * local file discovery and loading over time windows (daily or monthly layouts), * masking of flagged concentration values, * computation of hemispheric sea-ice area (SIA) and extent (SIE) time series. - AF2020 (Fraser et al. 2020) landfast sea-ice: * reading FIA CSV time series and constructing DOY climatologies, * regridding original AF2020 raster products to the CICE T-grid with xESMF, * convenience regional fast-ice area time series from AF2020-like gridded datasets. - ESA-CCI / AWI CryoSat/Envisat/Sentinel sea-ice thickness (SIT): * robust path discovery across multiple institutional directory layouts, * indexing and de-duplication (one file per day or month), * L2P swath → daily gridded SIT via pyresample neighbour queries with optional Gaussian weighting and explicit uncertainty propagation, * L3 gridded monthly SIT assembly (no resampling), with optional SIC thresholding, QC flag masking, and hemispheric weighted means. Expected configuration and dependencies ------------------------------------- This class is intended to be used inside the AFIM stack and expects several attributes to exist on `self`: Required attributes (typical) - logger : logging.Logger Used for progress and warnings. - dt0_str, dtN_str : str Default analysis window in "YYYY-MM-DD" format. - hemisphere_dict : dict Must include 'abbreviation' (e.g., "SH" or "NH"). - NSIDC_dict : dict Paths and dataset conventions for NSIDC loading and metrics. Typical keys: * "D_original" * "G02202_v4_file_format" * "file_versions" (dict ver -> "YYYY-MM-DD" start date, or None) * "SIC_name" (e.g., "cdr_seaice_conc") * "cdr_seaice_conc_flags" (list of flag integer codes) * "P_cell_area" (cell area file path) * "projection_string" (Proj4 string) - AF_FI_dict : dict Paths and naming for AF2020. Typical keys: * "P_AF2020_csv" * "D_AF2020_db_org" * "AF_reG_weights" * "variable_name", "lat_coord_name", "lon_coord_name", "time_coord_name" * "threshold_value" * "P_reG_reg_weights" (optional) - CICE_dict : dict CICE grid conventions and coordinate names (for mapping masks/regridding): * 'spatial_dims', 'three_dims' * 'lon_coord_name', 'lat_coord_name' * 'tcoord_names' (lon, lat) * 'time_dim' * 'P_reG_reg_weights' (optional) - Sea_Ice_Obs_dict : dict Root directories for institutions: * 'ESA-CCI', 'AWI' - icon_thresh : float Sea-ice concentration threshold for extent masking (fraction). - SIC_scale : float Unit scaling between dataset SIC and area weighting. - normalise_longitudes(lon, to=...) : callable Longitude wrapping helper used across multiple routines. - define_cice_grid(...) : callable Must be available for AF2020 conservative regridding to the CICE T-grid. - load_cice_grid(...) / define_cice_grid(...) conventions Must be consistent with your xESMF usage. Notes on time and units ----------------------- - All internal timestamp comparisons for file filtering are performed in UTC. - NSIDC SIA/SIE are computed using an explicit cell-area product and your configured concentration scaling. - SIT uncertainty propagation uses standard-error style aggregation: SE_cell = sqrt(sum(w^2 * unc^2)) / sum(w) SE_hemi = sqrt(sum(W^2 * SE_cell^2)) / sum(W) where W is cos(latitude) and w is neighbour weight. Implementation note (RTD/autodoc) -------------------------------- Keep docstrings on public methods comprehensive and consistent with your JSON keys. Methods with a leading underscore are treated as internal helpers. """ def __init__(self, **kwargs): """ Initialise the observations helper. Parameters ---------- **kwargs Optional configuration injected into `self` by the surrounding workflow. Notes ----- - The shown implementation is a no-op; in production you typically assign kwargs onto `self` or inherit from a base class that handles configuration. """ return #################################################################################################### # NSIDC #################################################################################################### def _get_nsidc_sic_name(self, ds, monthly_files=False): """ Return the NSIDC concentration variable present in ds. """ cand = [] sic_cfg = self.NSIDC_dict.get("SIC_name", "cdr_seaice_conc") if isinstance(sic_cfg, dict): if monthly_files: cand.extend([sic_cfg.get("monthly"), sic_cfg.get("daily")]) else: cand.extend([sic_cfg.get("daily"), sic_cfg.get("monthly")]) else: cand.append(sic_cfg) # robust fallbacks across V4/V6 raw files cand.extend(["cdr_seaice_conc", "cdr_seaice_conc_monthly"]) for name in cand: if name and name in ds.data_vars: return name raise KeyError(f"Could not find an NSIDC sea-ice concentration variable in {list(ds.data_vars)}")
[docs] def load_local_NSIDC(self, dt0_str=None, dtN_str=None, local_directory=None, monthly_files=False, chunks=None, engine="h5netcdf"): """ Load NSIDC sea-ice concentration NetCDF files from a local directory tree. Optimisations relative to the original version: - one directory scan instead of thousands of Path.exists() calls - combine="nested" + concat_dim="time" instead of combine="by_coords" - drop unused variables during open - avoid per-file warning spam - optional manual time assignment after open """ from datetime import datetime def preprocess(ds): # promote time before concatenation if "time" in ds.variables and "tdim" in ds.dims: ds = ds.set_coords("time") ds = ds.swap_dims({"tdim": "time"}) ds = ds.drop_vars("tdim", errors="ignore") # keep only what is actually needed keep_data = ["cdr_seaice_conc"] drop_data = [v for v in ds.data_vars if v not in keep_data] if drop_data: ds = ds.drop_vars(drop_data) # ensure coords are retained for c in ("time", "xgrid", "ygrid"): if c in ds.variables: ds = ds.set_coords(c) return ds f_fmt = self.NSIDC_dict["G02202_v4_file_format"] dt0_str = dt0_str if dt0_str is not None else self.dt0_str dtN_str = dtN_str if dtN_str is not None else self.dtN_str freq_str = "monthly" if monthly_files else "daily" D_local = (Path(local_directory) if local_directory else Path(self.NSIDC_dict["D_original"], self.NSIDC_dict["version"], self.hemisphere, freq_str)) dt0 = datetime.strptime(dt0_str, "%Y-%m-%d") dtN = datetime.strptime(dtN_str, "%Y-%m-%d") f_vers_parsed = [(ver, datetime.strptime(date_str, "%Y-%m-%d") if date_str else datetime.min) for ver, date_str in self.NSIDC_dict["file_versions"].items()] f_vers_parsed.sort(key=lambda x: x[1]) date_range = pd.date_range(start=dt0, end=dtN, freq="MS" if monthly_files else "D") # one metadata scan instead of N Path.exists() calls with os.scandir(D_local) as it: available = {entry.name: Path(entry.path) for entry in it if entry.is_file() and entry.name.endswith(".nc")} P_NSIDCs = [] times = [] missing = [] for dt_ in date_range: f_version = next((ver for ver, d in reversed(f_vers_parsed) if dt_ >= d), f_vers_parsed[0][0]) F_ = f_fmt.format(freq=freq_str, hem=self.hemisphere_dict["abbreviation"].lower(), date=dt_.strftime("%Y%m%d"), ver=f_version) P_ = available.get(F_) if P_ is not None: P_NSIDCs.append(str(P_)) times.append(pd.Timestamp(dt_)) else: missing.append(F_) if missing: self.logger.warning("Missing %d NSIDC files in %s; first few missing: %s", len(missing), D_local, ", ".join(missing[:5])) if not P_NSIDCs: raise FileNotFoundError(f"No NSIDC files found in {D_local} between {dt0_str} and {dtN_str}") self.logger.info("Opening %d NSIDC files from %s (last file: %s)", len(P_NSIDCs), D_local, Path(P_NSIDCs[-1]).name) ds = xr.open_mfdataset(P_NSIDCs, engine = engine, # try "h5netcdf"; fall back to "netcdf4" if needed combine = "nested", concat_dim = "time", preprocess = preprocess, parallel = False, # often faster for many small files decode_times = False, # assign known times ourselves below coords = "minimal", data_vars = "minimal", compat = "override", join = "override", combine_attrs = "override", drop_variables = ["nsidc_bt_seaice_conc", "nsidc_nt_seaice_conc", "qa_of_cdr_seaice_conc", "spatial_interpolation_flag", "stdev_of_cdr_seaice_conc", "temporal_interpolation_flag", "projection"]) # overwrite numeric/cftime-ish time with known timestamps from filenames/date loop ds = ds.assign_coords(time=("time", pd.DatetimeIndex(times))) # rechunk after concatenation; otherwise time chunks often remain size 1 if chunks is None: chunks = {"time": 365} return ds.chunk(chunks)
[docs] def NSIDC_coordinate_transformation(self, ds): """ Add 2D lon/lat coordinates to an NSIDC dataset from projected x/y coordinates. Works with 1D xgrid/ygrid or 2D projected coordinates. """ import numpy as np import xarray as xr from pyproj import CRS, Transformer x_name = "xgrid" if "xgrid" in ds.variables else self.NSIDC_dict.get("x_coord", "xgrid") y_name = "ygrid" if "ygrid" in ds.variables else self.NSIDC_dict.get("y_coord", "ygrid") if x_name not in ds.variables or y_name not in ds.variables: raise KeyError(f"Could not find projected coordinate variables '{x_name}' and '{y_name}'") x = ds[x_name].values y = ds[y_name].values # CRS from file if available, else from config if "projection" in ds.variables and "proj4text" in ds["projection"].attrs: proj4 = ds["projection"].attrs["proj4text"] else: proj_cfg = self.NSIDC_dict["projection_string"] proj4 = proj_cfg[self.hemisphere] if isinstance(proj_cfg, dict) else proj_cfg crs_nsidc = CRS.from_user_input(proj4) crs_geod = crs_nsidc.geodetic_crs if crs_nsidc.geodetic_crs is not None else CRS.from_epsg(4326) transformer = Transformer.from_crs(crs_nsidc, crs_geod, always_xy=True) if x.ndim == 1 and y.ndim == 1: xx, yy = np.meshgrid(x, y) lon, lat = transformer.transform(xx, yy) ds = ds.assign_coords( lon=(("y", "x"), lon), lat=(("y", "x"), lat), ) elif x.ndim == 2 and y.ndim == 2 and x.shape == y.shape: lon, lat = transformer.transform(x, y) ds = ds.assign_coords( lon=(ds[y_name].dims, lon), lat=(ds[y_name].dims, lat), ) else: raise ValueError("Unsupported xgrid/ygrid layout; expected 1D xgrid/ygrid or matching 2D arrays.") return ds
[docs] def compute_NSIDC_cell_area_from_grid(self, ds, P_out=None, overwrite=False): """ Derive geodetic cell area [m^2] from NSIDC projected x/y grid and file CRS. Parameters ---------- ds : xr.Dataset NSIDC dataset containing 1D xgrid/ygrid and projection metadata. P_out : str or Path, optional If given, write the derived cell-area field to NetCDF for reuse. overwrite : bool, default False Overwrite existing cached NetCDF if present. Returns ------- xr.DataArray cell_area(y, x) in m^2 """ from pyproj import CRS, Transformer, Geod if P_out is not None: P_out = Path(P_out) if P_out.exists() and not overwrite: A = xr.open_dataset(P_out) if "cell_area" in A: return A["cell_area"] x_name = "xgrid" if "xgrid" in ds.variables else self.NSIDC_dict.get("x_coord", "xgrid") y_name = "ygrid" if "ygrid" in ds.variables else self.NSIDC_dict.get("y_coord", "ygrid") if x_name not in ds.variables or y_name not in ds.variables: raise KeyError(f"Dataset must contain '{x_name}' and '{y_name}'") x = ds[x_name].values y = ds[y_name].values if x.ndim != 1 or y.ndim != 1: raise ValueError("This helper currently expects 1D xgrid and ygrid") # CRS from file metadata if available if "projection" in ds.variables and "proj4text" in ds["projection"].attrs: proj4 = ds["projection"].attrs["proj4text"] else: proj_cfg = self.NSIDC_dict["projection_string"] proj4 = proj_cfg[self.hemisphere] if isinstance(proj_cfg, dict) else proj_cfg crs_nsidc = CRS.from_user_input(proj4) crs_geod = crs_nsidc.geodetic_crs if crs_nsidc.geodetic_crs is not None else CRS.from_epsg(4326) transformer = Transformer.from_crs(crs_nsidc, crs_geod, always_xy=True) # Use native ellipsoid if present if "projection" in ds.variables: a = ds["projection"].attrs.get("semimajor_radius", None) b = ds["projection"].attrs.get("semiminor_radius", None) else: a = b = None if a is None or b is None: # fall back to CRS ellipsoid ellps = crs_nsidc.ellipsoid a = ellps.semi_major_metre b = ellps.semi_minor_metre geod = Geod(a=a, b=b) def centers_to_edges(c): c = np.asarray(c, dtype=np.float64) if c.size < 2: raise ValueError("Need at least two coordinates to infer edges") dc = np.diff(c) edges = np.empty(c.size + 1, dtype=np.float64) edges[1:-1] = 0.5 * (c[:-1] + c[1:]) edges[0] = c[0] - 0.5 * dc[0] edges[-1] = c[-1] + 0.5 * dc[-1] return edges x_edge = centers_to_edges(x) y_edge = centers_to_edges(y) # Corner mesh xx_edge, yy_edge = np.meshgrid(x_edge, y_edge) lon_edge, lat_edge = transformer.transform(xx_edge, yy_edge) ny = y.size nx = x.size area = np.empty((ny, nx), dtype=np.float64) # Geodesic polygon area per cell for j in range(ny): for i in range(nx): lons = [ lon_edge[j, i], lon_edge[j, i+1], lon_edge[j+1, i+1], lon_edge[j+1, i], ] lats = [ lat_edge[j, i], lat_edge[j, i+1], lat_edge[j+1, i+1], lat_edge[j+1, i], ] poly_area, _ = geod.polygon_area_perimeter(lons, lats) area[j, i] = abs(poly_area) area_da = xr.DataArray( area, dims=("y", "x"), coords={ "y": ds["y"] if "y" in ds.coords else np.arange(ny), "x": ds["x"] if "x" in ds.coords else np.arange(nx), x_name: ("x", x), y_name: ("y", y), }, name="cell_area", attrs={ "long_name": "NSIDC grid-cell geodetic area derived from projection and x/y grid", "units": "m2", "source": "derived from xgrid/ygrid + polar stereographic CRS", }, ) if P_out is not None: P_out.parent.mkdir(parents=True, exist_ok=True) area_da.to_dataset().to_netcdf(P_out) return area_da
[docs] def get_NSIDC_cell_area(self, ds=None, P_cell_area=None, P_fallback=None, overwrite=False): """ Return NSIDC cell area [m^2]. Priority: 1. official cell-area file if present 2. cached derived file if present 3. derive from ds x/y grid + projection and optionally cache """ if P_cell_area is None: P_cfg = self.NSIDC_dict.get("P_cell_area", None) if isinstance(P_cfg, dict): P_cell_area = P_cfg.get(self.hemisphere, None) else: P_cell_area = P_cfg if P_cell_area and os.path.exists(P_cell_area): A = xr.open_dataset(P_cell_area) if "cell_area" in A: return A["cell_area"] # mild fallback if variable name differs for v in A.data_vars: if "area" in v.lower(): return A[v] raise KeyError(f"No cell-area variable found in {P_cell_area}") if ds is None: raise FileNotFoundError("Official NSIDC cell-area file not found and no dataset supplied for fallback derivation.") return self.compute_NSIDC_cell_area_from_grid(ds, P_out=P_fallback, overwrite=overwrite)
[docs] def compute_NSIDC_metrics(self, dt0_str = None, dtN_str = None, local_load_dir = None, monthly_files = False, P_zarr = None, overwrite = False, P_cell_area = None, derive_area_if_missing = True, P_area_fallback = None): """ Compute and persist NSIDC hemispheric sea-ice area (SIA) and extent (SIE). If the official NSIDC cell-area ancillary is unavailable, optionally derive grid-cell area from xgrid/ygrid and the native projection. """ dt0_str = dt0_str if dt0_str is not None else self.dt0_str dtN_str = dtN_str if dtN_str is not None else self.dtN_str freq_str = "monthly" if monthly_files else "daily" D_local = Path(local_load_dir) if local_load_dir is not None else Path(self.NSIDC_dict["D_original"], self.NSIDC_dict["version"], self.hemisphere, freq_str) P_zarr = Path(P_zarr) if P_zarr is not None else Path(D_local, "zarr") if os.path.exists(P_zarr) and not overwrite: self.logger.info(f"**loading** previously created NSIDC time series zarr file {P_zarr}") return xr.open_zarr(P_zarr, consolidated=True) NSIDC = self.load_local_NSIDC(dt0_str = dt0_str, dtN_str = dtN_str, local_directory = D_local, monthly_files = monthly_files) SIC_name = self._get_nsidc_sic_name(NSIDC, monthly_files=monthly_files) aice = NSIDC[SIC_name] # Robust masking after CF decoding: # valid concentration should lie in [0, 1]; flags become > 1 after scale_factor aice = aice.where((aice >= 0.0) & (aice <= 1.0)) if P_area_fallback is None: P_area_fallback = Path(D_local, f"NSIDC_cell_area_{self.hemisphere}_derived.nc") if derive_area_if_missing: area = self.get_NSIDC_cell_area(ds=NSIDC, P_cell_area = P_cell_area, P_fallback = P_area_fallback, overwrite = overwrite) else: if P_cell_area is None: P_cfg = self.NSIDC_dict.get("P_cell_area", None) P_cell_area = P_cfg.get(self.hemisphere, None) if isinstance(P_cfg, dict) else P_cfg if P_cell_area is None or not os.path.exists(P_cell_area): raise FileNotFoundError("NSIDC cell-area file not found and derive_area_if_missing=False") area = xr.open_dataset(P_cell_area)["cell_area"] # convert to your preferred scale area = area / self.SIC_scale # 15% threshold mask for extent and thresholded area mask = aice > self.icon_thresh SIA = (aice.where(mask) * area).sum(dim=["y", "x"], skipna=True) SIE = area.where(mask).sum(dim=["y", "x"], skipna=True) NSIDC_ts = xr.Dataset(data_vars = {"SIA": ("time", SIA.values), "SIE": ("time", SIE.values)}, coords = {"time": ("time", SIA.time.values)}, attrs = {"source": "NSIDC G02202", "hemisphere": self.hemisphere, "area_method": "official ancillary" if (P_cell_area and os.path.exists(P_cell_area)) else "derived from xgrid/ygrid + projection", "ice_threshold": float(self.icon_thresh)}) self.logger.info(f"**writing** NSIDC time series zarr file {P_zarr}") NSIDC_ts.to_zarr(P_zarr, mode="w") return NSIDC_ts
#################################################################################################### ## AF2020 ####################################################################################################
[docs] def load_AF2020_FIA_summary(self, repeat_climatology: bool = True, start : str = "1994-01-01", end : str = "1999-12-31") -> xr.Dataset: """ Load AF2020 landfast sea-ice area (FIA) time series and build a DOY climatology. This method reads the AF2020 FIA CSV, standardises sector names, produces a circumpolar daily time series, and computes a day-of-year (DOY) climatology. Optionally, the climatology is repeated over an arbitrary target time range for direct comparison against model series. Parameters ---------- repeat_climatology : bool, default True If True, returns a repeated climatology series (`FIA_clim_repeat`) over the specified [start, end] window. If False, returns only the raw FIA series and DOY climatology. start, end : str, default ("1994-01-01", "1999-12-31") Date window used when repeating the climatology (daily frequency). Returns ------- xr.Dataset Variables: - FIA_obs(time, region) : observed FIA time series (typically "circumpolar") - FIA_clim(doy, region) : DOY climatology - FIA_clim_repeat(time, region) : climatology mapped to [start, end] (optional) Raises ------ FileNotFoundError If the CSV path does not exist. ValueError If expected columns required for climatology construction are missing. Notes ----- - Units are converted from km² to 10³ km² by dividing by 1000. - Leap-day handling: this implementation drops DOY 366 when repeating unless explicitly handled; ensure consistency with your model calendar. """ from scipy.interpolate import interp1d csv_path = self.AF_FI_dict['P_AF2020_csv'] df = pd.read_csv(csv_path) regions = ["circumpolar", "indian_ocean", "western_pacific", "ross_sea", "amundsen_sea", "weddell_sea"] df = df.rename(columns={'Circumpolar': 'circumpolar', 'IOsector' : 'indian_ocean', 'WPOsector' : 'western_pacific', 'RSsector' : 'ross_sea', 'BASsector' : 'amundsen_sea', 'WSsector' : 'weddell_sea'}) df = df.dropna(subset=["Year", "Month_start", "Day_of_month_start"]) df["date"] = pd.to_datetime({"year": df["Year"].astype(int), "month": df["Month_start"].astype(int), "day": df["Day_of_month_start"].astype(int)}, errors="coerce") df = df.dropna(subset=["date"]) df["doy"] = df["date"].dt.dayofyear df = df.set_index("date") df = df[regions + ["doy"]] fia_obs = df[regions] / 1000.0 # km² → 1000 km² fia_obs_xr = fia_obs.to_xarray().to_array("region").transpose("date", "region") if 'doy' not in df.columns or 'circumpolar' not in df.columns: raise ValueError("Expected 'doy' and 'circumpolar' columns for climatology generation.") doy_vals = np.arange(1, 366) x = df.groupby("doy")["circumpolar"].mean().dropna().index.values y = df.groupby("doy")["circumpolar"].mean().dropna().values / 1000.0 interp_func = interp1d(x, y, kind='linear', fill_value="extrapolate") clim_interp = pd.DataFrame({"circumpolar": interp_func(doy_vals)}, index=doy_vals) fia_clim_doy = xr.DataArray(clim_interp["circumpolar"].values[:, None], # <-- Add [:, None] to make it 2D coords={"doy": doy_vals, "region": ["circumpolar"]}, dims=["doy", "region"]) if repeat_climatology: model_dates = pd.date_range(start, end, freq="D") doy_map = model_dates.dayofyear valid_mask = doy_map < 366 model_dates = model_dates[valid_mask] doy_map = doy_map[valid_mask] repeated = fia_clim_doy.sel(doy=xr.DataArray(doy_map, dims="time")).values fia_clim_re = xr.DataArray(repeated, coords = {"time": model_dates, "region": ["circumpolar"]}, dims = ["time", "region"]) else: fia_clim_re = None ds = xr.Dataset({"FIA_obs": fia_obs_xr.sel(region="circumpolar"), "FIA_clim": fia_clim_doy}) if fia_clim_re is not None: ds["FIA_clim_repeat"] = fia_clim_re self.logger.info(f"AF2020 FIA data loaded from {csv_path}") return ds
[docs] def AF2020_interpolate_FIA_clim(self, csv_df, full_doy=np.arange(1, 366)): """ Interpolate an AF2020 DOY climatology to a complete 1..365 DOY axis. Parameters ---------- csv_df : pandas.DataFrame DataFrame containing at least: - 'DOY_start' : day-of-year index values - 'Circumpolar' : FIA values (km²) full_doy : array-like, default np.arange(1, 366) Target DOY coordinate to interpolate onto. Returns ------- xr.DataArray Interpolated climatology with coordinate: - doy : day-of-year Values are in 10³ km². """ from scipy.interpolate import interp1d df = csv_df.copy() if 'Circumpolar' not in df.columns or 'DOY_start' not in df.columns: raise ValueError("Expected columns 'DOY_start' and 'circumpolar' in CSV.") x = df['DOY_start'].values y = df['Circumpolar'].values/1e3 if len(x) != len(y): raise ValueError("x and y arrays must be equal in length.") interp_func = interp1d(x, y, kind='linear', fill_value="extrapolate") return xr.DataArray(interp_func(full_doy), coords=[("doy", full_doy)])
[docs] def AF2020_clim_to_model_time(self, model: xr.DataArray, clim: xr.DataArray) -> xr.DataArray: """ Expand a DOY-based climatology to match a model time axis. Parameters ---------- model : xr.DataArray Model time series containing a `time` coordinate (datetime-like). clim : xr.DataArray Climatology indexed by `doy` (and optionally `region`). Returns ------- xr.DataArray Climatology values mapped onto `model.time` using each date's day-of-year. Notes ----- - Leap days: if the observational climatology does not define DOY=366, callers should drop Feb 29 from the model calendar or define a mapping strategy (e.g., nearest/pad) consistent with the analysis. """ # Compute day-of-year for each model date (drop Feb 29 if not in obs) doy = xr.DataArray(model['time'].dt.dayofyear, coords={"time": model.time}, dims="time") # Match each model day to a climatological value matched = clim.sel(doy=doy, method="nearest") # or method="pad" matched.coords["time"] = model.time # Align with model time return matched
[docs] def build_AF2020_grid_corners(self, lat_c, lon_c): """ Construct (ny+1, nx+1) corner arrays from (ny, nx) center arrays. This helper builds approximate corner coordinates by padding center fields and averaging surrounding values. Corner arrays are commonly required for conservative remapping in xESMF. Parameters ---------- lat_c, lon_c : np.ndarray 2D arrays (ny, nx) of center latitude/longitude in degrees. Returns ------- (lat_b, lon_b) : tuple[np.ndarray, np.ndarray] 2D arrays (ny+1, nx+1) of corner latitude/longitude. Notes ----- - This approach assumes a locally smooth grid. For strongly curvilinear grids, corner construction should ideally use native corner metadata when available. - Output longitudes are wrapped to [0, 360). """ ny, nx = lat_c.shape lat_padded = np.pad(lat_c, ((1, 1), (1, 1)), mode='edge') lon_padded = np.pad(lon_c, ((1, 1), (1, 1)), mode='edge') lat_b = 0.25 * (lat_padded[0:-1, 0:-1] + lat_padded[0:-1, 1:] + lat_padded[1:, 0:-1] + lat_padded[1:, 1:]) lon_b = 0.25 * (lon_padded[0:-1, 0:-1] + lon_padded[0:-1, 1:] + lon_padded[1:, 0:-1] + lon_padded[1:, 1:]) lon_b = ((lon_b + 360) % 360) return lat_b, lon_b
def _load_AF2020_FI_org_and_save_zarr(self, P_zarr): """ Load original AF2020 FastIce NetCDF rasters, compute persistence, regrid to CICE T-grid, and write Zarr. Workflow -------- 1) Open all AF2020 "FastIce_70_*.nc" rasters (mosaic-window products). 2) Build an xESMF conservative regridder from the AF2020 grid to the CICE T-grid, reusing a persistent weights file when available. 3) Convert the AF2020 categorical time-series variable to a binary fast-ice mask using a threshold (>=4), then compute a time-mean persistence fraction. 4) Regrid persistence to the CICE grid and save a compact dataset to Zarr. Parameters ---------- P_zarr : pathlib.Path Destination Zarr store path. Returns ------- xr.Dataset Dataset containing: - FI(nj, ni) : fast-ice persistence fraction on CICE T-grid - FI_t_alt(t_FI_obs) : auxiliary date strings from the original product plus lon/lat coordinates on the CICE T-grid. Notes ----- - Weight file path is taken from `self.AF_FI_dict["AF_reG_weights"]`. - Target grid is built via `self.define_cice_grid(grid_type="t", ...)`. - Conservative regridding requires valid grid corners on both source and target. """ import xesmf as xe D_obs = Path(self.AF_FI_dict['D_AF2020_db_org']) P_orgs = sorted(D_obs.glob("FastIce_70_*.nc")) self.logger.info(f"Loading original AF2020 NetCDF files: {P_orgs}") FI_obs = xr.open_mfdataset(P_orgs, engine='netcdf4', combine='by_coords') FI_obs = FI_obs.chunk({'time': 32}) lat_c = FI_obs.latitude lon_c = FI_obs.longitude lat_b, lon_b = self.build_AF2020_grid_corners(lat_c.values, lon_c.values) G_obs = xr.Dataset({"lat" : (("Y", "X"), lat_c), "lon" : (("Y", "X"), lon_c), "lat_b" : (("Y_b", "X_b"), lat_b), "lon_b" : (("Y_b", "X_b"), lon_b)}) G_t = self.define_cice_grid(grid_type='t', mask=False, build_grid_corners=True) P_weights = self.AF_FI_dict["AF_reG_weights"] reuse_weights = os.path.exists(P_weights) self.logger.info(f"Regridding observational data using weights: {P_weights} (reuse={reuse_weights})") reG_obs = xe.Regridder(G_obs, G_t, method = "conservative", periodic = False, ignore_degenerate = True, reuse_weights = reuse_weights, filename = P_weights) fast_ice_mask = xr.where(FI_obs['Fast_Ice_Time_series'] >= 4, 1.0, 0.0) FI_frac = fast_ice_mask.sum(dim='time') / fast_ice_mask['time'].size FI_frac = FI_frac.where(FI_frac > 0) # set 0s to NaN (optional: mask ocean) FI_frac_reG = reG_obs(FI_frac) ds_out = xr.Dataset(data_vars = {"FI" : FI_frac_reG, "FI_t_alt" : ("t_FI_obs", FI_obs.date_alt, {"long_name" : FI_obs.date_alt.attrs.get("long_name", ""), "description" : FI_obs.date_alt.attrs.get("description", "")})}, coords = {"t_FI_obs" : ("t_FI_obs", FI_obs.time, {"description": "Start date of 15- or 20-day image mosaic window.", "units": "days since 2000-1-1 0:0:0"}), "lon" : (("nj", "ni"), G_t['lon'].values, {"units": "degrees_east"}), "lat" : (("nj", "ni"), G_t['lat'].values, {"units": "degrees_north"})}) self.logger.info(f"Saving regridded dataset to Zarr: {P_zarr}") ds_out.to_zarr(P_zarr, mode="w", consolidated=True) return ds_out
[docs] def AF2020_regional_fast_ice_area(self, ds: xr.Dataset, region = (52.5, 97.5, -70.5, -64), lon_name = "longitude", lat_name = "latitude", area_name = "area", fast_name = "Fast_Ice_Time_series", threshold = 4, lon_wrap = "0-360"): """ Compute a regional fast-ice area time series from a gridded observation product. This routine is designed for AF2020-like datasets that provide: - curvilinear or projected 2-D lon/lat fields, and - a per-cell area field (m²), and - a fast-ice indicator variable over time. The region is defined by a geographic bounding box: region = (lon_min, lon_max, lat_min, lat_max) Longitude seam-crossing is supported (e.g., lon_min=350, lon_max=20 in 0–360 convention). The computation is performed as: FIA(t) = Σ_{cells in region} [ fast(t,cell) * area(cell) ] where `fast(t,cell)` is a boolean mask defined by `ds[fast_name] >= threshold`. Parameters ---------- ds : xarray.Dataset Dataset containing at minimum: - ds[lon_name] : 2-D longitude field with a time dimension (time, Y, X) or broadcastable; only time=0 is used for the static grid. - ds[lat_name] : 2-D latitude field with a time dimension (time, Y, X) or broadcastable; only time=0 is used for the static grid. - ds[area_name] : 2-D grid-cell area in m² with a time dimension (time, Y, X) or broadcastable; only time=0 is used. - ds[fast_name] : fast-ice indicator variable with a time dimension. region : tuple[float, float, float, float], default (52.5, 97.5, -70.5, -64) Geographic region bounds as (lon_min, lon_max, lat_min, lat_max), in degrees. lon_name : str, default "longitude" Name of the longitude variable/coordinate in `ds`. lat_name : str, default "latitude" Name of the latitude variable/coordinate in `ds`. area_name : str, default "area" Name of the per-cell area variable/coordinate in `ds` (assumed m²). fast_name : str, default "Fast_Ice_Time_series" Name of the fast-ice indicator variable in `ds`. threshold : float, default 4 Threshold applied to `ds[fast_name]` to produce a boolean fast-ice mask. Cells with `ds[fast_name] >= threshold` are treated as fast ice. lon_wrap : {"0-360", "-180-180"}, default "0-360" Longitude convention used for the region definition and for wrapping `lon2d`. Returns ------- xarray.DataArray 1-D time series of regional fast-ice area in **1e3 km²** (i.e., thousands of km²), named `"FIA_1e3-km2"`. The time coordinate is inherited from `ds[fast_name]`. Notes ----- - Lon/lat/area are read from `time=0` only to avoid unnecessary time broadcasting. - The regional mask is optionally cropped to the minimal bounding index range to reduce computation. - If `region` crosses the longitude seam (lon_min > lon_max), the longitude mask is built using an OR condition to include both sides. Examples -------- >>> fia = obs.AF2020_regional_fast_ice_area(ds, region=(350, 20, -75, -65), lon_wrap="0-360") >>> fia.plot() """ lon_min, lon_max, lat_min, lat_max = region # --- 1) Use static lon/lat & area from first time slice (avoid time broadcast) lon2d = ds[lon_name].isel(time=0) lat2d = ds[lat_name].isel(time=0) area2d = ds[area_name].isel(time=0) # assumed m^2 # --- 2) Normalize longitudes to match region convention def wrap(lon): if lon_wrap == "0-360": return ((lon % 360) + 360) % 360 else: return ((lon + 180.0) % 360.0) - 180.0 lon2d = wrap(lon2d) if lon_wrap == "0-360": lon_min, lon_max = lon_min % 360, lon_max % 360 else: lon_min = ((lon_min + 180) % 360) - 180 lon_max = ((lon_max + 180) % 360) - 180 # --- 3) Build region mask (handle seam-crossing gracefully) if lon_min <= lon_max: mask_lon = (lon2d >= lon_min) & (lon2d <= lon_max) else: mask_lon = (lon2d >= lon_min) | (lon2d <= lon_max) # across seam mask_lat = (lat2d >= lat_min) & (lat2d <= lat_max) region_mask = mask_lon & mask_lat # (Y, X) # --- 4) Convert area to 1000-km^2 area_km2 = area2d / 1e9 # OPTIONAL: crop to bounding indices to reduce work jdim, idim = lat2d.dims # expect ("Y","X") j_any = region_mask.any(dim=idim) i_any = region_mask.any(dim=jdim) if bool(j_any.any()) and bool(i_any.any()): j_idx = np.where(j_any.values)[0] i_idx = np.where(i_any.values)[0] j0, j1 = int(j_idx.min()), int(j_idx.max()) + 1 i0, i1 = int(i_idx.min()), int(i_idx.max()) + 1 region_mask = region_mask.isel({jdim: slice(j0, j1), idim: slice(i0, i1)}) area_km2 = area_km2.isel({jdim: slice(j0, j1), idim: slice(i0, i1)}) # --- 5) Fast-ice presence per time (binary), then area sum over region fast = (ds[fast_name] >= threshold) # align cropped mask/area to fast dims (time, Y, X) region_mask3 = region_mask.broadcast_like(fast.isel(time=0)) area_km2_3 = area_km2.broadcast_like(fast.isel(time=0)) # area at each time = sum( fast * region_mask * area ) fia_ts = (fast.where(region_mask3) * area_km2_3).sum(dim=(jdim, idim)) fia_ts.name = "FIA_1e3-km2" fia_ts.attrs["description"] = f"Fast-ice area (1e3-km^2) in region {region}, threshold>={threshold}" return fia_ts
#################################################################################################### ## CCI SIT ####################################################################################################
[docs] def build_sea_ice_satellite_paths(self, dt0_str : str, dtN_str : str, hemisphere : Optional[str] = None, institutions : Optional[Iterable[str]] = ("ESA","AWI"), sensors : Optional[Iterable[str]] = None, # ["cryosat2","envisat","sentinel3a","sentinel3b"] levels : Optional[Iterable[str]] = None, # ESA: ["L2P","L3C"]; AWI: ["l2p_release","l3cp_release"] versions : Optional[Iterable[str]] = None, # ESA only; and only "v2.0" or "v3.0" root_esa : Optional[str] = None, root_awi : Optional[str] = None) -> List[Path]: """ Build a de-duplicated, time-filtered list of NetCDF files for satellite sea-ice products across ESA and/or AWI directory layouts. Time filtering: - Returns files whose date/month lies within the closed interval [dt0_str, dtN_str], interpreted in UTC. - Daily products (e.g., ESA L2P) are filtered by date parsed from the filename (YYYYMMDD). Monthly products (e.g., ESA L3C or AWI monthly layout) are filtered by overlap between the (year, month) and [dt0, dtN]. Supported directory patterns (walked automatically): ESA “flat” layout: <root_esa>/<L2P|L3C>/<sensor>/<hemisphere>/*.nc (hemisphere directory may be 'NH'/'SH' or lowercase) - L2P: filenames contain YYYYMMDD - L3C: filenames often contain YYYYMM ESA versioned layout: <root_esa>/<L2P|L3C>/<sensor>/<version>/<HEMISPHERE>/<YYYY>/<MM>/*.nc (HEMISPHERE can be 'NH'/'SH' or lowercase; <MM> subdirs may be absent) AWI release layout: <root_awi>/<l2p_release|l3cp_release>/<hemisphere>/<sensor>/<YYYY>[/<MM>]/*.nc Parameters ---------- dt0_str, dtN_str Start and end of the desired time window (inclusive). Any pandas-parsable string is accepted (e.g., "2002-01-01", "2012-12-31"). Interpreted as UTC. hemisphere Hemisphere selector. If None, uses `self.hemisphere_dict['abbreviation']`. Case is normalized; both 'sh'/'nh' and 'SH'/'NH' are supported in dir scans. institutions Iterable of institutions to include (case-insensitive). Valid values include "ESA" and/or "AWI". If None or empty, both are searched. sensors Optional iterable of sensor names to filter (case-insensitive, matched to directory names under the institution layout). levels Optional iterable of level strings. ESA: "L2P", "L3C". AWI: "l2p_release", "l3cp_release". If "l2p"/"l3c" are provided for AWI they are mapped to "l2p_release"/"l3cp_release". versions ESA only: one or more version directory names (e.g., "v2.0", "v3.0"). Case-insensitive. Ignored for AWI. root_esa, root_awi Root directories for ESA and AWI datasets. Returns ------- List[Path] Sorted unique list of file paths within the requested time window that satisfy the filters. Notes ----- - Filename parsing assumes dates follow YYYYMMDD (daily) or YYYYMM (monthly) within 2000–2099. Files without such tokens are kept only when they live in a year/month directory that overlaps [dt0, dtN]. - If `institutions` is None or empty, both ESA and AWI are scanned. - All timestamps are treated as UTC to avoid tz-comparison issues. Examples -------- >>> paths = self.build_sea_ice_satellite_paths( ... "2002-01-01", "2002-12-31", ... hemisphere="sh", ... institutions=["ESA", "AWI"], ... sensors=["cryosat2", "envisat"], ... levels=["L2P","L3C","l2p_release"] ... ) >>> len(paths), paths[:3] """ dt0_str = dt0_str if dt0_str is not None else self.dt0_str dtN_str = dtN_str if dtN_str is not None else self.dtN_str t0 = pd.Timestamp(dt0_str, tz="UTC") t1 = pd.Timestamp(dtN_str, tz="UTC") if t1 < t0: raise ValueError("dtN_str date must be >= dt0_str date") hem = hemisphere if hemisphere is not None else self.hemisphere_dict["abbreviation"] hem_upper = str(hem).upper() hem_lower = str(hem).lower() hem_dirs_esa = [hem_upper, hem_lower] # try both cases for ESA root_esa = root_esa if root_esa is not None else self.Sea_Ice_Obs_dict['ESA-CCI'] root_awi = root_awi if root_awi is not None else self.Sea_Ice_Obs_dict['AWI'] root_esa = Path(root_esa) root_awi = Path(root_awi) # filters (case-normalized) insts = set(s.upper() for s in (self._norm_list(institutions) or [])) sensor_set = set(s.lower() for s in (self._norm_list(sensors) or [])) if sensors else None version_set = set(v.lower() for v in (self._norm_list(versions) or [])) if versions else None level_set = set(l.lower() for l in (self._norm_list(levels) or [])) if levels else None out: List[Path] = [] # ---------------- ESA ---------------- if not insts or "ESA" in insts: esa_levels = ["L2P", "L3C"] if not level_set else ([L.upper() for L in level_set if L.upper() in ("L2P", "L3C")] or ["L2P", "L3C"]) for L in esa_levels: Ldir = Path(root_esa,L) if not Ldir.exists(): continue # <root>/<L>/<sensor>/... for sdir in sorted(p for p in Ldir.iterdir() if p.is_dir()): sensor = sdir.name.lower() if sensor_set and sensor not in sensor_set: continue # A) flat: .../L2P|L3C/<sensor>/<hem>/*.nc (hem 'nh'/'sh' any case) for hem_dir in hem_dirs_esa: flat_dir = Path(sdir,hem_dir) if flat_dir.exists(): for fp in flat_dir.glob("*.nc"): if L == "L2P": ts = self._parse_yyyymmdd(fp.name) if ts is None or ts < t0 or ts > t1: continue else: # L3C ym = self._parse_yyyymm(fp.name) if ym and not self._month_overlap(ym[0], ym[1], t0, t1): continue out.append(fp) # B) versioned: .../L2P|L3C/<sensor>/<ver>/<HEM>/YYYY/MM/*.nc for vdir in sorted(p for p in sdir.iterdir() if p.is_dir()): vname = vdir.name.lower() # skip if this 'vdir' is actually the hemisphere dir from flat layout if vname in {"nh", "sh"}: continue if version_set and vname not in version_set: continue for hem_dir in hem_dirs_esa: hdir = Path(vdir,hem_dir) if not hdir.exists(): continue for ydir in sorted(hdir.glob("[12][0-9][0-9][0-9]")): y = int(ydir.name) mdirs = list(sorted(ydir.glob("[01][0-9]"))) if mdirs: for mdir in mdirs: m = int(mdir.name) if not self._month_overlap(y, m, t0, t1): continue for fp in sorted(mdir.glob("*.nc")): if L == "L2P": ts = self._parse_yyyymmdd(fp.name) if ts is None or ts < t0 or ts > t1: continue out.append(fp) else: for fp in sorted(ydir.glob("*.nc")): if L == "L2P": ts = self._parse_yyyymmdd(fp.name) if ts is None or ts < t0 or ts > t1: continue else: ym = self._parse_yyyymm(fp.name) if ym and not self._month_overlap(ym[0], ym[1], t0, t1): continue out.append(fp) # ---------------- AWI ---------------- if not insts or "AWI" in insts: if not level_set: awi_levels: List[str] = ["l2p_release", "l3cp_release"] else: # map friendly ESA-like inputs to AWI release names canonical = [] for L in level_set: l = L.lower() if l in ("l2p_release", "l3cp_release"): canonical.append(l) elif l == "l2p": canonical.append("l2p_release") elif l == "l3c": canonical.append("l3cp_release") awi_levels = sorted(set(canonical)) or ["l2p_release", "l3cp_release"] LHEM = hem_lower # AWI uses lower-case hemisphere dirs for Lraw in awi_levels: Ldir = Path(root_awi,Lraw,LHEM) if not Ldir.exists(): continue for sdir in sorted(p for p in Ldir.iterdir() if p.is_dir()): sensor = sdir.name.lower() if sensor_set and sensor not in sensor_set: continue for ydir in sorted(sdir.glob("[12][0-9][0-9][0-9]")): y = int(ydir.name) mdirs = list(sorted(ydir.glob("[01][0-9]"))) if mdirs: for mdir in mdirs: m = int(mdir.name) if not self._month_overlap(y, m, t0, t1): continue out.extend(sorted(mdir.glob("*.nc"))) else: # year-only dir; filter by yyyymm in filenames when available for fp in sorted(ydir.glob("*.nc")): ym = self._parse_yyyymm(fp.name) if ym and not self._month_overlap(ym[0], ym[1], t0, t1): continue out.append(fp) # de-dup & sort return sorted(set(out))
# ------------------------- map grids (LAEA/EASE-like) ------------------------- # # ------------------------- file path indexing & QA helpers ------------------------- #
[docs] @staticmethod def laea_area_def(hem : str, nx : int, ny : int, dx_km : float, lon0 : float = 0.0): """ Build a Lambert Azimuthal Equal-Area grid centered on the pole. Parameters ---------- hem : {'sh','nh'} Hemisphere. nx, ny : int Grid shape (x, y). dx_km : float Grid spacing in kilometers (projected meters). lon0 : float, default 0.0 Central longitude (°E). ESA products commonly use 0. Returns ------- pyresample.geometry.AreaDefinition """ from pyresample import geometry if geometry is None: raise ImportError("pyresample is required: `pip install pyresample`") hem = str(hem).lower() lat0 = -90.0 if hem == "sh" else 90.0 proj_id = f"laea_{hem}_{int(dx_km)}km_{nx}x{ny}" proj_dict = {"proj" : "laea", "lat_0" : lat0, "lon_0" : lon0, "a" : 6378137.0, "b" : 6356752.314245, "units" : "m", "no_defs" : True} # extent centered on the pole half_w = (nx * dx_km * 1000.0) / 2.0 half_h = (ny * dx_km * 1000.0) / 2.0 area_extent = (-half_w, -half_h, half_w, half_h) area_id = proj_id description = f"LAEA {hem.upper()} {dx_km}km {nx}x{ny} lon0={lon0}" return geometry.AreaDefinition(area_id, description, proj_id, proj_dict, nx, ny, area_extent)
[docs] @staticmethod def ease2_sh_50km(): """Convenience grid roughly matching AWI SH L3C (216×216 @ ~50 km, lon0=0).""" return SeaIceObservations.laea_area_def("sh", nx=216, ny=216, dx_km=50.0, lon0=0.0)
[docs] @staticmethod def ease2_nh_25km(): """Convenience grid for NH (432×432 @ ~25 km, lon0=0).""" return SeaIceObservations.laea_area_def("nh", nx=432, ny=432, dx_km=25.0, lon0=0.0)
[docs] def index_satellite_paths(self, paths: Iterable[Path]) -> pd.DataFrame: """ Build a tidy index of candidate satellite files with parsed dates for QA/QC and selection. This function inspects each file name and classifies it as: - "daily" : contains a YYYYMMDD token (date parsed as that day) - "monthly" : contains a YYYYMM token (date set to first of that month) - "unknown" : no parseable token (date set to NaT) Parameters ---------- paths : iterable of pathlib.Path Input file paths. Returns ------- pandas.DataFrame A stable-sorted table with columns: - path : Path - kind : {"daily","monthly","unknown"} - date : UTC Timestamp (daily) or first-of-month (monthly) or NaT - year, month, day : ints (or None) Notes ----- Sorting is stable and deterministic to make downstream "first"/"last" selection repeatable. """ rows = [] for p in map(Path, paths): name = p.name ts = self._parse_yyyymmdd(name) if ts is not None: rows.append({"path": p, "kind": "daily", "date": ts, "year": ts.year, "month": ts.month, "day": ts.day}) continue ym = self._parse_yyyymm(name) if ym: y, m = ym rows.append({"path": p, "kind": "monthly", "date": pd.Timestamp(year=y, month=m, day=1, tz="UTC"), "year": y, "month": m, "day": None}) else: rows.append({"path": p, "kind": "unknown", "date": pd.NaT, "year": None, "month": None, "day": None}) df = pd.DataFrame(rows) # Stable sort so 'first'/'last' are deterministic df = df.sort_values(by = ["kind", "date", "year", "month", "day", "path"], na_position = "last", kind = "mergesort").reset_index(drop=True) return df
[docs] def dedup_daily_one_file_per_day(self, df: pd.DataFrame, prefer: str = "last") -> pd.DataFrame: """ Select exactly one daily file per UTC day from an indexed path table. Parameters ---------- df : pandas.DataFrame Output of `index_satellite_paths(...)`. prefer : {"first","last"}, default "last" If multiple files map to the same day, select the first/last after sorting. Returns ------- pandas.DataFrame Subset of `df` restricted to kind == "daily" with unique 'date'. Notes ----- - Non-daily entries are discarded. - The returned frame preserves the same columns as the input `df`. """ dfd = df[df["kind"] == "daily"].copy() if dfd.empty: return dfd dfd = dfd.sort_values(["date", "path"]) pick = {"first": "first", "last": "last"}[prefer] dfd = dfd.groupby("date", as_index=False, sort=True).agg(pick) return dfd
[docs] def dedup_monthly_one_file_per_month(self, df: pd.DataFrame, prefer: str = "last") -> pd.DataFrame: """ Select exactly one file per (year, month) from an indexed path table. This is intended for monthly products, but will also accept daily files that encode a YYYYMM token. Parameters ---------- df : pandas.DataFrame Output of `index_satellite_paths(...)`. prefer : {"first","last"}, default "last" If multiple files map to the same (year, month), select the first/last after sorting. Returns ------- pandas.DataFrame Subset with unique (year, month) combinations. Notes ----- Rows without parsed (year, month) are dropped. """ dfm = df[df["kind"].isin(["monthly","daily"])].copy() if dfm.empty: return dfm # For daily entries we keep (year, month) for grouping dfm["yymm"] = list(zip(dfm["year"], dfm["month"])) dfm = dfm.dropna(subset=["yymm"]) dfm = dfm.sort_values(["year","month","path"]) pick = {"first":"first","last":"last"}[prefer] dfm = dfm.groupby(["year","month"], as_index=False, sort=True).agg(pick) return dfm
# ------------------------- LEVEL 2 (DAILY UN-GRIDDED) ------------------------- # def _l2p_weights(self, index_array, # masked or ndarray; shape (k,n_out) OR (n_out,k) dist_array, # masked or ndarray; same shape/orientation valid_input_index, # (Nvalid_in,) valid_output_index, # (n_out,) ints OR bool mask of length ny*nx swath_SIT, # (Nswath,) swath_unc, # (Nswath,) out_shape : Tuple[int, int], # (ny, nx) roi_m : float, sigma_m : Optional[float] = None): """ Combine neighbour swath samples into gridded cell statistics using uniform or Gaussian weights. This helper is designed to consume the outputs of: pyresample.kd_tree.get_neighbour_info(...) and reduce swath SIT and uncertainty into per-grid-cell: - weighted mean SIT - propagated standard error (SE) - weighted mean of per-sample uncertainty - count of valid contributing neighbours Parameters ---------- index_array : array-like (masked or ndarray) Neighbour indices from `get_neighbour_info`. Shape is either (k, n_out) or (n_out, k), where k is the number of neighbours and n_out is the number of valid output cells. dist_array : array-like (masked or ndarray) Neighbour distances (meters) aligned with `index_array`. valid_input_index : array-like of int Indices selecting the valid subset of swath input points used by the KD-tree. valid_output_index : array-like of int or bool Output-cell selector from `get_neighbour_info`. If boolean, length must equal ny*nx; if integer, interpreted as flat indices into the target grid. swath_SIT : array-like of float Swath sea-ice thickness samples corresponding to the full swath arrays (pre-filtering). Units: meters. swath_unc : array-like of float Swath uncertainty samples (per-sample standard error or equivalent). Units: meters. out_shape : tuple[int, int] Target grid shape as (ny, nx). roi_m : float Radius of influence used for neighbour selection (meters). sigma_m : float or None, default None If provided, apply Gaussian weights: w = exp(-d^2 / (2*sigma^2)) Otherwise, use uniform weights for valid neighbours. Returns ------- tuple[numpy.ndarray, numpy.ndarray, numpy.ndarray, numpy.ndarray] (SIT, SIT_unc, u_cell, n_cell) where: - SIT : (ny, nx) float32, weighted mean thickness [m] - SIT_unc : (ny, nx) float32, propagated SE [m] - u_cell : (ny, nx) float32, weighted mean of per-sample uncertainty [m] - n_cell : (ny, nx) int32, number of valid neighbours per cell Notes ----- - Neighbours outside `roi_m` are excluded. - Masked neighbour entries are treated as non-existent. - SE propagation uses: sqrt(sum(w^2 * u^2)) / sum(w). """ ny, nx = out_shape # reduce sources to KD-tree's valid input subset src_s = np.asarray(swath_SIT, dtype=np.float64)[valid_input_index] src_u = np.asarray(swath_unc, dtype=np.float64)[valid_input_index] # normalise valid_output_index → integer flat indices voi = np.asarray(valid_output_index) out_idx = np.flatnonzero(voi) if voi.dtype == bool else voi.astype(np.int64, copy=False) n_out_expected = out_idx.size # fill masked arrays to ndarrays + build "exists" mask idx = np.ma.filled(index_array, 0).astype(np.int64, copy=False) dst = np.ma.filled(dist_array, np.inf).astype(np.float64, copy=False) neigh_exists = ~np.ma.getmaskarray(index_array) # orient to (k, n_out) i.e. neighbours along axis 0 if idx.shape[1] == n_out_expected: pass elif idx.shape[0] == n_out_expected: idx, dst, neigh_exists = idx.T, dst.T, neigh_exists.T else: raise ValueError(f"Neighbour array shape {idx.shape} incompatible with n_out={n_out_expected}") k, n_out = idx.shape if n_out != n_out_expected: raise ValueError(f"Internal n_out mismatch: {n_out} vs {n_out_expected}") # gather neighbour values from reduced swath v_s = np.take(src_s, idx, mode="clip") # (k, n_out) v_u = np.take(src_u, idx, mode="clip") # (k, n_out) # validity within_roi = np.isfinite(dst) & (dst <= roi_m) val_ok = neigh_exists & within_roi & np.isfinite(v_s) & np.isfinite(v_u) # weights if sigma_m is None: w = np.where(val_ok, 1.0, 0.0) else: w = np.where(val_ok, np.exp(-(dst**2) / (2.0 * sigma_m**2)), 0.0) # reductions along neighbour axis wsum = w.sum(axis=0) sitnum = (w * v_s).sum(axis=0) umean = (w * v_u).sum(axis=0) senum = ((w**2) * (v_u**2)).sum(axis=0) n_cnt = val_ok.sum(axis=0).astype("int32") sit_mean = np.divide(sitnum, wsum, out=np.full_like(wsum, np.nan, dtype=float), where=wsum>0) u_mean = np.divide(umean, wsum, out=np.full_like(wsum, np.nan, dtype=float), where=wsum>0) se_mean = np.divide(np.sqrt(senum), wsum, out=np.full_like(wsum, np.nan, dtype=float), where=wsum>0) # scatter back to flat arrays nflat = ny * nx SIT_flat = np.full(nflat, np.nan, dtype="float32") SIT_unc_flat = np.full(nflat, np.nan, dtype="float32") U_cell_flat = np.full(nflat, np.nan, dtype="float32") N_cell_flat = np.zeros(nflat, dtype="int32") SIT_flat[out_idx] = sit_mean.astype("float32", copy=False) SIT_unc_flat[out_idx] = se_mean.astype("float32", copy=False) U_cell_flat[out_idx] = u_mean.astype("float32", copy=False) N_cell_flat[out_idx] = n_cnt return (SIT_flat.reshape(ny, nx), SIT_unc_flat.reshape(ny, nx), U_cell_flat.reshape(ny, nx), N_cell_flat.reshape(ny, nx)) # ------------------------- main driver ------------------------- #
[docs] def l2p_to_daily_grid_pyresample(self, paths : List[Path], hem : Optional[str] = None, area_def = None, roi_km : float = 75.0, neighbours : int = 16, epsilon : float = 0.0, gaussian_sigma_km : Optional[float] = None, lat_cut : Optional[float] = None, time_at_noon : bool = True) -> xr.Dataset: """ Resample ESA/AWI **L2P swath** files to a **daily gridded** product on an equal-area polar grid using pyresample nearest-neighbour sets. The algorithm: 1) Group all valid swath samples by UTC day (per file) using `time` variable. 2) For each day: - Build a SwathDefinition from lon/lat points. - Use `pyresample.kd_tree.get_neighbour_info` to find up to `neighbours` candidates within `roi_km` of each grid cell. - Combine neighbour SIT & uncertainty with uniform or Gaussian weights. 3) Assemble (time, y, x) arrays and compute hemispheric daily means with cos(latitude) weights. Parameters ---------- paths : list of Path L2P NetCDF files. Each must include variables: `sea_ice_thickness`, `sea_ice_thickness_uncertainty`, `lon`, `lat`, `time`. hem : {'sh','nh'}, optional Hemisphere. Defaults to `self.hemisphere_dict['abbreviation']` when present, else 'sh'. area_def : pyresample.geometry.AreaDefinition, optional Target equal-area grid. If None, uses `ease2_sh_50km()` for SH or `ease2_nh_25km()` for NH. roi_km : float, default 75.0 Radius of influence for neighbour search (kilometres). neighbours : int, default 16 Max neighbours per grid cell from KD-tree. epsilon : float, default 0.0 Search tolerance passed to pyresample (trade accuracy vs. speed). gaussian_sigma_km : float or None, default None If set, apply Gaussian weights with this sigma (km); otherwise uniform. lat_cut : float or None, default None Optional extra swath mask: for SH keep `lat <= lat_cut`; for NH keep `lat >= lat_cut`. Useful for excluding marginal seas. time_at_noon : bool, default True If True, time coordinate is day+12:00Z; else midnight. Returns ------- xarray.Dataset Dimensions: time, y, x Coordinates: lon(y,x), lat(y,x), time Data variables: SIT(time,y,x) [m] SIT_unc(time,y,x) [m] propagated standard error u_cell(time,y,x) [m] per-cell uncertainty mean (neighbour-weighted) n_cell(time,y,x) [#] count of contributing neighbours per cell SIT_hem(time) [m] hemispheric mean SIT_hem_unc(time) [m] propagated SE for hemispheric mean SIT_hem_u(time) [m] hemispheric mean of per-cell uncertainty SIT_hem_n(time) [#] total contributing neighbours (sum over grid) Notes ----- - Hemispheric means use cos(lat) weights on the target grid after resampling. - Propagated SE follows: sqrt(sum(w^2 * se^2)) / sum(w). - Files missing required variables are skipped (warning). """ from pyresample import geometry, kd_tree if geometry is None or kd_tree is None: raise ImportError("pyresample is required: `pip install pyresample`") if not paths: raise ValueError("No L2P paths provided") hem = (hem or self.hemisphere_dict.get("abbreviation", "sh")).lower() if area_def is None: area_def = self.ease2_sh_50km() if hem == "sh" else self.ease2_nh_25km() # grid lon/lat for metadata & hemispheric weights lons2d, lats2d = area_def.get_lonlats() ny, nx = area_def.shape # collect swath samples by UTC day by_day: Dict[pd.Timestamp, Dict[str, List[np.ndarray]]] = {} for fp in sorted(map(Path, paths)): try: with xr.open_dataset(fp, decode_times=True, chunks={}) as ds: required = ("sea_ice_thickness", "sea_ice_thickness_uncertainty", "lon", "lat", "time") if not all(v in ds.variables for v in required): # skip quietly if this file is not SIT L2P continue sit_da = self._get_first(ds, ["sea_ice_thickness", "SIT", "sit"]) unc_da = self._get_first(ds, ["sea_ice_thickness_uncertainty", "SIT_unc", "uncertainty"]) lon_da = self._get_first(ds, ["lon", "longitude"]) lat_da = self._get_first(ds, ["lat", "latitude"]) tim_da = self._get_first(ds, ["time", "acquisition_time"]) if any(v is None for v in (sit_da, unc_da, lon_da, lat_da, tim_da)): # Skip quietly if not an SIT L2P file continue sit = sit_da.values.astype("float64", copy=False) unc = unc_da.values.astype("float64", copy=False) lon = lon_da.values.astype("float64", copy=False) lon = self._match_lon_range(lon, lons2d) lat = lat_da.values.astype("float64", copy=False) tt = pd.to_datetime(tim_da.values) except Exception: # unreadable file: skip continue valid = np.isfinite(sit) & np.isfinite(unc) & np.isfinite(lon) & np.isfinite(lat) if lat_cut is not None: valid &= (lat <= lat_cut) if hem == "sh" else (lat >= lat_cut) if not np.any(valid): self.logger.info(f"[skip] {fp.name}: finite(sit)={np.isfinite(sit).sum()} " f"finite(lon)={np.isfinite(lon).sum()} finite(lat)={np.isfinite(lat).sum()}") continue sit, unc, lon, lat, tt = sit[valid], unc[valid], lon[valid], lat[valid], tt[valid] days = pd.to_datetime(tt.floor("D").to_numpy()) for d in np.unique(days): sel = (days == d) if not np.any(sel): continue dkey = pd.Timestamp(d, tz="UTC") rec = by_day.setdefault(dkey, dict(sit=[], unc=[], lon=[], lat=[])) rec["sit"].append(sit[sel]) rec["unc"].append(unc[sel]) rec["lon"].append(lon[sel]) rec["lat"].append(lat[sel]) # empty output if nothing usable if not by_day: return xr.Dataset( coords=dict(time = ("time", np.array([], dtype="datetime64[ns]")), y = ("y", np.arange(ny, dtype=int)), x = ("x", np.arange(nx, dtype=int)), lon = (("y","x"), lons2d), lat = (("y","x"), lats2d))) roi_m = roi_km * 1000.0 sigma_m = None if gaussian_sigma_km is None else gaussian_sigma_km * 1000.0 # allocate outputs days_sorted = sorted(by_day.keys()) T = len(days_sorted) SIT = np.full((T, ny, nx), np.nan, dtype="float32") SIT_unc = np.full((T, ny, nx), np.nan, dtype="float32") U_cell = np.full((T, ny, nx), np.nan, dtype="float32") N_cell = np.zeros((T, ny, nx), dtype="int32") # time coordinate if time_at_noon: tcoord = [pd.Timestamp(d.date()).tz_localize("UTC") + pd.Timedelta(hours=12) for d in days_sorted] else: tcoord = [pd.Timestamp(d.date()).tz_localize("UTC") for d in days_sorted] tcoord = pd.to_datetime(tcoord).tz_localize(None).to_numpy() # resample per day for it, d in enumerate(days_sorted): s = np.concatenate(by_day[d]["sit"]) u = np.concatenate(by_day[d]["unc"]) lo = np.concatenate(by_day[d]["lon"]) la = np.concatenate(by_day[d]["lat"]) swath = geometry.SwathDefinition(lons=lo, lats=la) valid_in_idx, valid_out_idx, index_array, dist_array = kd_tree.get_neighbour_info(source_geo_def = swath, target_geo_def = area_def, radius_of_influence = roi_m, neighbours = neighbours, epsilon = epsilon) if getattr(valid_out_idx, "size", 0) == 0: continue sit2d, se2d, u2d, n2d = self._l2p_weights(index_array = index_array, dist_array = dist_array, valid_input_index = valid_in_idx, valid_output_index = valid_out_idx, swath_SIT = s, swath_unc = u, out_shape = (ny, nx), roi_m = roi_m, sigma_m = sigma_m) SIT[it,:,:] = sit2d SIT_unc[it,:,:] = se2d U_cell[it,:,:] = u2d N_cell[it,:,:] = n2d # hemispheric daily aggregates with cos(lat) weights W = np.cos(np.deg2rad(lats2d)).astype("float64") # (y,x) valid = np.isfinite(SIT) & np.isfinite(W)[None, :, :] S = np.where(valid, SIT, 0.0).astype("float64", copy=False) U = np.where(valid, SIT_unc, 0.0).astype("float64", copy=False) Uc = np.where(valid, U_cell, 0.0).astype("float64", copy=False) W3 = W[None, :, :] wsum = (W3 * valid).sum(axis=(1, 2)) # (time,) sit_num = (S * W3).sum(axis=(1, 2)) ume_num = (Uc * W3).sum(axis=(1, 2)) se_num = ((W3**2) * (U**2)).sum(axis=(1, 2)) SIT_hem = sit_num / np.where(wsum > 0, wsum, np.nan) SIT_hem_u = ume_num / np.where(wsum > 0, wsum, np.nan) SIT_hem_unc = np.sqrt(se_num) / np.where(wsum > 0, wsum, np.nan) SIT_hem_n = N_cell.sum(axis=(1, 2)).astype("int32") ds = xr.Dataset(data_vars = dict(SIT = (("time","y","x"), SIT), SIT_unc = (("time","y","x"), SIT_unc), u_cell = (("time","y","x"), U_cell), n_cell = (("time","y","x"), N_cell), SIT_hem = (("time",), SIT_hem.astype("float32")), SIT_hem_unc = (("time",), SIT_hem_unc.astype("float32")), SIT_hem_u = (("time",), SIT_hem_u.astype("float32")), SIT_hem_n = (("time",), SIT_hem_n),), coords = dict(time = ("time", tcoord), y = ("y", np.arange(ny, dtype=int)), x = ("x", np.arange(nx, dtype=int)), lon = (("y","x"), lons2d.astype("float64")), lat = (("y","x"), lats2d.astype("float64"))), attrs = dict(note = "L2P swath resampled to target grid via pyresample neighbour info; " "Gaussian weighting if gaussian_sigma_km set; SE propagated as sqrt(sum(w^2 u^2))/sum(w).")) ds["SIT"].attrs.update(dict(standard_name="sea_ice_thickness", units="m")) ds["SIT_unc"].attrs.update(dict(standard_name="sea_ice_thickness standard_error", units="m")) return ds
# ------------------------- one-call convenience ------------------------- #
[docs] def make_daily_gridded_SIT(self, dt0_str : str, dtN_str : str, hemisphere : Optional[str] = None, institutions : Optional[Iterable[str]] = ("ESA", "AWI"), sensors : Optional[Iterable[str]] = None, levels : Optional[Iterable[str]] = None, versions : Optional[Iterable[str]] = None, root_esa : Optional[Path] = None, root_awi : Optional[Path] = None, roi_km : float = 75.0, neighbours : int = 16, epsilon : float = 0.0, gaussian_sigma_km : Optional[float] = None, lat_cut : Optional[float] = None, time_at_noon : bool = True, prefer : str = "last", area_def = None) -> xr.Dataset: """ Create a daily gridded sea-ice thickness (SIT) dataset over a time window in one call. This convenience wrapper performs: 1) File discovery via `build_sea_ice_satellite_paths(...)` across ESA and/or AWI layouts. 2) Indexing and de-duplication to one candidate file per UTC day. 3) Swath-to-grid resampling with `l2p_to_daily_grid_pyresample(...)`. Parameters ---------- dt0_str, dtN_str : str Start/end of the time window (inclusive). Any pandas-parsable date string. hemisphere : {"sh","nh"} or None, optional Hemisphere selector; defaults to `self.hemisphere_dict['abbreviation']` when present. institutions : iterable of str or None, default ("ESA","AWI") Institutions to search (case-insensitive). sensors, levels, versions : iterable of str or None, optional Optional filters passed to `build_sea_ice_satellite_paths(...)`. root_esa, root_awi : pathlib.Path or None, optional Root directories. If None, falls back to configured defaults (module globals or class config). roi_km : float, default 75.0 Radius of influence for neighbour search (km). neighbours : int, default 16 Maximum neighbours per target grid cell. epsilon : float, default 0.0 KD-tree epsilon tolerance (speed/accuracy trade-off). gaussian_sigma_km : float or None, optional If set, apply Gaussian distance weights with this sigma (km). If None, uniform weights. lat_cut : float or None, optional Additional latitude filter applied to swath points (hemisphere-aware). time_at_noon : bool, default True If True, output time coordinate is day+12:00Z; otherwise midnight. prefer : {"first","last"}, default "last" If multiple files map to a day, choose first/last after deterministic sorting. area_def : pyresample.geometry.AreaDefinition or None, optional Target grid. If None, uses internal EASE/LAEA-like convenience grids. Returns ------- xarray.Dataset Daily gridded dataset with dimensions (time, y, x) including: - SIT, SIT_unc, u_cell, n_cell (gridded) - SIT_hem, SIT_hem_unc, SIT_hem_u, SIT_hem_n (hemispheric aggregates) See Also -------- build_sea_ice_satellite_paths index_satellite_paths dedup_daily_one_file_per_day l2p_to_daily_grid_pyresample """ # prefer configured roots if not supplied if root_esa is None: root_esa = globals().get("ROOT_ESA_DEF", None) if root_awi is None: root_awi = globals().get("ROOT_AWI_DEF", None) paths = self.build_sea_ice_satellite_paths(dt0_str = dt0_str, dtN_str = dtN_str, hemisphere = hemisphere, institutions = institutions, sensors = sensors, levels = levels, versions = versions, root_esa = root_esa, root_awi = root_awi) df = self.index_satellite_paths(paths) df_daily = self.dedup_daily_one_file_per_day(df, prefer=prefer) daily_paths = df_daily["path"].tolist() return self.l2p_to_daily_grid_pyresample(paths = daily_paths, hem = hemisphere, area_def = area_def, roi_km = roi_km, neighbours = neighbours, epsilon = epsilon, gaussian_sigma_km = gaussian_sigma_km, lat_cut = lat_cut, time_at_noon = time_at_noon)
# ------------------------- LEVEL 3 (MONTLY GRIDDED) ------------------------- #
[docs] def l3c_paths_to_monthly_grid(self, paths : list[Path], hem : str | None = None, sic_thresh_percent : float | None = 15.0, mask_strategy : str = "none", # {"none","quality","status","both"} include_snow : bool = True, include_flags : bool = True, min_obs : int = 1, # if an "n_observations" variable exists time_midpoint : bool = True) -> xr.Dataset: """ Assemble ESA L3C / AWI l3cp_release files (already on LAEA grids) into a single (time, y, x) dataset with hemispheric monthly means. This function mirrors the L2P driver logic, but **no resampling** is done: it reads gridded variables straight from the files and standardizes dims. Parameters ---------- paths : list[Path] Monthly L3 NetCDF paths (ESA L3C or AWI l3cp_release). hem : {'sh','nh'}, optional Hemisphere hint (controls sign in optional lat_cut if you add one later). Defaults to `self.hemisphere_dict['abbreviation']` if present. sic_thresh_percent : float or None, default 15.0 If set, mask cells with sea_ice_concentration < threshold (percent). mask_strategy : {"none","quality","status","both"}, default "none" - "quality" keeps cells with quality_flag == 0 - "status" keeps cells with status_flag == 0 - "both" requires both conditions include_snow : bool, default True Include snow_depth and snow_depth_uncertainty when present. include_flags : bool, default True Include flag fields (quality_flag, status_flag, region_code) when present. min_obs : int, default 1 If a count variable exists (e.g., "n_observations"), require count >= min_obs. time_midpoint : bool, default True If time_bnds variable exists, use midpoint as the time coordinate. Returns ------- xarray.Dataset Dimensions: time, y, x Coordinates: lon(y,x), lat(y,x), time Data variables: SIT(time,y,x) [m] SIT_unc(time,y,x) [m] per-cell uncertainty (SE) SIC(time,y,x) [%] if present snow_depth(time,y,x) [m] if requested/present snow_depth_unc(time,y,x) [m] if requested/present quality_flag/status_flag/region_code if requested/present SIT_hem(time) [m] hemispheric mean SIT_hem_unc(time) [m] propagated SE = sqrt(sum(w^2 * se^2))/sum(w) SIT_hem_u(time) [m] hemispheric mean of per-cell uncertainty SIT_hem_n(time) [#] count of valid contributing cells """ if not paths: raise ValueError("No L3 paths provided") hem = (hem or self.hemisphere_dict.get("abbreviation", "sh")).lower() # Store per-file outputs here then concat along time at the end out_list: list[xr.Dataset] = [] lat_template = None lon_template = None for fp in sorted(map(Path, paths)): try: ds = xr.open_dataset(fp, decode_times=True) except Exception: continue # Flexible getters def G(names): return self._get_first(ds, names) # Dim normalization (yc/xc or y/x) ydim = "yc" if "yc" in ds.dims else ("y" if "y" in ds.dims else None) xdim = "xc" if "xc" in ds.dims else ("x" if "x" in ds.dims else None) if ydim is None or xdim is None: ds.close() continue ds_work = ds.rename({ydim: "y", xdim: "x"}) # Coordinates lat2d = G(["lat"]) lon2d = G(["lon"]) if lat2d is None or lon2d is None: ds.close() continue # Persist a lat/lon template (assumed constant across files) lat_template = lat2d.values if lat_template is None else lat_template lon_template = lon2d.values if lon_template is None else lon_template # Variables of interest SIT = G(["sea_ice_thickness", "SIT", "sit"]) SIT_U = G(["sea_ice_thickness_uncertainty", "SIT_unc", "sit_unc", "uncertainty"]) SIC = G(["sea_ice_concentration", "sic"]) SD = G(["snow_depth", "snow_thickness"]) SDU = G(["snow_depth_uncertainty", "snow_thickness_uncertainty"]) QF = G(["quality_flag"]) SF = G(["status_flag"]) RC = G(["region_code"]) NOBS = G(["n_observations", "num_observations", "count"]) # Required: SIT & SIT_U at least if SIT is None or SIT_U is None: ds.close(); continue # Time coordinate (often size 1 per file, but robust to >1) time = G(["time"]) tbnd = G(["time_bnds", "time_bounds"]) if time is None: ds.close(); continue tvals = pd.to_datetime(time.values) if tvals.ndim == 0: tvals = tvals[None] if time_midpoint and (tbnd is not None): try: tb = xr.DataArray(tbnd).compute().values # (time, nv) tmid = np.mean(tb, axis=1) tvals = pd.to_datetime(tmid, unit="s", origin="unix") except Exception: # fall back to time center pass # Build a mask base_mask = np.isfinite(SIT) & np.isfinite(SIT_U) & np.isfinite(lat2d) & np.isfinite(lon2d) if SIC is not None and sic_thresh_percent is not None: base_mask &= np.isfinite(SIC) & (SIC >= float(sic_thresh_percent)) if NOBS is not None and isinstance(min_obs, (int, float)) and min_obs > 1: base_mask &= (NOBS >= int(min_obs)) if mask_strategy in ("quality", "both") and QF is not None: base_mask &= (QF == 0) if mask_strategy in ("status", "both") and SF is not None: base_mask &= (SF == 0) mask_strategy = (mask_strategy or "none").lower() # Ensure 3D shape: (time, y, x) def _as3(da): v = da.transpose(..., "yc", "xc").values if v.ndim == 2: v = v[None, ...] return v sit3 = _as3(SIT) sunc3 = _as3(SIT_U) mask3 = base_mask if mask3.ndim == 2: mask3 = mask3[None, ...] if SIC is not None: sic3 = _as3(SIC) if SD is not None and include_snow: sd3 = _as3(SD) if SDU is not None and include_snow: sdu3 = _as3(SDU) if QF is not None and include_flags: qf3 = _as3(QF) if SF is not None and include_flags: sf3 = _as3(SF) if RC is not None and include_flags: rc3 = _as3(RC) # Apply mask: keep NaNs outside valid cells S = np.where(mask3, sit3, np.nan).astype("float32", copy=False) U = np.where(mask3, sunc3, np.nan).astype("float32", copy=False) if SIC is not None: SICv = np.where(mask3, sic3, np.nan).astype("float32", copy=False) if SD is not None and include_snow: SDv = np.where(mask3, sd3, np.nan).astype("float32", copy=False) if SDU is not None and include_snow: SDUv = np.where(mask3, sdu3, np.nan).astype("float32", copy=False) # Hemispheric weighting W2 = np.cos(np.deg2rad(lat2d.values)).astype("float64") W3 = W2[None, :, :] valid = np.isfinite(S) wsum = (W3 * valid).sum(axis=(1, 2)) sit_num = (np.where(valid, S, 0.0) * W3).sum(axis=(1, 2)) u_num = (np.where(valid, U, 0.0) * W3).sum(axis=(1, 2)) se_num = ((W3**2) * np.where(valid, U, 0.0)**2).sum(axis=(1, 2)) SIT_hem = sit_num / np.where(wsum > 0, wsum, np.nan) SIT_hem_u = u_num / np.where(wsum > 0, wsum, np.nan) SIT_hem_unc = np.sqrt(se_num) / np.where(wsum > 0, wsum, np.nan) SIT_hem_n = valid.sum(axis=(1, 2)).astype("int32") # ---- NEW: concentration-weighted hemispheric mean (SIC weights) ---- SIT_wgt = np.full((S.shape[0],), np.nan, dtype="float64") SIT_wgt_u = np.full((S.shape[0],), np.nan, dtype="float64") SIT_wgt_unc = np.full((S.shape[0],), np.nan, dtype="float64") SIT_wgt_n = np.zeros((S.shape[0],), dtype="int32") if SIC is not None: # Convert SIC to fraction [0..1] for correct uncertainty propagation. sic_units = (SIC.attrs.get("units","") or "").lower() sic3_raw = _as3(SIC) if ("percent" in sic_units) or ("%" in sic_units): Wsic = sic3_raw.astype("float64") / 100.0 else: # heuristic fallback vmax = np.nanmax(sic3_raw) Wsic = (sic3_raw / 100.0) if np.isfinite(vmax) and vmax > 1.5 else sic3_raw Wsic = Wsic.astype("float64") # Honor the same base mask used for S/U (keeps SIC >= thresh, flags, etc.) # mask3 is (time,y,x); if it's 2D, lift it to 3D m3 = mask3 if getattr(m3, "ndim", 2) == 2: m3 = m3[None, ...] Wsic = np.where(m3, Wsic, np.nan) # Weighted mean thickness DOES NOT require uncertainty to be finite v_sit = np.isfinite(S) & np.isfinite(Wsic) & (Wsic > 0) wsum = (np.where(v_sit, Wsic, 0.0)).sum(axis=(1, 2)) num = (np.where(v_sit, S * Wsic, 0.0)).sum(axis=(1, 2)) SIT_wgt = num / np.where(wsum > 0, wsum, np.nan) SIT_wgt_n = v_sit.sum(axis=(1, 2)).astype("int32") # For the uncertainty aggregates, only use pixels where uncertainty is finite v_u = v_sit & np.isfinite(U) wsum_u = (np.where(v_u, Wsic, 0.0)).sum(axis=(1, 2)) num_u = (np.where(v_u, U * Wsic, 0.0)).sum(axis=(1, 2)) se_num_u = ((np.where(v_u, Wsic, 0.0) ** 2) * (np.where(v_u, U, 0.0) ** 2)).sum(axis=(1, 2)) SIT_wgt_u = num_u / np.where(wsum_u > 0, wsum_u, np.nan) SIT_wgt_unc = np.sqrt(se_num_u) / np.where(wsum_u > 0, wsum_u, np.nan) # Assemble per-file dataset (time may be >1) T = S.shape[0] ds_out = xr.Dataset(data_vars = dict(SIT = (("time","y","x"), S), SIT_unc = (("time","y","x"), U), SIT_hem = (("time",), SIT_hem.astype("float32")), SIT_hem_unc = (("time",), SIT_hem_unc.astype("float32")), SIT_hem_u = (("time",), SIT_hem_u.astype("float32")), SIT_hem_n = (("time",), SIT_hem_n), SIT_wgt = (("time",), SIT_wgt.astype("float32")), SIT_wgt_unc = (("time",), SIT_wgt_unc.astype("float32")), SIT_wgt_u = (("time",), SIT_wgt_u.astype("float32")), SIT_wgt_n = (("time",), SIT_wgt_n)), coords = dict(time = ("time", pd.to_datetime(tvals).tz_localize(None).to_numpy()), y = ("y", np.arange(lat2d.sizes["yc"], dtype=int)), x = ("x", np.arange(lat2d.sizes["xc"], dtype=int)), lon = (("y","x"), lon2d.values.astype("float64")), lat = (("y","x"), lat2d.values.astype("float64")))) if SIC is not None: ds_out["SIC"] = (("time","y","x"), SICv) ds_out["SIC"].attrs.update(dict(standard_name="sea_ice_area_fraction", units="percent")) if include_snow and SD is not None: ds_out["snow_depth"] = (("time","y","x"), SDv) ds_out["snow_depth"].attrs.update(dict(standard_name="surface_snow_thickness_where_sea_ice", units="m")) if include_snow and SDU is not None: ds_out["snow_depth_unc"] = (("time","y","x"), SDUv) ds_out["snow_depth_unc"].attrs.update(dict(standard_name="surface_snow_thickness_where_sea_ice standard_error", units="m")) if include_flags and QF is not None: ds_out["quality_flag"] = (("time","y","x"), qf3.astype("int8")) if include_flags and SF is not None: ds_out["status_flag"] = (("time","y","x"), sf3.astype("int8")) if include_flags and RC is not None: ds_out["region_code"] = (("time","y","x"), rc3.astype("int8")) out_list.append(ds_out) ds.close() if not out_list: # Empty shell with lat/lon if we managed to see one file’s coords return xr.Dataset(coords = dict(time = ("time", np.array([], dtype="datetime64[ns]")), y = ("y", np.arange(lat_template.shape[0], dtype=int) if lat_template is not None else np.array([], dtype=int)), x = ("x", np.arange(lat_template.shape[1], dtype=int) if lat_template is not None else np.array([], dtype=int)), lon = (("y","x"), lon_template if lon_template is not None else np.zeros((0,0))), lat = (("y","x"), lat_template if lat_template is not None else np.zeros((0,0))),)) ds_all = xr.concat(out_list, dim="time").sortby("time") ds_all["SIT"].attrs.update(dict(standard_name="sea_ice_thickness", units="m")) ds_all["SIT_unc"].attrs.update(dict(standard_name="sea_ice_thickness standard_error", units="m")) ds_all.attrs.update(dict(note="Assembled L3 (ESA L3C / AWI l3cp_release) monthly grids with optional QC & SIC threshold; hemispheric means use cos(lat) weights.")) return ds_all
[docs] def make_monthly_gridded_SIT_L3(self, dt0_str : str, dtN_str : str, hemisphere : str | None = None, institutions : Iterable[str] | None = ("ESA","AWI"), sensors : Iterable[str] | None = None, levels : Iterable[str] | None = ("L3C","l3cp_release"), versions : Iterable[str] | None = None, root_esa : Path | None = None, root_awi : Path | None = None, prefer : str = "last", sic_thresh_percent : float | None = 15.0, mask_strategy : str = "none", # {"none","quality","status","both"} include_snow : bool = True, include_flags : bool = True, min_obs : int = 1, time_midpoint : bool = True,) -> xr.Dataset: """ Create a monthly gridded SIT dataset from Level-3 products (ESA L3C / AWI l3cp_release). This convenience wrapper performs: 1) File discovery via `build_sea_ice_satellite_paths(...)`. 2) Indexing and de-duplication to one file per (year, month). 3) Assembly into a single (time, y, x) dataset using `l3c_paths_to_monthly_grid(...)`. Parameters ---------- dt0_str, dtN_str : str Start/end of the time window (inclusive). hemisphere : {"sh","nh"} or None, optional Hemisphere selector; defaults to `self.hemisphere_dict['abbreviation']` when present. institutions, sensors, levels, versions : iterable of str or None, optional Filters passed to `build_sea_ice_satellite_paths(...)`. By default, `levels` targets monthly L3 products (e.g., ("L3C","l3cp_release")). root_esa, root_awi : pathlib.Path or None, optional Root directories. If None, falls back to configured defaults (module globals or class config). prefer : {"first","last"}, default "last" If multiple files map to a month, choose first/last after deterministic sorting. sic_thresh_percent : float or None, default 15.0 If set, mask cells with SIC below this threshold (percent). mask_strategy : {"none","quality","status","both"}, default "none" Optional QC masking using quality_flag and/or status_flag when present. include_snow : bool, default True Include snow depth fields when present. include_flags : bool, default True Include flag fields (quality/status/region code) when present. min_obs : int, default 1 If a per-cell observation count exists, require count >= min_obs. time_midpoint : bool, default True If time bounds are present, set time coordinate to the midpoint. Returns ------- xarray.Dataset Monthly gridded dataset with dimensions (time, y, x), including: - SIT, SIT_unc, optional SIC/snow/flags - SIT_hem* aggregates and optional SIC-weighted SIT aggregates (SIT_wgt*) See Also -------- build_sea_ice_satellite_paths index_satellite_paths dedup_monthly_one_file_per_month l3c_paths_to_monthly_grid """ # default roots from module globals if not specified if root_esa is None: root_esa = globals().get("ROOT_ESA_DEF", None) if root_awi is None: root_awi = globals().get("ROOT_AWI_DEF", None) paths = self.build_sea_ice_satellite_paths(dt0_str = dt0_str, dtN_str = dtN_str, hemisphere = hemisphere, institutions = institutions, sensors = sensors, levels = levels, versions = versions, root_esa = root_esa, root_awi = root_awi) # QA & dedup to one per month df = self.index_satellite_paths(paths) df_mo = self.dedup_monthly_one_file_per_month(df, prefer=prefer) P_mos = df_mo["path"].tolist() return self.l3c_paths_to_monthly_grid(paths = P_mos, hem = hemisphere, sic_thresh_percent = sic_thresh_percent, mask_strategy = mask_strategy, include_snow = include_snow, include_flags = include_flags, min_obs = min_obs, time_midpoint = time_midpoint)