Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,8 @@ ignore = [
"PLC0414",
# Misidentifies xarrary.DataArray as pandas.DataFrame
"PD003",
# Conflicts with code block examples:
"D412",
]

[tool.ruff.lint.per-file-ignores]
Expand Down
107 changes: 107 additions & 0 deletions src/ewatercycle/_forcings/_caravan_reference.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
"""Holds which caravan variables belong to which datasets."""

ERA5_VARS = [ # data vars for ERA5
"dewpoint_temperature_2m_max",
"dewpoint_temperature_2m_mean",
"dewpoint_temperature_2m_min",
"potential_evaporation_sum",
"snow_depth_water_equivalent_max",
"snow_depth_water_equivalent_mean",
"snow_depth_water_equivalent_min",
"streamflow",
"surface_net_solar_radiation_max",
"surface_net_solar_radiation_mean",
"surface_net_solar_radiation_min",
"surface_net_thermal_radiation_max",
"surface_net_thermal_radiation_mean",
"surface_net_thermal_radiation_min",
"surface_pressure_max",
"surface_pressure_mean",
"surface_pressure_min",
"temperature_2m_max",
"temperature_2m_mean",
"temperature_2m_min",
"total_precipitation_sum",
"u_component_of_wind_10m_max",
"u_component_of_wind_10m_mean",
"u_component_of_wind_10m_min",
"v_component_of_wind_10m_max",
"v_component_of_wind_10m_mean",
"v_component_of_wind_10m_min",
"volumetric_soil_water_layer_1_max",
"volumetric_soil_water_layer_1_mean",
"volumetric_soil_water_layer_1_min",
"volumetric_soil_water_layer_2_max",
"volumetric_soil_water_layer_2_mean",
"volumetric_soil_water_layer_2_min",
"volumetric_soil_water_layer_3_max",
"volumetric_soil_water_layer_3_mean",
"volumetric_soil_water_layer_3_min",
"volumetric_soil_water_layer_4_max",
"volumetric_soil_water_layer_4_mean",
"volumetric_soil_water_layer_4_min",
]


CAMELS_VARS = [ # data vars for daymet/nldas/mauer
"area_gages2",
"area_geospa_fabric",
"baseflow_index",
"carbonate_rocks_frac",
"clay_frac",
"dayl",
"dom_land_cover",
"dom_land_cover_frac",
"elev_mean",
"evspsblpot",
"frac_forest",
"gauge_lat",
"gauge_lon",
"gauge_name",
"geol_1st_class",
"geol_2nd_class",
"geol_permeability",
"geol_porostiy",
"glim_1st_class_frac",
"glim_2nd_class_frac",
"gvf_diff",
"gvf_max",
"hfd_mean",
"high_prec_timing",
"high_q_dur",
"high_q_freq",
"huc_02",
"lai_diff",
"lai_max",
"low_prec_timing",
"low_q_dur",
"low_q_freq",
"max_water_content",
"organic_frac",
"other_frac",
"p_seasonality",
"pr",
"q5",
"q95",
"q_mean",
"root_depth_50",
"root_depth_99",
"runoff_ratio",
"sand_frac",
"silt_frac",
"slope_fdc",
"slope_mean",
"soil_conductivity",
"soil_depth_pelletier",
"soil_depth_statsgo",
"soil_porosity",
"srad",
"stream_elas",
"swe",
"tas",
"tasmax",
"tasmin",
"vp",
"water_frac",
"zero_q_freq",
]
156 changes: 121 additions & 35 deletions src/ewatercycle/_forcings/caravan.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,13 @@
import xarray as xr
from cartopy.io import shapereader

from ewatercycle._forcings._caravan_reference import CAMELS_VARS, ERA5_VARS
from ewatercycle.base.forcing import DefaultForcing
from ewatercycle.util import get_time

COMMON_URL = "ca13056c-c347-4a27-b320-930c2a4dd207"
OPENDAP_URL = f"https://opendap.4tu.nl/thredds/dodsC/data2/djht/{COMMON_URL}/1/"
VERSION = 2
OPENDAP_URL = f"https://opendap.4tu.nl/thredds/dodsC/data2/djht/{COMMON_URL}/{VERSION}/"
SHAPEFILE_URL = (
f"https://data.4tu.nl/file/{COMMON_URL}/bbe94526-cf1a-4b96-8155-244f20094719"
)
Expand Down Expand Up @@ -71,11 +73,11 @@ class CaravanForcing(DefaultForcing):
HRU_id = 1022500

camels_forcing = sources['CaravanForcing'].generate(
start_time = experiment_start_date,
end_time = experiment_end_date,
directory = forcing_path,
basin_id = f"camels_0{HRU_id}"
)
start_time = experiment_start_date,
end_time = experiment_end_date,
directory = forcing_path,
basin_id = f"camels_0{HRU_id}"
)

which gives something like:

Expand Down Expand Up @@ -168,27 +170,63 @@ def generate( # type: ignore[override]
If none is specified, will be downloaded automatically.
kwargs: Additional keyword arguments.
basin_id: The ID of the desired basin. Data sets can be explored using
`CaravanForcing.get_dataset(dataset_name)` or
`CaravanForcing.get_basin_id(dataset_name)`
where `dataset_name` is the name of a dataset in Caravan
(for example, "camels" or "camelsgb").
For more information do `help(CaravanForcing.get_basin_id)` or see
https://www.ewatercycle.org/caravan-map/.
`CaravanForcing.get_dataset(dataset_name)` or
`CaravanForcing.get_basin_id(dataset_name)`
where `dataset_name` is the name of a dataset in Caravan
(for example, "camels" or "camelsgb").
For more information do `help(CaravanForcing.get_basin_id)` or see
https://www.ewatercycle.org/caravan-map/.
data_source: The ID of the data source to be used. For some datasets
multiple datasources are available. currently this is only
implemented for the (basins in the) "camels" (ie. camels US)
dataset. If "data_sources" is not specified, it defaults to
'era5_land' (the default for caravan). Options for Camels are:
- 'nldas'
- 'maurer'
- 'daymet'
See the documentation of Camels for details on the differences
between these data sources: https://dx.doi.org/10.5065/D6MW2F4D
retrieve_shape: If the shapefile should be retrieved (optional). By
default False.
"""
if "basin_id" not in kwargs:
msg = (
"You have to specify a basin ID to be able to generate forcing from"
" Caravan."
)
raise ValueError(msg)

basin_id = str(kwargs["basin_id"])

if "data_source" not in kwargs:
data_source = "era5_land"
elif kwargs["data_source"] in ["era5_land", "nldas", "maurer", "daymet"]:
data_source = str(kwargs["data_source"])
else:
msg = (
"If 'data_source' is provided it needs to be one of:\n"
" 'era5_land', 'nldas', 'maurer', 'daymet'"
)
raise ValueError(msg)

dataset: str = basin_id.split("_")[0]
ds = cls.get_dataset(dataset)
ds_basin = ds.sel(basin_id=basin_id.encode())

if dataset != "camels" and data_source != "era5_land":
msg = (
"Alternative data sources are only implemented for the camels "
"(USA) dataset"
)
raise ValueError(msg)
ds_basin = get_camels_data(ds_basin, data_source.encode())

ds_basin_time = crop_ds(ds_basin, start_time, end_time)

if shape is None:
retrieve_shape = (
bool(kwargs["retrieve_shape"]) if "retrieve_shape" in kwargs else True
)
if retrieve_shape:
shape = get_shapefiles(Path(directory), basin_id)

if len(variables) == 0:
Expand All @@ -197,36 +235,31 @@ def generate( # type: ignore[override]
# only return the properties which are also in property vars
properties = set(variables).intersection(PROPERTY_VARS)
non_property_vars = set(variables) - properties
variable_names = non_property_vars.intersection(
RENAME_ERA5.keys()
) # only take the vars also in Rename dict

# only take the vars also in Rename dict (ie. pr, tas, etc.)
variable_names = non_property_vars.intersection(set(RENAME_ERA5.values()))

for prop in properties:
ds_basin_time.coords.update({prop: ds_basin_time[prop].to_numpy()})

ds_basin_time = ds_basin_time.rename(RENAME_ERA5)
variables = tuple([RENAME_ERA5[var] for var in variable_names])

# convert units to Kelvin for compatibility with CMOR MIP table units
for temp in ["tas", "tasmin", "tasmax"]:
ds_basin_time[temp].attrs.update({"height": "2m"})
if (ds_basin_time[temp].attrs["unit"]) == "°C":
ds_basin_time[temp].values = ds_basin_time[temp].to_numpy() + 273.15
ds_basin_time[temp].attrs["unit"] = "K"

for var in ["evspsblpot", "pr"]:
if (ds_basin_time[var].attrs["unit"]) == "mm":
# mm/day --> kg m-2 s-1
ds_basin_time[var].values = ds_basin_time[var].to_numpy() / (86400)
ds_basin_time[var].attrs["unit"] = "kg m-2 s-1"
ds_basin_time = convert_units(ds_basin_time)

start_time_name = start_time[:10]
end_time_name = end_time[:10]

history_attrs = ds_basin_time.attrs["history"]
for var in variables:
for var in variable_names:
ds_basin_time[var].attrs["history"] = history_attrs
ds_basin_time[var].to_netcdf(

# Basin IDs can be duplicate because of problem in opendap data
# only select the one with data (if it exists)
valid_basin = ~ds_basin_time[var].isnull().all(dim="time")
if ds_basin_time[var]["basin_id"].size > 1 and valid_basin.any():
dout = ds_basin_time[var][~ds_basin_time[var].isnull().all(dim="time")]
else:
dout = ds_basin_time[var]

dout.isel(basin_id=0).to_netcdf(
Path(directory)
/ f"{basin_id}_{start_time_name}_{end_time_name}_{var}.nc"
)
Expand All @@ -235,15 +268,23 @@ def generate( # type: ignore[override]
directory=Path(directory),
start_time=start_time,
end_time=end_time,
shape=Path(shape),
shape=Path(shape) if shape is not None else None,
filenames={
var: f"{basin_id}_{start_time_name}_{end_time_name}_{var}.nc"
for var in variables
for var in variable_names
},
)
forcing.save()
return forcing

def to_xarray(self) -> xr.Dataset:
"""Return this Forcing object as an xarray Dataset."""
if len(self.filenames) == 0:
msg = "There are no variables stored in this Forcing object."
raise ValueError(msg)
fpaths = [self.directory / filename for _, filename in self.filenames.items()]
return xr.merge([xr.open_dataset(fpath, chunks="auto") for fpath in fpaths])


def get_shapefiles(directory: Path, basin_id: str) -> Path:
"""Retrieve shapefiles from data 4TU.nl ."""
Expand Down Expand Up @@ -319,3 +360,48 @@ def crop_ds(ds: xr.Dataset, start_time: str, end_time: str) -> xr.Dataset:
return ds.isel(
time=(ds["time"].to_numpy() >= start) & (ds["time"].to_numpy() <= end)
)


def get_camels_data(ds: xr.Dataset, data_source: bytes) -> xr.Dataset:
"""Grab the right source of input data for the camels (USA) dataset.

The way the dataset on OPENDAP is structured is not optimal, see the
discussion at:
https://github.com/eWaterCycle/ewatercycle/pull/433/

This function will select and rename the right variables.
"""
if data_source == b"era5_land":
ds_era5 = ds.sel(data_source=b"era5_land")
ds_era5 = ds_era5.drop_vars("data_source")
ds_era5 = ds_era5[[*PROPERTY_VARS, *ERA5_VARS, "streamflow"]]
return ds_era5.rename(RENAME_ERA5)

ds_common = ds.sel(data_source=b"era5_land")
ds_common = ds_common.drop_vars("data_source")
ds_common = ds_common[[*PROPERTY_VARS, "streamflow"]]
ds_common = ds_common.rename({"streamflow": "Q"})

ds_camels = ds.sel(data_source=data_source)
ds_camels = ds_camels.drop_vars("data_source")
ds_camels = ds_camels[CAMELS_VARS]

return xr.merge((ds_common, ds_camels))


def convert_units(ds: xr.Dataset) -> xr.Dataset:
"""Caravan uses degrees C, mm/d. We need Kelvin and km/m2/s."""
# convert units to Kelvin for compatibility with CMOR MIP table units
for temp in ["tas", "tasmin", "tasmax"]:
if ds[temp].attrs["unit"] in ["°C", "degC"]:
ds[temp].values = ds[temp].to_numpy() + 273.15
ds[temp].attrs["unit"] = "K"
ds[temp].attrs.update({"height": "2m"})

for var in ["evspsblpot", "pr"]:
if ds[var].attrs["unit"] in ["mm", "mm/d"]:
# mm/day --> kg m-2 s-1
ds[var].values = ds[var].to_numpy() / (86400)
ds[var].attrs["unit"] = "kg m-2 s-1"

return ds
24 changes: 23 additions & 1 deletion tests/src/base/forcing_files/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,28 @@ The data only includes a year of forcing for one catchment.

For own use, please download from the original source and cite correctly. The Caravan dataset itself is also a combination of data from seperate sources.

The Carvan dataset is originanly obtained from https://zenodo.org/records/7944025 and is explained in a paper by Kratzert, F. :'Caravan - A global community dataset for large-sample hydrology' found here: https://doi-org.tudelft.idm.oclc.org/10.1038/s41597-023-01975-w
The Carvan dataset is originally obtained from https://zenodo.org/records/7944025 and is explained in a paper by Kratzert, F. :'Caravan - A global community dataset for large-sample hydrology' found here: https://doi-org.tudelft.idm.oclc.org/10.1038/s41597-023-01975-w

Distributed under Creative Commons Attribution 4.0 International.


## Test data generation

The test data file was generated with:

```py
import xarray as xr

from ewatercycle._forcings.caravan import OPENDAP_URL

dataset="camels"
ds_dap = xr.open_dataset(f"{OPENDAP_URL}{dataset}.nc")

ds_test = xr.concat(
(ds_dap.sel(basin_id=b"camels_01022500"), ds_dap.sel(basin_id=b"camels_03439000")),
dim="basin_id",
)

# Can't write to hdf5 due to "name" variable
ds_test.to_netcdf("tests/src/base/forcing_files/test_caravan_file.nc", format="NETCDF3_CLASSIC")
```
Binary file modified tests/src/base/forcing_files/test_caravan_file.nc
Binary file not shown.
Loading