diff --git a/.gitignore b/.gitignore index 942e36f..00d2caf 100644 --- a/.gitignore +++ b/.gitignore @@ -131,4 +131,5 @@ dmypy.json # Pyre type checker .pyre/ .cdsapirc -service-account.json \ No newline at end of file +service-account.json +.DS_Store \ No newline at end of file diff --git a/docker-compose.yml b/docker-compose.yml index 6e30600..515c359 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -10,5 +10,9 @@ services: - ./out:/out - ./src:/src environment: - - TOOL_RUN=download_era5_series - command: ["echo", "Run the tool like: docker compose run era5 python run.py"] \ No newline at end of file + # TOOL_RUN: download_era5_series + TOOL_RUN: download_cmip6_series + + # CHANGE this bucket name, if you change the service account to your account + BUCKET: vfw_ee_export + command: ["echo", "Run the tool like: docker compose run --rm era5 python run.py"] \ No newline at end of file diff --git a/in/inputs.json b/in/inputs.json index 4f30163..c275c09 100644 --- a/in/inputs.json +++ b/in/inputs.json @@ -17,7 +17,7 @@ "longitude": 8.0, "latitude": 48.0, "backend": "earthengine", - "model": "EC-Earth3", + "model": ["EC-Earth3"], "scenario": "ssp585" } } diff --git a/src/credentials.py b/src/credentials.py index 650c279..90981fe 100644 --- a/src/credentials.py +++ b/src/credentials.py @@ -1,4 +1,5 @@ from pathlib import Path +import os import ee @@ -21,6 +22,7 @@ def build_ee_credentials(credentials_path: str | Path = None): if not credentials_path.exists(): raise ValueError(f"Credentials file not found at {credentials_path}. Right now you need to mount the service-account.json file to /root/service-account.json. It needs the earth engine API activated and the cloud project needs to be registered for Earth Engine.") + os.environ.setdefault('GOOGLE_APPLICATION_CREDENTIALS', str(credentials_path.resolve())) credentials = ee.ServiceAccountCredentials( email=None, key_file=str(credentials_path) diff --git a/src/earthengine.py b/src/earthengine.py index ee468d2..d1eb4f9 100644 --- a/src/earthengine.py +++ b/src/earthengine.py @@ -1,12 +1,22 @@ -from pathlib import Path +from uuid import uuid4 from datetime import datetime, timedelta import pandas as pd import ee from json2args.logger import logger +from google.cloud import storage import time -from params import Params, ParamsCMIP6, map_dataset, map_variable, EE_CMIP6_MODELS, EE_CMIP6_MODELS_SHORT +from params import Params, ParamsCMIP6, map_dataset, map_variable +FAIL_MESSAGE = """Direct data fetch failed: {e} +This usually happens when the dataset is too large to download directly via getInfo(). +Proceeding with export to Google Cloud Storage bucket: '{bucket}'... +""" +PROCESSING_MESSAGE = """ +Earth Engine export task started. Check your Earth Engine Tasks tab (https://code.earthengine.google.com/tasks) +Once complete, your CSV will be available in the '{bucket}' Google Cloud Storage bucket. +You will need to manually pivot the data using Pandas/Excel after downloading it from GCS if you want models as columns. +""" def download_era5_series(kwargs: Params) -> pd.DataFrame: point = ee.Geometry.Point(kwargs.longitude, kwargs.latitude) @@ -67,18 +77,18 @@ def get_value(image): return df - - - def download_cmip6_series(kwargs: ParamsCMIP6) -> pd.DataFrame: point = ee.Geometry.Point(kwargs.longitude, kwargs.latitude) - + start_date = kwargs.start_date if kwargs.end_date is None: end_date = datetime.now() else: end_date = kwargs.end_date + storage_client = storage.Client() + bucket = storage_client.bucket(kwargs.bucket) + # map variable name and dataset variable_name = map_variable(kwargs.variable, "cmip6", "earthengine") dataset = map_dataset("cmip6", "earthengine") @@ -87,77 +97,99 @@ def download_cmip6_series(kwargs: ParamsCMIP6) -> pd.DataFrame: # filter by scenario cmip6 = cmip6.filterDate(start_date.strftime("%Y-%m-%d"), end_date.strftime("%Y-%m-%d")).filter(ee.Filter.eq('scenario', kwargs.scenario)) - cmip6 = cmip6.filter(ee.Filter.inList('model', EE_CMIP6_MODELS_SHORT)) - - # Create a feature collection with all the data - def get_value(image): - value = image.reduceRegion( - reducer=ee.Reducer.first(), - geometry=point, - scale=1000 - ) - - return ee.Feature(None, { - 'time': image.date().millis(), - **{f"{kwargs.variable}_{model_name.replace('-', '_')}": value.get(f"{variable_name}_{model_name}", -9999) for model_name in EE_CMIP6_MODELS} - }) + cmip6 = cmip6.filter(ee.Filter.inList('model', kwargs.model)) - def extract_data(image): - # Get date, model, and value - date = image.date().format('YYYY-MM-dd') # Format date as string + # Function to extract data for a single model and date + def extract_single_model_data(image): + date = image.date().format('YYYY-MM-dd') model = image.get('model') - # Extract the value at the point. - # Using scale from the dataset's projection if possible, or a default. - # The nominal scale for GDDP-CMIP6 is 0.25 arc degrees (~27830 meters at equator) - scale = image.projection().nominalScale() # Use image's native scale - # Use reduceRegion to get the value. Use firstNonNull reducer. - data_dict = image.reduceRegion( - reducer=ee.Reducer.firstNonNull(), + # The band name for CMIP6 variables is usually just the variable name (e.g., 'tas') + band_name = variable_name + + # Use the nominal scale of the image's projection + scale = image.projection().nominalScale() + + # Reduce region to get the value at the point + value = image.reduceRegion( + reducer=ee.Reducer.first(), # Use first() for single point extraction geometry=point, scale=scale, - crs=image.projection() # Ensure sampling in image's CRS - ) - - # Get the value for the variable, handle potential nulls - value = data_dict.get(variable_name) + crs=image.projection() + ).get(band_name) # Get the value for the specific variable band - # Return a Feature with properties. Set null value marker if necessary + # Return a feature with date, model, and the extracted value return ee.Feature(None, { 'date': date, 'model': model, - variable_name: ee.Algorithms.If(value, value, -9999) # Use -9999 for null + 'value': ee.Algorithms.If(value, value, -9999) # Use -9999 for null values }) - # Create the feature collection - # features = cmip6.filterDate( - # start_date.strftime("%Y-%m-%d"), - # end_date.strftime("%Y-%m-%d") - # ).filterBounds(point).map(get_value) # Filter to our point of interest - features = cmip6.map(extract_data) + # Map the extraction function over the filtered image collection + features_to_export = cmip6.map(extract_single_model_data) + + # try: + # logger.info("Attempting to fetch data directly (might be slow/fail for large datasets)...") + # data_list = features_to_export.getInfo()['features'] + + # parsed_data = [] + # for f in data_list: + # properties = f['properties'] + # parsed_data.append({ + # 'date': properties['date'], + # 'model': properties['model'], + # 'value': properties['value'] + # }) + + # # Create a Pandas DataFrame + # df = pd.DataFrame(parsed_data) + + # # Pivot the DataFrame to have models as columns + # df_pivot = df.pivot_table(index='date', columns='model', values='value') + + # # Rename columns for clarity (e.g., 'tas_GFDL_ESM4') + # df_pivot.columns = [f"{kwargs.variable}_{col.replace('-', '_')}" for col in df_pivot.columns] + + # # Reset index to make 'date' a regular column and sort by date + # df_pivot = df_pivot.reset_index() + # df_pivot['date'] = pd.to_datetime(df_pivot['date']) + # df_pivot = df_pivot.sort_values(by='date') + + # return df_pivot + + # except Exception as e: + # logger.info(FAIL_MESSAGE.format(e=e, bucket=kwargs.bucket)) + # pass + + # Selectors ensure only the desired columns are exported + selectors = ['date', 'model', 'value'] + + file_name = f"cmip_export_{uuid4()}" - # Export to Cloud Storage task = ee.batch.Export.table.toCloudStorage( - collection=features, - description=f"CMIP6_{kwargs.variable}_{kwargs.scenario}_{kwargs.longitude}_{kwargs.latitude}", - bucket="ee_loader_export", - fileNamePrefix=f"CMIP6_{kwargs.variable}_{kwargs.scenario}_{kwargs.longitude}_{kwargs.latitude}", - fileFormat="CSV", - selectors=['date', 'model', variable_name] # Specify columns to export + collection=features_to_export, + description=f'CMIP6_Export_{kwargs.variable}_{kwargs.scenario}', + bucket=kwargs.bucket, + fileNamePrefix=file_name, + fileFormat='CSV', + selectors=selectors ) - - # Start the export task.start() - + logger.info(PROCESSING_MESSAGE.format(bucket=kwargs.bucket)) + # Wait for the export to complete while task.active(): - logger.info("Exporting to Cloud Storage...") + logger.debug("Exporting to Cloud Storage...") time.sleep(30) # Check every 30 seconds if task.status()['state'] == 'COMPLETED': - logger.info("Export completed successfully!") - # Return a message instead of a DataFrame - return pd.DataFrame({"status": ["Export completed successfully"]}) + logger.info(f"Export completed successfully! Check the '{kwargs.bucket}' for {file_name}.csv") + + blob = bucket.blob(f"{file_name}.csv") + with blob.open() as f: + df = pd.read_csv(f) + + blob.delete() + return df else: logger.error(f"Export failed: {task.status()}") - raise RuntimeError(f"Export failed: {task.status()}") \ No newline at end of file diff --git a/src/params.py b/src/params.py index d7084b8..da61e56 100644 --- a/src/params.py +++ b/src/params.py @@ -10,8 +10,9 @@ class Params(BaseModel): latitude: float class ParamsCMIP6(Params): - model: str + model: list[str] scenario: str + bucket: str CDS_ERA5_LAND_VARIABLE_DAILY = { "precipitation": "total_precipitation", @@ -33,7 +34,7 @@ class ParamsCMIP6(Params): } EE_CMIP6_MODELS = ["ACCESS-CM2", "ACCESS-ESM1-5", "BCC-CSM2-MR", "CESM2", "CESM2-WACCM", "CMCC-CM2-SR5", "CMCC-ESM2", "CNRM-CM6-1", "CNRM-ESM2-1", "CanESM5", "EC-Earth3", "EC-Earth3-Veg-LR", "FGOALS-g3", "GFDL-CM4", "GFDL-ESM4", "GISS-E2-1-G", "HadGEM3-GC31-LL", "HadGEM3-GC31-MM", "IITM-ESM", "INM-CM4-8", "INM-CM5-0", "IPSL-CM6A-LR", "KACE-1-0-G", "KIOST-ESM", "MIROC-ES2L", "MIROC6", "MPI-ESM1-2-HR", "MPI-ESM1-2-LR", "MRI-ESM2-0", "NESM3", "NorESM2-LM", "NorESM2-MM", "TaiESM1", "UKESM1-0-LL"] -EE_CMIP6_MODELS_SHORT = ["ACCESS-CM2", "MPI-ESM1-2-HR", "EC-Earth3", "FGOALS-g3", "GISS-E2-1-G", "IPSL-CM6A-LR"] + def map_variable(variable: str, dataset: str, provider: str) -> str: if dataset == "era5-daily": diff --git a/src/tool.yml b/src/tool.yml index 390bd84..8a3ce6f 100644 --- a/src/tool.yml +++ b/src/tool.yml @@ -63,9 +63,45 @@ tools: description: The end date of the series. If omitted, the current date will be used. default: 2050-12-31 model: - type: string + type: enum + array: true + values: + - ACCESS-CM2 + - ACCESS-ESM1-5 + - BCC-CSM2-MR + - CESM2 + - CESM2-WACCM + - CMCC-CM2-SR5 + - CMCC-ESM2 + - CNRM-CM6-1 + - CNRM-ESM2-1 + - CanESM5 + - EC-Earth3 + - EC-Earth3-Veg-LR + - FGOALS-g3 + - GFDL-CM4 + - GFDL-ESM4 + - GISS-E2-1-G + - HadGEM3-GC31-LL + - HadGEM3-GC31-MM + - IITM-ESM + - INM-CM4-8 + - INM-CM5-0 + - IPSL-CM6A-LR + - KACE-1-0-G + - KIOST-ESM + - MIROC-ES2L + - MIROC6 + - MPI-ESM1-2-HR + - MPI-ESM1-2-LR + - MRI-ESM2-0 + - NESM3 + - NorESM2-LM + - NorESM2-MM + - TaiESM1 + - UKESM1-0-LL + default: ["ACCESS-CM2", "MPI-ESM1-2-HR", "EC-Earth3", "FGOALS-g3", "GISS-E2-1-G", "IPSL-CM6A-LR"] description: The GCM model to use. - default: "EC-Earth3" scenario: type: enum values: @@ -73,4 +109,7 @@ tools: - ssp585 description: The scenario to use. default: "ssp585" + bucket: + type: string + default: camels_plus_cazs