-
Notifications
You must be signed in to change notification settings - Fork 0
Carlos download changes on cds loader #1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
4fd8f08
d19c175
68c640d
5995900
65abe2c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -131,4 +131,5 @@ dmypy.json | |
| # Pyre type checker | ||
| .pyre/ | ||
| .cdsapirc | ||
| service-account.json | ||
| service-account.json | ||
| .DS_Store | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,12 +1,22 @@ | ||
| from pathlib import Path | ||
| from uuid import uuid4 | ||
| from datetime import datetime, timedelta | ||
| import pandas as pd | ||
| import ee | ||
| from json2args.logger import logger | ||
| from google.cloud import storage | ||
| import time | ||
|
|
||
| from params import Params, ParamsCMIP6, map_dataset, map_variable, EE_CMIP6_MODELS, EE_CMIP6_MODELS_SHORT | ||
| from params import Params, ParamsCMIP6, map_dataset, map_variable | ||
|
|
||
| FAIL_MESSAGE = """Direct data fetch failed: {e} | ||
| This usually happens when the dataset is too large to download directly via getInfo(). | ||
| Proceeding with export to Google Cloud Storage bucket: '{bucket}'... | ||
| """ | ||
| PROCESSING_MESSAGE = """ | ||
| Earth Engine export task started. Check your Earth Engine Tasks tab (https://code.earthengine.google.com/tasks) | ||
| Once complete, your CSV will be available in the '{bucket}' Google Cloud Storage bucket. | ||
| You will need to manually pivot the data using Pandas/Excel after downloading it from GCS if you want models as columns. | ||
| """ | ||
|
|
||
| def download_era5_series(kwargs: Params) -> pd.DataFrame: | ||
| point = ee.Geometry.Point(kwargs.longitude, kwargs.latitude) | ||
|
|
@@ -67,18 +77,18 @@ def get_value(image): | |
| return df | ||
|
|
||
|
|
||
|
|
||
|
|
||
|
|
||
| def download_cmip6_series(kwargs: ParamsCMIP6) -> pd.DataFrame: | ||
| point = ee.Geometry.Point(kwargs.longitude, kwargs.latitude) | ||
|
|
||
| start_date = kwargs.start_date | ||
| if kwargs.end_date is None: | ||
| end_date = datetime.now() | ||
| else: | ||
| end_date = kwargs.end_date | ||
|
|
||
| storage_client = storage.Client() | ||
| bucket = storage_client.bucket(kwargs.bucket) | ||
|
|
||
| # map variable name and dataset | ||
| variable_name = map_variable(kwargs.variable, "cmip6", "earthengine") | ||
| dataset = map_dataset("cmip6", "earthengine") | ||
|
|
@@ -87,77 +97,99 @@ def download_cmip6_series(kwargs: ParamsCMIP6) -> pd.DataFrame: | |
| # filter by scenario | ||
| cmip6 = cmip6.filterDate(start_date.strftime("%Y-%m-%d"), end_date.strftime("%Y-%m-%d")).filter(ee.Filter.eq('scenario', kwargs.scenario)) | ||
|
|
||
| cmip6 = cmip6.filter(ee.Filter.inList('model', EE_CMIP6_MODELS_SHORT)) | ||
|
|
||
| # Create a feature collection with all the data | ||
| def get_value(image): | ||
| value = image.reduceRegion( | ||
| reducer=ee.Reducer.first(), | ||
| geometry=point, | ||
| scale=1000 | ||
| ) | ||
|
|
||
| return ee.Feature(None, { | ||
| 'time': image.date().millis(), | ||
| **{f"{kwargs.variable}_{model_name.replace('-', '_')}": value.get(f"{variable_name}_{model_name}", -9999) for model_name in EE_CMIP6_MODELS} | ||
| }) | ||
| cmip6 = cmip6.filter(ee.Filter.inList('model', kwargs.model)) | ||
|
|
||
| def extract_data(image): | ||
| # Get date, model, and value | ||
| date = image.date().format('YYYY-MM-dd') # Format date as string | ||
| # Function to extract data for a single model and date | ||
| def extract_single_model_data(image): | ||
| date = image.date().format('YYYY-MM-dd') | ||
| model = image.get('model') | ||
|
|
||
| # Extract the value at the point. | ||
| # Using scale from the dataset's projection if possible, or a default. | ||
| # The nominal scale for GDDP-CMIP6 is 0.25 arc degrees (~27830 meters at equator) | ||
| scale = image.projection().nominalScale() # Use image's native scale | ||
| # Use reduceRegion to get the value. Use firstNonNull reducer. | ||
| data_dict = image.reduceRegion( | ||
| reducer=ee.Reducer.firstNonNull(), | ||
| # The band name for CMIP6 variables is usually just the variable name (e.g., 'tas') | ||
| band_name = variable_name | ||
|
|
||
| # Use the nominal scale of the image's projection | ||
| scale = image.projection().nominalScale() | ||
|
|
||
| # Reduce region to get the value at the point | ||
| value = image.reduceRegion( | ||
| reducer=ee.Reducer.first(), # Use first() for single point extraction | ||
| geometry=point, | ||
| scale=scale, | ||
| crs=image.projection() # Ensure sampling in image's CRS | ||
| ) | ||
|
|
||
| # Get the value for the variable, handle potential nulls | ||
| value = data_dict.get(variable_name) | ||
| crs=image.projection() | ||
| ).get(band_name) # Get the value for the specific variable band | ||
|
|
||
| # Return a Feature with properties. Set null value marker if necessary | ||
| # Return a feature with date, model, and the extracted value | ||
| return ee.Feature(None, { | ||
| 'date': date, | ||
| 'model': model, | ||
| variable_name: ee.Algorithms.If(value, value, -9999) # Use -9999 for null | ||
| 'value': ee.Algorithms.If(value, value, -9999) # Use -9999 for null values | ||
| }) | ||
|
|
||
| # Create the feature collection | ||
| # features = cmip6.filterDate( | ||
| # start_date.strftime("%Y-%m-%d"), | ||
| # end_date.strftime("%Y-%m-%d") | ||
| # ).filterBounds(point).map(get_value) # Filter to our point of interest | ||
| features = cmip6.map(extract_data) | ||
| # Map the extraction function over the filtered image collection | ||
| features_to_export = cmip6.map(extract_single_model_data) | ||
|
|
||
| # try: | ||
| # logger.info("Attempting to fetch data directly (might be slow/fail for large datasets)...") | ||
| # data_list = features_to_export.getInfo()['features'] | ||
|
|
||
| # parsed_data = [] | ||
| # for f in data_list: | ||
| # properties = f['properties'] | ||
| # parsed_data.append({ | ||
| # 'date': properties['date'], | ||
| # 'model': properties['model'], | ||
| # 'value': properties['value'] | ||
| # }) | ||
|
|
||
| # # Create a Pandas DataFrame | ||
| # df = pd.DataFrame(parsed_data) | ||
|
|
||
| # # Pivot the DataFrame to have models as columns | ||
| # df_pivot = df.pivot_table(index='date', columns='model', values='value') | ||
|
|
||
| # # Rename columns for clarity (e.g., 'tas_GFDL_ESM4') | ||
| # df_pivot.columns = [f"{kwargs.variable}_{col.replace('-', '_')}" for col in df_pivot.columns] | ||
|
|
||
| # # Reset index to make 'date' a regular column and sort by date | ||
| # df_pivot = df_pivot.reset_index() | ||
| # df_pivot['date'] = pd.to_datetime(df_pivot['date']) | ||
| # df_pivot = df_pivot.sort_values(by='date') | ||
|
|
||
| # return df_pivot | ||
|
|
||
| # except Exception as e: | ||
| # logger.info(FAIL_MESSAGE.format(e=e, bucket=kwargs.bucket)) | ||
| # pass | ||
|
|
||
|
Comment on lines
+131
to
+163
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🛠️ Refactor suggestion Remove or document the commented code block. Large blocks of commented code should be removed or properly documented if they serve as a reference. If this code represents an alternative approach that might be useful in the future, consider moving it to documentation or creating a separate method with a clear deprecation notice. 🤖 Prompt for AI Agents |
||
| # Selectors ensure only the desired columns are exported | ||
| selectors = ['date', 'model', 'value'] | ||
|
|
||
| file_name = f"cmip_export_{uuid4()}" | ||
|
|
||
| # Export to Cloud Storage | ||
| task = ee.batch.Export.table.toCloudStorage( | ||
| collection=features, | ||
| description=f"CMIP6_{kwargs.variable}_{kwargs.scenario}_{kwargs.longitude}_{kwargs.latitude}", | ||
| bucket="ee_loader_export", | ||
| fileNamePrefix=f"CMIP6_{kwargs.variable}_{kwargs.scenario}_{kwargs.longitude}_{kwargs.latitude}", | ||
| fileFormat="CSV", | ||
| selectors=['date', 'model', variable_name] # Specify columns to export | ||
| collection=features_to_export, | ||
| description=f'CMIP6_Export_{kwargs.variable}_{kwargs.scenario}', | ||
| bucket=kwargs.bucket, | ||
| fileNamePrefix=file_name, | ||
| fileFormat='CSV', | ||
| selectors=selectors | ||
| ) | ||
|
|
||
| # Start the export | ||
| task.start() | ||
|
|
||
| logger.info(PROCESSING_MESSAGE.format(bucket=kwargs.bucket)) | ||
|
|
||
| # Wait for the export to complete | ||
| while task.active(): | ||
| logger.info("Exporting to Cloud Storage...") | ||
| logger.debug("Exporting to Cloud Storage...") | ||
| time.sleep(30) # Check every 30 seconds | ||
|
|
||
| if task.status()['state'] == 'COMPLETED': | ||
| logger.info("Export completed successfully!") | ||
| # Return a message instead of a DataFrame | ||
| return pd.DataFrame({"status": ["Export completed successfully"]}) | ||
| logger.info(f"Export completed successfully! Check the '{kwargs.bucket}' for {file_name}.csv") | ||
|
|
||
| blob = bucket.blob(f"{file_name}.csv") | ||
| with blob.open() as f: | ||
| df = pd.read_csv(f) | ||
|
|
||
| blob.delete() | ||
| return df | ||
| else: | ||
| logger.error(f"Export failed: {task.status()}") | ||
| raise RuntimeError(f"Export failed: {task.status()}") | ||
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -63,14 +63,53 @@ tools: | |||||
| description: The end date of the series. If omitted, the current date will be used. | ||||||
| default: 2050-12-31 | ||||||
| model: | ||||||
| type: string | ||||||
| type: enum | ||||||
| array: true | ||||||
| values: | ||||||
| - ACCESS-CM2 | ||||||
| - ACCESS-ESM1-5 | ||||||
| - BCC-CSM2-MR | ||||||
| - CESM2 | ||||||
| - CESM2-WACCM | ||||||
| - CMCC-CM2-SR5 | ||||||
| - CMCC-ESM2 | ||||||
| - CNRM-CM6-1 | ||||||
| - CNRM-ESM2-1 | ||||||
| - CanESM5 | ||||||
| - EC-Earth3 | ||||||
| - EC-Earth3-Veg-LR | ||||||
| - FGOALS-g3 | ||||||
| - GFDL-CM4 | ||||||
| - GFDL-ESM4 | ||||||
| - GISS-E2-1-G | ||||||
| - HadGEM3-GC31-LL | ||||||
| - HadGEM3-GC31-MM | ||||||
| - IITM-ESM | ||||||
| - INM-CM4-8 | ||||||
| - INM-CM5-0 | ||||||
| - IPSL-CM6A-LR | ||||||
| - KACE-1-0-G | ||||||
| - KIOST-ESM | ||||||
| - MIROC-ES2L | ||||||
| - MIROC6 | ||||||
| - MPI-ESM1-2-HR | ||||||
| - MPI-ESM1-2-LR | ||||||
| - MRI-ESM2-0 | ||||||
| - NESM3 | ||||||
| - NorESM2-LM | ||||||
| - NorESM2-MM | ||||||
| - TaiESM1 | ||||||
| - UKESM1-0-LL | ||||||
| default: ["ACCESS-CM2", "MPI-ESM1-2-HR", "EC-Earth3", "FGOALS-g3", "GISS-E2-1-G", "IPSL-CM6A-LR"] | ||||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fix formatting: extra spaces after comma. Remove the extra spaces after the comma to comply with YAML formatting standards. - default: ["ACCESS-CM2", "MPI-ESM1-2-HR", "EC-Earth3", "FGOALS-g3", "GISS-E2-1-G", "IPSL-CM6A-LR"]
+ default: ["ACCESS-CM2", "MPI-ESM1-2-HR", "EC-Earth3", "FGOALS-g3", "GISS-E2-1-G", "IPSL-CM6A-LR"]📝 Committable suggestion
Suggested change
🧰 Tools🪛 YAMLlint (1.37.1)[warning] 103-103: too many spaces after comma (commas) 🤖 Prompt for AI Agents |
||||||
| description: The GCM model to use. | ||||||
| default: "EC-Earth3" | ||||||
| scenario: | ||||||
| type: enum | ||||||
| values: | ||||||
| - ssp245 | ||||||
| - ssp585 | ||||||
| description: The scenario to use. | ||||||
| default: "ssp585" | ||||||
| bucket: | ||||||
| type: string | ||||||
| default: camels_plus_cazs | ||||||
|
|
||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix inline comment formatting.
Add proper spacing before inline comments to comply with PEP 8.
Also applies to: 121-121, 127-127
🧰 Tools
🪛 Flake8 (7.2.0)
[error] 117-117: at least two spaces before inline comment
(E261)
🤖 Prompt for AI Agents