Source code for sea_ice_metrics

from __future__ import annotations
import re, shutil
import xarray as xr
import numpy  as np
import pandas as pd
from pathlib  import Path
__all__ = ["SeaIceMetrics"]
[docs] class SeaIceMetrics: """ Compute diagnostic and skill metrics for processed sea-ice classifications. This class aggregates common post-processing calculations used in AFIM workflows, including: - building standardised input dictionaries for metrics computations (fast ice / pack ice / total sea ice), - computing hemisphere-integrated time series (area, volume, thickness, rates), - computing spatial summary fields (temporal mean thickness; spatial rates), - computing inter-comparison skill statistics against observations, - computing seasonal statistics (timing, growth/retreat slopes, extrema), - computing persistence-distance statistics relative to an Antarctic coastline, - loading previously computed metrics from Zarr and padding/reindexing time. The principal entry point is `compute_sea_ice_metrics()`, which ingests a dictionary of masks and core prognostic/tendency fields (already prepared elsewhere), computes a suite of metrics, and optionally writes the results to a consolidated Zarr archive. Expected external configuration ------------------------------ This class is designed to operate inside the broader AFIM stack and expects several attributes to exist on `self`, typically injected via kwargs or a toolbox manager: - logger : logging.Logger For progress/debug logging. - CICE_dict : dict Must contain at least: * "time_dim" and "spatial_dims" * "drop_coords" (list of coordinate vars to drop before computations) * "FI_chunks" (chunking spec for small 1D/2D outputs) - dt0_str, dtN_str : str Analysis window bounds ("YYYY-MM-DD"). - D_metrics : str | Path Output directory for metrics Zarr archives. - metrics_name : str Suffix used when constructing default metrics filenames. - ice_type : str e.g., "FI", "FI_Tb_bin", etc. Cleaned internally to one of {"FI","PI","SI"}. - FIC_scale : float Scaling factor applied to area (commonly used to convert m² → km² or similar). - sim_config : dict Metadata inserted into output dataset attributes and/or scalars. - AF_FI_dict : dict Must include path to AF2020 fast-ice area observations (e.g., "P_AF2020_FIA"). - BAS_dict : dict Must include coastline shapefile path (e.g., "P_Ant_Cstln") for persistence distance. - BorC2T_type, ispd_thresh, D_zarr, etc. Used by `load_computed_metrics()` to locate metrics on disk. Required external methods (defined elsewhere in your stack) ---------------------------------------------------------- - compute_hemisphere_ice_area(...) - compute_hemisphere_ice_volume(...) - compute_hemisphere_ice_thickness(...) - compute_hemisphere_variable_aggregate(...) - compute_NSIDC_metrics() (for PI/SI observational comparisons) - persistence_stability_index(...) - load_cice_grid(slice_hem=True) - _prepare_BAS_coast(...) - compute_grounded_iceberg_area() (optional, for GI area accounting) Notes ----- - Many methods are written to be safe with Dask-backed inputs. Where appropriate, the implementation forces local computation to avoid shipping large task graphs. - Output writing uses Zarr v2 (explicitly `zarr_format=2`) for compatibility. """ def __init__(self, **kwargs): """ Initialise the metrics helper. Parameters ---------- **kwargs Optional configuration injected into `self` by the surrounding workflow. Typical keys include `logger`, `CICE_dict`, directory paths, and analysis settings (dt0/dtN, thresholds, etc.). Notes ----- - The shown implementation is a no-op; in production you typically assign kwargs onto `self` (as done in your other classes) or inherit a shared base class. """ return @staticmethod def _clean_zarr_chunks(ds): """ Remove per-variable chunk encodings and return an unchunked dataset. Zarr writes can be sensitive to stale or inconsistent `encoding["chunks"]` metadata inherited from prior reads. This helper deletes the `chunks` entry from each variable encoding (if present) and returns `ds.chunk(None)` to remove active Dask chunking. Parameters ---------- ds : xr.Dataset Dataset to sanitise prior to writing. Returns ------- xr.Dataset Dataset with `encoding["chunks"]` removed (where present) and with chunking disabled (`chunk(None)`). Notes ----- - This does not change data values; it only adjusts encoding and chunk metadata. - Use immediately before `.to_zarr(...)` to avoid RTD/runtime confusion and Zarr rechunk warnings. """ for var in ds.variables: if 'chunks' in ds[var].encoding: del ds[var].encoding['chunks'] return ds.chunk(None)
[docs] @staticmethod def thickness_tendency_to_mps(dvt: xr.DataArray) -> xr.DataArray: u = (dvt.attrs.get("units", "") or "").strip().lower() # common cases if u in ["m/s", "m s-1", "m s^-1"]: return dvt if u in ["cm/s", "cm s-1", "cm s^-1"]: return dvt * 0.01 if "cm/day" in u or "cm d-1" in u: return dvt * 0.01 / 86400.0 if "m/day" in u or "m d-1" in u: return dvt / 86400.0 # if unknown, do not silently scale; return as-is and force you to inspect raise ValueError(f"Unrecognized DVT units: '{dvt.attrs.get('units','')}'")
[docs] def metrics_data_dict(self, I_mask, I_data, A, *, ice_type = None, required = None, optional = None): """ Build a standardised input dictionary for FI/PI/SI metrics. This method: - validates required variables exist in `I_data`, - adds optional variables if present, - always includes the ice mask and T-grid area (`tarea`), - and (NEW) auto-adds any referenced cell-area measures (e.g., earea/narea) when a variable declares them via CF `cell_measures`, e.g.: cell_measures: "area: earea" This is needed for lateral-drag stress diagnostics (KuxE/KuxN/KuyE/KuyN). Parameters ---------- I_mask : xr.DataArray Ice-type mask (e.g., FI/PI/SI), typically with dims (time, nj, ni) or (nj, ni). I_data : xr.Dataset or dict-like Source dataset/dictionary containing model variables. A : xr.DataArray T-grid cell area (m^2), stored into the output dict as "tarea". ice_type : str, optional Ice type key ("FI","PI","SI"). Defaults to `self.ice_type`. required : list[str], optional Required variable names that must exist in `I_data`. optional : list[str], optional Optional variable names to include if present in `I_data`. Returns ------- dict Dictionary containing mask, tarea, and required/optional fields. Raises ------ KeyError If any required variables are missing. Notes ----- - NEW: If a present variable declares an area measure via `cell_measures`, and that area variable exists in `I_data`, it is added automatically. This supports stress diagnostics with earea/narea weights. """ import re ice_type = ice_type or self.ice_type self._check_ice_type(ice_type) self.define_ice_mask_name(ice_type=ice_type) # Defaults (tune as needed) required = required or ["aice", "hi", "uvel", "vvel"] optional = optional or ["strength", "dvidtt", "dvidtd", "daidtt", "daidtd", # NEW: lateral-drag stress outputs (if written by CICE) "KuxE", "KuxN", "KuyE", "KuyN", # NEW: possible area weights (often static 2-D fields) "earea", "narea", "uarea"] out = {self.mask_name: I_mask, "tarea": A} missing_required = [v for v in required if not self._has(I_data, v)] if missing_required: raise KeyError(f"metrics_data_dict missing required variables in I_data: {missing_required}") # Add required for v in required: out[v] = self._get(I_data, v) # Add optional missing_optional = [] for v in optional: if self._has(I_data, v): out[v] = self._get(I_data, v) else: missing_optional.append(v) if missing_optional: try: self.logger.info(f"metrics_data_dict: skipping missing optional vars: {missing_optional}") except Exception: pass # ------------------------------------------------------------------ # NEW: auto-add any referenced area measures from CF cell_measures # ------------------------------------------------------------------ try: area_vars_needed = set() pat = re.compile(r"area:\s*([A-Za-z0-9_]+)") for k, da in list(out.items()): if not isinstance(da, xr.DataArray): continue cm = da.attrs.get("cell_measures", "") or da.attrs.get("cell_measures:area", "") m = pat.search(str(cm)) if m: area_vars_needed.add(m.group(1)) for avar in sorted(area_vars_needed): if avar in out: continue if self._has(I_data, avar): out[avar] = self._get(I_data, avar) self.logger.info(f"metrics_data_dict: auto-added area measure '{avar}' referenced by cell_measures") else: self.logger.info(f"metrics_data_dict: variable(s) reference area '{avar}' via cell_measures, but '{avar}' not found in I_data") except Exception as e: self.logger.debug(f"metrics_data_dict: cell_measures parsing skipped ({e})") return out
def _is_all_nan(self, da): try: return bool(da.isnull().all().compute()) except Exception: try: return bool(np.all(np.isnan(da))) except Exception: return False def _is_valid_metric_input(self, obj): if obj is None: return False if isinstance(obj, xr.DataArray): try: return not self._is_all_nan(obj) except Exception: return True return True def _check_required_metric_inputs(self, required): """ Parameters ---------- required : dict[str, Any] Mapping of logical input name -> object Returns ------- tuple[bool, list[str]] (is_ok, missing_or_invalid_names) """ bad = [] for name, obj in required.items(): if obj is None: bad.append(name) elif isinstance(obj, xr.DataArray) and not self._is_valid_metric_input(obj): bad.append(name) return (len(bad) == 0), bad def _compute_named_metric(self, store, tag, out_key, func, required = None, post = None, load_result = False, **kwargs): """ Safely compute one metric and insert it into `store`. """ required = required or {} ok, bad = self._check_required_metric_inputs(required) if not ok: self.logger.info(f"skipping {tag}: missing/invalid inputs {bad}") return None try: out = func(**kwargs) if post is not None: out = post(out) if load_result and isinstance(out, xr.DataArray): out = out.load() store[out_key] = out return out except Exception as e: self.logger.warning(f"{tag} failed: {e}") return None def _parse_area_name_from_cell_measures(self, da): cm = str(da.attrs.get("cell_measures", "") or "") m = re.search(r"area:\s*([A-Za-z0-9_]+)", cm) return m.group(1) if m else None def _get_tau_area(self, tau, da_dict, fallback_area=None): """ Prefer explicit edge area from cell_measures, else fallback to tarea. """ aname = self._parse_area_name_from_cell_measures(tau) if aname and aname in da_dict and da_dict[aname] is not None: return da_dict[aname], aname if fallback_area is not None: return fallback_area, "tarea" return None, None def _get_tau_mask(self, tau, mask): """ Align the classification mask to stress field geometry where possible. """ if mask is None: return None try: m = (mask > 0) m, tau_al = xr.align(m, tau, join="inner") return m.astype(bool) except Exception: try: return (mask > 0) except Exception: return None def _add_stress_aggregate(self, mets, da_dict, tau, base_name, spatial_dim_names, mask = None, fallback_area = None): """ Compute area-weighted stress aggregates and append them to `mets`. """ if tau is None or not self._is_valid_metric_input(tau): self.logger.info(f"skipping {base_name}: missing or all-NaN") return A_tau, A_name = self._get_tau_area(tau, da_dict, fallback_area=fallback_area) if A_tau is None: self.logger.info(f"skipping {base_name}: no area weights available") return m_tau = self._get_tau_mask(tau, mask) self.logger.info(f"computing lateral-drag stress aggregate: {base_name}") self.logger.debug(f" using area weights : {A_name}") self.logger.debug(f" output units : Pa") try: ds_tau = self.compute_hemisphere_area_weighted_stress(tau = tau, A = A_tau, spatial_dim_names = spatial_dim_names, mask = m_tau, out_units = "Pa", return_abs = True, min_area_m2 = 0.0, name = base_name) if "valid_area_m2" in ds_tau: ds_tau = ds_tau.rename({"valid_area_m2": f"{base_name}_valid_area_m2"}) for v in ds_tau.data_vars: mets[v] = ds_tau[v] except Exception as e: self.logger.warning(f"{base_name} stress aggregate failed: {e}") def _compute_area_skill_and_seasonal(self, IA, ice_type): """ Return (IA_skill, IA_seasonal) """ IA_skill = {} IA_seasonal = {} if IA is None or not isinstance(IA, xr.DataArray) or not self._is_valid_metric_input(IA): self.logger.info("skipping skill/seasonal stats: IA not available") return IA_skill, IA_seasonal try: if ice_type == "FI": self.logger.info(f"loading FI obs: {self.AF_FI_dict['P_AF2020_FIA']}") IA_obs = xr.open_dataset(self.AF_FI_dict["P_AF2020_FIA"], engine="netcdf4")["AF2020"] elif ice_type in ("PI", "SI"): NSIDC = self.compute_NSIDC_metrics() IA_obs = NSIDC["SIA"] else: IA_obs = None if IA_obs is not None: IA_obs = IA_obs.load() result = self.compute_skill_statistics(IA, IA_obs) IA_skill = result if isinstance(result, dict) else {} except Exception as e: self.logger.warning(f"compute_skill_statistics failed: {e}") IA_skill = {} try: result = self.compute_seasonal_statistics(IA, stat_name=f"{ice_type}A") if isinstance(result, tuple): IA_seasonal = result[0] if isinstance(result[0], dict) else {} elif isinstance(result, dict): IA_seasonal = result else: IA_seasonal = {} except Exception as e: self.logger.warning(f"compute_seasonal_statistics failed: {e}") IA_seasonal = {} return IA_skill, IA_seasonal def _compute_persistence_summaries(self, ice_type, I_mask, A, IP): """ Return (IP_stab, IP_dist) """ IP_stab = {} IP_dist = {} if ice_type != "FI": return IP_stab, IP_dist if I_mask is not None and A is not None: try: result = self.persistence_stability_index(I_mask, A) IP_stab = result if isinstance(result, dict) else {} except Exception as e: self.logger.warning(f"persistence_stability_index failed: {e}") else: self.logger.info("skipping persistence_stability_index: missing mask or area") if IP is not None: try: result = self.persistence_ice_distance_mean_max(IP) IP_dist = result if isinstance(result, dict) else {} except Exception as e: self.logger.warning(f"persistence_ice_distance_mean_max failed: {e}") else: self.logger.info("skipping persistence distance: IP not available") return IP_stab, IP_dist def _build_metrics_dataset(self, mets, summary_vars=None, attrs=None): """ Assemble final metrics dataset from DataArrays and scalar summaries. """ summary_vars = summary_vars or {} attrs = attrs or {} ds = xr.Dataset() for k, v in mets.items(): if v is None: continue if isinstance(v, xr.DataArray): ds[k] = v else: ds[k] = xr.DataArray(v, dims=()) for k, v in summary_vars.items(): ds[k] = xr.DataArray(v, dims=()) for k, v in attrs.items(): ds.attrs[k] = v return ds def _prepare_dataset_for_zarr(self, ds): """ Rechunk final metrics dataset so every variable has uniform chunking acceptable to Zarr. For metrics outputs, using one chunk per dimension is usually fine. """ if hasattr(ds, "unify_chunks"): ds = ds.unify_chunks() chunk_map = {dim: -1 for dim in ds.dims} if chunk_map: ds = ds.chunk(chunk_map) ds = self._strip_unsafe_zarr_encoding(ds) return ds def _write_dataset_zarr(self, ds, store, overwrite=False, consolidated=True): import shutil store = Path(store) if store.exists(): if not overwrite: self.logger.info(f"Store already exists, skipping write: {store}") return False shutil.rmtree(store) ds = self._prepare_dataset_for_zarr(ds) ds.to_zarr(store, mode="w", consolidated=consolidated, zarr_format=2) return True
[docs] def compute_sea_ice_metrics(self, da_dict, P_mets_zarr, ice_type = None, dt0_str = None, dtN_str = None, ice_area_scale = None, overwrite_zarr = False): ice_type = ice_type or self.ice_type dt0_str = dt0_str or self.dt0_str dtN_str = dtN_str or self.dtN_str self._check_ice_type(ice_type) self.define_ice_mask_name(ice_type=ice_type) if ice_area_scale is None: ice_area_scale = self.FIC_scale if ice_type == "FI" else self.SIC_scale spatial_dim_names = self.CICE_dict["spatial_dims"] time_dim = self.CICE_dict["time_dim"] # Early skip: do this BEFORE several hours of compute if P_mets_zarr and Path(P_mets_zarr).exists() and not overwrite_zarr: self.logger.info(f"metrics store already exists, skipping compute: {P_mets_zarr}") try: return xr.open_zarr(P_mets_zarr, consolidated=True) except Exception: self.logger.warning(f"existing metrics store could not be opened cleanly: {P_mets_zarr}") return xr.Dataset() self.logger.info(f"¡¡¡ COMPUTING ICE METRICS for {ice_type} !!!") self.logger.info(f" results to {P_mets_zarr}") # Pull available inputs I_mask = da_dict.get(self.mask_name) I_C = da_dict.get("aice") I_T = da_dict.get("hi") I_S = da_dict.get("strength") I_TVT = da_dict.get("dvidtt") I_MVT = da_dict.get("dvidtd") I_TAT = da_dict.get("daidtt") I_MAT = da_dict.get("daidtd") A = da_dict.get("tarea") KuxE = da_dict.get("KuxE") KuxN = da_dict.get("KuxN") KuyE = da_dict.get("KuyE") KuyN = da_dict.get("KuyN") mets = {} # --- Core 1D metrics --- IA = self._compute_named_metric(mets, "ice area", f"{ice_type}A", self.compute_hemisphere_ice_area, required = {"aice": I_C, "tarea": A}, SIC = I_C, A = A, ice_area_scale = ice_area_scale) IV = self._compute_named_metric(mets, "ice volume", f"{ice_type}V", self.compute_hemisphere_ice_volume, required = {"aice": I_C, "hi": I_T, "tarea": A}, SIC = I_C, HI = I_T, A = A) IT = self._compute_named_metric(mets, "ice thickness", f"{ice_type}T", self.compute_hemisphere_ice_thickness, required = {"aice": I_C, "hi": I_T, "tarea": A}, SIC = I_C, HI = I_T, A = A) IS = self._compute_named_metric(mets, "ice strength (1D, area-weighted hPa)", f"{ice_type}S", self.compute_area_weighted_strength_hpa, required = {"aice": I_C, "hi": I_T, "strength": I_S, "tarea": A}, SIC = I_C, HI = I_T, IS = I_S, A = A, spatial_dim_names = spatial_dim_names, sic_threshold = self.icon_thresh, hmin = 0.05, assume_IS_units = "N/m") ITVR = None if IV is not None and A is not None: ITVR = self._compute_named_metric(mets, "thermo volume rate (1D)", f"{ice_type}TVR", self.compute_hemisphere_ice_volume_rate, required = {"aice": I_C, "dvidtt": I_TVT, "IV": IV, "tarea": A}, DVT = I_TVT, IV = IV, A = A, SIC = I_C, mode = "absolute", out_units = "per_day") else: self.logger.info("skipping thermo volume rate: requires IV and area") IMVR = None if IV is not None and A is not None: IMVR = self._compute_named_metric(mets, "dynamic volume rate (1D)", f"{ice_type}MVR", self.compute_hemisphere_ice_volume_rate, required = {"aice": I_C, "dvidtd": I_MVT, "IV": IV, "tarea": A}, DVT = I_MVT, IV = IV, A = A, SIC = I_C, mode = "absolute", out_units = "per_day") else: self.logger.info("skipping dynamic volume rate: requires IV and area") ITAR = None if IA is not None and A is not None: ITAR = self._compute_named_metric(mets, "thermo area rate (1D)", f"{ice_type}TAR", self.compute_hemisphere_ice_area_rate, required = {"daidtt": I_TAT, "IA": IA, "tarea": A}, DAT = I_TAT, IA = IA, A = A, mode = "absolute", out_units = "per_day") else: self.logger.info("skipping thermo area rate: requires IA and area") IMAR = None if IA is not None and A is not None: IMAR = self._compute_named_metric(mets, "dynamic area rate (1D)", f"{ice_type}MAR", self.compute_hemisphere_ice_area_rate, required = {"daidtd": I_MAT, "IA": IA, "tarea": A}, DAT = I_MAT, IA = IA, A = A, mode = "absolute", out_units = "per_day") else: self.logger.info("skipping dynamic area rate: requires IA and area") IP = self._compute_named_metric(mets, "persistence aggregate (2D)", f"{ice_type}P", self.compute_hemisphere_variable_aggregate, required = {"aice": I_C}, da = I_C) # --- Lateral drag stress aggregates --- self._add_stress_aggregate(mets, da_dict, KuxE, f"{ice_type}KuxE", spatial_dim_names = spatial_dim_names, mask = I_mask, fallback_area = A) self._add_stress_aggregate(mets, da_dict, KuyE, f"{ice_type}KuyE", spatial_dim_names = spatial_dim_names, mask = I_mask, fallback_area = A) self._add_stress_aggregate(mets, da_dict, KuxN, f"{ice_type}KuxN", spatial_dim_names = spatial_dim_names, mask = I_mask, fallback_area = A) self._add_stress_aggregate(mets, da_dict, KuyN, f"{ice_type}KuyN", spatial_dim_names = spatial_dim_names, mask = I_mask, fallback_area = A) if KuxE is not None and KuyE is not None and self._is_valid_metric_input(KuxE) and self._is_valid_metric_input(KuyE): try: KuE_mag = xr.apply_ufunc(np.hypot, KuxE, KuyE, dask="allowed") KuE_mag.name = "KuE_mag" self._add_stress_aggregate(mets, da_dict, KuE_mag, f"{ice_type}KuE_mag", spatial_dim_names = spatial_dim_names, mask = I_mask, fallback_area = A) except Exception as e: self.logger.warning(f"{ice_type}KuE_mag failed: {e}") if KuxN is not None and KuyN is not None and self._is_valid_metric_input(KuxN) and self._is_valid_metric_input(KuyN): try: KuN_mag = xr.apply_ufunc(np.hypot, KuxN, KuyN, dask="allowed") KuN_mag.name = "KuN_mag" self._add_stress_aggregate(mets, da_dict, KuN_mag, f"{ice_type}KuN_mag", spatial_dim_names = spatial_dim_names, mask = I_mask, fallback_area = A) except Exception as e: self.logger.warning(f"{ice_type}KuN_mag failed: {e}") # --- Spatial summary diagnostics --- self._compute_named_metric(mets, "ice thickness temporal-mean", f"{ice_type}HI", lambda da: da.mean(dim=time_dim), required = {"hi": I_T}, da = I_T) if I_S is not None and I_T is not None and self._is_valid_metric_input(I_S) and self._is_valid_metric_input(I_T): try: self.logger.info("computing ice strength temporal-sum; units mPa") mets[f"{ice_type}ST"] = (I_S / I_T.where(I_T > 0)).sum(dim=time_dim) / 1e6 except Exception as e: self.logger.warning(f"strength temporal-sum failed: {e}") if I_TVT is not None and self._is_valid_metric_input(I_TVT): try: self.logger.info("computing ice volume tendency spatial rate; units m/yr") mets[f"{ice_type}TVR_YR"] = ((I_TVT * 1e2).mean(dim=time_dim) / 3.65) except Exception as e: self.logger.warning(f"TVR_YR failed: {e}") if I_MVT is not None and self._is_valid_metric_input(I_MVT): try: self.logger.info("computing ice volume tendency spatial rate; units m/yr") mets[f"{ice_type}MVR_YR"] = ((I_MVT * 1e2).mean(dim=time_dim) / 3.65) except Exception as e: self.logger.warning(f"MVR_YR failed: {e}") if I_TAT is not None and A is not None and self._is_valid_metric_input(I_TAT) and self._is_valid_metric_input(A): try: self.logger.info("computing ice area tendency spatial rate") mets[f"{ice_type}TAR_YR"] = ((I_TAT * A).mean(dim=time_dim) / 31_536_000) except Exception as e: self.logger.warning(f"TAR_YR failed: {e}") if I_MAT is not None and A is not None and self._is_valid_metric_input(I_MAT) and self._is_valid_metric_input(A): try: self.logger.info("computing ice area tendency spatial rate") mets[f"{ice_type}MAR_YR"] = ((I_MAT * A).mean(dim=time_dim) / 31_536_000) except Exception as e: self.logger.warning(f"MAR_YR failed: {e}") # --- Skill / seasonal / persistence summaries --- IA_skill, IA_seasonal = self._compute_area_skill_and_seasonal(IA, ice_type) IP_stab, IP_dist = self._compute_persistence_summaries(ice_type, I_mask, A, IP) summary_vars = {} summary_vars.update({f"{ice_type}A_{k}": v for k, v in IA_seasonal.items()}) summary_vars.update(IP_stab) summary_vars.update(IP_dist) summary_vars.update(IA_skill) attrs = dict(self.sim_config) if hasattr(self, "sim_config") and self.sim_config else {} DS_METS = self._build_metrics_dataset(mets, summary_vars=summary_vars, attrs=attrs) # --- Save to Zarr --- if P_mets_zarr: written = self._write_dataset_zarr(DS_METS, P_mets_zarr, overwrite=overwrite_zarr, consolidated=True) if written: self.logger.info(f"Metrics written to {P_mets_zarr}") else: self.logger.info(f"Metrics write skipped: {P_mets_zarr}") return DS_METS
def _subset_and_pad_time(self, ds: xr.Dataset, dt0_str: str, dtN_str: str, time_dim: str = "time", drop_conflicting: bool = True) -> xr.Dataset: """ Reindex a dataset to a requested inclusive time axis, padding with NaNs as needed. Drops variables whose `time_dim` length conflicts with the indexed time coordinate. """ if time_dim not in ds.dims and time_dim not in ds.coords and time_dim not in ds.variables: return ds if dt0_str is None or dtN_str is None: return ds # Require an actual time coordinate/index variable if time_dim not in ds.variables: self.logger.warning(f"{time_dim!r} not present as a variable; skipping reindex") return ds t = ds[time_dim] if t.ndim != 1 or t.size == 0: return ds # Build requested axis is_datetime64 = np.issubdtype(t.values.dtype, np.datetime64) if is_datetime64: start = np.datetime64(dt0_str) end = np.datetime64(dtN_str) else: try: import cftime except Exception as e: raise RuntimeError( "Dataset time coordinate appears to be cftime/object, but cftime is not available." ) from e cal = t.encoding.get("calendar", None) or t.attrs.get("calendar", None) or "standard" start = xr.cftime_range(start=dt0_str, periods=1, calendar=cal)[0] end = xr.cftime_range(start=dtN_str, periods=1, calendar=cal)[0] if start > end: raise ValueError(f"Requested start > end: {dt0_str} > {dtN_str}") if t.size >= 2: step = t.values[1] - t.values[0] else: step = np.timedelta64(1, "D") if is_datetime64 else __import__("datetime").timedelta(days=1) if is_datetime64: desired = np.arange(start, end + step, step) else: desired = [] cur = start while cur <= end: desired.append(cur) cur = cur + step desired = np.array(desired, dtype=object) # Identify vars that cannot be aligned to this time axis tlen = t.sizes[time_dim] bad_vars = [ name for name, var in ds.variables.items() if (time_dim in var.dims and var.sizes[time_dim] != tlen) ] if bad_vars: msg = ( f"Found variables with conflicting {time_dim!r} length relative to " f"{time_dim!r} coord ({tlen}): {bad_vars}" ) if drop_conflicting: self.logger.warning(msg + " -- dropping them before reindex") ds = ds.drop_vars(bad_vars, errors="ignore") else: raise ValueError(msg) return ds.reindex({time_dim: desired})
[docs] def load_computed_metrics(self, class_method : str = "binary-days", BorC2T_type : str = None, ice_type : str = None, ispd_thresh : float = None, bin_min_days : int = None, bin_win_days : int = None, mean_period : int = None, zarr_directory : str = None, clip_to_self : bool = True, time_dim : str = "time", vars_keep : list[str] | None = None, eager : bool = False): BorC2T_type = BorC2T_type or self.BorC2T_type ice_type = ice_type or self.ice_type ispd_thresh = self.ispd_thresh if ispd_thresh is None else ispd_thresh D_zarr = zarr_directory or self.D_zarr P_mets_zarr = self._resolve_product_store( ice_type = ice_type, class_method = class_method, product = "mets", D_zarr = D_zarr, BorC2T_type = BorC2T_type, ispd_thresh = ispd_thresh, bin_win_days = bin_win_days, bin_min_days = bin_min_days, mean_period = mean_period, ) self.logger.info(f"Loading metrics Zarr: {P_mets_zarr}") if not Path(P_mets_zarr).exists(): raise FileNotFoundError(f"Metrics Zarr does not exist: {P_mets_zarr}") ds = xr.open_zarr( P_mets_zarr, consolidated=None, chunks=None if eager else "auto", decode_coords=True, create_default_indexes=True, ) if vars_keep is not None: missing = [v for v in vars_keep if v not in ds.variables] if missing: self.logger.warning(f"Requested vars not present: {missing}") keep = [v for v in vars_keep if v in ds.variables] if time_dim in ds.variables and time_dim not in keep: keep = [time_dim] + keep ds = ds[keep] if clip_to_self: self.logger.info(f"Subsetting and padding time to {self.dt0_str} and {self.dtN_str}") ds = self._subset_and_pad_time(ds, self.dt0_str, self.dtN_str, time_dim=time_dim) if eager: self.logger.info("Loading selected metrics eagerly into memory") ds = ds.load() return ds
[docs] def compute_hemisphere_area_weighted_stress(self, tau: xr.DataArray, A: xr.DataArray, spatial_dim_names: list | None = None, mask : xr.DataArray | None = None, out_units : str = "Pa", # "Pa" | "kPa" | "hPa" return_abs : bool = True, # if True, also return |tau| min_area_m2 : float = 0.0, # if >0, mask timesteps with too little valid area name : str | None = None) -> xr.Dataset: """ Compute a hemispheric, area-weighted aggregate of a stress component (or magnitude). Designed for stress-like diagnostics such as lateral-drag stresses (e.g., KuxE, KuyN) whose native units are N/m^2 (= Pa). This yields a hemispheric mean stress time series with **sensible physical units** (Pa, hPa, or kPa) and a robust treatment of masks. The aggregate is an area-weighted mean over the valid region: <tau>_A(t) = sum( tau(t,...) * A(...) ) / sum( A(...) ) If `return_abs=True`, this also returns the area-weighted mean of |tau|, which is often more interpretable for signed stress components that alternate in sign over space. Parameters ---------- tau : xr.DataArray Stress component field in N/m^2 (Pa). Must include a time dimension and spatial dimensions in `spatial_dim_names`. A : xr.DataArray Grid-cell area (m^2) corresponding to `tau`'s grid (e.g., earea for E-grid, narea for N-grid). Broadcastable to tau's spatial dims. spatial_dim_names : list[str], optional Spatial dimensions to reduce over (e.g., ["nj", "ni"]). Defaults to `self.CICE_dict["spatial_dims"]`. mask : xr.DataArray, optional Boolean mask (True=keep) broadcastable to tau. Use this to restrict aggregation to a region (e.g., fast-ice mask, SIC threshold mask, coastal mask, etc.). If None, uses all finite tau cells. out_units : {"Pa","hPa","kPa"}, default "Pa" Unit for the returned time series. return_abs : bool, default True If True, return both signed mean stress and mean absolute stress. min_area_m2 : float, default 0.0 If >0, timesteps where the valid area sum is <= min_area_m2 are masked (set to NaN). Helpful to prevent noisy values when the valid mask becomes tiny. name : str, optional Name root used for output variables. Defaults to `tau.name` if present, else "tau". Returns ------- xr.Dataset Dataset containing: - f"{name}_mean" : area-weighted mean stress (signed) - f"{name}_abs_mean" : area-weighted mean |stress| (if return_abs=True) - "valid_area_m2" : total valid area used in weighting Units of *_mean variables are Pa/hPa/kPa as requested. Notes ----- - N/m^2 is exactly Pascal. So converting to hPa uses /100, and to kPa uses /1000. - Prefer |tau| (absolute mean) when comparing “strength” of drag across experiments, because signed components can cancel spatially. - If you want a vector magnitude across components (e.g., sqrt(Kux^2+Kuy^2)), compute that first and pass as `tau`. """ spatial_dim_names = spatial_dim_names if spatial_dim_names is not None else self.CICE_dict["spatial_dims"] nm = name or getattr(tau, "name", None) or "tau" self.logger.info(f"Computing **AREA-WEIGHTED STRESS AGGREGATE** for '{nm}'") self.logger.debug(f" • out_units : {out_units}") self.logger.debug(f" • return_abs : {return_abs}") self.logger.debug(f" • min_area_m2 : {min_area_m2:.3e} m^2") self.logger.debug(f" • Spatial dimensions : {spatial_dim_names}") self.logger.debug(f" • tau units (attrs) : {tau.attrs.get('units', 'unknown')}") self.logger.debug(f" • tau long_name : {tau.attrs.get('long_name', 'unknown')}") self.logger.debug("\n[Stress Aggregation Steps]\n" " 1. Define valid mask: finite(tau) & optional user mask\n" " 2. Compute area-weighted mean: sum(tau*A)/sum(A)\n" " 3. Optionally compute area-weighted mean absolute stress: sum(|tau|*A)/sum(A)\n" " 4. Convert Pa -> requested units (Pa/hPa/kPa)\n" " 5. Optionally mask timesteps where valid area is too small") # base validity mask valid = np.isfinite(tau) if mask is not None: valid = valid & mask w = A.where(valid) valid_area = w.sum(dim=spatial_dim_names) # signed mean tau_mean = (tau.where(valid) * w).sum(dim=spatial_dim_names) / valid_area # absolute mean if return_abs: tau_abs_mean = (np.abs(tau.where(valid)) * w).sum(dim=spatial_dim_names) / valid_area # unit conversion from Pa if out_units not in ("Pa", "hPa", "kPa"): raise ValueError(f"Unknown out_units='{out_units}'. Expected 'Pa', 'hPa', or 'kPa'.") if out_units == "hPa": scale = 1.0 / 100.0 elif out_units == "kPa": scale = 1.0 / 1000.0 else: scale = 1.0 tau_mean = tau_mean * scale tau_mean.attrs["units"] = out_units tau_mean.attrs["long_name"] = f"Area-weighted mean stress ({nm})" tau_mean.attrs["aggregation"] = f"area_weighted_mean_over={spatial_dim_names}" out = {f"{nm}_mean" : tau_mean, "valid_area_m2" : valid_area} if return_abs: tau_abs_mean = tau_abs_mean * scale tau_abs_mean.attrs["units"] = out_units tau_abs_mean.attrs["long_name"] = f"Area-weighted mean absolute stress ({nm})" tau_abs_mean.attrs["aggregation"] = f"area_weighted_mean_over={spatial_dim_names}" out[f"{nm}_abs_mean"] = tau_abs_mean ds_out = xr.Dataset(out) if min_area_m2 and float(min_area_m2) > 0.0: self.logger.debug("Applying min_area_m2 gating to output time series") gate = ds_out["valid_area_m2"] > min_area_m2 for v in ds_out.data_vars: if v != "valid_area_m2": ds_out[v] = ds_out[v].where(gate) return ds_out
[docs] def compute_hemisphere_lateral_drag_stress(self, KuxE : xr.DataArray | None = None, KuxN : xr.DataArray | None = None, KuyE : xr.DataArray | None = None, KuyN : xr.DataArray | None = None, earea : xr.DataArray | None = None, narea : xr.DataArray | None = None, spatial_dim_names : list | None = None, maskE : xr.DataArray | None = None, maskN : xr.DataArray | None = None, out_units : str = "Pa", min_area_m2 : float = 0.0) -> xr.Dataset: """ Convenience wrapper to compute hemispheric aggregates for lateral-drag stress components. This computes area-weighted mean and mean-absolute stresses for any subset of: KuxE, KuxN, KuyE, KuyN using their appropriate grid-cell area weights: - E-grid stresses weighted by `earea` - N-grid stresses weighted by `narea` Parameters ---------- KuxE, KuxN, KuyE, KuyN : xr.DataArray, optional Stress component fields (Pa) on E or N grids. earea, narea : xr.DataArray, optional Grid-cell areas (m^2) for E- and N-grids. spatial_dim_names : list[str], optional Spatial reduction dims. Defaults to `self.CICE_dict["spatial_dims"]`. maskE, maskN : xr.DataArray, optional Optional boolean masks for E-grid and N-grid fields. out_units : {"Pa","hPa","kPa"}, default "Pa" Units for output time series. min_area_m2 : float, default 0.0 Gating threshold for valid area per timestep. Returns ------- xr.Dataset Dataset containing time series for each provided field: - <name>_mean - <name>_abs_mean - <name>_valid_area_m2 Plus, if both x and y are present on the same grid, a magnitude aggregate: - K_uE_mag_mean / abs_mean (from KuxE,KuyE) - K_uN_mag_mean / abs_mean (from KuxN,KuyN) Notes ----- - For interpretation across experiments, the **absolute mean** and the **magnitude** are usually the most stable. """ spatial_dim_names = spatial_dim_names if spatial_dim_names is not None else self.CICE_dict["spatial_dims"] self.logger.info("Computing **HEMISPHERE LATERAL-DRAG STRESS AGGREGATES**") self.logger.debug(f" • out_units : {out_units}") self.logger.debug(f" • min_area_m2 : {min_area_m2:.3e} m^2") self.logger.debug(f" • Spatial dimensions : {spatial_dim_names}") out = [] # --- E grid components --- if (KuxE is not None) and (earea is None): raise ValueError("KuxE provided but earea is None.") if (KuyE is not None) and (earea is None): raise ValueError("KuyE provided but earea is None.") if KuxE is not None: ds = self.compute_hemisphere_area_weighted_stress(tau = KuxE, A = earea, spatial_dim_names = spatial_dim_names, mask = maskE, out_units = out_units, return_abs = True, min_area_m2 = min_area_m2, name = "KuxE") ds = ds.rename({"valid_area_m2": "KuxE_valid_area_m2"}) out.append(ds) if KuyE is not None: ds = self.compute_hemisphere_area_weighted_stress(tau = KuyE, A = earea, spatial_dim_names = spatial_dim_names, mask = maskE, out_units = out_units, return_abs = True, min_area_m2 = min_area_m2, name = "KuyE") ds = ds.rename({"valid_area_m2": "KuyE_valid_area_m2"}) out.append(ds) if (KuxE is not None) and (KuyE is not None): KmagE = np.hypot(KuxE, KuyE) KmagE.name = "K_uE_mag" ds = self.compute_hemisphere_area_weighted_stress(tau = KmagE, A = earea, spatial_dim_names = spatial_dim_names, mask = maskE, out_units = out_units, return_abs = True, min_area_m2 = min_area_m2, name = "K_uE_mag") ds = ds.rename({"valid_area_m2": "K_uE_mag_valid_area_m2"}) out.append(ds) # --- N grid components --- if (KuxN is not None) and (narea is None): raise ValueError("KuxN provided but narea is None.") if (KuyN is not None) and (narea is None): raise ValueError("KuyN provided but narea is None.") if KuxN is not None: ds = self.compute_hemisphere_area_weighted_stress(tau = KuxN, A = narea, spatial_dim_names = spatial_dim_names, mask = maskN, out_units = out_units, return_abs = True, min_area_m2 = min_area_m2, name = "KuxN") ds = ds.rename({"valid_area_m2": "KuxN_valid_area_m2"}) out.append(ds) if KuyN is not None: ds = self.compute_hemisphere_area_weighted_stress(tau = KuyN, A = narea, spatial_dim_names = spatial_dim_names, mask = maskN, out_units = out_units, return_abs = True, min_area_m2 = min_area_m2, name = "KuyN") ds = ds.rename({"valid_area_m2": "KuyN_valid_area_m2"}) out.append(ds) if (KuxN is not None) and (KuyN is not None): KmagN = np.hypot(KuxN, KuyN) KmagN.name = "K_uN_mag" ds = self.compute_hemisphere_area_weighted_stress(tau = KmagN, A = narea, spatial_dim_names = spatial_dim_names, mask = maskN, out_units = out_units, return_abs = True, min_area_m2 = min_area_m2, name = "K_uN_mag") ds = ds.rename({"valid_area_m2": "K_uN_mag_valid_area_m2"}) out.append(ds) if not out: raise ValueError("No stress fields provided. Supply at least one of KuxE/KuxN/KuyE/KuyN.") return xr.merge(out)
[docs] def compute_hemisphere_ice_area_rate(self, DAT: xr.DataArray, IA: xr.DataArray, A: xr.DataArray, spatial_dim_names : list | None = None, mode : str = "fractional", # "fractional" (1/s or 1/day) OR "absolute" (km^2/s or km^2/day) out_units : str = "per_day", # "per_day" | "per_second" IA_min_m2 : float = 5e10, # denominator floor (m^2) for fractional mode mask_invalid : bool = True) -> xr.DataArray: """ Compute a hemisphere-aggregated ice-area tendency rate with optional denominator gating. This function aggregates a gridded area-tendency field over the hemisphere and returns either: (A) an **absolute** integrated tendency: d(Area)/dt [km^2/s] or [km^2/day] (B) a **fractional** tendency (normalised by instantaneous integrated ice area): (1/Area) d(Area)/dt [1/s] or [1/day] The fractional form is often the most comparable across experiments, but it can become numerically unstable when the integrated ice area is small. To mitigate this, a minimum area floor `IA_min_m2` is applied in fractional mode (values are masked where IA <= IA_min_m2). Parameters ---------- DAT : xr.DataArray Local ice-area tendency field, typically in [1/s] (e.g., dynamic area tendency), with dimensions including time and the spatial dimensions in `spatial_dim_names`. NOTE: `DAT` is assumed to act on the same area basis as `A` (grid-cell area). IA : xr.DataArray Hemisphere-integrated ice area time series in [m^2], used as the denominator in fractional mode. Must share the same time coordinate as `DAT`. A : xr.DataArray Grid-cell area in [m^2], broadcastable to the spatial dimensions of `DAT`. spatial_dim_names : list[str], optional Spatial dimensions to sum over (e.g., ["nj", "ni"]). Defaults to `self.CICE_dict["spatial_dims"]`. mode : {"fractional","absolute"}, default "fractional" - "fractional": return (DAT*A).sum / IA -> [1/s] or [1/day] - "absolute" : return (DAT*A).sum -> [m^2/s] converted to [km^2/s] or [km^2/day] out_units : {"per_day","per_second"}, default "per_day" Output time unit scaling. For "per_day", multiplies by 86400. IA_min_m2 : float, default 5e10 Minimum integrated ice area (m^2) required to compute fractional rates. Values where IA <= IA_min_m2 are masked (set to NaN) in fractional mode. A typical order-of-magnitude choice is 1e10–1e11 m^2 (~10^4–10^5 km^2). mask_invalid : bool, default True If True, masks non-finite values after calculation (NaN/inf). Returns ------- xr.DataArray Hemisphere-aggregated area tendency rate. Units: - mode="absolute" : km^2/s or km^2/day - mode="fractional": 1/s or 1/day Notes ----- - This function replaces the previous hard-coded `/1e9` scaling and instead returns physically interpretable units (km^2/time for absolute, 1/time for fractional). - If your `IA` is not in m^2, convert it before calling this function. - If `DAT` is not in 1/s, units will differ accordingly; the calculations here assume the common CICE convention for area tendencies. """ spatial_dim_names = spatial_dim_names if spatial_dim_names is not None else self.CICE_dict["spatial_dims"] self.logger.info("Computing **HEMISPHERE ICE-AREA TENDENCY RATE**") self.logger.debug(f" • mode : {mode}") self.logger.debug(f" • out_units : {out_units}") self.logger.debug(f" • IA_min_m2 (fractional gating) : {IA_min_m2:.3e} m^2") self.logger.debug(f" • Spatial dimensions : {spatial_dim_names}") # 1) integrate local tendency over area: (1/s)*m^2 -> m^2/s self.logger.debug("\n[Area Tendency Aggregation Steps]\n" " 1. Multiply DAT by grid-cell area A and sum over spatial dims -> dA/dt in m^2/s\n" " 2. If mode='fractional', divide by IA (with IA floor) -> fractional rate in 1/s\n" " 3. Convert to requested output units (per_day vs per_second) and (if absolute) km^2 scaling") dA_dt_m2_s = (DAT * A).sum(dim=spatial_dim_names) if mode not in ("fractional", "absolute"): raise ValueError(f"Unknown mode='{mode}'. Expected 'fractional' or 'absolute'.") if out_units not in ("per_day", "per_second"): raise ValueError(f"Unknown out_units='{out_units}'. Expected 'per_day' or 'per_second'.") if mode == "fractional": # denominator gating to avoid blow-ups IA_safe = IA.where(IA > IA_min_m2) rate = dA_dt_m2_s / IA_safe # 1/s (if DAT is 1/s and IA is m^2) rate.attrs["long_name"] = "Hemisphere fractional ice-area tendency rate" rate.attrs["units"] = "1/s" else: # absolute: convert m^2/s -> km^2/s rate = dA_dt_m2_s / 1e6 rate.attrs["long_name"] = "Hemisphere absolute ice-area tendency rate" rate.attrs["units"] = "km^2/s" if out_units == "per_day": rate = rate * 86400.0 # adjust units string if mode == "fractional": rate.attrs["units"] = "1/day" else: rate.attrs["units"] = "km^2/day" if mask_invalid: rate = rate.where(np.isfinite(rate)) # keep a breadcrumb for provenance rate.attrs["aggregation"] = f"sum_over={spatial_dim_names}" if mode == "fractional": rate.attrs["IA_min_m2"] = float(IA_min_m2) return rate
[docs] def compute_hemisphere_ice_volume_rate(self, DVT: xr.DataArray, IV: xr.DataArray | None, A: xr.DataArray, SIC: xr.DataArray | None = None, spatial_dim_names: list | None = None, mode: str = "fractional", # "fractional" | "absolute" out_units: str = "per_day", # "per_day" | "per_second" IV_min_m3: float = 5e10, # denominator floor (m^3) for fractional mode sic_threshold: float | None = None, # optional mask (SIC > thresh) mask_invalid: bool = True, assume_units: str = "auto" # "auto" | "cm/day" | "m/day" | "m/s" ) -> xr.DataArray: """ Compute a hemisphere-aggregated ice-volume tendency rate with sensible units and optional denominator gating. This function aggregates a gridded "volume tendency" field (CICE history typically stores `dvidtt` / `dvidtd` in **cm/day**, representing an equivalent thickness tendency) into: (A) an **absolute** integrated volume tendency: dV/dt [km^3/s] or [km^3/day] (B) a **fractional** (normalised) tendency: (1/V) dV/dt [1/s] or [1/day] The fractional form is often the most comparable across experiments, but it can become numerically unstable when the integrated ice volume is small. To mitigate this, a minimum volume floor `IV_min_m3` is applied in fractional mode (values are masked where IV <= IV_min_m3). Parameters ---------- DVT : xr.DataArray Local ice "volume tendency" field. For standard CICE diagnostics: - dvidtt / dvidtd typically have units "cm/day" (equivalent thickness tendency). Must have dims including time and the spatial dims in `spatial_dim_names`. IV : xr.DataArray or None Hemisphere-integrated ice volume time series in [m^3], used as the denominator in fractional mode. Must share the same time coordinate as `DVT`. Required if mode="fractional"; can be None if mode="absolute". A : xr.DataArray Grid-cell area in [m^2], broadcastable to the spatial dimensions of `DVT`. (For CICE dvidtt/dvidtd: thickness tendency * area -> volume tendency.) SIC : xr.DataArray, optional Sea-ice concentration used to mask contributions. If provided, cells with SIC <= sic_threshold are excluded. If None, no SIC masking is applied. spatial_dim_names : list[str], optional Spatial dimensions to sum over (e.g., ["nj","ni"]). Defaults to `self.CICE_dict["spatial_dims"]`. mode : {"fractional","absolute"}, default "fractional" - "fractional": return (dV/dt)/IV -> [1/s] or [1/day] - "absolute" : return dV/dt -> [km^3/s] or [km^3/day] out_units : {"per_day","per_second"}, default "per_day" Output time unit. "per_day" scales by 86400 relative to per-second base. IV_min_m3 : float, default 5e10 Minimum integrated ice volume (m^3) required to compute fractional rates. Values where IV <= IV_min_m3 are masked (set to NaN) in fractional mode. Order-of-magnitude guidance: - fast-ice V ~ 1e10–1e11 m^3 (10–100 km^3) - sea-ice V ~ 1e12–1e13 m^3 (1000–10000 km^3) sic_threshold : float, optional SIC threshold for masking. Defaults to `self.icon_thresh` when SIC is provided. mask_invalid : bool, default True If True, mask non-finite results (NaN/inf) after calculation. assume_units : {"auto","cm/day","m/day","m/s"}, default "auto" How to interpret `DVT` units. If "auto", tries to parse `DVT.attrs["units"]`. If parsing fails and you have `self.thickness_tendency_to_mps`, it will be used as fallback. Returns ------- xr.DataArray Hemisphere-aggregated volume tendency rate time series. Units: - mode="absolute" : km^3/s or km^3/day - mode="fractional": 1/s or 1/day Notes ----- - For CICE dvidtt/dvidtd (cm/day): the physically consistent absolute tendency is: dV/dt [m^3/day] = (DVT * 1e-2) * A summed over space then converted to km^3/day by dividing by 1e9. - This function does NOT assume any hard-coded scaling like /1e9*86400 unless justified by units. """ spatial_dim_names = spatial_dim_names if spatial_dim_names is not None else self.CICE_dict["spatial_dims"] if mode not in ("fractional", "absolute"): raise ValueError(f"Unknown mode='{mode}'. Expected 'fractional' or 'absolute'.") if out_units not in ("per_day", "per_second"): raise ValueError(f"Unknown out_units='{out_units}'. Expected 'per_day' or 'per_second'.") if (mode == "fractional") and (IV is None): raise ValueError("mode='fractional' requires IV (hemisphere-integrated volume time series in m^3).") sic_threshold = sic_threshold if sic_threshold is not None else getattr(self, "icon_thresh", 0.15) self.logger.info("Computing **HEMISPHERE ICE-VOLUME TENDENCY RATE**") self.logger.debug(f" • mode : {mode}") self.logger.debug(f" • out_units : {out_units}") self.logger.debug(f" • IV_min_m3 (fractional gating) : {IV_min_m3:.3e} m^3") self.logger.debug(f" • SIC mask applied? : {'yes' if SIC is not None else 'no'}") if SIC is not None: self.logger.debug(f" • sic_threshold : {sic_threshold:.2f}") self.logger.debug(f" • Spatial dimensions : {spatial_dim_names}") self.logger.debug( "\n[Volume Tendency Aggregation Steps]\n" " 1. Convert DVT to thickness tendency in m/s (unit-aware)\n" " 2. Optionally apply SIC mask (SIC > threshold)\n" " 3. Multiply by grid-cell area A and sum over spatial dims -> dV/dt in m^3/s\n" " 4. If mode='fractional', divide by IV (with IV floor) -> fractional rate in 1/s\n" " 5. Convert to requested output units (per_day vs per_second) and (if absolute) km^3 scaling" ) # ---- unit-aware conversion to m/s ---- def _to_m_per_s(da: xr.DataArray) -> xr.DataArray: if assume_units != "auto": u = assume_units.lower() else: u = str(da.attrs.get("units", "")).strip().lower() # Common CICE cases if ("cm" in u) and ("/day" in u or "d-1" in u or "day-1" in u): # cm/day -> m/s return da * 1e-2 / 86400.0 if ("m" in u) and ("/day" in u or "d-1" in u or "day-1" in u): # m/day -> m/s return da / 86400.0 if ("cm" in u) and ("/s" in u or "s-1" in u): # cm/s -> m/s return da * 1e-2 if ("m" in u) and ("/s" in u or "s-1" in u): # m/s -> m/s return da # Fallback to your existing helper if available if hasattr(self, "thickness_tendency_to_mps"): self.logger.debug(" • units not recognised; falling back to self.thickness_tendency_to_mps(DVT)") return self.thickness_tendency_to_mps(da) raise ValueError(f"Could not infer DVT units='{da.attrs.get('units','')}'. " f"Set assume_units='cm/day'|'m/day'|'m/s' or implement thickness_tendency_to_mps().") DVT_mps = _to_m_per_s(DVT) # ---- optional SIC mask ---- if SIC is not None: mask = (SIC > float(sic_threshold)) DVT_mps = DVT_mps.where(mask) # ---- integrate: (m/s)*m^2 -> m^3/s ---- dV_dt_m3_s = (DVT_mps * A).sum(dim=spatial_dim_names) if mode == "fractional": IV_safe = IV.where(IV > float(IV_min_m3)) rate = dV_dt_m3_s / IV_safe # 1/s rate.attrs["long_name"] = "Hemisphere fractional ice-volume tendency rate" rate.attrs["units"] = "1/s" rate.attrs["IV_min_m3"] = float(IV_min_m3) else: # absolute: m^3/s -> km^3/s rate = dV_dt_m3_s / 1e9 rate.attrs["long_name"] = "Hemisphere absolute ice-volume tendency rate" rate.attrs["units"] = "km^3/s" if out_units == "per_day": rate = rate * 86400.0 if mode == "fractional": rate.attrs["units"] = "1/day" else: rate.attrs["units"] = "km^3/day" if mask_invalid: rate = rate.where(np.isfinite(rate)) # provenance breadcrumbs rate.attrs["aggregation"] = f"sum_over={spatial_dim_names}" if SIC is not None: rate.attrs["sic_threshold"] = float(sic_threshold) return rate
[docs] def compute_area_weighted_strength_hpa(self, SIC: xr.DataArray, HI: xr.DataArray, IS: xr.DataArray, A: xr.DataArray, spatial_dim_names: list | None = None, sic_threshold: float | None = None, hmin: float = 0.05, assume_IS_units: str = "N/m", # "N/m" -> divide by HI to get Pa; "Pa" -> use IS directly mask_invalid: bool = True) -> xr.DataArray: """ Compute an area-weighted mean ice "pressure-equivalent" in hectopascals (hPa). This is a robust hemispheric aggregate designed to avoid two common pathologies: 1) **summing** a pressure-like field instead of averaging (domain-size dependence), 2) **division-by-small-thickness** spikes (handled via `hmin` and masking). The calculation proceeds as: - Define a valid-ice mask: SIC > sic_threshold AND HI > hmin AND finite(IS) - Convert the internal strength variable to a pressure-like field in Pa: * if IS is in N/m, then P = IS / HI (Pa) * if IS is already in Pa, then P = IS - Compute an area-weighted mean: P_mean = sum(P * A) / sum(A) over the masked region - Convert Pa -> hPa (divide by 100) Parameters ---------- SIC : xr.DataArray Sea ice concentration [0–1]. HI : xr.DataArray Sea ice thickness [m]. IS : xr.DataArray Internal ice strength field. Expected units depend on `assume_IS_units`: - "N/m": common for CICE strength diagnostics; converted to Pa via division by HI. - "Pa" : if your diagnostic is already pressure-like. A : xr.DataArray Grid-cell area [m^2], broadcastable to SIC/HI/IS. spatial_dim_names : list[str], optional Spatial dimensions to reduce over. Defaults to `self.CICE_dict["spatial_dims"]`. sic_threshold : float, optional Concentration threshold for including cells. Defaults to `self.icon_thresh`. hmin : float, default 0.05 Minimum thickness (m) required to include a cell. Prevents spikes from dividing by very small HI and excludes near-zero ice. assume_IS_units : {"N/m","Pa"}, default "N/m" Controls conversion to Pa. mask_invalid : bool, default True If True, masks non-finite values after calculation. Returns ------- xr.DataArray Area-weighted mean pressure-equivalent in hPa. Notes ----- - This is an *analysis* diagnostic: it translates a CICE internal strength-like quantity into a pressure-equivalent for hemispheric comparison. Interpret with care. - If you want the same mask to be applied across multiple experiments, pass a common mask or use a common SIC/HI selection upstream. """ spatial_dim_names = spatial_dim_names if spatial_dim_names is not None else self.CICE_dict["spatial_dims"] sic_threshold = sic_threshold if sic_threshold is not None else self.icon_thresh self.logger.info("Computing **AREA-WEIGHTED INTERNAL ICE PRESSURE (hPa)**") self.logger.debug(f" • SIC threshold : {sic_threshold:.2f}") self.logger.debug(f" • HI minimum (hmin) : {hmin:.3f} m") self.logger.debug(f" • assume_IS_units : {assume_IS_units}") self.logger.debug(f" • Spatial dimensions : {spatial_dim_names}") self.logger.debug("\n[Area-Weighted Strength Steps]\n" f" 1. Build mask: (SIC > {sic_threshold:.2f}) & (HI > {hmin:.3f}) & finite(IS)\n" " 2. Convert to Pa: P = IS/HI if IS in N/m, else P = IS if already Pa\n" " 3. Area-weighted mean over mask: sum(P*A)/sum(A)\n" " 4. Convert Pa -> hPa by dividing by 100") mask = (SIC > sic_threshold) & (HI > hmin) & np.isfinite(IS) if assume_IS_units not in ("N/m", "Pa"): raise ValueError(f"Unknown assume_IS_units='{assume_IS_units}'. Expected 'N/m' or 'Pa'.") if assume_IS_units == "N/m": P_pa = (IS / HI).where(mask) else: P_pa = IS.where(mask) w = A.where(mask) # avoid 0/0 if mask is empty denom = w.sum(dim=spatial_dim_names) P_pa_mean = (P_pa * w).sum(dim=spatial_dim_names) / denom P_hpa = P_pa_mean / 100.0 if mask_invalid: P_hpa = P_hpa.where(np.isfinite(P_hpa)) P_hpa.attrs["long_name"] = "Area-weighted internal ice pressure-equivalent" P_hpa.attrs["units"] = "hPa" P_hpa.attrs["sic_threshold"] = float(sic_threshold) P_hpa.attrs["hmin_m"] = float(hmin) P_hpa.attrs["assume_IS_units"] = assume_IS_units P_hpa.attrs["aggregation"] = f"area_weighted_mean_over={spatial_dim_names}" return P_hpa
[docs] def compute_hemisphere_ice_strength(self, SIC, HI, IS, spatial_dim_names : list = None, sic_threshold : float = None, ice_strength_scale : float = 100): """ Compute hemispheric mean sea ice pressure in hectopascals (hPa), based on masked concentration, thickness, strength, and area fields. This function calculates an area-weighted mean ice thickness (m) by dividing total ice volume by total ice-covered area. It then converts internal ice strength (N/m) into an equivalent pressure field (Pa = N/m²), and finally into hectopascals (hPa). Grid cells below a threshold sea ice concentration are masked from the analysis. INPUTS: ------------------- SIC : xr.DataArray; Sea ice concentration (unitless, typically 0–1), per grid cell. HI : xr.DataArray; Sea ice thickness in meters, per grid cell. IS : xr.DataArray; Internal ice strength in N/m, per grid cell. A : xr.DataArray; Grid cell area in m². spatial_dim_names : list, optional; Names of the spatial dimensions to reduce over (e.g., ["nj", "ni"]). If not provided, defaults to values in self.CICE_dict['spatial_dims']. sic_threshold : float, optional; Threshold for masking sea ice concentration (default: self.icon_thresh). OUTPUTS: ------- P : xr.DataArray; Sea ice pressure in hectopascals (hPa), with masked cells excluded and computed as (IS / mean_thickness) / 100. Notes ----- - The output pressure is an approximation derived from CICE's internal strength field and is not a direct model output. - Use with caution where sea ice thickness is very small or near-zero. """ sic_threshold = sic_threshold if sic_threshold is not None else self.icon_thresh spatial_dim_names = spatial_dim_names if spatial_dim_names is not None else self.CICE_dict['spatial_dims'] self.logger.info("Computing **INTERNAL ICE PRESSURE** for hemisphere") self.logger.debug(f" • Using SIC threshold : {sic_threshold:.2f}\n" f" • Spatial dimension names : {spatial_dim_names}") self.logger.debug("\n[Sea Ice Pressure Computation Steps]\n" f" 1. Apply SIC mask (SIC > {sic_threshold:.2f})\n" f" 2. Divide internal ice strength (N/m = kg*m/s^2) by thickness (m) to get ice pressure (kg/m*s^2 = Pa) then divide by 100 to get hPa") mask = SIC > sic_threshold HI = HI.where(mask) IS = IS.where(mask) return (IS / HI).sum(dim=spatial_dim_names) / ice_strength_scale
[docs] def compute_sector_ice_area(self, I_mask, area_grid, sector_defs, GI_area = False): """ Compute ice area per geographic sector and a domain total from an arbitrary ice mask. This method generalises sector-based area computations for any ice type (FI/PI/SI) by integrating a boolean mask over an area grid within sector bounding boxes. Parameters ---------- I_mask : xr.DataArray Ice mask (boolean or 0/1) on the same grid as `area_grid`. area_grid : xr.DataArray Grid-cell area field (typically m²). Must have coordinate variables `lon` and `lat` (either as DataArray attributes or attached coords) compatible with the sector definitions. sector_defs : dict Sector definition dictionary. For each sector key, expects: sector_defs[sec_name]["geo_region"] == (lon_min, lon_max, lat_min, lat_max) GI_area : bool, default False If True, include grounded iceberg area in the domain total. This uses `compute_grounded_iceberg_area()` (called internally) and adds the result (converted to km²) to the total. Returns ------- IA_da : xr.DataArray Sector ice areas (same units as `area_grid` integration), indexed by "sector". IA_tot : float Domain total ice area (sum of sectors) plus optional grounded iceberg contribution. Notes ----- - Current implementation assumes sectors do not cross the dateline in a way that requires wrap-aware longitude masking. If any sector bounds cross the dateline, adjust the masking logic accordingly. - If `I_mask` or `area_grid` are Dask-backed, each sector sum is computed independently; this is typically acceptable because the number of sectors is small. """ if GI_area: self.compute_grounded_iceberg_area() GI_ttl_area = self.compute_grounded_iceberg_area() / 1e6 self.logger.info(f"adding {GI_ttl_area} to ice area computation") else: GI_ttl_area = 0 sector_names = list(sector_defs.keys()) ia_values = [] for sec_name in sector_names: lon_min, lon_max, lat_min, lat_max = sector_defs[sec_name]["geo_region"] sec_mask = I_mask.where( (area_grid.lon >= lon_min) & (area_grid.lon <= lon_max) & (area_grid.lat >= lat_min) & (area_grid.lat <= lat_max) ) tmp = (sec_mask * area_grid).sum(skipna=True) if hasattr(tmp, "compute"): ia_val = float(tmp.compute()) else: ia_val = tmp.item() if hasattr(tmp, "item") else float(tmp) ia_values.append(ia_val) IA_da = xr.DataArray(ia_values, dims=["sector"], coords={"sector": sector_names}) IA_tot = sum(ia_values) + GI_ttl_area return IA_da, IA_tot
[docs] def compute_sector_FIA(self, FI_mask, area_grid, sector_defs, GI_area=False): """ Backwards-compatible wrapper for fast-ice area (FIA) sector computation. This method preserves the original FI-specific API by delegating to the ice-type-agnostic `compute_sector_ice_area()`. Parameters ---------- FI_mask : xr.DataArray Fast-ice mask on the same grid as `area_grid`. area_grid : xr.DataArray Grid-cell area field with lon/lat coordinates. sector_defs : dict Sector definition dictionary (see `compute_sector_ice_area()`). GI_area : bool, default False If True, add grounded iceberg area to the domain total. Returns ------- (xr.DataArray, float) Sector areas and domain total, as returned by `compute_sector_ice_area()`. """ return self.compute_sector_ice_area(FI_mask, area_grid, sector_defs, GI_area=GI_area)
[docs] def compute_hemisphere_ice_area(self, SIC, A, ice_area_scale = None, spatial_dim_names = None, sic_threshold = None, add_grounded_iceberg_area = None, grounded_iceberg_area = None, region = None): """ Compute the total sea ice area (IA) by integrating sea ice concentration (SIC) over the grid area, and optionally including the grounded iceberg area (GI_total_area). This method computes the sea ice area by multiplying the sea ice concentration (SIC) by the grid cell area (GC_area), summing the result over the specified spatial dimensions, and optionally adding the grounded iceberg area. The final result is then scaled by an optional ice area scale factor. Parameters: ----------- SIC : xarray.DataArray Sea ice concentration field with dimensions (nj, ni) representing the concentration of sea ice at each grid point. A : xarray.DataArray Grid cell area with the same dimensions (nj, ni) as SIC, representing the area of each grid cell in square meters. ice_area_scale : float, optional A scale factor for the sea ice area calculation. If not provided, the default scale is used from the `FIC_scale` attribute. spatial_dim_names : list of str, optional The dimension names over which to sum the sea ice area (typically latitude and longitude). If not provided, the default spatial dimensions are used from the `CICE_dict` attribute. sic_threshold : float, optional Minimum SIC value to be included in the area calculation. Points with SIC below this threshold are excluded. Defaults to `self.icon_thresh`. add_grounded_iceberg_area : bool, optional A flag indicating whether to include the grounded iceberg area in the ice area calculation. Defaults to the class attribute `use_gi`. grounded_iceberg_area : float, optional The grounded iceberg area in square meters to be added to the sea ice area. If not provided, the grounded iceberg area is computed using `compute_grounded_iceberg_area()`. Returns: -------- IA : xarray.DataArray The total sea ice area, including the optional grounded iceberg area, with the same spatial dimensions as SIC and GC_area. The value is returned after summing over the spatial dimensions and scaling by the provided `ice_area_scale`. Notes: ------ - The grounded iceberg area is added to the sea ice area calculation if `add_grounded_iceberg_area` is `True`. If no grounded iceberg area is provided, the method will compute it using the `compute_grounded_iceberg_area()` method. - The sea ice area is computed as the sum of SIC * GC_area across the specified spatial dimensions, and then scaled by `ice_area_scale`. """ add_grounded_iceberg_area = add_grounded_iceberg_area if add_grounded_iceberg_area is not None else self.use_gi sic_threshold = sic_threshold if sic_threshold is not None else self.icon_thresh spatial_dim_names = spatial_dim_names if spatial_dim_names is not None else self.CICE_dict['spatial_dims'] ice_area_scale = ice_area_scale if ice_area_scale is not None else self.FIC_scale if region is not None: self.logger.info(f"Computing Ice **AREA** for {region} ") else: self.logger.info("Computing Ice **AREA** ") self.logger.debug(f" • Using SIC threshold : {sic_threshold:.2f}\n" f" • Spatial dimension names : {spatial_dim_names}\n" f" • Ice area scaling factor : {ice_area_scale:.2e} (to convert m² → km² or other units)\n" f" • Include grounded iceberg area : {add_grounded_iceberg_area}") if add_grounded_iceberg_area: GI_total_area = (grounded_iceberg_area if grounded_iceberg_area is not None else self.compute_grounded_iceberg_area() ) else: GI_total_area = 0 self.logger.debug("\n[Sea Ice Area Computation Steps]\n" f" 1. Apply SIC mask (SIC > {sic_threshold:.2f})\n" f" 2. Multiply SIC × area and sum over {spatial_dim_names}\n" f" 3. Add grounded iceberg area (GIA) = {GI_total_area:.2f}\n" f" 4. Divide by scale factor {ice_area_scale:.2e} to convert units") mask = SIC > sic_threshold SIC = SIC.where(mask) A = A.where(mask) IA = ((SIC * A).sum(dim=spatial_dim_names)) return (IA + GI_total_area) / ice_area_scale
[docs] def compute_hemisphere_ice_volume(self, SIC, HI, A, add_grounded_iceberg_area = None, ice_volume_scale = 1e12, spatial_dim_names = None, sic_threshold = None, grounded_iceberg_area = None): """ Compute total sea ice volume by integrating sea ice concentration, thickness, and grid cell area. This method calculates the total sea ice volume as the sum of the product of sea ice concentration (SIC), thickness (HI), and grid cell area (GC_area) across the model domain. A SIC threshold can be applied to exclude grid cells with low concentrations. Optionally includes grounded iceberg volume and applies a scaling factor for unit conversion. Parameters ---------- SIC : xarray.DataArray Sea ice concentration (unitless, typically between 0 and 1). HI : xarray.DataArray Sea ice thickness in meters. A : xarray.DataArray Grid cell area in square meters (m²). ice_volume_scale : float, optional Scale factor for the output volume. Default is `1e12`, converting m³ to 1000 km³. spatial_dim_names : list of str, optional Names of spatial dimensions to sum over (e.g., ['nj', 'ni']). Defaults to `self.CICE_dict['spatial_dims']`. sic_threshold : float, optional Minimum SIC value to be included in the volume calculation. Grid cells with SIC ≤ `sic_threshold` are masked. Defaults to `self.icon_thresh`. Returns ------- float Total sea ice volume, optionally including grounded ice and scaled (e.g., in 1000 km³ if default scale is used). Notes ----- - SIC, HI, and GC_area must share the same grid and be broadcastable. - Grid cells below `sic_threshold` are excluded from the volume calculation. - If `self.use_gi` is True, a grounded iceberg area is converted to volume and included. """ add_grounded_iceberg_area = add_grounded_iceberg_area if add_grounded_iceberg_area is not None else self.use_gi spatial_dim_names = spatial_dim_names if spatial_dim_names is not None else self.CICE_dict['spatial_dims'] sic_threshold = sic_threshold if sic_threshold is not None else self.icon_thresh self.logger.info("Computing Ice **VOLUME** ") self.logger.debug(f" • Using SIC threshold : {sic_threshold:.2f}\n" f" • Spatial dimension names : {spatial_dim_names}\n" f" • Ice volume scaling factor : {ice_volume_scale:.2e} (to convert {1e6:.1e}-km^3)\n" f" • Include grounded iceberg area : {add_grounded_iceberg_area}") if add_grounded_iceberg_area: GI_total_area = (grounded_iceberg_area if grounded_iceberg_area is not None else self.compute_grounded_iceberg_area() ) else: GI_total_area = 0 self.logger.debug("\n[Sea Ice Volume Computation Steps]\n" f" 1. Apply SIC mask (SIC > {sic_threshold:.2f})\n" f" 2. Multiply SIC × thickness × area and sum over {spatial_dim_names}\n" f" 3. Add grounded iceberg area (GIA) = {GI_total_area:.2f}\n" f" 4. Divide by scale factor {ice_volume_scale:.2e} to convert units") mask = SIC > sic_threshold HI = HI.where(mask) SIC = SIC.where(mask) A = A.where(mask) IV = (SIC * HI * A).sum(dim=spatial_dim_names)#.chunk({'time': -1}).compute() return (IV + GI_total_area) / ice_volume_scale
[docs] def compute_hemisphere_ice_thickness(self, SIC, HI, A, spatial_dim_names = None, sic_threshold = None): """ Compute average sea ice thickness weighted by grid cell area and sea ice concentration. This method calculates the domain-averaged sea ice thickness as the ratio of the total sea ice volume to the total sea ice area. A SIC threshold is used to exclude low-concentration grid cells from both the numerator and denominator. Parameters ---------- HI : xarray.DataArray Sea ice thickness in meters. SIC : xarray.DataArray Sea ice concentration (unitless, typically between 0 and 1). GC_area : xarray.DataArray Grid cell area in square meters (m²). spatial_dim_names : list of str, optional Names of spatial dimensions over which to compute the sums. Defaults to `self.CICE_dict['spatial_dims']`. sic_threshold : float, optional Minimum SIC value to include in the average. Grid cells with SIC ≤ `sic_threshold` are excluded. Defaults to `self.icon_thresh`. Returns ------- float or xarray.DataArray Domain-averaged sea ice thickness, computed as the total volume divided by total area. Notes ----- - Computed as: ∑(HI × area) / ∑(SIC × area), with masking by `sic_threshold`. - Excludes cells with SIC below threshold from both numerator and denominator. - Units are in meters. """ spatial_dim_names = spatial_dim_names if spatial_dim_names is not None else self.CICE_dict['spatial_dims'] sic_threshold = sic_threshold if sic_threshold is not None else self.icon_thresh self.logger.info("Computing Ice **THICKNESS** ") self.logger.debug(f" • Using SIC threshold : {sic_threshold:.2f}\n" f" • Spatial dimension names : {spatial_dim_names}") self.logger.debug("\n[Sea Ice Thickness Computation Steps]\n" f" 1. Apply SIC mask (SIC > {sic_threshold:.2f})\n" f" 2. Compute ice area: Multiply SIC × area and sum over {spatial_dim_names}\n" f" 3. Compute ice volume: Multiply SIC × thickness × area and sum over {spatial_dim_names}\n" f" 4. Divide by area / volume = thickness in metres") mask = SIC > sic_threshold HI = HI.where(mask) SIC = SIC.where(mask) A = A.where(mask) IA = (SIC * A).sum(dim=spatial_dim_names) IV = (HI * A).sum(dim=spatial_dim_names) return (IV / IA)
[docs] def compute_hemisphere_variable_aggregate(self, da, time_coord_name='time', flat_mean=False, da2=None, scale=None): """ Compute a time-aggregated field from a DataArray across the specified time dimension. This method calculates the mean of the input variable over time, with an optional normalisation step based on the maximum value across the time dimension. Designed for Dask-backed DataArrays to support scalable computation. Parameters ---------- da : xarray.DataArray Input data array to aggregate over time. Typically a geophysical variable such as sea ice thickness (m), strength (N/m), or velocity (m/s), with time as one of the dimensions. time_coord_name : str, optional Name of the time dimension to aggregate over. Default is 'time'. flat_mean : bool, optional If True, the data has simple mean over time, Default is False. Returns ------- xarray.DataArray A time-aggregated version of the input array. If `normalise_max` is True, the result represents the normalised mean occurrence across the time dimension; otherwise, it is the simple time mean. Notes ----- - If the specified `time_coord_name` is not found in the input DataArray, the original array is returned with a warning. - If `normalise_max` is enabled, the output represents a relative frequency-like measure, especially useful for thresholded or binary input fields (e.g., fast ice presence). - Assumes that all non-time dimensions are to be preserved in the output. """ self.logger.info("Computing Ice **AGGREGATE OVER TIME** (percentage of occurence)") if time_coord_name not in da.dims: self.logger.warning(f"No time dimension '{time_coord_name}' found. Returning original array.") return da time_len = da[time_coord_name].sizes.get(time_coord_name, 1) if flat_mean: return da.mean(dim=time_coord_name) elif da2 and scale: return (da/da2).sum(dim=time_coord_name) / scale else: return da.sum(dim=time_coord_name) / time_len
# SEASONAL METRICS @staticmethod def _ensure_odd(n: int) -> int: return n if n % 2 == 1 else n + 1 @staticmethod def _safe_slope(y: np.ndarray, t_days: np.ndarray, scaling: float) -> float | None: """Least-squares slope without polyfit overhead; returns None if flat/NaN.""" mask = np.isfinite(y) & np.isfinite(t_days) if mask.sum() < 2: return None y = y[mask] t = t_days[mask] t = t - t[0] # improves conditioning t_mean = t.mean() y_mean = y.mean() denom = np.sum((t - t_mean) ** 2) if denom == 0: return None slope = np.sum((t - t_mean) * (y - y_mean)) / denom return float(slope * scaling) @staticmethod def _year_index_slices(times: np.ndarray) -> dict[int, np.ndarray]: years = pd.DatetimeIndex(times).year.values uniq = np.unique(years) return {int(y): np.where(years == y)[0] for y in uniq} @staticmethod def _drop_leap_day(times: np.ndarray, y: np.ndarray) -> tuple[np.ndarray, np.ndarray]: # Remove Feb 29 for DOY-consistent filters di = pd.DatetimeIndex(times) keep = ~((di.month == 2) & (di.day == 29)) return times[keep], y[keep] @staticmethod def _summary(x): x = [v for v in x if v is not None] return (float(np.mean(x)), float(np.std(x))) if len(x) else (None, None)
[docs] def compute_seasonal_statistics(self, da, stat_name : str = 'FIA', window : int = 15, polyorder : int = 2, min_onset_doy : int = 50, min_retreat_doy : int = 240, growth_range : tuple[int, int] = (71, 273), retreat_early_range : tuple[int, int] = (273, 330), retreat_late_range : tuple[tuple[int, int], tuple[int, int]] = ((331, 365), (1, 71)), scaling_factor : float = 1e6, drop_leap_day : bool = True, return_per_year : bool = False): """ Compute seasonal timing and rate statistics from a daily 1D time series. This method is optimised for long daily time series (multi-year) by: - loading the full 1D series once, - computing Savitzky–Golay derivatives once per year, - using NumPy slicing for seasonal windows (minimising xarray overhead). It returns summary statistics (mean and standard deviation across years) for: - annual maxima and minima, - day-of-year (DOY) of maxima and minima, - onset timing (first positive derivative after `min_onset_doy`), - retreat timing (last negative curvature after `min_retreat_doy`), - duration (onset → retreat, wrap-aware), - growth slope (within `growth_range`), - retreat slope(s): early, late, and combined. Parameters ---------- da : xr.DataArray Daily 1D time series with dimension "time" (e.g., ice area in km²). stat_name : str, default "FIA" Label used for logging and downstream reporting. window : int, default 15 Savitzky–Golay filter window length (days). Will be coerced to an odd integer and adjusted upward if too small for `polyorder`. polyorder : int, default 2 Polynomial order for Savitzky–Golay derivatives. min_onset_doy : int, default 50 Earliest DOY at which onset is permitted (limits spurious early detections). min_retreat_doy : int, default 240 Earliest DOY at which retreat is permitted. growth_range : tuple[int, int], default (71, 273) DOY window used to estimate growth slope. retreat_early_range : tuple[int, int], default (273, 330) DOY window used to estimate early retreat slope within the same year. retreat_late_range : tuple[tuple[int,int], tuple[int,int]], default ((331,365), (1,71)) Late retreat window that may span the year boundary: - first tuple: late-year DOY window in current year - second tuple: early-year DOY window in next year scaling_factor : float, default 1e6 Multiplier applied to slope estimates to yield convenient units (e.g., km²/day). drop_leap_day : bool, default True If True, removes Feb 29 to simplify DOY-consistent filtering. return_per_year : bool, default False If True, also return a per-year table (pandas.DataFrame) with raw annual values. Returns ------- summary : dict[str, float | None] Dictionary of mean/std summary metrics across years. per_year : pandas.DataFrame, optional Returned only if `return_per_year=True`. Indexed by year, containing annual extrema, DOYs, and slope estimates. Raises ------ ValueError If `da` does not have a "time" dimension. Notes ----- - The routine iterates over years except the final year (by design) because the late-retreat window can require samples from the subsequent year. - Slope estimates use a lightweight least-squares implementation (`_safe_slope`) instead of `np.polyfit` to reduce overhead. """ from scipy.signal import savgol_filter from tqdm import tqdm if 'time' not in da.dims: raise ValueError("Input DataArray must have a 'time' dimension") self.logger.info(f"computing SEASONAL STATISTICS for {stat_name} ... ") # 1) Load once (small 1D array) and optionally drop leap days self.logger.debug(" 1. loading array ") da = da.load() # safe for 1D daily series times = da.time.values y_full = da.values.astype(np.float32) self.logger.debug(" 2. drop leap-days") if drop_leap_day: times, y_full = self._drop_leap_day(times, y_full) self.logger.debug(" 3. compute DOY indeces once") doy_full = pd.DatetimeIndex(times).dayofyear.values idx_by_year = self._year_index_slices(times) years_sorted = sorted(idx_by_year.keys()) self.logger.debug(" 4. prepare statistical lists") per_year_rows = [] max_list, min_list = [], [] grow_list, ret_list = [], [] ret_early_list, ret_late_list = [], [] min_doy_list, max_doy_list = [], [] onset_list = [] duration_list = [] self.logger.debug(" 5. Ensure Savitzky–Golay (savgol) settings are valid") window = self._ensure_odd(window) if window < polyorder + 2: window = polyorder + 3 if (polyorder + 3) % 2 == 1 else polyorder + 4 self.logger.debug(" 6. compute derivatives once per-year") for i, year in enumerate(years_sorted[:-1]): self.logger.debug(f" ....{year}") idx_this = idx_by_year[year] idx_next = idx_by_year[years_sorted[i + 1]] t_this = times[idx_this] y_this = y_full[idx_this] doy_this = doy_full[idx_this] # Skip tiny years if y_this.size < max(window, polyorder + 2): continue # Derivatives once per year try: dy_dt = savgol_filter(y_this, window_length=window, polyorder=polyorder, deriv=1, mode='interp') d2y_dt2 = savgol_filter(y_this, window_length=window, polyorder=polyorder, deriv=2, mode='interp') except ValueError: # window too long for this year, skip continue # --- Max/Min & their DOY --- if np.all(~np.isfinite(y_this)): # skip year with all NaN continue with np.errstate(invalid='ignore'): max_idx = int(np.nanargmax(y_this)) min_idx = int(np.nanargmin(y_this)) max_val = float(y_this[max_idx]) min_val = float(y_this[min_idx]) max_doy = int(doy_this[max_idx]) min_doy = int(doy_this[min_idx]) max_list.append(max_val); min_list.append(min_val) max_doy_list.append(max_doy); min_doy_list.append(min_doy) # --- Onset via derivative (first positive slope after min_onset_doy) --- onset_candidates = np.flatnonzero((dy_dt > 0) & (doy_this >= min_onset_doy)) onset_doy = int(doy_this[onset_candidates[0]]) if onset_candidates.size else None # --- Retreat via inflection (last negative curvature after min_retreat_doy) --- retreat_candidates = np.flatnonzero((d2y_dt2 < 0) & (doy_this >= min_retreat_doy)) retreat_doy = int(doy_this[retreat_candidates[-1]]) if retreat_candidates.size else None # --- Duration (onset → retreat, handle wrap) --- if onset_doy is not None and retreat_doy is not None: duration = (retreat_doy - onset_doy) if (retreat_doy > onset_doy) else (365 - onset_doy + retreat_doy) duration_list.append(int(duration)) onset_list.append(int(onset_doy)) # --- Slopes (growth / retreat early / retreat late / overall retreat) --- # Build masks once (NumPy, no xarray .where) doy_mask = doy_this # alias g0, g1 = growth_range re0, re1 = retreat_early_range rl0a, rl1a = retreat_late_range[0] rl0b, rl1b = retreat_late_range[1] # growth season (this year) g_mask = (doy_mask >= g0) & (doy_mask <= g1) y_grow = y_this[g_mask] t_grow = (t_this[g_mask] - t_this[g_mask][0]) / np.timedelta64(1, 'D') if y_grow.size else np.array([]) slope_grow = self._safe_slope(y_grow, t_grow, scaling_factor) if slope_grow is not None: grow_list.append(slope_grow) # retreat early (this year) e_mask = (doy_mask >= re0) & (doy_mask <= re1) y_early = y_this[e_mask] t_early = (t_this[e_mask] - t_this[e_mask][0]) / np.timedelta64(1, 'D') if y_early.size else np.array([]) slope_early = self._safe_slope(y_early, t_early, scaling_factor) if slope_early is not None: ret_early_list.append(slope_early) # retreat late (next year) t_next = times[idx_next] y_next = y_full[idx_next] doy_next = doy_full[idx_next] l_mask_a = (doy_mask >= rl0a) & (doy_mask <= rl1a) l_mask_b = (doy_next >= rl0b) & (doy_next <= rl1b) y_late = np.concatenate([y_this[l_mask_a], y_next[l_mask_b]]) t_late = np.concatenate([(t_this[l_mask_a] - t_this[l_mask_a][0]) / np.timedelta64(1, 'D') if l_mask_a.any() else np.array([]), # Continue time axis across year boundary ((t_next[l_mask_b] - t_this[l_mask_a][0]) / np.timedelta64(1, 'D')) if l_mask_b.any() and l_mask_a.any() else ((t_next[l_mask_b] - t_next[l_mask_b][0]) / np.timedelta64(1, 'D')) if l_mask_b.any() else np.array([])]) if (l_mask_a.any() or l_mask_b.any()) else np.array([]) slope_late = self._safe_slope(y_late, t_late, scaling_factor) if slope_late is not None: ret_late_list.append(slope_late) # overall retreat (early + late concatenated) if y_early.size or y_late.size: y_ret = np.concatenate([y_early, y_late]) # stitch time so it's strictly increasing t_ret = np.concatenate([t_early if y_early.size else np.array([]), (t_late + (t_early[-1] + 1) if y_early.size and y_late.size else t_late)]) slope_ret = self._safe_slope(y_ret, t_ret, scaling_factor) if slope_ret is not None: # Keep sign convention negative (ablation) ret_list.append(-abs(slope_ret)) if return_per_year: per_year_rows.append({'year' : year, 'max' : max_val, 'min' : min_val, 'doy_max' : max_doy, 'doy_min' : min_doy, 'onset_doy' : onset_doy, 'retreat_doy' : retreat_doy, 'duration_days' : duration if (onset_doy is not None and retreat_doy is not None) else None, 'growth_slope' : slope_grow, 'retreat_early_slope' : slope_early, 'retreat_late_slope' : slope_late, 'retreat_slope' : slope_ret if slope_ret is not None else None,}) self.logger.debug(" 7. compile summary and return") summary = {"Maximum Mean" : self._summary(max_list)[0], "Maximum Std" : self._summary(max_list)[1], "Minimum Mean" : self._summary(min_list)[0], "Minimum Std" : self._summary(min_list)[1], "Growth Mean" : self._summary(grow_list)[0], "Growth Std" : self._summary(grow_list)[1], "Retreat Mean" : self._summary(ret_list)[0], "Retreat Std" : self._summary(ret_list)[1], "Retreat Early Mean" : self._summary(ret_early_list)[0], "Retreat Early Std" : self._summary(ret_early_list)[1], "Retreat Late Mean" : self._summary(ret_late_list)[0], "Retreat Late Std" : self._summary(ret_late_list)[1], "Duration-days Mean" : self._summary(duration_list)[0], "Duration-days Std" : self._summary(duration_list)[1], "DOY Min Mean" : self._summary(min_doy_list)[0], "DOY Min Std" : self._summary(min_doy_list)[1], "DOY Max Mean" : self._summary(max_doy_list)[0], "DOY Max Std" : self._summary(max_doy_list)[1], "DOY Onset Mean" : self._summary(onset_list)[0], "DOY Onset Std" : self._summary(onset_list)[1],} if return_per_year: return summary, pd.DataFrame(per_year_rows).set_index('year') return summary
# PERSISTENCE METRICS
[docs] def persistence_stability_index(self, I_mask, A, persistence_threshold: float = 0.8, winter_months : tuple = (5, 6, 7, 8, 9, 10), area_scale : float = 1e9) -> dict: """ Compute FIPSI (Fast-Ice Persistence Stability Index) from a binary fast-ice mask. Definition ---------- FIPSI = (area of grid cells whose WINTER fast-ice persistence ≥ threshold) / (area of grid cells that have fast ice at least once in WINTER), where persistence is the fraction of winter days (e.g., May–Oct) that a cell is fast. Parameters ---------- I_mask : xarray.DataArray or xarray.Dataset Binary fast-ice mask (0/1) with a 'time' dimension; if a Dataset is given, a variable named 'FI_mask' is used. A : xarray.DataArray or xarray.Dataset Grid-cell area on the same grid; if a Dataset is given, the first of {'tarea','area','TAREA'} is used. If A has a 'time' dim, the first slice is used. persistence_threshold : float, optional Minimum winter persistence to classify a cell as persistent (default 0.8). winter_months : tuple[int], optional Months to include for the winter persistence (default May–Oct). Returns ------- dict: {'persistence_stability_index': float, # FIPSI in [0,1] or NaN if undefined 'area_persistent_winter': float, # area of persistent winter FI (units of A) 'area_ever_winter': float # area with any winter FI (units of A)} Notes ----- • Uses the class's `spatial_dims` to sum areas. • Assumes `I_mask` is truly binary (0/1). For classification outputs that are boolean, they will be cast to float internally. • Designed to be Dask-friendly: only two area reductions are computed eagerly. """ import dask self.logger.info("Computing FIPSI: persistent-winter area / ever-winter area") I_mask = self._as_da_mask(I_mask).astype("float32") A = self._as_da_area(A) # Ensure A has only spatial dims if "time" in A.dims: A = A.isel(time=0) if "time" in A.coords: # safer than drop_vars which can produce a Dataset A = A.drop_vars("time", errors="ignore") A = A.squeeze(drop=True) spat_dims = self.CICE_dict["spatial_dims"] # restrict to winter months and form persistence (fraction in [0,1]) in_winter = I_mask["time"].dt.month.isin(list(winter_months)) FI_winter = I_mask.where(in_winter) prstnc = FI_winter.mean(dim="time", skipna=True) # masks for footprints persistent_mask = prstnc >= float(persistence_threshold) # persistent winter FI ever_mask = FI_winter.max(dim="time", skipna=True) > 0 # ever-winter FI # # area-weighted sums over spatial dims prstnc_A = (A.where(persistent_mask)).sum(dim=spat_dims, skipna=True) ttl_A = (A.where(ever_mask)).sum(dim=spat_dims, skipna=True) with dask.config.set(scheduler="threads"): # or "synchronous" prstnc_A = prstnc_A.compute() ttl_A = ttl_A.compute() prstnc_A = self._to_float_scalar(prstnc_A) ttl_A = self._to_float_scalar(ttl_A) psi = (prstnc_A / ttl_A) if (np.isfinite(ttl_A) and ttl_A > 0.0) else np.nan return {"persistence_stability_index": float(psi), "area_persistent_winter" : prstnc_A/area_scale, "area_ever_winter" : ttl_A/area_scale}
def _prepare_BAS_coast(self, path_coast_shape: str | None = None, crs_out: str = "EPSG:3031", target_spacing_km: float = 1.0, cache_dir: str | None = None, rebuild_cache: bool = False): """ Load BAS high-res coastline and build a KD-tree of densified coastline points in a metric CRS. This implementation is optimised for repeated calls: 1) It supports an on-disk cache of densified coastline points (NPZ), keyed by: (shapefile path, shapefile mtime, crs_out, target_spacing_km). 2) It uses Shapely 2.x `segmentize()` + `get_coordinates()` to densify in C (fast). If Shapely 2.x isn't available, it falls back to a slower Python interpolation loop. Parameters ---------- path_coast_shape : str, optional Path to the BAS polygon coastline shapefile. Defaults to `self.BAS_dict["P_Ant_Cstln"]`. crs_out : str, default "EPSG:3031" Target projected CRS for metric distances. target_spacing_km : float, default 1.0 Approx spacing along coastline for densification (km). Clipped to >= 0.1 km (100 m). cache_dir : str, optional Directory to store NPZ cache files. If None, uses: - `self.D_cache/coast_kdtree` if `self.D_cache` exists - else `~/.cache/AFIM/coast_kdtree` rebuild_cache : bool, default False If True, ignore any existing cache and rebuild from shapefile. Sets ---- self._coast_kdtree : scipy.spatial.cKDTree self._coast_xy_proj : tuple(ndarray, ndarray) of coastline points (meters) self._coast_crs : str (crs_out) self._coast_cache : str (path to cache file used, if any) """ import os import hashlib from pathlib import Path import numpy as np import geopandas as gpd from scipy.spatial import cKDTree from shapely.ops import unary_union # --- inputs / paths --- P_shp = Path(path_coast_shape) if path_coast_shape is not None else Path(self.BAS_dict["P_Ant_Cstln"]) if not P_shp.exists(): raise FileNotFoundError(f"Coastline shapefile not found: {P_shp}") spacing_m = max(100.0, float(target_spacing_km) * 1000.0) # >= 100 m # --- determine cache location --- if cache_dir is not None: P_cache_dir = Path(cache_dir) else: if hasattr(self, "D_cache") and self.D_cache is not None: P_cache_dir = Path(self.D_cache) / "coast_kdtree" else: P_cache_dir = Path.home() / ".cache" / "AFIM" / "coast_kdtree" P_cache_dir.mkdir(parents=True, exist_ok=True) # --- build cache key (path + mtime + crs + spacing) --- mtime = os.path.getmtime(P_shp) sig = f"{str(P_shp.resolve())}|{int(mtime)}|{crs_out}|{spacing_m:.3f}" h = hashlib.md5(sig.encode("utf-8")).hexdigest()[:12] P_npz = P_cache_dir / f"bas_coast_{h}.npz" self.logger.info("Preparing BAS coastline KD-tree") self.logger.debug(f" • shapefile : {P_shp}") self.logger.debug(f" • crs_out : {crs_out}") self.logger.debug(f" • target_spacing_km : {target_spacing_km}") self.logger.debug(f" • spacing_m : {spacing_m:.1f}") self.logger.debug(f" • cache_file : {P_npz}") self.logger.debug(f" • rebuild_cache : {rebuild_cache}") # --- fast path: load cached points --- if P_npz.exists() and not rebuild_cache: self.logger.info(f"Coast cache hit: {P_npz.name}") z = np.load(P_npz) x_coast = z["x"].astype(float, copy=False) y_coast = z["y"].astype(float, copy=False) kdt = cKDTree(np.column_stack([x_coast, y_coast])) self._coast_kdtree = kdt self._coast_xy_proj = (x_coast, y_coast) self._coast_crs = crs_out self._coast_cache = str(P_npz) self.logger.info(f"Loaded coastline points: {x_coast.size:,} (cached)") return # --- slow path: read + densify + cache --- self.logger.info(f"Coast cache miss: building from shapefile {P_shp.name}") gdf = gpd.read_file(P_shp) # CRS handling if gdf.crs is None: self.logger.warning("Input CRS is None; assuming EPSG:4326 (lon/lat).") gdf = gdf.set_crs("EPSG:4326", allow_override=True) gdf = gdf.to_crs(crs_out) # Prefer union of BOUNDARIES (linework) instead of polygon union (usually faster) self.logger.info("Extracting coastline boundary linework...") boundary_series = gdf.geometry.boundary # Merge boundaries to remove duplicates (still can be expensive, but less than polygon union) self.logger.info("Merging boundary linework...") boundary_geom = unary_union(boundary_series.values) # Densify # Shapely 2 path: segmentize + get_coordinates (fast) try: import shapely # shapely 2.x provides segmentize/get_coordinates at top-level have_segmentize = hasattr(shapely, "segmentize") and hasattr(shapely, "get_coordinates") self.logger.debug(f" • shapely version : {getattr(shapely, '__version__', 'unknown')}") self.logger.debug(f" • segmentize avail? : {have_segmentize}") if have_segmentize: self.logger.info("Densifying coastline using shapely.segmentize (fast path)...") densified = shapely.segmentize(boundary_geom, spacing_m) coords = shapely.get_coordinates(densified) # (N,2) if coords.size == 0: raise RuntimeError("Shapely densification produced zero coordinates.") x_coast = coords[:, 0].astype(float, copy=False) y_coast = coords[:, 1].astype(float, copy=False) else: raise ImportError("segmentize/get_coordinates not available") except Exception as e: # Fallback: Python interpolation along linework (slower) self.logger.warning(f"Falling back to Python densification (shapely2 not available or failed): {e}") from shapely.geometry import LineString, MultiLineString, GeometryCollection def _iter_lines(g): if g is None: return gt = getattr(g, "geom_type", "").lower() if gt == "linestring": yield g elif gt == "multilinestring": for gg in g.geoms: yield gg elif gt == "geometrycollection": for gg in g.geoms: yield from _iter_lines(gg) else: # last resort: try boundary try: yield from _iter_lines(g.boundary) except Exception: return xs, ys = [], [] n_lines = 0 for ln in _iter_lines(boundary_geom): try: if ln.length <= 0 or not np.isfinite(ln.length): continue n_lines += 1 n = max(2, int(np.ceil(ln.length / spacing_m)) + 1) dists = np.linspace(0.0, float(ln.length), n) for d in dists: p = ln.interpolate(d) x, y = p.coords[0] xs.append(x) ys.append(y) except Exception: continue if len(xs) == 0: raise RuntimeError("Fallback densification produced zero points.") x_coast = np.asarray(xs, dtype=float) y_coast = np.asarray(ys, dtype=float) self.logger.info(f"Fallback densified {n_lines} lines -> {x_coast.size:,} points") # Optional de-duplication (helps reduce KD-tree size if union missed duplicates) # Keep it cheap: round to 1 m and unique. self.logger.debug("De-duplicating coastline points (rounded to 1 m)...") coords_i = np.column_stack([np.round(x_coast, 0), np.round(y_coast, 0)]).astype(np.int64, copy=False) # Unique rows (stable-ish) _, idx = np.unique(coords_i, axis=0, return_index=True) idx.sort() x_coast = x_coast[idx] y_coast = y_coast[idx] if x_coast.size == 0: raise RuntimeError("Coastline point set is empty after processing.") self.logger.info(f"Built coastline point set: {x_coast.size:,} pts @ ~{target_spacing_km} km spacing") # Build KD-tree kdt = cKDTree(np.column_stack([x_coast, y_coast])) # Cache to disk try: np.savez_compressed(P_npz, x=x_coast.astype(np.float64), y=y_coast.astype(np.float64)) self.logger.info(f"Wrote coast cache: {P_npz}") except Exception as e: self.logger.warning(f"Failed to write coast cache '{P_npz}': {e}") # Cache on self self._coast_kdtree = kdt self._coast_xy_proj = (x_coast, y_coast) self._coast_crs = crs_out self._coast_cache = str(P_npz)
[docs] def persistence_ice_distance_mean_max(self, ice_prstnc, persistence_threshold : float = 0.8, path_coast_shape : str = None, crs_out : str = "EPSG:3031"): """ Compute mean and maximum distance from the Antarctic coastline for persistent ice. This method measures how far persistent ice (e.g., persistent fast ice) extends from the coastline by: 1) thresholding a persistence field `ice_prstnc` in [0, 1], 2) projecting grid coordinates into a metric CRS (default EPSG:3031), 3) querying nearest coastline points via a pre-built KD-tree, and 4) returning mean and maximum distances (km) of persistent grid cells. Parameters ---------- ice_prstnc : xr.DataArray Persistence field in [0, 1], typically computed from a binary mask over a seasonal window (e.g., May–Oct). Must be on the same spatial grid as `self.G_t`. persistence_threshold : float, default 0.8 Cells with persistence >= threshold are treated as “persistent” for distance computations. path_coast_shape : str, optional Path to a coastline shapefile. Defaults to `self.BAS_dict["P_Ant_Cstln"]`. crs_out : str, default "EPSG:3031" Output CRS used for distance computations. Must be a metric CRS. Returns ------- dict Dictionary with keys: - "persistence_mean_distance": mean distance (km) - "persistence_max_distance" : max distance (km) Raises ------ KeyError If longitude/latitude fields cannot be found in `self.G_t`. ValueError If `ice_prstnc` shape does not match the loaded grid coordinates. Exception Propagates errors from coordinate transforms or KD-tree construction. Notes ----- - The coastline KD-tree is cached on `self` as `_coast_kdtree` and rebuilt only if `crs_out` changes. - The method forces local computation of `ice_prstnc` (via `.compute()`) using the threaded scheduler to avoid distributing a large Dask graph. """ import dask from pyproj import Transformer P_shp = path_coast_shape if path_coast_shape is not None else self.BAS_dict["P_Ant_Cstln"] spat_dims = self.CICE_dict["spatial_dims"] self.logger.info("Computing persistence distance (USNIC coastline KD-Tree)") with dask.config.set(scheduler="threads"): ice_prstnc = ice_prstnc.compute() # instead of .load() if not getattr(self, "bgrid_loaded", False): self.load_cice_grid(slice_hem=True) Gt = self.G_t lon_name = 'TLON' if 'TLON' in Gt else ('lon' if 'lon' in Gt else None) lat_name = 'TLAT' if 'TLAT' in Gt else ('lat' if 'lat' in Gt else None) if lon_name is None or lat_name is None: raise KeyError("Could not find lon/lat fields (TLON/TLAT or lon/lat) in self.G_t.") grid_shape = tuple(ice_prstnc.sizes[d] for d in spat_dims) if grid_shape != Gt[lon_name].shape: self.logger.warning("Grid mismatch detected; reloading b-grid with slice_hem=True") self.load_cice_grid(slice_hem=True) Gt = self.G_t if tuple(ice_prstnc.sizes[d] for d in spat_dims) != Gt[lon_name].shape: raise ValueError("ice_prstnc and grid coordinate shapes still mismatch after reload.") # Prepare coastline KD-tree once (unchanged) need_kdtree = (not hasattr(self, "_coast_kdtree")) or (getattr(self, "_coast_crs", None) != crs_out) if need_kdtree: self._prepare_BAS_coast(P_shp, crs_out=crs_out, target_spacing_km=1.0) # Build persistent mask from fully-realized array prst = (ice_prstnc >= float(persistence_threshold)).values if not np.any(prst): self.logger.warning("No persistent fast-ice cells found at this threshold.") return {"persistence_mean_distance": float('nan'), "persistence_max_distance": float('nan')} # cache dict on self if not hasattr(self, "_grid_xy_cache"): self._grid_xy_cache = {} # key includes CRS + grid shape + hemisphere slicing state (shape is usually enough) proj_key = (crs_out, lon_name, lat_name, tuple(Gt[lon_name].shape)) if proj_key in self._grid_xy_cache: self.logger.debug(f"Using cached projected grid coords for {crs_out}") x_all, y_all = self._grid_xy_cache[proj_key] else: self.logger.info(f"Projecting grid lon/lat -> {crs_out} (cached thereafter)") lon = np.asarray(Gt[lon_name].values) lat = np.asarray(Gt[lat_name].values) transformer = Transformer.from_crs("EPSG:4326", crs_out, always_xy=True) x_all, y_all = transformer.transform(lon, lat) self._grid_xy_cache[proj_key] = (x_all, y_all) iy, ix = np.where(prst) # spat_dims order (nj, ni) x_p = x_all[iy, ix] y_p = y_all[iy, ix] if x_p.size == 0: return {"persistence_mean_distance": float('nan'), "persistence_max_distance": float('nan')} d_m, _ = self._coast_kdtree.query(np.column_stack([x_p, y_p]), k=1) d_km = d_m / 1000.0 return {"persistence_mean_distance": float(np.nanmean(d_km)), "persistence_max_distance": float(np.nanmax(d_km))}
# INTER-COMPARISONS @staticmethod def _skill_stats(x, y): from sklearn.metrics import mean_squared_error, mean_absolute_error from scipy.stats import pearsonr valid = (~np.isnan(x)) & (~np.isnan(y)) x, y = x[valid], y[valid] return {"Bias" : np.mean(x - y), "RMSE" : np.sqrt(mean_squared_error(y, x)), "MAE" : mean_absolute_error(y, x), "Corr" : pearsonr(x, y)[0], "SD_Model" : np.std(x), "SD_Obs" : np.std(y)} @staticmethod def _safe_load_array_to_memory(x): # --- Force local computation to avoid serializing Dask HLGs to the cluster --- # Keep memory light and avoid shipping graphs to distributed workers try: return x.astype("float32").compute(scheduler="single-threaded") except Exception: return x.compute(scheduler="single-threaded") if hasattr(x, "compute") else x
[docs] def compute_skill_statistics(self, mod, obs, min_points=100): """ Compute model-vs-observation skill statistics for a time series. The method attempts a direct time-series comparison by intersecting model and observation timestamps. If there are fewer than `min_points` intersecting times, it falls back to a climatological comparison using day-of-year means. Parameters ---------- mod : xr.DataArray Modelled time series (must have a "time" coordinate for direct alignment). obs : xr.DataArray Observational time series. Ideally shares the same time basis as `mod`. min_points : int, default 100 Minimum number of intersecting timestamps required to compute direct time-series statistics. Otherwise, a climatology comparison is used. Returns ------- dict Dictionary of skill metrics: - Bias - RMSE - MAE - Corr - SD_Model - SD_Obs Returns NaNs for all metrics if time alignment fails and climatology cannot be formed. Notes ----- - Uses `_safe_load_array_to_memory()` to compute Dask-backed series locally and minimise graph shipping. - Climatology fallback uses `groupby("time.dayofyear").mean("time")` for both series. """ mod = self._safe_load_array_to_memory(mod) obs = self._safe_load_array_to_memory(obs) if hasattr(mod, 'time') and hasattr(obs, 'time'): dt_mod = mod.time.values dt_obs = obs.time.values dt_xsct = np.intersect1d(dt_mod, dt_obs) if len(dt_xsct) > min_points: self.logger.warning(f"more than {min_points} date-times intersected, therefore do stats on time series") mod_data = mod.sel(time=dt_xsct).values obs_data = obs.sel(time=dt_xsct).values return self._skill_stats(mod_data, obs_data) else: self.logger.warning("date-time mismatch detected — falling back to climatology comparison.") mod_clim = mod.groupby('time.dayofyear').mean('time') obs_clim = obs.groupby('time.dayofyear').mean('time') doy_xsct = np.intersect1d(mod_clim.dayofyear.values, obs_clim.dayofyear.values) mod_data = mod_clim.sel(dayofyear=doy_xsct).values obs_data = obs_clim.sel(dayofyear=doy_xsct).values return self._skill_stats(mod_data, obs_data) self.logger.warning("Time dimension not found or could not align. Returning empty stats.") return {"Bias": np.nan, "RMSE": np.nan, "MAE": np.nan, "Corr": np.nan, "SD_Model": np.nan, "SD_Obs": np.nan}
[docs] def antarctic_year_labels(self, time_da: xr.DataArray) -> xr.DataArray: """ Construct Antarctic-year labels for a time coordinate (Jul→Jun “years”). Antarctic-year (AY) grouping is commonly defined such that AY YYYY runs from July YYYY through June YYYY+1. This helper returns an integer label "ayear" for each timestamp: - months Jan–Jun are assigned to the previous calendar year, - months Jul–Dec are assigned to the current calendar year. Parameters ---------- time_da : xr.DataArray Time coordinate DataArray supporting `.dt.year` and `.dt.month`. Returns ------- xr.DataArray Integer DataArray named "ayear" aligned with `time_da`. Examples -------- - 2000-06-15 -> ayear = 1999 - 2000-07-01 -> ayear = 2000 """ year = time_da.dt.year month = time_da.dt.month ayear = xr.where(month <= 6, year - 1, year) return ayear.rename("ayear")
[docs] def extrema_means_antarctic_year(self, da_ts: xr.DataArray): """ Compute mean maxima/minima across Antarctic years for a 1D time series. Parameters ---------- da_ts : xr.DataArray 1D time series with dimension "time" (e.g., fast-ice area in km²). Returns ------- dict Dictionary containing: - "max_mean": mean of per-AY maxima (xr.DataArray scalar) - "min_mean": mean of per-AY minima (xr.DataArray scalar) - "max_ts" : per-AY maxima time series (dims: ayear) - "min_ts" : per-AY minima time series (dims: ayear) Notes ----- - Uses `antarctic_year_labels()` and xarray groupby reductions. - For short 1D series, it is acceptable to fully load/compute in memory. """ # for tiny 1D series, it's fine to fully chunk or even compute up front # da_ts = da_ts.chunk({"time": -1}) ay = self.antarctic_year_labels(da_ts["time"]) g = da_ts.groupby(ay) max_ts = g.max(skipna=True) # dims: ayear min_ts = g.min(skipna=True) return { "max_mean": max_ts.mean("ayear", skipna=True), "min_mean": min_ts.mean("ayear", skipna=True), "max_ts": max_ts, "min_ts": min_ts, }
#######################################################################