from __future__ import annotations
from dataclasses import dataclass
from pathlib import Path
import glob, re, yaml, os, shutil
import pandas as pd
[docs]
@dataclass
class SimPaths:
sim_name : str
root : Path # AFIM-registered root for this sim
cice_files : list[Path] # ordered history files (daily or subdaily)
mom_files : list[Path] # ordered MOM output files (daily/monthly)
grid_files : dict[str, Path] # optional: ocean_hgrid, cice grid, etc.
meta : dict # run metadata (commit hashes, start/end, etc.)
[docs]
class AccessOM3Run:
"""
Adapter for ACCESS-OM3 Payu/NUOPC output structure -> AFIM-consumable paths.
Works without copying data (symlinks recommended).
"""
def __init__(self, run_dir: str | Path):
self.run_dir = Path(run_dir).expanduser().resolve()
# If user points at sim top, try to infer work/
if (self.run_dir / "work").is_dir():
self.work = self.run_dir / "work"
else:
self.work = self.run_dir
# -------- discovery --------
[docs]
def discover_cice(self) -> list[Path]:
# Common NUOPC-style names you’ve shown: access-om3.cice.h.1day.mean.YYYY-MM-DD.nc
patterns = [
str(self.work / "access-om3.cice.h.*.nc"),
str(self.work / "*cice*.h.*.nc"),
str(self.work / "iceh*.nc"), # fallback if it exists
str(self.work / "history" / "*.nc"), # some runs keep history/ subdir
]
files = []
for pat in patterns:
files.extend(glob.glob(pat))
files = sorted(set(map(str, files)))
files = [Path(f) for f in files if Path(f).is_file()]
return self._sort_by_date(files)
[docs]
def discover_mom(self) -> list[Path]:
patterns = [
str(self.work / "access-om3.ocean.*.nc"),
str(self.work / "*mom*.nc"),
str(self.work / "ocean" / "*.nc"),
]
files = []
for pat in patterns:
files.extend(glob.glob(pat))
files = sorted(set(map(str, files)))
files = [Path(f) for f in files if Path(f).is_file()]
return self._sort_by_date(files)
[docs]
def discover_grids(self) -> dict[str, Path]:
grids = {}
# best-effort: grab any obvious grid files sitting in INPUT/ or work/
for cand in [
self.work / "INPUT" / "ocean_hgrid.nc",
self.work / "INPUT" / "cice_grid.nc",
self.work / "ocean_hgrid.nc",
]:
if cand.is_file():
grids[cand.name] = cand
return grids
[docs]
def infer_time_range(self, files: list[Path]) -> tuple[str | None, str | None]:
# Extract YYYY-MM-DD from filename if present
dates = []
for f in files:
m = re.search(r"(\d{4}-\d{2}-\d{2})", f.name)
if m:
dates.append(m.group(1))
if not dates:
return None, None
dates = sorted(dates)
return dates[0], dates[-1]
# -------- registration (symlinked AFIM view) --------
[docs]
def register_to_afim(
self,
sim_name: str,
afim_root: str | Path = "/g/data/gv90/da1339/afim_output",
mode: str = "symlink", # "symlink" | "copy" | "none"
) -> SimPaths:
afim_root = Path(afim_root).expanduser().resolve()
sim_root = afim_root / sim_name
raw_root = sim_root / "raw"
cice_dst = raw_root / "cice"
mom_dst = raw_root / "mom"
meta_dst = sim_root / "meta"
for d in (cice_dst, mom_dst, meta_dst):
d.mkdir(parents=True, exist_ok=True)
cice_files = self.discover_cice()
mom_files = self.discover_mom()
grids = self.discover_grids()
# Create links/copies if requested
if mode in ("symlink", "copy"):
self._materialize(cice_files, cice_dst, mode=mode)
self._materialize(mom_files, mom_dst, mode=mode)
# Metadata to make later debugging trivial
c0, cN = self.infer_time_range(cice_files)
m0, mN = self.infer_time_range(mom_files)
meta = dict(
source_run_dir=str(self.run_dir),
source_work_dir=str(self.work),
cice_count=len(cice_files),
mom_count=len(mom_files),
cice_time_range=[c0, cN],
mom_time_range=[m0, mN],
grids={k: str(v) for k, v in grids.items()},
mode=mode,
)
with open(meta_dst / "runinfo.yaml", "w") as f:
yaml.safe_dump(meta, f, sort_keys=False)
# Return paths pointing at the registered view if we materialized; else original
if mode in ("symlink", "copy"):
cice_reg = self._sort_by_date(list(cice_dst.glob("*.nc")))
mom_reg = self._sort_by_date(list(mom_dst.glob("*.nc")))
root = sim_root
return SimPaths(sim_name, root, cice_reg, mom_reg, grids, meta)
else:
# consume directly from work dir (no archive view)
return SimPaths(sim_name, self.work, cice_files, mom_files, grids, meta)
# -------- internals --------
def _materialize(self, src_files: list[Path], dst_dir: Path, mode: str):
for src in src_files:
dst = dst_dir / src.name
if dst.exists():
continue
if mode == "symlink":
os.symlink(src, dst)
elif mode == "copy":
shutil.copy2(src, dst)
def _sort_by_date(self, files: list[Path]) -> list[Path]:
def key(f: Path):
m = re.search(r"(\d{4}-\d{2}-\d{2})", f.name)
return m.group(1) if m else f.name
return sorted(files, key=key)
[docs]
class SeaIceACCESS:
"""
Interface to the ACCESS-NRI Intake catalog for loading ACCESS-OM* CICE daily output
and writing it into an AFIM-friendly monthly Zarr layout.
This class is intended to:
1) Query the ACCESS-NRI Intake catalog for available CICE variables for a given
ACCESS experiment.
2) Load a daily dataset (1day frequency) for a selected set of variables and
time window.
3) Standardise basic indexing/coordinates (e.g., ensure `nj`/`ni` coordinates
exist) so downstream merging/processing is reliable.
4) Persist the daily time series to monthly Zarr groups (one directory per month).
Notes
-----
- The Intake catalog must be installed and the ACCESS-NRI catalog must be available
as `intake.cat.access_nri`.
- This class assumes CICE daily output is labelled as a daily mean associated with
the *following* day (a common convention); `write_ACCESS_to_monthly_zarr()` shifts
timestamps back by 1 day to align with the intended averaging period.
- Dask is optional, but recommended for performance. If `self.client` is not set,
operations may still work but can be slower and may run locally.
Expected Attributes
-------------------
logger : logging.Logger
Logger used for status and warnings.
client : dask.distributed.Client or None
Dask client (optional). If None, a warning is emitted at initialisation.
AOM2_dict : dict
Configuration dictionary expected to contain at least `experiment` (str) when
`experiment` is not passed explicitly to methods.
dt0_str, dtN_str : str
Default ISO date strings (e.g. "1993-01-01") used when method arguments are None.
cice_vars_reqd : list[str]
Required CICE variable names you want to extract from the loaded dataset.
cice_var_list : list[str]
Variable names used to query the catalog. This can match `cice_vars_reqd`, or be
a broader list (e.g., including coordinates/auxiliary vars).
D_zarr : str or pathlib.Path
Output directory where monthly Zarr groups are written.
Raises
------
ValueError
If `self.cice_vars_reqd` is not defined at initialisation.
"""
def __init__(self, **kwargs):
"""
Initialise the ACCESS-NRI CICE catalog helper.
Parameters
----------
**kwargs
Implementation-dependent configuration parameters. Typical usage is to pass
or set attributes such as `logger`, `client`, `AOM2_dict`, `dt0_str`, `dtN_str`,
`cice_vars_reqd`, `cice_var_list`, and `D_zarr`.
Notes
-----
- If no Dask client is provided (`self.client is None`), a warning is printed.
- `self.cice_vars_reqd` must be set before completion; otherwise a ValueError is raised.
Raises
------
ValueError
If `self.cice_vars_reqd` is None.
"""
if self.client is None:
print("Warning: No Dask client was provided to SeaIceACCESS.")
if self.cice_vars_reqd is None:
raise ValueError("Missing required variable list: self.cice_vars_reqd")
[docs]
def list_access_cice_variables(self, experiment=None):
"""
List available daily CICE variables in the ACCESS-NRI Intake catalog.
This method queries the ACCESS-NRI catalog for the specified experiment and returns
the unique set of variable names found at daily frequency (frequency="1day").
Parameters
----------
experiment : str, optional
ACCESS experiment identifier (e.g., "access-om2-01-iaf").
If None, the method falls back to `self.AOM2_dict['experiment']`.
Returns
-------
list[str]
Sorted list of unique CICE variable names available for the experiment at
1-day frequency. Returns an empty list if no matching results are found.
Raises
------
ValueError
If `experiment` is not provided and `self.AOM2_dict['experiment']` is missing.
RuntimeError
If the ACCESS-NRI catalog is not available as `intake.cat.access_nri`.
Notes
-----
- The ACCESS-NRI Intake catalog must be installed and configured (i.e., the catalog
is discoverable under `intake.cat`).
- Catalog entries sometimes store variable names as a list per asset. This method
flattens those lists into a unique set.
"""
import intake
experiment = experiment or self.AOM2_dict.get('experiment', None)
if experiment is None:
raise ValueError("Experiment name must be provided or set in self.AOM2_dict['experiment'].")
if not hasattr(intake.cat, "access_nri"):
raise RuntimeError("ACCESS-NRI catalog not found in Intake. Run `intake catalog add access_nri ...` first.")
self.logger.info(f"Querying ACCESS-NRI catalog for available CICE variables in '{experiment}'...")
results = intake.cat.access_nri[experiment].search(frequency="1day")
if results is None or results.df.empty:
self.logger.warning(f"No CICE results found for experiment '{experiment}' at 1-day frequency.")
return []
# Flatten list of lists to extract individual variable names
var_lists = results.df["variable"].dropna().tolist()
flat_vars = {var for sublist in var_lists for var in (sublist if isinstance(sublist, list) else [sublist])}
var_list = sorted(flat_vars)
self.logger.info(f"Found {len(var_list)} unique variables.")
return var_list
[docs]
def load_ACCESS_OM_CICE(self, experiment=None, dt0_str=None, dtN_str=None):
"""
Load daily CICE output from the ACCESS-NRI Intake catalog as an xarray.Dataset.
The method searches the ACCESS-NRI catalog for a daily (frequency="1day") CICE dataset
matching `self.cice_var_list`, opens it via `to_dataset_dict()`, extracts the variables
listed in `self.cice_vars_reqd` (if present), and subsets the time dimension to the
requested window.
Parameters
----------
experiment : str, optional
ACCESS experiment identifier. If None, defaults to `self.AOM2_dict['experiment']`.
dt0_str : str, optional
Inclusive start date string (ISO-like), e.g. "1993-01-01".
If None, defaults to `self.dt0_str`.
dtN_str : str, optional
Inclusive end date string (ISO-like), e.g. "1993-12-31".
If None, defaults to `self.dtN_str`.
Returns
-------
xr.Dataset
Dataset containing the extracted variables on the time subset. The dataset is
merged from individual variables to ensure consistent coordinate labelling.
Raises
------
RuntimeError
If the ACCESS-NRI catalog is not available as `intake.cat.access_nri`.
KeyError
If `experiment` is None and `self.AOM2_dict['experiment']` is missing.
ValueError
If the catalog search returns zero or multiple daily datasets (expects exactly one).
Warns
-----
UserWarning (via logger)
If any of `self.cice_vars_reqd` are missing from the loaded dataset.
Notes
-----
- Uses `use_cftime=True` for robust non-standard calendars.
- Adds explicit `nj` and `ni` coordinates (integer labels) to each extracted variable
to avoid merge failures or downstream ambiguity when datasets lack labelled indices.
- The catalog search uses `self.cice_var_list`; variables actually returned are then
filtered to `self.cice_vars_reqd` (if present).
"""
import intake
import numpy as np
import xarray as xr
experiment = experiment if experiment is not None else self.AOM2_dict['experiment']
dt0_str = dt0_str if dt0_str is not None else self.dt0_str
dtN_str = dtN_str if dtN_str is not None else self.dtN_str
t_slice = slice(dt0_str, dtN_str)
if not hasattr(intake.cat, "access_nri"):
raise RuntimeError("ACCESS-NRI catalog not found in Intake. Did you add it with `intake catalog add`?")
result = intake.cat.access_nri[experiment].search(variable=self.cice_var_list, frequency="1day")
ds_dict = result.to_dataset_dict(xarray_open_kwargs={"use_cftime": True,
"decode_coords": True,
"decode_timedelta": False})
if len(ds_dict) != 1:
raise ValueError(f"Expected one daily dataset, got {len(ds_dict)}.\n"
f"Available keys: {list(ds_dict.keys())}")
ds = list(ds_dict.values())[0]
missing_vars = [var for var in self.cice_vars_reqd if var not in ds]
if missing_vars:
self.logger.warning(f"Missing variables in dataset: {missing_vars}")
extracted_vars = [var for var in self.cice_vars_reqd if var in ds]
self.logger.info(f"Extracting variables from ESM datastore: {extracted_vars}")
DS = {var: ds[var].sel(time=t_slice) for var in extracted_vars}
#DS = {var: ds[var].sel(time=t_slice) for var in self.cice_vars_reqd if var in ds}
# Add nj, ni coordinate labels so merge works cleanly
for var, da in DS.items():
DS[var] = da.assign_coords(nj=("nj", np.arange(da.shape[1])),
ni=("ni", np.arange(da.shape[2])))
return xr.merge(DS.values())
[docs]
def write_ACCESS_to_monthly_zarr(self, DS, overwrite=False):
"""
Write a daily ACCESS-CICE Dataset into monthly Zarr groups.
The input dataset is assumed to have a daily time axis. The method:
1) Creates/ensures the output directory exists (`self.D_zarr`).
2) Shifts timestamps back by one day to correct the common "daily mean labelled
by the following day" convention in some CICE outputs.
3) Splits the dataset into monthly subsets (YYYY-MM).
4) Writes each month to `iceh_YYYY-MM.zarr` under `self.D_zarr`.
Parameters
----------
DS : xr.Dataset
Daily CICE dataset with a `time` dimension. Expected to also have spatial
dimensions `nj` and `ni`.
overwrite : bool, default False
If True, existing monthly Zarr directories are overwritten. If False, existing
outputs are skipped.
Returns
-------
None
This method writes Zarr stores to disk and does not return a value.
Side Effects
------------
Writes monthly Zarr groups to:
<self.D_zarr>/iceh_YYYY-MM.zarr
Raises
------
OSError
If the output directory cannot be created.
Exception
Any exception raised by `xarray.Dataset.to_zarr()` will be logged and propagated
unless caught (current implementation logs errors per month).
Notes
-----
- Time shift: `time = time - 1 day`. If your upstream data is already correctly
labelled, remove or disable this adjustment.
- Chunking: this method applies a default chunking
`{"time": -1, "nj": 540, "ni": 1440}`. Adjust to match your grid shape and
storage/performance requirements.
"""
import datetime
import numpy as np
import pandas as pd
import xarray as xr
from pathlib import Path
D_out = Path(self.D_zarr)
D_out.mkdir(parents=True, exist_ok=True)
DS["time"] = xr.DataArray([t - datetime.timedelta(days=1) for t in DS["time"].values], dims="time", coords={"time": DS["time"].values})
self.logger.info(f"First timestamp now: {str(DS.time.values[0])}")
self.logger.info("Shifted all timestamps back by one day to correct for CICE daily average label.")
time_vals = pd.to_datetime([t.isoformat() for t in DS.time.values])
mo_strs = np.unique(time_vals.strftime("%Y-%m"))
for m_str in mo_strs:
ds_month = DS.sel(time=time_vals.strftime("%Y-%m") == m_str)
if ds_month.time.size == 0:
self.logger.warning(f"No data found for {m_str}")
continue
P_zarr = Path(D_out,f"iceh_{m_str}.zarr")
if P_zarr.exists() and not overwrite:
self.logger.info(f"Skipping {P_zarr} (already exists)")
continue
self.logger.info(f"Writing {m_str} with {ds_month.time.size} days → {P_zarr}")
ds_month = ds_month.chunk({"time": -1, "nj": 540, "ni": 1440})
try:
ds_month.to_zarr(P_zarr, mode="w", consolidated=True)
except Exception as e:
self.logger.error(f"Failed to write {P_zarr}: {e}")