import os, shutil
import xarray as xr
import numpy as np
import pandas as pd
from pathlib import Path
from datetime import datetime
[docs]
class SeaIceIcebergs:
def __init__(self, **kwargs):
return
######################################################################
# small helpers
######################################################################
@staticmethod
def _first_existing_column(columns, candidates):
for c in candidates:
if c in columns:
return c
return None
def _grid_da(self, values, name, dtype=None, attrs=None):
arr = np.asarray(values, dtype=dtype) if dtype is not None else np.asarray(values)
da = xr.DataArray(
data=arr,
dims=self.spatial_dims,
coords={
"lat": (self.spatial_dims, self.G_t["lat"].values),
"lon": (self.spatial_dims, self.G_t["lon"].values),
},
name=name,
)
if attrs:
da.attrs.update(attrs)
return da
def _dedup_gi_features(self, df, uid_col="uid"):
"""
Collapse repeated observations of the same iceberg UID into one feature row.
"""
if uid_col not in df.columns:
return df.reset_index(drop=True)
uid = df[uid_col]
if uid.isna().all():
return df.reset_index(drop=True)
agg = {}
for c in ["x_m", "y_m", "lon", "lat"]:
if c in df.columns:
agg[c] = "mean"
for c in ["area_m2", "perim_m", "bed_depth"]:
if c in df.columns:
agg[c] = "median"
for c in ["timestamp", "date_range", "orbit", "swath_mode",
"source_type", "geometry_type", "source_path"]:
if c in df.columns:
agg[c] = "first"
out = df.groupby(uid_col, as_index=False).agg(agg)
out["obs_count"] = (
df.groupby(uid_col).size().reindex(out[uid_col]).to_numpy(dtype=np.int32)
)
return out.reset_index(drop=True)
######################################################################
# canonical source loader
######################################################################
[docs]
def load_grounded_iceberg_source(
self,
GI_P_raw=None,
GI_layer="grounded_icebergs",
csv_lon_col=None,
csv_lat_col=None,
dedup_uid=True,
normalise_lon_to="0-360",
use_attr_area=True,
):
"""
Read CSV or GPKG grounded iceberg input and return a canonical feature table.
Returns columns as available:
uid, lon, lat, x_m, y_m, area_m2, perim_m, bed_depth, obs_count,
source_type, geometry_type, source_path
"""
GI_P_raw = GI_P_raw if GI_P_raw is not None else self.GI_dict["P_raw"]
P_raw = Path(GI_P_raw)
ext = P_raw.suffix.lower()
if ext == ".csv":
raw = pd.read_csv(P_raw)
if csv_lon_col is None:
csv_lon_col = self._first_existing_column(
raw.columns,
["X", "x", "lon", "Lon", "LON", "longitude", "LONGITUDE"],
)
if csv_lat_col is None:
csv_lat_col = self._first_existing_column(
raw.columns,
["Y", "y", "lat", "Lat", "LAT", "latitude", "LATITUDE"],
)
if csv_lon_col is None or csv_lat_col is None:
raise ValueError(
f"Could not identify lon/lat columns in CSV: {P_raw}\n"
f"Available columns: {list(raw.columns)}"
)
df = self.read_grounded_iceberg_csv(
P_csv=P_raw,
lon_col=csv_lon_col,
lat_col=csv_lat_col,
normalise_lon_to=normalise_lon_to,
).copy()
if "uid" not in df.columns:
df["uid"] = np.arange(len(df), dtype=np.int64)
df["x_m"] = np.nan
df["y_m"] = np.nan
if "area_m2" not in df.columns:
df["area_m2"] = np.nan
if "perim_m" not in df.columns:
df["perim_m"] = np.nan
if "bed_depth" not in df.columns:
df["bed_depth"] = np.nan
df["obs_count"] = 1
df["source_type"] = "csv_point"
df["geometry_type"] = "Point"
df["source_path"] = str(P_raw)
elif ext == ".gpkg":
df = self.read_grounded_iceberg_gpkg(
P_gpkg=P_raw,
layer=GI_layer,
dedup_uid=False, # dedup ourselves so we can preserve obs_count
use_attr_area=use_attr_area,
normalise_lon_to=normalise_lon_to,
).copy()
df["obs_count"] = 1
df["source_type"] = "gpkg_polygon"
df["geometry_type"] = "Polygon"
df["source_path"] = str(P_raw)
if dedup_uid:
df = self._dedup_gi_features(df, uid_col="uid")
else:
raise ValueError(
f"Unsupported grounded iceberg input: {P_raw}\n"
f"Expected .csv or .gpkg"
)
self.GI_features = df.reset_index(drop=True)
self.logger.info(
f"Loaded grounded iceberg source: {P_raw} "
f"(n_features={len(self.GI_features)})"
)
return self.GI_features
######################################################################
# mapping to model grid
######################################################################
[docs]
def assign_grounded_icebergs_to_tgrid(
self,
GI_features=None,
proj_crs="EPSG:3031",
max_dist_km=None,
):
"""
Map canonical GI features to nearest T-grid cell.
Notes
-----
map_xy_to_tgrid / map_points_to_tgrid return:
ii -> ni index
jj -> nj index
"""
if GI_features is None:
if not hasattr(self, "GI_features"):
raise ValueError("No GI_features found. Run load_grounded_iceberg_source first.")
GI_features = self.GI_features
self.load_cice_grid(slice_hem=False, build_faces=True)
df = GI_features.copy()
has_xy = (
("x_m" in df.columns) and ("y_m" in df.columns)
and np.isfinite(df["x_m"]).any() and np.isfinite(df["y_m"]).any()
)
if has_xy:
ni_i, nj_i, dist_m, valid = self.map_xy_to_tgrid(
x_m=df["x_m"].to_numpy(dtype=float),
y_m=df["y_m"].to_numpy(dtype=float),
proj_crs=proj_crs,
max_dist_km=max_dist_km,
)
else:
ni_i, nj_i, dist_m, valid = self.map_points_to_tgrid(
lon_deg=df["lon"].to_numpy(dtype=float),
lat_deg=df["lat"].to_numpy(dtype=float),
proj_crs=proj_crs,
max_dist_km=max_dist_km,
)
df["ni_i"] = ni_i.astype(np.int32)
df["nj_i"] = nj_i.astype(np.int32)
df["map_dist_m"] = dist_m.astype(np.float64)
df["map_valid"] = valid.astype(bool)
df = df[df["map_valid"]].reset_index(drop=True)
self.GI_assignments = df
self.GI_clean = df.copy() # backward compatibility
self.logger.info(
f"Mapped grounded icebergs to T-grid: "
f"n_valid={len(df)}"
)
return self.GI_assignments
######################################################################
# cellwise aggregation
######################################################################
[docs]
def aggregate_grounded_icebergs_on_tgrid(self, GI_assignments=None):
"""
Aggregate mapped features to model-grid cell diagnostics.
"""
if GI_assignments is None:
if not hasattr(self, "GI_assignments"):
raise ValueError("No GI_assignments found. Run assign_grounded_icebergs_to_tgrid first.")
GI_assignments = self.GI_assignments
self.load_cice_grid(slice_hem=False, build_faces=True)
df = GI_assignments.copy()
nj, ni = self.G_t["lon"].shape
if "obs_count" not in df.columns:
df["obs_count"] = 1
if "area_m2" not in df.columns:
df["area_m2"] = np.nan
if "perim_m" not in df.columns:
df["perim_m"] = np.nan
if "bed_depth" not in df.columns:
df["bed_depth"] = np.nan
grouped = (
df.groupby(["nj_i", "ni_i"], sort=False)
.agg(
GI_count=("nj_i", "size"),
GI_obs_count=("obs_count", "sum"),
GI_unique_uid=("uid", "nunique") if "uid" in df.columns else ("nj_i", "size"),
GI_area_sum_m2=("area_m2", "sum"),
GI_area_max_m2=("area_m2", "max"),
GI_perim_sum_m=("perim_m", "sum"),
GI_bed_depth_mean=("bed_depth", "mean"),
GI_map_dist_mean_m=("map_dist_m", "mean"),
)
.reset_index()
)
def zeros_int():
return np.zeros((nj, ni), dtype=np.int32)
def zeros_float():
return np.full((nj, ni), np.nan, dtype=np.float32)
GI_count = zeros_int()
GI_obs_count = zeros_int()
GI_unique_uid = zeros_int()
GI_area_sum_m2 = zeros_float()
GI_area_max_m2 = zeros_float()
GI_perim_sum_m = zeros_float()
GI_bed_depth_mean = zeros_float()
GI_map_dist_mean_m = zeros_float()
jj = grouped["nj_i"].to_numpy(dtype=np.int32)
ii = grouped["ni_i"].to_numpy(dtype=np.int32)
GI_count[jj, ii] = grouped["GI_count"].to_numpy(dtype=np.int32)
GI_obs_count[jj, ii] = grouped["GI_obs_count"].to_numpy(dtype=np.int32)
GI_unique_uid[jj, ii] = grouped["GI_unique_uid"].to_numpy(dtype=np.int32)
GI_area_sum_m2[jj, ii] = grouped["GI_area_sum_m2"].to_numpy(dtype=np.float32)
GI_area_max_m2[jj, ii] = grouped["GI_area_max_m2"].to_numpy(dtype=np.float32)
GI_perim_sum_m[jj, ii] = grouped["GI_perim_sum_m"].to_numpy(dtype=np.float32)
GI_bed_depth_mean[jj, ii] = grouped["GI_bed_depth_mean"].to_numpy(dtype=np.float32)
GI_map_dist_mean_m[jj, ii] = grouped["GI_map_dist_mean_m"].to_numpy(dtype=np.float32)
ds = xr.Dataset(
data_vars={
"GI_count": self._grid_da(GI_count, "GI_count", dtype=np.int32),
"GI_obs_count": self._grid_da(GI_obs_count, "GI_obs_count", dtype=np.int32),
"GI_unique_uid": self._grid_da(GI_unique_uid, "GI_unique_uid", dtype=np.int32),
"GI_area_sum_m2": self._grid_da(GI_area_sum_m2, "GI_area_sum_m2", dtype=np.float32),
"GI_area_max_m2": self._grid_da(GI_area_max_m2, "GI_area_max_m2", dtype=np.float32),
"GI_perim_sum_m": self._grid_da(GI_perim_sum_m, "GI_perim_sum_m", dtype=np.float32),
"GI_bed_depth_mean": self._grid_da(GI_bed_depth_mean, "GI_bed_depth_mean", dtype=np.float32),
"GI_map_dist_mean_m": self._grid_da(GI_map_dist_mean_m, "GI_map_dist_mean_m", dtype=np.float32),
}
)
ds["GI_occupied"] = (ds["GI_count"] >= 1).astype(np.int8)
ds["GI_occupied"].attrs["long_name"] = "binary occupied GI cell"
self.GI_cell_stats = ds
self.GI_cnts = ds["GI_count"].rename("GI_counts") # backward compatibility
self.logger.info(
f"Aggregated grounded icebergs to grid: "
f"occupied_cells={int((ds['GI_count'].values > 0).sum())}"
)
return self.GI_cell_stats
######################################################################
# old public entrypoint, now upgraded
######################################################################
[docs]
def process_raw_grounded_iceberg_database(
self,
GI_P_raw=None,
GI_layer="grounded_icebergs",
csv_lon_col=None,
csv_lat_col=None,
dedup_uid=True,
normalise_lon_to="0-360",
proj_crs="EPSG:3031",
max_dist_km=None,
use_attr_area=True,
):
"""
Load, map, and aggregate grounded iceberg source data.
Backward-compatible public entry point.
"""
self.load_grounded_iceberg_source(
GI_P_raw=GI_P_raw,
GI_layer=GI_layer,
csv_lon_col=csv_lon_col,
csv_lat_col=csv_lat_col,
dedup_uid=dedup_uid,
normalise_lon_to=normalise_lon_to,
use_attr_area=use_attr_area,
)
self.assign_grounded_icebergs_to_tgrid(
proj_crs=proj_crs,
max_dist_km=max_dist_km,
)
self.aggregate_grounded_icebergs_on_tgrid()
return self.GI_cell_stats
######################################################################
# thinning
######################################################################
def _is_isolated(self, y, x, mask):
ny, nx = mask.shape
adjacent_coords = [
(y-1, x-1), (y-1, x), (y-1, x+1),
(y, x-1), (y, x+1),
(y+1, x-1), (y+1, x), (y+1, x+1),
]
for adj_y, adj_x in adjacent_coords:
if 0 <= adj_y < ny and 0 <= adj_x < nx:
if mask[adj_y, adj_x]:
return False
return True
def _thin_mask(
self,
mask,
counts,
p_min=0.2,
p_max=0.8,
scaling_exponent=2.0,
random_state=0,
protect_isolated=True,
):
"""
Legacy probabilistic thinning, now reproducible via random_state.
"""
rng = np.random.default_rng(random_state)
ny, nx = mask.shape
thinned_mask = np.zeros_like(mask, dtype=bool)
count_min = np.nanmin(counts)
count_max = np.nanmax(counts)
if count_max > count_min:
norm_counts = (counts - count_min) / (count_max - count_min)
norm_counts = norm_counts ** scaling_exponent
else:
norm_counts = np.ones_like(counts, dtype=float)
retain_probs = p_min + norm_counts * (p_max - p_min)
retained_count = 0
total_count = int(np.count_nonzero(mask))
for y in range(ny):
for x in range(nx):
if not mask[y, x]:
continue
keep = False
if protect_isolated and self._is_isolated(y, x, mask):
keep = True
elif rng.random() < retain_probs[y, x]:
keep = True
if keep:
thinned_mask[y, x] = True
retained_count += 1
GI_thinning_factor = 1.0 - (retained_count / total_count) if total_count > 0 else 0.0
return thinned_mask, GI_thinning_factor
def _ranked_thin_mask(
self,
occupied,
score,
keep_fraction=0.5,
protect_isolated=True,
):
"""
Deterministic thinning by retaining the highest-scoring occupied cells.
"""
occupied = np.asarray(occupied, dtype=bool)
score = np.asarray(score, dtype=float)
total = int(np.count_nonzero(occupied))
if total == 0:
return np.zeros_like(occupied, dtype=bool), 0.0
keep_fraction = float(np.clip(keep_fraction, 0.0, 1.0))
n_keep_target = int(np.ceil(keep_fraction * total))
keep = np.zeros_like(occupied, dtype=bool)
if protect_isolated:
ys, xs = np.where(occupied)
for y, x in zip(ys, xs):
if self._is_isolated(y, x, occupied):
keep[y, x] = True
remaining_need = max(n_keep_target - int(keep.sum()), 0)
eligible = occupied & (~keep)
if remaining_need > 0 and np.any(eligible):
flat = np.flatnonzero(eligible)
flat_scores = score.ravel()[flat]
flat_scores = np.where(np.isfinite(flat_scores), flat_scores, -np.inf)
order = flat[np.argsort(flat_scores)[::-1]]
chosen = order[:remaining_need]
keep.ravel()[chosen] = True
thinning_factor = 1.0 - (int(keep.sum()) / total)
return keep, thinning_factor
[docs]
def thin_grounded_icebergs(
self,
strategy="legacy_count_prob",
keep_fraction=None,
rank_var="GI_area_sum_m2",
p_min=0.1,
p_max=0.9,
scaling_exponent=1.5,
random_state=0,
protect_isolated=True,
):
"""
Thin occupied GI cells using either the legacy stochastic method or a new
deterministic ranked method.
"""
if not hasattr(self, "GI_cell_stats"):
raise ValueError("GI_cell_stats not found. Run process_raw_grounded_iceberg_database first.")
counts = self.GI_cell_stats["GI_count"].values
occupied = counts >= 1
if strategy == "legacy_count_prob":
thin_mask, thin_fact = self._thin_mask(
mask=occupied,
counts=counts,
p_min=p_min,
p_max=p_max,
scaling_exponent=scaling_exponent,
random_state=random_state,
protect_isolated=protect_isolated,
)
score_used = counts.astype(float)
elif strategy in {"ranked_area", "ranked_count", "ranked"}:
if keep_fraction is None:
keep_fraction = 0.5
if strategy == "ranked_count":
score_name = "GI_count"
elif strategy == "ranked_area":
score_name = "GI_area_sum_m2"
else:
score_name = rank_var
if score_name not in self.GI_cell_stats:
raise ValueError(
f"Requested rank_var='{score_name}' not found in GI_cell_stats. "
f"Available: {list(self.GI_cell_stats.data_vars)}"
)
score_used = self.GI_cell_stats[score_name].values
thin_mask, thin_fact = self._ranked_thin_mask(
occupied=occupied,
score=score_used,
keep_fraction=keep_fraction,
protect_isolated=protect_isolated,
)
else:
raise ValueError(
f"Unknown thinning strategy '{strategy}'. "
f"Supported: legacy_count_prob, ranked_area, ranked_count, ranked"
)
self.GI_keep_mask = thin_mask
self.GI_thin_mask = thin_mask
self.GI_thin_fact = thin_fact
self.GI_thin_strategy = strategy
self.GI_keep_score = self._grid_da(score_used, "GI_keep_score", dtype=np.float32)
return thin_mask, thin_fact
######################################################################
# modified landmask
######################################################################
[docs]
def modify_landmask_with_grounded_icebergs(
self,
GI_P_raw=None,
GI_layer="grounded_icebergs",
csv_lon_col=None,
csv_lat_col=None,
dedup_uid=True,
normalise_lon_to="0-360",
proj_crs="EPSG:3031",
max_dist_km=None,
use_attr_area=True,
strategy="legacy_count_prob",
keep_fraction=None,
rank_var="GI_area_sum_m2",
p_min=0.1,
p_max=0.9,
scaling_exponent=1.5,
random_state=0,
protect_isolated=True,
):
"""
Full GI-to-KMT workflow.
This remains backward-compatible with the old usage, but now supports:
- CSV or GPKG input
- legacy stochastic thinning
- deterministic ranked thinning
"""
if (GI_P_raw is not None) or (not hasattr(self, "GI_cell_stats")):
self.process_raw_grounded_iceberg_database(
GI_P_raw=GI_P_raw,
GI_layer=GI_layer,
csv_lon_col=csv_lon_col,
csv_lat_col=csv_lat_col,
dedup_uid=dedup_uid,
normalise_lon_to=normalise_lon_to,
proj_crs=proj_crs,
max_dist_km=max_dist_km,
use_attr_area=use_attr_area,
)
thin_mask, thin_fact = self.thin_grounded_icebergs(
strategy=strategy,
keep_fraction=keep_fraction,
rank_var=rank_var,
p_min=p_min,
p_max=p_max,
scaling_exponent=scaling_exponent,
random_state=random_state,
protect_isolated=protect_isolated,
)
counts = self.GI_cell_stats["GI_count"].values
GI_thinned = counts * thin_mask
self.GI_thin_da = self._grid_da(
GI_thinned,
"GI_counts",
dtype=np.int32,
attrs={
"thinning_strategy": strategy,
"thinning_factor": float(thin_fact),
},
)
kmt = xr.open_dataset(self.P_KMT_org).kmt.values.copy()
thin_nj, thin_ni = np.where(thin_mask)
kmt[thin_nj, thin_ni] = 0
self.GI_KMT = self._grid_da(
kmt,
"kmt",
dtype=kmt.dtype,
attrs={
"modified_with_grounded_icebergs": 1,
"thinning_strategy": strategy,
"thinning_factor": float(thin_fact),
},
)
self.logger.info(
f"Modified landmask with grounded icebergs: "
f"strategy={strategy}, thinning_factor={thin_fact:.3f}"
)
return self.GI_KMT
[docs]
def compute_grounded_iceberg_cell_count(self, hemisphere_only=True, return_area=False):
"""
Count modified landmask cells introduced by grounded icebergs.
A grounded-iceberg cell is defined here as:
(kmt_org > 0) AND (kmt_mod == 0)
Parameters
----------
hemisphere_only : bool, default True
If True, apply the configured hemisphere slice before counting.
return_area : bool, default False
If True, also return the summed T-cell area (m^2) of those cells.
Returns
-------
int
Number of grounded-iceberg-modified T-grid cells.
or
tuple
(n_cells, area_m2) if return_area=True
"""
self.load_cice_grid(slice_hem=hemisphere_only, build_faces=False)
if not hasattr(self, "kmt_org"):
raise ValueError("Original landmask (kmt_org) not loaded.")
if not hasattr(self, "kmt_mod"):
raise ValueError("Modified landmask (kmt_mod) not loaded. Set use_gi=True and/or define P_KMT_mod.")
kmt_org = self.kmt_org["kmt_org"].values
kmt_mod = self.kmt_mod["kmt_mod"].values
gi_mask = (kmt_org > 0) & (kmt_mod == 0)
n_cells = int(np.count_nonzero(gi_mask))
if not return_area:
return n_cells
area_m2 = float(np.nansum(self.G_t["area"].values[gi_mask]))
return n_cells, area_m2
[docs]
def write_modified_landmask_and_counts_to_disk(self,
GI_thin_fact : float|int = None,
GI_version_str : str = None,
iteration : int = 0,
overwrite : bool = False):
"""
Save the thinned grounded iceberg counts and modified KMT landmask to NetCDF files.
File paths are generated from the config template using GI thinning factor and version.
"""
GI_thin_fact = GI_thin_fact if GI_thin_fact is not None else self.GI_thin_fact
GI_version_str = GI_version_str if GI_version_str is not None else self.GI_version_str
GI_thin_fact_str = f"{GI_thin_fact:.2f}".replace('.', 'p')
F_thin = self.GI_dict['GI_thin_fmt'].format(GI_thin=GI_thin_fact_str, version=GI_version_str, iteration=iteration)
F_KMT_mod = self.GI_dict['KMT_mod_fmt'].format(GI_thin=GI_thin_fact_str, version=GI_version_str, iteration=iteration)
self.GI_P_counts = os.path.join(self.GI_dict['D_GI_thin'], F_thin)
self.P_KMT_mod = os.path.join(self.GI_dict['D_GI_thin'], F_KMT_mod)
print(f"modified KMT : {self.GI_P_counts}")
print(f"thinned GI count : {self.P_KMT_mod}")
if os.path.exists(self.GI_P_counts) or os.path.exists(self.P_KMT_mod):
if not overwrite:
print(f"GI counts file and/or KMT file already exists:\n - {self.GI_P_counts}\n - {self.P_KMT_mod}\n"
"These files may have been used in model simulations. Set `overwrite=True` in the constructor to allow overwriting." )
else:
date_tag = datetime.now().strftime("%Y-%m-%d")
base_gi = os.path.splitext(self.GI_P_counts)[0]
base_kmt = os.path.splitext(self.P_KMT_mod)[0]
backup_gi = f"{base_gi}_{date_tag}.nc"
backup_kmt = f"{base_kmt}_{date_tag}.nc"
shutil.copy2(self.GI_P_counts, backup_gi)
shutil.copy2(self.P_KMT_mod, backup_kmt)
print("*** WARNING OVER-WRITING EXISTING FILES ***")
print(f"Backups created:\n - {backup_gi}\n - {backup_kmt}")
print("Saving thinned GI count and modified KMT files")
self.GI_thin_da.to_netcdf(self.GI_P_counts, mode='w')
self.GI_KMT.to_netcdf(self.P_KMT_mod, mode='w')
else:
print("Modified KMT and GI-count-file do not exist. Writing out the above files")
self.GI_thin_da.to_netcdf(self.GI_P_counts, mode='w')
self.GI_KMT.to_netcdf(self.P_KMT_mod, mode='w')
# import os, shutil
# import xarray as xr
# import numpy as np
# import pandas as pd
# import geopandas as gpd
# from scipy.spatial import cKDTree
# from pathlib import Path
# from datetime import datetime
# from scipy.spatial import cKDTree
# class SeaIceIcebergs:
# def __init__(self, **kwargs):
# return
# def load_GI_lon_lats(self):
# self.load_cice_grid(slice_hem=True)
# lon = self.G_t["lon"].reset_coords(drop=True)
# lat = self.G_t["lat"].reset_coords(drop=True)
# mask = self.G_GI["mask"].reset_coords(drop=True).astype(bool)
# return {"lon": lon.data[mask.data].ravel(),
# "lat": lat.data[mask.data].ravel()}
# def _region_lon_mask(self, lon2d, lon_min, lon_max):
# """
# Build a longitude mask that works for either:
# - non-wrapping bounds (lon_min <= lon_max)
# - seam-crossing bounds (lon_min > lon_max) e.g. 330..10 on a 0..360 grid
# """
# if lon_min <= lon_max:
# return (lon2d >= lon_min) & (lon2d <= lon_max)
# else: # crosses seam
# return (lon2d >= lon_min) | (lon2d <= lon_max)
# def _lonlat_to_unit_xyz(lon, lat):
# """
# Convert lon/lat in degrees to 3D unit-sphere Cartesian coordinates.
# This makes nearest-neighbour search more robust near the pole/dateline.
# """
# lon = np.asarray(lon, dtype=float)
# lat = np.asarray(lat, dtype=float)
# lon_rad = np.deg2rad(((lon + 180.0) % 360.0) - 180.0)
# lat_rad = np.deg2rad(lat)
# clat = np.cos(lat_rad)
# x = clat * np.cos(lon_rad)
# y = clat * np.sin(lon_rad)
# z = np.sin(lat_rad)
# return np.column_stack((x, y, z))
# def _first_existing_column(columns, candidates):
# for c in candidates:
# if c in columns:
# return c
# return None
# def _warn(logger, msg):
# if logger is not None:
# logger.warning(msg)
# else:
# print(f"WARNING: {msg}")
# def compute_grounded_iceberg_area(self, region=None, scale=1e6):
# """
# Returns:
# - dict of km^2 by region if region is provided
# - total GI area (m^2) if region is None (or km^2 if you divide by scale)
# """
# self.load_cice_grid(slice_hem=True)
# # IMPORTANT: use FULL-grid lon/lat for region tests (no NaNs)
# lon2d = self.G_t["lon"].values
# lat2d = self.G_t["lat"].values
# # GI area field already has NaNs outside GI in your loader
# area2d = self.G_GI["area"].values
# # IMPORTANT: convert mask back to boolean for indexing
# gi_mask = self.G_GI["mask"].values.astype(bool)
# # Logging (nan-safe)
# self.logger.info(f"GI-area_calc: grid lon min/max: {np.nanmin(lon2d):.2f} / {np.nanmax(lon2d):.2f}")
# self.logger.info(f"GI-area_calc: grid lat min/max: {np.nanmin(lat2d):.2f} / {np.nanmax(lat2d):.2f}")
# if region is None:
# # total GI area (m^2)
# return np.nansum(area2d[gi_mask])
# # Determine grid lon convention
# grid_is_0360 = (np.nanmin(lon2d) >= 0.0) and (np.nanmax(lon2d) > 180.0)
# area_dict = {}
# for reg_name, reg_cfg in region.items():
# lon_min, lon_max, lat_min, lat_max = reg_cfg["plot_region"]
# # Convert region bounds if grid is 0..360 but bounds are negative / -180..180 style
# if grid_is_0360:
# lon_min_c = self.normalise_longitudes(lon_min)
# lon_max_c = self.normalise_longitudes(lon_max)
# else:
# lon_min_c, lon_max_c = lon_min, lon_max
# lon_mask = self._region_lon_mask(lon2d, lon_min_c, lon_max_c)
# lat_mask = (lat2d >= lat_min) & (lat2d <= lat_max)
# region_mask = lon_mask & lat_mask
# # Sum GI areas within region
# area_km2 = np.nansum(area2d[gi_mask & region_mask]) / scale
# area_dict[reg_name] = area_km2
# self.logger.info(f"GI-area_calc: region {reg_name} GI area = {area_km2:0.2f} km^2")
# return area_dict
# def check_GI_coverage(self, da, varname="GI_counts", lon_name="lon", lat_name="lat"):
# """
# Assess grounded iceberg (GI) or landmask coverage across Antarctica.
# Parameters
# ----------
# da : xr.DataArray
# The GI or KMT data.
# varname : str
# Determines masking logic: 'GI_counts' uses >0, anything else uses >1.
# lon_name : str, default 'lon'
# Name of the longitude coordinate.
# lat_name : str, default 'lat'
# Name of the latitude coordinate.
# Returns
# -------
# report : dict
# Dictionary of summary statistics and Antarctic sector coverage.
# """
# report = {}
# lon = da[lon_name]
# lat = da[lat_name]
# lon_vals = lon.values
# lat_vals = lat.values
# # Handle longitude wrapping
# if lon_vals.max() <= 180:
# lon_wrapped = np.mod(lon_vals + 360, 360)
# if lon_vals.ndim == 1:
# da = da.assign_coords({lon_name: (da[lon_name].dims, lon_wrapped)})
# da = da.sortby(lon_name)
# elif lon_vals.ndim == 2:
# da = da.assign_coords({lon_name: (da[lon_name].dims, lon_wrapped)})
# self.logger.warning("Skipping sortby(): longitude is 2D.")
# else:
# raise ValueError("Longitude data must be 1D or 2D.")
# # Construct binary coverage mask
# if varname == "GI_counts":
# mask = da.values > 0
# else:
# mask = da.values > 1
# # Basic spatial stats
# report['lon_range'] = (float(np.nanmin(lon_vals)), float(np.nanmax(lon_vals)))
# report['lat_range'] = (float(np.nanmin(lat_vals)), float(np.nanmax(lat_vals)))
# report['n_valid_cells'] = int(np.count_nonzero(mask))
# report['n_total_cells'] = int(np.prod(mask.shape))
# report['coverage_fraction'] = report['n_valid_cells'] / report['n_total_cells']
# report['nonfinite_count'] = int(np.sum(~np.isfinite(da.values)))
# # Value stats
# report['mean_value'] = float(np.nanmean(da.values)) if np.any(mask) else float('nan')
# report['max_value'] = float(np.nanmax(da.values)) if np.any(mask) else float('nan')
# report['min_nonzero'] = float(np.nanmin(da.values[da.values > 0])) if np.any(da.values > 0) else float('nan')
# # Sector-based coverage check
# if lon_vals.ndim == 2 and lat_vals.ndim == 2:
# try:
# sector_edges = np.linspace(0, 360, 9) # 8 sectors
# presence_by_sector = []
# for i in range(len(sector_edges) - 1):
# lon_min, lon_max = sector_edges[i], sector_edges[i + 1]
# sector_mask = (
# (lon_vals >= lon_min) & (lon_vals < lon_max) &
# (lat_vals < -60) # Limit to Antarctic region
# )
# if np.any(sector_mask):
# sector_data = da.values[sector_mask]
# presence_by_sector.append(np.any(sector_data > 0))
# else:
# presence_by_sector.append(False)
# report['sector_gaps'] = [i for i, present in enumerate(presence_by_sector) if not present]
# report['sector_gap_count'] = len(report['sector_gaps'])
# except Exception as e:
# report['sector_check_failed'] = str(e)
# return report
# def load_existing_thinned(self):
# """
# Load previously saved thinned grounded iceberg data and modified KMT file.
# Sets the attributes `self.GI_thin_da`, `self.GI_thin_mask`, and `self.GI_KMT`
# if their respective NetCDF files exist.
# """
# if os.path.exists(self.P_GI_thin):
# self.GI_thin_da = xr.open_dataarray(self.P_GI_thin)
# self.logger.info(f"GROUNDED ICEBERG COUNTS PER GRID CELL REPORT {self.P_GI_thin}")
# self.logger.info(self.check_GI_coverage(self.GI_thin_da))
# self.GI_thin_mask = self.GI_thin_da.values > 0
# if os.path.exists(self.P_KMT_mod):
# self.GI_KMT = xr.open_dataarray(self.P_KMT_mod)
# self.logger.info(f"MODIFIED LANDMASK REPORT {self.P_KMT_mod}")
# self.logger.info(self.check_GI_coverage(self.GI_KMT, varname="kmt"))
# def _read_grounded_iceberg_points(self,
# P_raw,
# layer=None,
# x_col=None,
# y_col=None,
# assume_gpkg_crs="EPSG:3031",
# logger=None,
# ):
# """
# Read grounded iceberg point data from CSV or GPKG and return a pandas DataFrame
# with lon/lat stored in columns 'X' and 'Y'.
# Parameters
# ----------
# P_raw : str or Path
# Input CSV or GPKG path.
# layer : str, optional
# GeoPackage layer name. If None, geopandas will use its default behaviour.
# x_col, y_col : str, optional
# Explicit coordinate column names for CSV input.
# assume_gpkg_crs : str, default 'EPSG:4326'
# CRS to assign if a GPKG has no CRS metadata.
# logger : logging.Logger, optional
# Returns
# -------
# pandas.DataFrame
# DataFrame with at least columns ['X', 'Y'].
# """
# P_raw = Path(P_raw)
# suffix = P_raw.suffix.lower()
# if suffix == ".csv":
# df = pd.read_csv(P_raw)
# if x_col is None:
# x_col = _first_existing_column(
# df.columns,
# ["X", "x", "lon", "Lon", "LON", "longitude", "LONGITUDE"],
# )
# if y_col is None:
# y_col = _first_existing_column(
# df.columns,
# ["Y", "y", "lat", "Lat", "LAT", "latitude", "LATITUDE"],
# )
# if x_col is None or y_col is None:
# raise ValueError(
# f"Could not identify longitude/latitude columns in CSV: {P_raw}\n"
# f"Available columns: {list(df.columns)}\n"
# f"Pass x_col=... and y_col=... explicitly."
# )
# out = df.copy()
# out["X"] = pd.to_numeric(out[x_col], errors="coerce")
# out["Y"] = pd.to_numeric(out[y_col], errors="coerce")
# out = out.dropna(subset=["X", "Y"]).copy()
# return out
# elif suffix == ".gpkg":
# try:
# import geopandas as gpd
# except ImportError as e:
# raise ImportError(
# "Reading GeoPackage files requires geopandas. "
# "Install geopandas (plus pyogrio/fiona) in your environment."
# ) from e
# gdf = gpd.read_file(P_raw, layer=layer)
# if gdf.empty:
# raise ValueError(f"No features found in GeoPackage: {P_raw}")
# if gdf.geometry is None:
# raise ValueError(f"No geometry column found in GeoPackage: {P_raw}")
# gdf = gdf[gdf.geometry.notna()].copy()
# # Expand MultiPoint to individual Point features where relevant
# try:
# gdf = gdf.explode(index_parts=False, ignore_index=True)
# except TypeError:
# # compatibility with older geopandas
# gdf = gdf.explode()
# if gdf.crs is None:
# _warn(
# logger,
# f"{P_raw} has no CRS metadata; assuming {assume_gpkg_crs}.",
# )
# gdf = gdf.set_crs(assume_gpkg_crs)
# # Convert to lon/lat for consistency with model grid lookup
# gdf = gdf.to_crs("EPSG:4326")
# non_point = ~gdf.geometry.geom_type.isin(["Point"])
# if np.any(non_point):
# _warn(
# logger,
# f"{P_raw} contains non-Point geometries; using representative points.",
# )
# geom = gdf.geometry.copy()
# geom.loc[non_point] = gdf.loc[non_point].representative_point()
# gdf = gdf.set_geometry(geom)
# out = pd.DataFrame(gdf.drop(columns=[gdf.geometry.name]))
# out["X"] = gdf.geometry.x.to_numpy()
# out["Y"] = gdf.geometry.y.to_numpy()
# out = out.dropna(subset=["X", "Y"]).copy()
# return out
# else:
# raise ValueError(f"Unsupported grounded iceberg file type: {P_raw.suffix}. "
# f"Supported types are .csv and .gpkg")
# def process_raw_grounded_iceberg_database(
# self,
# GI_P_raw=None,
# GI_layer=None,
# x_col=None,
# y_col=None,
# assume_gpkg_crs="EPSG:4326",
# ):
# """
# Load and spatially map raw grounded iceberg locations from CSV or GPKG to
# the model grid.
# Uses a KD-tree nearest-neighbour search on 3D unit-sphere coordinates to
# assign iceberg coordinates to grid cells.
# Sets
# ----
# self.GI_clean : pandas.DataFrame
# Original iceberg table with attached model-grid indices (nj_i, ni_i)
# and standardised lon/lat columns (X, Y).
# self.GI_cnts : xarray.DataArray
# 2D grounded iceberg counts on the model grid.
# Parameters
# ----------
# GI_P_raw : str or Path, optional
# Path to raw grounded iceberg database (.csv or .gpkg). If omitted,
# uses self.GI_dict['P_raw'].
# GI_layer : str, optional
# GeoPackage layer name, only used for .gpkg input.
# x_col, y_col : str, optional
# Explicit lon/lat column names for CSV input.
# assume_gpkg_crs : str, default 'EPSG:4326'
# CRS to assume if a GPKG is missing CRS metadata.
# """
# self.load_cice_grid()
# GI_P_raw = GI_P_raw if GI_P_raw is not None else self.GI_dict["P_KJ"]
# logger = getattr(self, "logger", None)
# GI_clean = self._read_grounded_iceberg_points(
# P_raw=GI_P_raw,
# layer=GI_layer,
# x_col=x_col,
# y_col=y_col,
# assume_gpkg_crs=assume_gpkg_crs,
# logger=logger,
# )
# # Model grid lon/lat
# G_lon = np.asarray(self.G_t["lon"].values)
# G_lat = np.asarray(self.G_t["lat"].values)
# # KD-tree in 3D Cartesian coordinates on the unit sphere
# G_pts = _lonlat_to_unit_xyz(G_lon.ravel(), G_lat.ravel())
# tree = cKDTree(G_pts)
# GI_locs = _lonlat_to_unit_xyz(
# GI_clean["X"].to_numpy(),
# GI_clean["Y"].to_numpy(),
# )
# _, nrst_i = tree.query(GI_locs)
# nj, ni = G_lon.shape
# nj_i, ni_i = np.unravel_index(nrst_i, (nj, ni))
# GI_clean = GI_clean.copy()
# GI_clean["nj_i"] = nj_i.astype(np.int32)
# GI_clean["ni_i"] = ni_i.astype(np.int32)
# self.GI_clean = GI_clean
# GI_cnts = np.zeros((nj, ni), dtype=np.int32)
# GI_cnts_df = (
# self.GI_clean
# .groupby(["nj_i", "ni_i"], sort=False)
# .size()
# .reset_index(name="count")
# )
# GI_cnts[
# GI_cnts_df["nj_i"].to_numpy(),
# GI_cnts_df["ni_i"].to_numpy()
# ] = GI_cnts_df["count"].to_numpy()
# GI_cnts_da = xr.DataArray(
# GI_cnts,
# dims=self.spatial_dims,
# name="GI_counts",
# )
# GI_cnts_da.attrs["source_file"] = str(GI_P_raw)
# if GI_layer is not None:
# GI_cnts_da.attrs["source_layer"] = GI_layer
# self.GI_cnts = GI_cnts_da
# def modify_landmask_with_grounded_icebergs(self, p_min=0.1, p_max=0.9, scaling_exponent=1.5):
# """
# Apply probabilistic thinning to the grounded iceberg mask based on iceberg counts.
# Parameters:
# -----------
# p_min : float
# Minimum probability of retaining a cell with low counts.
# p_max : float
# Maximum probability of retaining a cell with high counts.
# scaling_exponent : float
# Controls steepness of probability scaling with count.
# Sets:
# ------
# self.GI_thin_da : xarray.DataArray
# self.GI_thin_mask : ndarray (boolean)
# self.GI_thin_fact : float (fraction thinned out)
# """
# kmt = xr.open_dataset(self.P_KMT_org).kmt.values
# mask = self.GI_cnts.values >= 1
# counts = self.GI_cnts.values
# thin_mask, thin_fact = self._thin_mask(mask, counts, p_min, p_max, scaling_exponent)
# GI_thinned = counts * thin_mask
# self.GI_thin_da = xr.DataArray(data = GI_thinned,
# dims = self.spatial_dims,
# coords = {'lat': (self.spatial_dims, self.G_t['lat'].values),
# 'lon': (self.spatial_dims, self.G_t['lon'].values)},
# name = 'GI_counts')
# self.GI_thin_mask = thin_mask
# self.GI_thin_fact = thin_fact
# thin_nj, thin_ni = np.where(thin_mask)
# kmt[thin_nj, thin_ni] = 0
# self.GI_KMT = xr.DataArray(data = kmt,
# dims = self.spatial_dims,
# coords = {'lat': (self.spatial_dims, self.G_t['lat'].values),
# 'lon': (self.spatial_dims, self.G_t['lon'].values)},
# name = 'kmt' )
# def write_modified_landmask_and_counts_to_disk(self,
# GI_thin_fact : float|int = None,
# GI_version_str : str = None,
# overwrite : bool = None):
# """
# Save the thinned grounded iceberg counts and modified KMT landmask to NetCDF files.
# File paths are generated from the config template using GI thinning factor and version.
# """
# GI_thin_fact = GI_thin_fact if GI_thin_fact is not None else self.GI_thin_fact
# GI_version_str = GI_version_str if GI_version_str is not None else self.GI_version_str
# overwrite = overwrite if overwrite is not None else self.overwrite
# GI_thin_fact_str = f"{GI_thin_fact:.2f}".replace('.', 'p')
# F_thin = self.GI_dict['GI_thin_fmt'].format(GI_thin_fact=GI_thin_fact_str, version=GI_version_str)
# F_KMT_mod = self.GI_dict['KMT_mod_fmt'].format(GI_thin_fact=GI_thin_fact_str, version=GI_version_str)
# self.GI_P_counts = os.path.join(self.GI_dict['D_GI_thin'], F_thin)
# self.P_KMT_mod = os.path.join(self.GI_dict['D_GI_thin'], F_KMT_mod)
# print(f"modified KMT : {self.GI_P_counts}")
# print(f"thinned GI count : {self.P_KMT_mod}")
# if os.path.exists(self.GI_P_counts) or os.path.exists(self.P_KMT_mod):
# if not overwrite:
# print(f"GI counts file and/or KMT file already exists:\n - {self.GI_P_counts}\n - {self.P_KMT_mod}\n"
# "These files may have been used in model simulations. Set `overwrite=True` in the constructor to allow overwriting." )
# else:
# date_tag = datetime.now().strftime("%Y-%m-%d")
# base_gi = os.path.splitext(self.GI_P_counts)[0]
# base_kmt = os.path.splitext(self.P_KMT_mod)[0]
# backup_gi = f"{base_gi}_{date_tag}.nc"
# backup_kmt = f"{base_kmt}_{date_tag}.nc"
# shutil.copy2(self.GI_P_counts, backup_gi)
# shutil.copy2(self.P_KMT_mod, backup_kmt)
# print("*** WARNING OVER-WRITING EXISTING FILES ***")
# print(f"Backups created:\n - {backup_gi}\n - {backup_kmt}")
# print("Saving thinned GI count and modified KMT files")
# self.GI_thin_da.to_netcdf(self.GI_P_counts, mode='w')
# self.GI_KMT.to_netcdf(self.P_KMT_mod, mode='w')
# else:
# print("Modified KMT and GI-count-file do not exist. Writing out the above files")
# self.GI_thin_da.to_netcdf(self.GI_P_counts, mode='w')
# self.GI_KMT.to_netcdf(self.P_KMT_mod, mode='w')
# def _is_isolated(self, y, x, mask):
# """
# Determine if a given grid cell is isolated (no grounded icebergs in 8-connected neighbors).
# Parameters:
# -----------
# y, x : int
# Grid coordinates to test.
# mask : ndarray
# Boolean mask where True = grounded iceberg.
# Returns:
# --------
# bool
# True if isolated, False otherwise.
# """
# ny, nx = mask.shape
# adjacent_coords = [(y-1, x-1), (y-1, x) , (y-1, x+1),
# (y, x-1), (y, x+1),
# (y+1, x-1), (y+1, x) , (y+1, x+1) ]
# for adj_y, adj_x in adjacent_coords:
# if 0 <= adj_y < ny and 0 <= adj_x < nx:
# if mask[adj_y, adj_x]:
# return False
# return True
# def _thin_mask(self, mask, counts, p_min=0.2, p_max=0.8, scaling_exponent=2.0):
# """
# Perform probabilistic thinning of grounded iceberg cells based on their counts.
# Parameters:
# -----------
# mask : ndarray
# Boolean array of original grounded iceberg presence.
# counts : ndarray
# Corresponding iceberg counts.
# p_min : float
# Minimum probability of retaining a cell.
# p_max : float
# Maximum probability of retaining a cell.
# scaling_exponent : float
# Controls how sharply retention probability scales with count.
# Returns:
# --------
# thinned_mask : ndarray (bool)
# Mask after thinning.
# GI_thinning_factor : float
# Fraction of cells that were removed.
# """
# ny, nx = mask.shape
# thinned_mask = np.zeros_like(mask, dtype=bool)
# count_min, count_max = np.nanmin(counts), np.nanmax(counts)
# if count_max > count_min:
# norm_counts = (counts - count_min) / (count_max - count_min)
# norm_counts = norm_counts ** scaling_exponent
# else:
# norm_counts = np.ones_like(counts)
# retain_probs = p_min + norm_counts * (p_max - p_min)
# retained_count = 0
# total_count = np.count_nonzero(mask)
# for y in range(ny):
# for x in range(nx):
# if mask[y, x]:
# retain_prob = retain_probs[y, x]
# if self._is_isolated(y, x, mask) or np.random.rand() < retain_prob:
# thinned_mask[y, x] = True
# retained_count += 1
# GI_thinning_factor = 1 - (retained_count / total_count) if total_count > 0 else 0
# return thinned_mask, GI_thinning_factor
# def regrid_gi_to_fip_bool(self, gi_mask_da: xr.DataArray, gi_lon2d: xr.DataArray, gi_lat2d: xr.DataArray, fip_lon2d: xr.DataArray, fip_lat2d: xr.DataArray,
# dilate_pixels: int = 1): # set 0 to disable adjacency expansion
# """
# Regrids a boolean GI mask from the model grid onto the FIP grid via nearest-neighbour
# on great-circle distance (xesmf 'nearest_s2d'). Returns a boolean mask on the FIP grid.
# Optionally dilates by N pixels on the target grid to include coast-adjacent cells.
# """
# from scipy.ndimage import binary_dilation
# import xesmf as xe
# # xESMF expects float fields; cast True->1, False->0
# gi_float = gi_mask_da.astype(np.float32)
# # Build source/target 'grid' dictionaries from 2D lon/lat
# src = {'lon': gi_lon2d.values, 'lat': gi_lat2d.values}
# dst = {'lon': fip_lon2d.values, 'lat': fip_lat2d.values}
# # Regridder (nearest source-to-destination). reuse_weights=True if you call repeatedly.
# regridder = xe.Regridder(src, dst, method='nearest_s2d', locstream_in=False, locstream_out=False)
# gi_on_fip_float = regridder(gi_float.values) # ndarray on target shape
# gi_on_fip = xr.DataArray(gi_on_fip_float > 0.5, coords=fip_lon2d.coords, dims=fip_lon2d.dims)
# # Optional: 1-pixel dilation to catch coast-adjacent cells next to GI
# if dilate_pixels and dilate_pixels > 0:
# # ndimage expects numpy array; preserve NaNs as False for mask ops
# mask_np = np.asarray(gi_on_fip.fillna(False).values, dtype=bool)
# structure = np.ones((1 + 2*dilate_pixels, 1 + 2*dilate_pixels), dtype=bool)
# mask_np = binary_dilation(mask_np, structure=structure)
# gi_on_fip = xr.DataArray(mask_np, coords=fip_lon2d.coords, dims=fip_lon2d.dims)
# gi_on_fip.name = "GI_mask_on_FIP"
# gi_on_fip.attrs.update(dict(long_name="Grounded iceberg mask (regridded to FIP)", comment="nearest_s2d + optional dilation"))
# return gi_on_fip