Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
194 changes: 194 additions & 0 deletions covjsonkit/decoder/BoundingBox.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import logging

import numpy as np

try:
Expand All @@ -11,6 +13,16 @@
from ..encoder.encoder import sort_step_values
from .decoder import Decoder

logger = logging.getLogger(__name__)

# Default grid metadata for ECMWF oper data (TCo1279 → O1280 reduced Gaussian).
# Used when mars:grid is absent from the CoverageJSON. Will be replaced once
# polytope exposes real grid information.
_OPER_GRID_DEFAULTS = {
"gridType": "reduced_gg",
"N": 1280,
}


class BoundingBox(Decoder):
def __init__(self, covjson):
Expand Down Expand Up @@ -216,3 +228,185 @@ def to_xarray(self):
ds.attrs["date"] = self.get_coordinates()["t"]["values"][0]

return ds

# ------------------------------------------------------------------
# GRIB export
# ------------------------------------------------------------------

def to_grib(self, output_path="output.grib", backend="auto"):
"""Convert the CoverageJSON to a multi-message GRIB file.

Produces one GRIB message per field — i.e. for each unique
combination of (parameter, step, number, date/time) found in
the coverage collection. This mirrors the output of a standard
MARS ``retrieve`` with an ``area`` keyword.

Args:
output_path: Filesystem path for the output GRIB file.
backend: GRIB encoding backend to use. One of ``"auto"``
(try pymars2grib first, fall back to eccodes),
``"mars2grib"``, or ``"eccodes"``.

Returns:
The *output_path* that was written.

Raises:
ImportError: If no suitable GRIB backend is available.
"""
from .grib_backends import get_backend

grib_backend = get_backend(backend)

messages = []

for coverage in self.coverages:
mars_metadata = coverage.get("mars:metadata", {})
grid_metadata = coverage.get("mars:grid", {})

mars_dict = self._build_mars_dict(mars_metadata, coverage)
misc_dict = self._build_misc_dict(grid_metadata, coverage)

# Compute sort order for N→S, W→E point ordering (MARS convention)
coords = coverage["domain"]["axes"]["composite"]["values"]
sort_idx = self._nswe_sort_indices(coords)

# One GRIB message per parameter
for param_shortname in self.parameters:
values = coverage["ranges"][param_shortname]["values"]

# Reorder values to N→S, W→E
sorted_values = [values[i] for i in sort_idx]

field_mars = {**mars_dict, "param": self._shortname_to_param_id(param_shortname)}

msg_bytes = grib_backend.encode_message(sorted_values, field_mars, misc_dict)
messages.append(msg_bytes)

with open(output_path, "wb") as fh:
for msg in messages:
fh.write(msg)

logger.info("Wrote %d GRIB message(s) to %s", len(messages), output_path)
return output_path

# ------------------------------------------------------------------
# Private helpers for to_grib
# ------------------------------------------------------------------

@staticmethod
def _build_mars_dict(mars_metadata, coverage):
"""Normalise ``mars:metadata`` into the dict expected by GRIB backends.

Handles the ``Forecast date`` ISO-8601 string that polytope-mars
puts on each coverage, splitting it into separate ``date`` and
``time`` keys.
"""
mars = {}

# Direct MARS keys
for key in ("class", "stream", "type", "expver", "levtype", "domain"):
if key in mars_metadata:
mars[key] = mars_metadata[key]

# Date / time
forecast_date = mars_metadata.get("Forecast date", "")
if forecast_date:
# "2025-06-23T00:00:00Z" → date=20250623, time=0000
dt_str = str(forecast_date).replace("Z", "")
if "T" in dt_str:
date_part, time_part = dt_str.split("T", 1)
else:
date_part = dt_str
time_part = "0000"
mars["date"] = date_part.replace("-", "")
mars["time"] = time_part.replace(":", "")[:4]
elif "date" in mars_metadata:
mars["date"] = str(mars_metadata["date"])
if "time" in mars_metadata:
mars["time"] = str(mars_metadata["time"])

# Step
if "step" in mars_metadata:
mars["step"] = str(mars_metadata["step"])

# Ensemble number
if "number" in mars_metadata:
mars["number"] = str(mars_metadata["number"])

# Level — use the z-coordinate from the first composite point
coords = coverage["domain"]["axes"]["composite"]["values"]
if coords and len(coords[0]) > 2:
level = coords[0][2]
if level != 0:
mars["levelist"] = str(int(level))

return mars

@staticmethod
def _build_misc_dict(grid_metadata, coverage):
"""Build the ``misc`` dict with grid geometry for the GRIB backend.

When ``mars:grid`` is not present on the coverage (i.e. polytope
does not yet expose grid info), oper defaults are applied so that
the pipeline can be tested end-to-end.
"""
misc = {}

if grid_metadata:
misc.update(grid_metadata)

# Apply oper defaults when grid info is missing
if "gridType" not in misc:
logger.warning(
"No gridType in mars:grid metadata — assuming ECMWF oper defaults "
"(reduced_gg N1280). This will be replaced when polytope provides "
"real grid information."
)
for key, default in _OPER_GRID_DEFAULTS.items():
misc.setdefault(key, default)

# Compute area from coordinates if not already present
if "area" not in misc:
coords = coverage["domain"]["axes"]["composite"]["values"]
if coords:
lats = [c[0] for c in coords]
lons = [c[1] for c in coords]
misc["area"] = [max(lats), min(lons), min(lats), max(lons)] # N/W/S/E

# Compute Nj (number of latitude rows) and pl (points per row)
# from the coordinates if not already provided.
if "Nj" not in misc or "pl" not in misc:
coords = coverage["domain"]["axes"]["composite"]["values"]
if coords:
from collections import Counter

lat_counts = Counter(round(c[0], 9) for c in coords)
# Sort latitudes N→S
sorted_lats = sorted(lat_counts.keys(), reverse=True)
misc.setdefault("Nj", len(sorted_lats))
misc.setdefault("pl", [lat_counts[lat] for lat in sorted_lats])

return misc

@staticmethod
def _nswe_sort_indices(coords):
"""Return indices that sort composite coords into N→S, W→E order.

MARS GRIB convention: first grid point is the north-west corner,
scanning west→east within each latitude row, rows ordered north→south.
"""
# coords is a list of [lat, lon, level] tuples
# Sort by latitude descending (N→S), then longitude ascending (W→E)
indexed = list(enumerate(coords))
indexed.sort(key=lambda item: (-item[1][0], item[1][1]))
return [i for i, _ in indexed]

def _shortname_to_param_id(self, shortname):
"""Map a parameter shortname (e.g. ``'2t'``) to its numeric param ID."""
from covjsonkit.param_db import get_param_id_from_db

try:
return str(get_param_id_from_db(shortname))
except (KeyError, Exception):
# If the shortname is not in the DB, pass it through as-is
return shortname
3 changes: 3 additions & 0 deletions covjsonkit/decoder/Circle.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,3 +158,6 @@ def to_xarray(self):
ds.attrs["date"] = self.get_coordinates()["t"]["values"][0]

return ds

def to_grib(self, output_path="output.grib", backend="auto"):
raise NotImplementedError("to_grib() is only supported for BoundingBox domains.")
3 changes: 3 additions & 0 deletions covjsonkit/decoder/Frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,3 +121,6 @@ def to_xarray(self):
ds.attrs[key] = val

return ds

def to_grib(self, output_path="output.grib", backend="auto"):
raise NotImplementedError("to_grib() is only supported for BoundingBox domains.")
3 changes: 3 additions & 0 deletions covjsonkit/decoder/Grid.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,3 +228,6 @@ def to_xarray(self):
ds[pname].attrs[k] = v

return ds

def to_grib(self, output_path="output.grib", backend="auto"):
raise NotImplementedError("to_grib() is only supported for BoundingBox domains.")
3 changes: 3 additions & 0 deletions covjsonkit/decoder/Path.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,3 +154,6 @@ def to_xarray(self):
ds.attrs[mars_metadata] = self.mars_metadata[0][mars_metadata]

return ds

def to_grib(self, output_path="output.grib", backend="auto"):
raise NotImplementedError("to_grib() is only supported for BoundingBox domains.")
3 changes: 3 additions & 0 deletions covjsonkit/decoder/Position.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,3 +213,6 @@ def _covers_domain(self, coverage, num, date, x, y, z):
and coverage["domain"]["axes"][self.y_name]["values"] == y
and coverage["domain"]["axes"][self.z_name]["values"] == z
)

def to_grib(self, output_path="output.grib", backend="auto"):
raise NotImplementedError("to_grib() is only supported for BoundingBox domains.")
3 changes: 3 additions & 0 deletions covjsonkit/decoder/Shapefile.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,3 +124,6 @@ def to_xarray(self):
ds.attrs[key] = val

return ds

def to_grib(self, output_path="output.grib", backend="auto"):
raise NotImplementedError("to_grib() is only supported for BoundingBox domains.")
3 changes: 3 additions & 0 deletions covjsonkit/decoder/TimeSeries.py
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,9 @@ def _covers_domain(self, coverage, num, date, x, y, z):
and coverage["domain"]["axes"][self.z_name]["values"] == z
)

def to_grib(self, output_path="output.grib", backend="auto"):
raise NotImplementedError("to_grib() is only supported for BoundingBox domains.")

def _to_xarray_no_forecast_date(self):
"""Convert monthly-means CovJSON (no 'Forecast date' in metadata) to xarray.

Expand Down
3 changes: 3 additions & 0 deletions covjsonkit/decoder/VerticalProfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -222,3 +222,6 @@ def to_xarray(self):
return ds[0]

return ds

def to_grib(self, output_path="output.grib", backend="auto"):
raise NotImplementedError("to_grib() is only supported for BoundingBox domains.")
3 changes: 3 additions & 0 deletions covjsonkit/decoder/Wkt.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,3 +121,6 @@ def to_xarray(self):
ds.attrs[key] = val

return ds

def to_grib(self, output_path="output.grib", backend="auto"):
raise NotImplementedError("to_grib() is only supported for BoundingBox domains.")
4 changes: 4 additions & 0 deletions covjsonkit/decoder/decoder.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,3 +122,7 @@ def to_geojson(self):
@abstractmethod
def to_geotiff(self):
pass

@abstractmethod
def to_grib(self, output_path="output.grib", backend="auto"):
pass
47 changes: 47 additions & 0 deletions covjsonkit/decoder/grib_backends/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
"""Swappable GRIB encoding backends.

Use :func:`get_backend` to obtain the best available backend at runtime.
The factory tries ``pymars2grib`` first (preferred) and falls back to
``eccodes`` when it is not installed.
"""

from .base import GribBackend # noqa: F401


def get_backend(preferred: str = "auto") -> GribBackend:
"""Return a ready-to-use GRIB encoding backend.

Args:
preferred: One of ``"auto"``, ``"mars2grib"``, or ``"eccodes"``.
* ``"auto"`` – try mars2grib first, fall back to eccodes.
* ``"mars2grib"`` – require mars2grib (raises ImportError if
unavailable).
* ``"eccodes"`` – use eccodes directly.

Returns:
A :class:`GribBackend` instance.

Raises:
ImportError: If the explicitly requested backend is not installed.
"""
if preferred in ("mars2grib", "auto"):
try:
from .mars2grib_backend import Mars2GribBackend

return Mars2GribBackend()
except ImportError:
if preferred == "mars2grib":
raise ImportError(
"pymars2grib is not installed. "
"Build metkit from source with pybind11 support, or use backend='eccodes'."
)

try:
from .eccodes_backend import EccodesBackend

return EccodesBackend()
except ImportError:
raise ImportError(
"No GRIB backend available. Install eccodes: pip install eccodes, "
"or build pymars2grib from metkit source."
)
31 changes: 31 additions & 0 deletions covjsonkit/decoder/grib_backends/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
from abc import ABC, abstractmethod


class GribBackend(ABC):
"""Abstract backend interface for encoding GRIB messages from MARS metadata + values.

Implementations translate (values, mars_dict, misc_dict) into a single
encoded GRIB message returned as ``bytes``. Two concrete backends are
provided:

* :class:`~.eccodes_backend.EccodesBackend` – uses the eccodes Python API
(available via ``pip install eccodes``).
* :class:`~.mars2grib_backend.Mars2GribBackend` – wraps ECMWF's
``pymars2grib`` pybind11 module (requires metkit built from source).
"""

@abstractmethod
def encode_message(self, values: list, mars: dict, misc: dict) -> bytes:
"""Encode a single GRIB message.

Args:
values: Field data values (one per grid point).
mars: MARS keys describing the field (class, stream, type, date,
time, step, param, levtype, levelist, …).
misc: Non-MARS metadata such as grid geometry (gridType, N, area,
…) and packing options.

Returns:
The fully encoded GRIB message as raw bytes.
"""
pass
Loading
Loading