From fcd82b9d2a1706ec1b70713c20b6dff87728d456 Mon Sep 17 00:00:00 2001 From: Her0n24 Date: Wed, 11 Feb 2026 21:20:34 +0000 Subject: [PATCH 1/4] Feature: Added France solar generation pipeline --- .../configs/admin_region_lat_lon.csv | 13 + .../scripts/fetch_pvlive_data.py | 1 - .../scripts/generate_combined_gsp.py | 15 +- .../scripts/get_generation_csv.py | 147 ++++++++ .../inspect_france_training_pipeline.py | 296 +++++++++++++++ .../scripts/process_france_data.py | 348 ++++++++++++++++++ 6 files changed, 813 insertions(+), 7 deletions(-) create mode 100644 src/open_data_pvnet/configs/admin_region_lat_lon.csv create mode 100644 src/open_data_pvnet/scripts/get_generation_csv.py create mode 100644 src/open_data_pvnet/scripts/inspect_france_training_pipeline.py create mode 100644 src/open_data_pvnet/scripts/process_france_data.py diff --git a/src/open_data_pvnet/configs/admin_region_lat_lon.csv b/src/open_data_pvnet/configs/admin_region_lat_lon.csv new file mode 100644 index 0000000..3615438 --- /dev/null +++ b/src/open_data_pvnet/configs/admin_region_lat_lon.csv @@ -0,0 +1,13 @@ +region,principal_municipality,latitude,longitude +"Auvergne-Rhône-Alpes","Lyon",45.7640,4.8357 +"Bourgogne-Franche-Comté","Dijon",47.3220,5.0415 +"Bretagne","Rennes",48.1173,-1.6778 +"Centre-Val-de-Loire","Orléans",47.9030,1.9093 +"Grand-Est","Strasbourg",48.5734,7.7521 +"Hauts-de-France","Lille",50.6292,3.0573 +"Ile-de-France","Paris",48.8566,2.3522 +"Normandie","Rouen",49.4432,1.0993 +"Nouvelle-Aquitaine","Bordeaux",44.8378,-0.5792 +"Occitanie","Toulouse",43.6047,1.4442 +"Pays-de-la-Loire","Nantes",47.2184,-1.5536 +"PACA","Marseille",43.2965,5.3698 \ No newline at end of file diff --git a/src/open_data_pvnet/scripts/fetch_pvlive_data.py b/src/open_data_pvnet/scripts/fetch_pvlive_data.py index 0847a9b..a24ebab 100644 --- a/src/open_data_pvnet/scripts/fetch_pvlive_data.py +++ b/src/open_data_pvnet/scripts/fetch_pvlive_data.py @@ -1,7 +1,6 @@ from pvlive_api import PVLive import logging - logger = logging.getLogger(__name__) diff --git a/src/open_data_pvnet/scripts/generate_combined_gsp.py b/src/open_data_pvnet/scripts/generate_combined_gsp.py index 804b541..0d26871 100644 --- a/src/open_data_pvnet/scripts/generate_combined_gsp.py +++ b/src/open_data_pvnet/scripts/generate_combined_gsp.py @@ -33,12 +33,13 @@ from src.open_data_pvnet.scripts.fetch_pvlive_data import PVLiveData -logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') +logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") + def main( start_year: int = typer.Option(2020, help="Start year for data collection"), end_year: int = typer.Option(2025, help="End year for data collection"), - output_folder: str = typer.Option("data", help="Output folder for the zarr dataset") + output_folder: str = typer.Option("data", help="Output folder for the zarr dataset"), ): """ Generate combined GSP data for all GSPs and save as a zarr dataset. @@ -51,15 +52,15 @@ def main( all_dataframes = [] # Changed range to start from 0 to include gsp_id=0 - for gsp_id in range(0, 319): + for gsp_id in range(0, 319): logging.info(f"Processing GSP ID {gsp_id}") df = data_source.get_data_between( start=range_start, end=range_end, entity_id=gsp_id, - extra_fields="capacity_mwp,installedcapacity_mwp" + extra_fields="capacity_mwp,installedcapacity_mwp", ) - + if df is not None and not df.empty: # Add gsp_id column to the dataframe df["gsp_id"] = gsp_id @@ -87,7 +88,9 @@ def main( xr_pv.to_zarr(output_path, mode="w", consolidated=True) logging.info(f"Successfully saved combined GSP dataset to {output_path}") - logging.info(f"Dataset contains GSPs 0-318 for period {range_start.date()} to {range_end.date()}") + logging.info( + f"Dataset contains GSPs 0-318 for period {range_start.date()} to {range_end.date()}" + ) if __name__ == "__main__": diff --git a/src/open_data_pvnet/scripts/get_generation_csv.py b/src/open_data_pvnet/scripts/get_generation_csv.py new file mode 100644 index 0000000..b983622 --- /dev/null +++ b/src/open_data_pvnet/scripts/get_generation_csv.py @@ -0,0 +1,147 @@ +""" +France PVNet Data Download Script + +This script downloads and process mainland France solar generation data from RTE's éCO2mix platform for PVNet training. + +Data source: +RTE éCO2mix Dataset: https://www.rte-france.com/en/data-publications/eco2mix/download-indicators + - Half Hourly data for the 12 administrative regions of France, from Jan 2020 to Dec 2023 (definitive data) + - Consolidated data for Jan to Dec 2024 (in-progress data) + - Capacity (TCH) data available from Jan 2020 + +Usage: + python get_generation_csv.py --start_yr 2019 --end_yr 2023 --consolidate_yr 2024 + # where users need to determine the consolidate year for assignment of a year in the file name + # based on the latest available data on RTE. This way filenames will be consistent with the year of data they contain. +""" + +import requests +import pandas as pd +import os +from time import sleep +import zipfile +import argparse +import logging + +# Configure logging +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s - %(levelname)s - %(message)s", + datefmt="%Y-%m-%d %H:%M:%S", +) +logger = logging.getLogger(__name__) + +base_dir = os.getcwd() +parent_3_levels_up = os.path.dirname(os.path.dirname(os.path.dirname(base_dir))) +output_dir = os.path.join(parent_3_levels_up, "tmp") + +admin_region_list = [ + "Auvergne-Rhône-Alpes", + "Bourgogne-Franche-Comté", + "Bretagne", + "Centre-Val-de-Loire", + "Grand-Est", + "Hauts-de-France", + "Ile-de-France", + "Normandie", + "Nouvelle-Aquitaine", + "Occitanie", + "Pays-de-la-Loire", + "PACA", +] + + +def get_region_generation_csv(region, year, consolidated=False) -> None: + """Download and extract the annual generation CSV for a given region and year. + + Args: + region (str): The name of the region. + year (int): The year for which to download the data. + consolidated (bool): If True, download consolidated (in-progress) data, + otherwise download definitive data. + """ + if consolidated: + url = f"https://eco2mix.rte-france.com/download/eco2mix/eCO2mix_RTE_{region}_En-cours-Consolide.zip" + data_type = "Consolidated" + else: + url = f"https://eco2mix.rte-france.com/download/eco2mix/eCO2mix_RTE_{region}_Annuel-Definitif_{year}.zip" + data_type = "Definitive" + + # Download the ZIP file + response = requests.get(url) + + # Check if request was successful + if response.status_code != 200: + logger.error( + f"Failed to download {region} {year} ({data_type}): HTTP {response.status_code}" + ) + return + + # Save to temporary ZIP file + temp_zip = f"temp_{region}_{year}.zip" + with open(temp_zip, "wb") as f: + f.write(response.content) + + # Try to extract the ZIP file + try: + with zipfile.ZipFile(temp_zip, "r") as zip_ref: + # Get list of files in the zip + file_list = zip_ref.namelist() + # Find the XLS file (assuming there's one XLS file in the zip) + xls_file = [f for f in file_list if f.endswith(".xls") or f.endswith(".xlsx")][0] + # Extract just that file + zip_ref.extract(xls_file) + except zipfile.BadZipFile: + logger.warning(f"Skipping {region} {year} ({data_type}): Not a valid ZIP file") + os.remove(temp_zip) + return + except IndexError: + logger.warning(f"Skipping {region} {year} ({data_type}): No XLS file found in ZIP") + os.remove(temp_zip) + return + + # The .xls file is actually tab-separated text, not Excel format + # Read as CSV with tab delimiter and proper encoding for French characters + df = pd.read_csv(xls_file, sep="\t", encoding="latin-1", low_memory=False) + + # Save as CSV in downloads subdirectory + os.makedirs(output_dir, exist_ok=True) + csv_filename = os.path.join(output_dir, f"eCO2mix_RTE_{region}_Annuel_{year}.csv") + df.to_csv(csv_filename, index=False) + + # Clean up temporary files + os.remove(temp_zip) + os.remove(xls_file) + + logger.info(f"Saved {csv_filename}") + sleep(1) + + +if __name__ == "__main__": + parser = argparse.ArgumentParser( + description="Download France RTE éCO2mix generation data for specified years" + ) + parser.add_argument( + "--start_yr", type=int, help="Start year for definitive data download (default: 2019)" + ) + parser.add_argument( + "--end_yr", type=int, help="End year for definitive data download (default: 2023)" + ) + parser.add_argument( + "--consolidate_yr", + type=int, + help="Year for consolidated (in-progress) data download (default: 2024)", + ) + + args = parser.parse_args() + + year_list = [year for year in range(args.start_yr, args.end_yr + 1)] + + # Run for consolidated data + for region in admin_region_list: + get_region_generation_csv(region, args.consolidate_yr, consolidated=True) + + # Run for all regions and definitive years + for region in admin_region_list: + for year in year_list: + get_region_generation_csv(region, year) diff --git a/src/open_data_pvnet/scripts/inspect_france_training_pipeline.py b/src/open_data_pvnet/scripts/inspect_france_training_pipeline.py new file mode 100644 index 0000000..05185d4 --- /dev/null +++ b/src/open_data_pvnet/scripts/inspect_france_training_pipeline.py @@ -0,0 +1,296 @@ +""" +This script inspects the France PV training pipeline. + +Validates: +1. France Solar Zarr dataset loads correctly and data looks as expected +2. GFS NWP data for France is accessible from S3 +3. Data timestamps align for training +""" + +import xarray as xr +import pandas as pd +import numpy as np +import matplotlib.pyplot as plt +import logging +import fsspec +import os + +# Configure logging +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s - %(levelname)s - %(message)s", + datefmt="%Y-%m-%d %H:%M:%S", +) +logger = logging.getLogger(__name__) + +gfs_path = "s3://ocf-open-data-pvnet/data/gfs/v4/2024.zarr" + +# Load the zarr dataset +base_dir = os.getcwd() +parent_3_levels_up = os.path.dirname(os.path.dirname(os.path.dirname(base_dir))) +output_dir = os.path.join(parent_3_levels_up, "data") +solar_path = "france_solar_combined.zarr" +print(f"Loading {solar_path}...\n") + +# Define France latitude and longitude bounds +MIN_LAT, MAX_LAT = 41.5, 51.5 +MIN_LON, MAX_LON = -5.5, 9.0 + + +def test_france_solar_data(zarr_path): + """Test loading and inspecting the France solar zarr dataset.""" + logger.info("\n" + "=" * 60) + logger.info("Testing France Solar Zarr Dataset") + logger.info("=" * 60) + + ds = xr.open_zarr(zarr_path) + + # 1. Basic Dataset Info + print("=" * 60) + print("DATASET OVERVIEW") + print("=" * 60) + print(ds) + print("\n") + + # 2. Dimensions and Coordinates + print("=" * 60) + print("DIMENSIONS & COORDINATES") + print("=" * 60) + print(f"Locations: {len(ds.location_id)} regions") + print(f"Time steps: {len(ds.time_utc)}") + print(f"Time range: {ds.time_utc.values[0]} to {ds.time_utc.values[-1]}") + print(f"\nRegions: {list(ds.location_id.values)}") + print(f"\nLatitudes: {ds.latitude.values}") + print(f"Longitudes: {ds.longitude.values}") + print("\n") + + # 3. Check Time Resolution + print("=" * 60) + print("TIME RESOLUTION CHECK") + print("=" * 60) + time_diff = pd.Series(ds.time_utc.values).diff() + print("Expected resolution: 30 minutes") + print(f"Actual resolution (mode): {time_diff.mode()[0]}") + + # Check for irregular gaps + irregular_mask = time_diff != pd.Timedelta("30min") + irregular_count = irregular_mask.sum() + if irregular_count > 0: + print(f"Found {irregular_count} irregular time gaps") + irregular_times = ds.time_utc.values[irregular_mask] + print(f"First few irregular gaps: {irregular_times[:5]}") + else: + print("✓ All timesteps are regular 30-minute intervals") + + # Check completeness + expected_count = len(pd.date_range(ds.time_utc.values[0], ds.time_utc.values[-1], freq="30min")) + actual_count = len(ds.time_utc) + print(f"\nExpected timesteps: {expected_count}") + print(f"Actual timesteps: {actual_count}") + print(f"Complete: {'✓ Yes' if expected_count == actual_count else '✗ No'}") + print("\n") + + # 4. Data Quality Check + print("=" * 60) + print("DATA QUALITY") + print("=" * 60) + + # Generation data + gen_data = ds["generation_mw"].values + print("Generation (MW):") + print(f" Shape: {gen_data.shape}") + print(f" Range: [{np.nanmin(gen_data):.2f}, {np.nanmax(gen_data):.2f}] MW") + print(f" Mean: {np.nanmean(gen_data):.2f} MW") + print( + f" NaN count: {np.isnan(gen_data).sum()} ({100*np.isnan(gen_data).sum()/gen_data.size:.2f}%)" + ) + + # Capacity data + cap_data = ds["capacity_mwp"].values + print("\nCapacity (MWp):") + print(f" Shape: {cap_data.shape}") + print(f" Range: [{np.nanmin(cap_data):.2f}, {np.nanmax(cap_data):.2f}] MWp") + print(f" Mean: {np.nanmean(cap_data):.2f} MWp") + print( + f" NaN count: {np.isnan(cap_data).sum()} ({100*np.isnan(cap_data).sum()/cap_data.size:.2f}%)" + ) + print("\n") + + # 5. Per-Region Stats + print("=" * 60) + print("PER-REGION STATISTICS") + print("=" * 60) + for i, region in enumerate(ds.location_id.values): + gen = ds["generation_mw"].isel(location_id=i).values + cap = ds["capacity_mwp"].isel(location_id=i).values + print(f"\n{region}:") + print( + f" Generation: [{np.nanmin(gen):.1f}, {np.nanmax(gen):.1f}] MW, " + f"Mean: {np.nanmean(gen):.1f} MW, NaN: {100*np.isnan(gen).sum()/len(gen):.1f}%" + ) + print( + f" Capacity: {np.nanmean(cap):.1f} MWp, NaN: {100*np.isnan(cap).sum()/len(cap):.1f}%" + ) + print("\n") + + # 6. Dataset Attributes + print("=" * 60) + print("DATASET ATTRIBUTES") + print("=" * 60) + for key, value in ds.attrs.items(): + print(f"{key}: {value}") + print("\n") + + # 7. Sample Time Series Plot + print("=" * 60) + print("SAMPLE TIME SERIES") + print("=" * 60) + print("Creating sample plot for first 30 days...") + + # Plot first region for first 30 days + sample_days = 30 + sample_times = ds.time_utc[: 48 * sample_days] # 48 half-hours per day + + fig, axes = plt.subplots(2, 1, figsize=(12, 8)) + + # Generation + for i, region in enumerate(ds.location_id.values): + ds["generation_mw"].sel(location_id=region, time_utc=sample_times).plot.line( + ax=axes[0], label=region + ) + axes[0].set_title("Solar Generation (First 30 Days)") + axes[0].set_ylabel("Generation (MW)") + axes[0].legend() + axes[0].grid(True, alpha=0.3) + + # Capacity + for i, region in enumerate(ds.location_id.values): + ds["capacity_mwp"].sel(location_id=region, time_utc=sample_times).plot.line( + ax=axes[1], label=region + ) + axes[1].set_title("Solar Capacity (First 30 Days)") + axes[1].set_ylabel("Capacity (MWp)") + axes[1].legend() + axes[1].grid(True, alpha=0.3) + + plt.tight_layout() + plt.savefig("zarr_inspection_sample.png", dpi=150, bbox_inches="tight") + print("✓ Saved plot to zarr_inspection_sample.png") + + print("\n" + "=" * 60) + print("INSPECTION COMPLETE") + print("=" * 60) + + +def test_gfs_data_access(): + """Test accessing GFS NWP data from S3.""" + logger.info("\n" + "=" * 60) + logger.info("Testing GFS NWP Data Access") + logger.info("=" * 60) + + try: + logger.info(f"Opening GFS data from: {gfs_path}") + store = fsspec.get_mapper(gfs_path, anon=True) + + # Open with limited variables to test access + ds = xr.open_zarr(store, consolidated=True) + + logger.info("GFS Dataset accessed successfully!") + logger.info(f"Variables: {list(ds.data_vars)[:10]}...") # First 10 + logger.info(f"Dimensions: {dict(ds.dims)}") + + # Check latitude/longitude coverage + if "latitude" in ds.dims: + lats = ds["latitude"].values + lons = ds["longitude"].values + logger.info("\nSpatial Coverage:") + logger.info(f" Latitude: {lats.min():.1f} to {lats.max():.1f}") + logger.info(f" Longitude: {lons.min():.1f} to {lons.max():.1f}") + + # Check if France is within bounds + if ( + (lats.min() <= MIN_LAT <= lats.max()) + and (lats.min() <= MAX_LAT <= lats.max()) + and (lons.min() <= MIN_LON <= lons.max()) + and (lons.min() <= MAX_LON <= lons.max()) + ): + logger.info("GFS data covers France region") + else: + logger.warning("GFS data does NOT cover France region") + else: + logger.warning("GFS dataset does not have latitude/longitude dimensions") + + # Check time dimension + if "init_time" in ds.dims or "time" in ds.dims: + time_dim = "init_time" if "init_time" in ds.dims else "time" + times = ds[time_dim].values + logger.info("\nTime Coverage:") + logger.info(f" First: {pd.Timestamp(times[0])}") + logger.info(f" Last: {pd.Timestamp(times[-1])}") + + logger.info("\n GFS Data Access: PASSED") + return True + + except ImportError: + logger.warning("fsspec not available - skipping S3 test") + return None + except Exception as e: + logger.error(f"Failed to access GFS data: {e}") + return False + + +def test_time_alignment(solar_path, gfs_path): + """Check if France solar data and GFS data have + overlapping times.""" + logger.info("\n" + "=" * 60) + logger.info("Testing Time Alignment") + logger.info("=" * 60) + + try: + ds_solar = xr.open_zarr(str(solar_path)) + solar_times = ds_solar["time_utc"].values + + ds_gfs = xr.open_zarr(str(gfs_path)) + gfs_times = ds_gfs["time"].values + + # Check for overlapping times + overlap = np.intersect1d(solar_times, gfs_times) + if len(overlap) > 0: + logger.info(f"Found {len(overlap)} overlapping time steps") + else: + logger.warning("No overlapping time steps found") + + except Exception as e: + logger.error(f"Failed to check time alignment: {e}") + + +def main(): + logger.info("=" * 60) + logger.info("FRANCE PVNET DATA PIPELINE TEST") + logger.info("=" * 60) + + results = {} + results["solar_data"] = test_france_solar_data(solar_path) + results["gfs_access"] = test_gfs_data_access() + + # Summary + logger.info("\n" + "=" * 60) + logger.info("TEST SUMMARY") + logger.info("=" * 60) + + for test, result in results.items(): + status = "PASSED" if result else ("SKIPPED" if result is None else "FAILED") + logger.info(f" {test}: {status}") + + all_passed = all(r is True or r is None for r in results.values()) + if all_passed: + logger.info("\nAll tests passed!") + else: + logger.info("\nSome tests failed - check logs above") + + # Skip time alignment test since GFS is on S3 + logger.info("\nSkipping time alignment test (GFS is on S3)") + + +if __name__ == "__main__": + main() diff --git a/src/open_data_pvnet/scripts/process_france_data.py b/src/open_data_pvnet/scripts/process_france_data.py new file mode 100644 index 0000000..b8d945e --- /dev/null +++ b/src/open_data_pvnet/scripts/process_france_data.py @@ -0,0 +1,348 @@ + +import pandas as pd +import xarray as xr +import os +import logging +import numpy as np +import argparse + +# Configure logging +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s - %(levelname)s - %(message)s", + datefmt="%Y-%m-%d %H:%M:%S", +) +logger = logging.getLogger(__name__) + +base_dir = os.getcwd() +metadata_file_dir = os.path.join(os.path.dirname(base_dir), "configs") +parent_3_levels_up = os.path.dirname(os.path.dirname(os.path.dirname(base_dir))) +generation_data_dir = os.path.join(parent_3_levels_up, "tmp") +output_dir = os.path.join(parent_3_levels_up, "data") +start_yr = 2020 +end_yr = 2024 + +def process_location_csv(generation_data_dir, region, year) -> pd.DataFrame: + """Process the location CSV for a given region and year. + Args: + generation_data_dir (str): The directory where the generation CSV files are stored. + region (str): The name of the region. + year (int): The year for which to process the data. + """ + df = pd.read_csv( + os.path.join(generation_data_dir, f"eCO2mix_RTE_{region}_Annuel_{year}.csv"), + low_memory=False, + ) + # Keep only solar related columns and date/time + df = df[["Date", "Heures", "Solaire", "TCH Solaire (%)"]] + + # generate a datetime column from the date and time column + # Convert all time to UTC + df["datetime"] = pd.to_datetime(df["Date"] + " " + df["Heures"]) + + # Handle DST transitions + # - nonexistent (spring): Mark as NaT and drop + # - ambiguous (fall): Keep first occurrence (DST/summer time) + df["datetime"] = df["datetime"].dt.tz_localize( + "Europe/Paris", ambiguous=True, nonexistent="NaT" + ) + + # Drop any NaT values created during spring forward transition + df = df.dropna(subset=["datetime"]) + + # Convert to UTC + df["datetime"] = df["datetime"].dt.tz_convert("UTC") + + # Create capacity column from TCH + df["capacity_mwp"] = ( + ((df["Solaire"] / df["TCH Solaire (%)"]) * 100).replace([np.inf, -np.inf], np.nan).round(1) + ) + # keep only every 0 and 30 minute data of each hour as only these are filled with data + df = df[df["datetime"].dt.minute.isin([0, 30])] + + # Ensure time is monotonic and no duplicate + # Check if monotonic increasing + if not df["datetime"].is_monotonic_increasing: + logger.warning("Datetime column is not monotonic increasing. Sorting by datetime.") + df = df.sort_values("datetime") + else: + logger.info("Datetime is monotonic increasing.") + + # Check for duplicates + duplicates = df[df.duplicated(subset=["datetime"], keep=False)] + if len(duplicates) > 0: + duplicate_times = df[df.duplicated(subset=["datetime"], keep="first")]["datetime"].unique() + logger.warning(f"Found {len(duplicates)} duplicate timestamps:") + for dt in duplicate_times: + logger.warning(f" Duplicate: {dt}") + df = df.drop_duplicates(subset=["datetime"], keep="first") + else: + logger.info("No duplicate timestamps found.") + + # Ensure all 30-minute timesteps are present + df = df.set_index("datetime") + full_range = pd.date_range(start=df.index.min(), end=df.index.max(), freq="30min") + missing_timestamps = full_range.difference(df.index) + + if len(missing_timestamps) > 0: + logger.warning(f"Found {len(missing_timestamps)} missing 30-minute timesteps:") + for dt in missing_timestamps[:20]: # Show first 20 to avoid excessive logging + logger.warning(f" Missing: {dt}") + if len(missing_timestamps) > 20: + logger.warning(f" ... and {len(missing_timestamps) - 20} more missing timesteps") + df = df.reindex(full_range) + df.index.name = "datetime" + else: + logger.info("All 30-minute timesteps are present.") + + # Reset index to make datetime a column again + df = df.reset_index() + + # rename solaiure + df = df.rename(columns={"Solaire": "generation_mw", "TCH Solaire (%)": "tch_solaire_percent"}) + df.drop(columns=["Date", "Heures"], inplace=True) + # Forward-fill then backward-fill solar capacity at nighttimes + df["capacity_mwp"] = ( + ((df["generation_mw"] / df["tch_solaire_percent"]) * 100) + .replace([np.inf, -np.inf], np.nan) + .round(1) + ) + df["capacity_mwp"] = df["capacity_mwp"].ffill().bfill() + df.set_index("datetime", inplace=True) + + return df + + + + + +# Create a France wide aggregate +def create_France_aggregate(generation_data_dir, region_list, year_list) -> pd.DataFrame: + """Create a France-wide aggregate DataFrame from regional generation data. + Args: + generation_data_dir (str): The directory where the generation CSV files are stored. + region_list (list): List of regions to include in the aggregate. + year_list (list): List of years to include in the aggregate. + """ + france_df = None + + for region in region_list: + for year in year_list: + logger.info(f"Processing {region} {year}") + df = process_location_csv(generation_data_dir, region, year) + df["region"] = region # Add region column for later aggregation + + if france_df is None: + france_df = df + else: + france_df = pd.concat([france_df, df], ignore_index=True) + + # Now we have a DataFrame with all regions and years. We can create an aggregate by summing generation and capacity across regions for each timestamp. + france_aggregate = ( + france_df.groupby("datetime") + .agg({"generation_mw": "sum", "capacity_mwp": "sum"}) + .reset_index() + ) + + return france_aggregate + + +def create_xarray_dataset( + generation_data_dir, + region_list, + year_list, + metadata_file=f"{metadata_file_dir}/admin_region_lat_lon.csv", +) -> xr.Dataset: + """Create an xarray Dataset combining all regions with their lat/lon coordinates. + + Args: + generation_data_dir (str): The directory where the generation CSV files are stored. + region_list (list): List of regions to include. + year_list (list): List of years to include. + metadata_file (str): Path to CSV file with columns: region, latitude, longitude + + Returns: + xr.Dataset: Dataset with dimensions (location_id, time_utc) + """ + # Load metadata - if not absolute path, assume it's relative to generation_data_dir parent + if not os.path.isabs(metadata_file): + metadata_path = os.path.join(os.path.dirname(generation_data_dir), metadata_file) + else: + metadata_path = metadata_file + + metadata = pd.read_csv(metadata_path) + metadata = metadata.set_index("region") + + # Collect data for all regions + all_data = [] + location_ids = [] + latitudes = [] + longitudes = [] + + for region in region_list: + logger.info(f"Processing region: {region}") + + # Collect data across all years for this region + region_dfs = [] + for year in year_list: + try: + df = process_location_csv(generation_data_dir, region, year) + region_dfs.append(df) + except FileNotFoundError: + logger.warning(f"File not found for {region} {year}, skipping") + continue + + # Concatenate all years for this region + if region_dfs: + region_df = pd.concat(region_dfs) + region_df = region_df.sort_index() # Sort by datetime + + all_data.append(region_df) + location_ids.append(region) + latitudes.append(metadata.loc[region, "latitude"]) + longitudes.append(metadata.loc[region, "longitude"]) + + # Create common time index (union of all timestamps) + all_times = pd.DatetimeIndex([]) + for df in all_data: + all_times = all_times.union(df.index) + all_times = all_times.sort_values() + + # Create a complete 30-minute grid to ensure no gaps + min_time = all_times.min() + max_time = all_times.max() + complete_range = pd.date_range(start=min_time, end=max_time, freq="30min") + + logger.info(f"Original union has {len(all_times)} timestamps") + logger.info(f"Complete 30-min grid has {len(complete_range)} timestamps") + logger.info(f"Missing {len(complete_range) - len(all_times)} timestamps in union") + + # Use complete range instead of union + all_times = complete_range + + # Convert to timezone-naive datetime64[ns] for zarr serialization + if hasattr(all_times, "tz") and all_times.tz is not None: + all_times = all_times.tz_localize(None) + elif isinstance(all_times, pd.Index): + # If it's a regular Index, convert to DatetimeIndex and remove timezone + all_times = pd.DatetimeIndex(all_times).tz_localize(None) + + # Reindex all dataframes to common time index + generation_data = [] + capacity_data = [] + + for df in all_data: + # Remove timezone from dataframe index to match all_times + df_naive = df.copy() + df_naive.index = df_naive.index.tz_localize(None) + + df_reindexed = df_naive.reindex(all_times) + generation_data.append(df_reindexed["generation_mw"].values) + capacity_data.append(df_reindexed["capacity_mwp"].values) + + # Stack into 2D arrays (location, time) + generation_array = np.stack(generation_data, axis=0) + capacity_array = np.stack(capacity_data, axis=0) + + # Create xarray Dataset + ds = xr.Dataset( + { + "generation_mw": (["location_id", "time_utc"], generation_array), + "capacity_mwp": (["location_id", "time_utc"], capacity_array), + }, + coords={ + "location_id": location_ids, + "time_utc": all_times, + "latitude": ("location_id", latitudes), + "longitude": ("location_id", longitudes), + }, + ) + + # Add attributes + ds.attrs["description"] = ( + "France Réseau de Transport d’Électricité (RTE) solar generation data for PVNet training" + ) + ds.attrs["source"] = "https://www.rte-france.com/eco2mix" + ds.attrs["schema"] = "ocf-data-sampler generation format" + ds.attrs["time_resolution"] = "0.5 hour" + ds.attrs["date_range"] = f"{all_times.min().isoformat()} to {all_times.max().isoformat()}" + ds.attrs["created"] = pd.Timestamp.now().isoformat() + + ds["generation_mw"].attrs["units"] = "MW" + ds["generation_mw"].attrs["long_name"] = "Solar generation" + + ds["capacity_mwp"].attrs["units"] = "MWp" + ds["capacity_mwp"].attrs["long_name"] = "Solar capacity" + + return ds + +if __name__ == "__main__": + parser = argparse.ArgumentParser( + description="Process France RTE solar generation data and create xarray dataset" + ) + parser.add_argument( + "--generation-data-dir", + type=str, + default="downloads", + help="Directory containing the generation CSV files (default: downloads)", + ) + parser.add_argument( + "--start-yr", type=int, default=2020, help="Start year for data processing (default: 2020)" + ) + parser.add_argument( + "--end-yr", type=int, default=2024, help="End year for data processing (default: 2024)" + ) + parser.add_argument( + "--output", + type=str, + default="france_solar_combined.zarr", + help="Output file name (default: france_solar_combined.zarr)", + ) + parser.add_argument( + "--metadata-file", + type=str, + default=f"{metadata_file_dir}/admin_region_lat_lon.csv", + help="Metadata CSV file with region lat/lon (default: configs/admin_region_lat_lon.csv)", + ) + + args = parser.parse_args() + + # Convert to absolute path if relative + if not os.path.isabs(args.generation_data_dir): + generation_data_dir = os.path.join(os.getcwd(), args.generation_data_dir) + else: + generation_data_dir = args.generation_data_dir + + # Define regions and years + admin_region_list = [ + "Auvergne-Rhône-Alpes", + "Bourgogne-Franche-Comté", + "Bretagne", + "Centre-Val-de-Loire", + "Grand-Est", + "Hauts-de-France", + "Ile-de-France", + "Normandie", + "Nouvelle-Aquitaine", + "Occitanie", + "Pays-de-la-Loire", + "PACA", + ] + + year_list = list(range(args.start_yr, args.end_yr + 1)) + + logger.info(f"Processing data from {generation_data_dir}") + logger.info(f"Years: {args.start_yr} to {args.end_yr}") + + # Create xarray dataset + ds = create_xarray_dataset( + generation_data_dir, admin_region_list, year_list, args.metadata_file + ) + + # Save to Zarr + output_file = os.path.join(output_dir, args.output) + ds.to_zarr(output_file, mode="w") + logger.info(f"Saved dataset to {output_file}") + + # Display info + print(ds) From e8d895d42d378225b39a2ad153bc3ad95ebde13c Mon Sep 17 00:00:00 2001 From: Her0n24 Date: Thu, 12 Feb 2026 00:39:41 +0000 Subject: [PATCH 2/4] fixes project paths, added location_id coordinate and readme, updated scripts and zarr --- docs/france_readme.md | 104 ++++++++++++++++++ .../configs/admin_region_lat_lon.csv | 26 ++--- .../inspect_france_training_pipeline.py | 80 +++++++++++--- .../scripts/process_france_data.py | 24 ++-- 4 files changed, 195 insertions(+), 39 deletions(-) create mode 100644 docs/france_readme.md diff --git a/docs/france_readme.md b/docs/france_readme.md new file mode 100644 index 0000000..302eab0 --- /dev/null +++ b/docs/france_readme.md @@ -0,0 +1,104 @@ +## France Solar Data Pipeline for PVNet +This edit/ contribution adds support for France RTE solar generation data to the project. + +## Changes +- Added France data processing script +- Created admin region metadata CSV +- Updated data pipeline to use integer location_ids +- Added inspection script for validation + +## Data API +The Definitive datasets follow the format: +https://eco2mix.rte-france.com/download/eco2mix/eCO2mix_RTE_{Region}_Annuel-Definitif_{Year}.zip + +The consolidate datasets follow the format: +https://eco2mix.rte-france.com/download/eco2mix/eCO2mix_RTE_{Region}_En-cours-Consolide.zip + +Note that TCH (le Taux de CHarge), which refers to the actual production compared to installed solar capacity is only available from 2020. Hence, initially we use 2020 to 2024 (5 years) of data. + +## Summer Time Behavior +When transitioning to summer time (e.g. 26 Mar 2023 2:00 to 03:00), entries between 2:00 and 3:00 are duplicated. +When transitioning back to winter time (e.g. 29 Oct 2023 3:00 to 2:00), data entries are ambiguous and 2 timesteps will be missing. + +### ZARR File +The converted zarr file is available on huggingface, link: +https://huggingface.co/datasets/hhhn2/France_PV_data + +### Scripts +| File | Description | +|------|-------------| +| `scripts/download_mendeley_india.py` | Dataset download instructions | +| `scripts/process_india_data.py` | Excel → Zarr conversion | +| `scripts/test_india_pipeline.py` | Pipeline validation tests | +| `scripts/train_india_baseline.py` | Solar-only baseline model | + +## Testing +- Ran process_france_data.py successfully +- Validated output with inspect_france_training_pipeline.py + +## Data Processing Results +Data Quality + +Generation (MW): + Shape: (12, 87696) + Range: [0.00, 4002.00] MW + Mean: 174.98 MW + NaN count: 120 (0.01%) + +Capacity (MWp): + Shape: (12, 87696) + Range: [122.70, 6000.00] MWp + Mean: 1170.15 MWp + NaN count: 0 (0.00%) + +Per-Region Statistics + +0: +Generation: [0.0, 2194.0] MW, Mean: 238.2 MW, NaN: 0.0% +Capacity: 1655.3 MWp, NaN: 0.0% + +1: + Generation: [0.0, 883.0] MW, Mean: 78.7 MW, NaN: 0.0% + Capacity: 537.8 MWp, NaN: 0.0% + +2: + Generation: [0.0, 568.0] MW, Mean: 49.3 MW, NaN: 0.0% + Capacity: 364.3 MWp, NaN: 0.0% + +3: + Generation: [0.0, 975.0] MW, Mean: 97.5 MW, NaN: 0.0% + Capacity: 665.7 MWp, NaN: 0.0% + +4: + Generation: [0.0, 1337.0] MW, Mean: 134.2 MW, NaN: 0.0% + Capacity: 998.7 MWp, NaN: 0.0% + +5: + Generation: [0.0, 629.0] MW, Mean: 49.0 MW, NaN: 0.0% + Capacity: 361.4 MWp, NaN: 0.0% + +6: + Generation: [0.0, 306.0] MW, Mean: 26.8 MW, NaN: 0.0% + Capacity: 218.7 MWp, NaN: 0.0% + +7: + Generation: [0.0, 464.0] MW, Mean: 32.1 MW, NaN: 0.0% + Capacity: 247.4 MWp, NaN: 0.0% + +8: + Generation: [0.0, 4002.0] MW, Mean: 534.9 MW, NaN: 0.0% + Capacity: 3524.9 MWp, NaN: 0.0% + +9: + Generation: [0.0, 3287.0] MW, Mean: 438.6 MW, NaN: 0.0% + Capacity: 2799.3 MWp, NaN: 0.0% + +10: + Generation: [0.0, 1213.0] MW, Mean: 118.8 MW, NaN: 0.0% + Capacity: 860.4 MWp, NaN: 0.0% + +11: + Generation: [0.0, 1942.0] MW, Mean: 301.8 MW, NaN: 0.0% + Capacity: 1807.9 MWp, NaN: 0.0% + +## Next Steps diff --git a/src/open_data_pvnet/configs/admin_region_lat_lon.csv b/src/open_data_pvnet/configs/admin_region_lat_lon.csv index 3615438..a56737d 100644 --- a/src/open_data_pvnet/configs/admin_region_lat_lon.csv +++ b/src/open_data_pvnet/configs/admin_region_lat_lon.csv @@ -1,13 +1,13 @@ -region,principal_municipality,latitude,longitude -"Auvergne-Rhône-Alpes","Lyon",45.7640,4.8357 -"Bourgogne-Franche-Comté","Dijon",47.3220,5.0415 -"Bretagne","Rennes",48.1173,-1.6778 -"Centre-Val-de-Loire","Orléans",47.9030,1.9093 -"Grand-Est","Strasbourg",48.5734,7.7521 -"Hauts-de-France","Lille",50.6292,3.0573 -"Ile-de-France","Paris",48.8566,2.3522 -"Normandie","Rouen",49.4432,1.0993 -"Nouvelle-Aquitaine","Bordeaux",44.8378,-0.5792 -"Occitanie","Toulouse",43.6047,1.4442 -"Pays-de-la-Loire","Nantes",47.2184,-1.5536 -"PACA","Marseille",43.2965,5.3698 \ No newline at end of file +location_id,region,principal_municipality,latitude,longitude +0,"Auvergne-Rhône-Alpes","Lyon",45.7640,4.8357 +1,"Bourgogne-Franche-Comté","Dijon",47.3220,5.0415 +2,"Bretagne","Rennes",48.1173,-1.6778 +3,"Centre-Val-de-Loire","Orléans",47.9030,1.9093 +4,"Grand-Est","Strasbourg",48.5734,7.7521 +5,"Hauts-de-France","Lille",50.6292,3.0573 +6,"Ile-de-France","Paris",48.8566,2.3522 +7,"Normandie","Rouen",49.4432,1.0993 +8,"Nouvelle-Aquitaine","Bordeaux",44.8378,-0.5792 +9,"Occitanie","Toulouse",43.6047,1.4442 +10,"Pays-de-la-Loire","Nantes",47.2184,-1.5536 +11,"PACA","Marseille",43.2965,5.3698 \ No newline at end of file diff --git a/src/open_data_pvnet/scripts/inspect_france_training_pipeline.py b/src/open_data_pvnet/scripts/inspect_france_training_pipeline.py index 05185d4..e176966 100644 --- a/src/open_data_pvnet/scripts/inspect_france_training_pipeline.py +++ b/src/open_data_pvnet/scripts/inspect_france_training_pipeline.py @@ -25,12 +25,16 @@ gfs_path = "s3://ocf-open-data-pvnet/data/gfs/v4/2024.zarr" + # Load the zarr dataset base_dir = os.getcwd() parent_3_levels_up = os.path.dirname(os.path.dirname(os.path.dirname(base_dir))) output_dir = os.path.join(parent_3_levels_up, "data") -solar_path = "france_solar_combined.zarr" +solar_path = os.path.join(parent_3_levels_up, "data", "france_solar_combined.zarr") print(f"Loading {solar_path}...\n") +gfs_path_local = os.path.join( + parent_3_levels_up, "data", "gfs_2023.zarr" +) # For testing local access if needed # Define France latitude and longitude bounds MIN_LAT, MAX_LAT = 41.5, 51.5 @@ -181,8 +185,10 @@ def test_france_solar_data(zarr_path): print("INSPECTION COMPLETE") print("=" * 60) + return True + -def test_gfs_data_access(): +def test_gfs_data_access(gfs_path): """Test accessing GFS NWP data from S3.""" logger.info("\n" + "=" * 60) logger.info("Testing GFS NWP Data Access") @@ -192,8 +198,8 @@ def test_gfs_data_access(): logger.info(f"Opening GFS data from: {gfs_path}") store = fsspec.get_mapper(gfs_path, anon=True) - # Open with limited variables to test access - ds = xr.open_zarr(store, consolidated=True) + # Open with decode_timedelta to suppress warning + ds = xr.open_zarr(store, consolidated=True, decode_timedelta=True) logger.info("GFS Dataset accessed successfully!") logger.info(f"Variables: {list(ds.data_vars)[:10]}...") # First 10 @@ -221,12 +227,18 @@ def test_gfs_data_access(): logger.warning("GFS dataset does not have latitude/longitude dimensions") # Check time dimension - if "init_time" in ds.dims or "time" in ds.dims: - time_dim = "init_time" if "init_time" in ds.dims else "time" + if "init_time_utc" in ds.dims or "init_time" in ds.dims: + time_dim = "init_time_utc" if "init_time_utc" in ds.dims else "init_time" times = ds[time_dim].values logger.info("\nTime Coverage:") - logger.info(f" First: {pd.Timestamp(times[0])}") - logger.info(f" Last: {pd.Timestamp(times[-1])}") + logger.info(f" First init_time: {pd.Timestamp(times[0])}") + logger.info(f" Last init_time: {pd.Timestamp(times[-1])}") + + if "step" in ds.dims: + steps = ds["step"].values + logger.info(f" Forecast steps: {len(steps)} steps") + logger.info(f" First step: {steps[0]}") + logger.info(f" Last step: {steps[-1]}") logger.info("\n GFS Data Access: PASSED") return True @@ -250,18 +262,53 @@ def test_time_alignment(solar_path, gfs_path): ds_solar = xr.open_zarr(str(solar_path)) solar_times = ds_solar["time_utc"].values - ds_gfs = xr.open_zarr(str(gfs_path)) - gfs_times = ds_gfs["time"].values + # Open GFS with decode_timedelta to handle step coordinate + store = fsspec.get_mapper(gfs_path, anon=True) + ds_gfs = xr.open_zarr(store, consolidated=True, decode_timedelta=True) + + # GFS uses init_time_utc + step for valid time + # Calculate valid times from init_time and forecast steps + init_times = ds_gfs["init_time_utc"].values + steps = ds_gfs["step"].values + + logger.info("\nGFS time structure:") + logger.info(f" Init times: {len(init_times)} values") + logger.info(f" Forecast steps: {len(steps)} values") + logger.info(f" First init_time: {pd.Timestamp(init_times[0])}") + logger.info(f" First step: {steps[0]}") + + # Calculate all valid times (init_time + step for all combinations) + gfs_valid_times = [] + for init_time in init_times: + for step in steps: + valid_time = pd.Timestamp(init_time) + pd.Timedelta(step) + gfs_valid_times.append(valid_time) + + gfs_valid_times = np.array(gfs_valid_times, dtype="datetime64[ns]") + gfs_valid_times = np.unique(gfs_valid_times) # Remove duplicates + + logger.info(f"\nTotal unique GFS valid times: {len(gfs_valid_times)}") # Check for overlapping times - overlap = np.intersect1d(solar_times, gfs_times) + overlap = np.intersect1d(solar_times, gfs_valid_times) if len(overlap) > 0: logger.info(f"Found {len(overlap)} overlapping time steps") + logger.info(f" First overlap: {pd.Timestamp(overlap[0])}") + logger.info(f" Last overlap: {pd.Timestamp(overlap[-1])}") else: logger.warning("No overlapping time steps found") + logger.warning( + f" Solar time range: {pd.Timestamp(solar_times[0])} to {pd.Timestamp(solar_times[-1])}" + ) + logger.warning( + f" GFS time range: {pd.Timestamp(gfs_valid_times[0])} to {pd.Timestamp(gfs_valid_times[-1])}" + ) + + return True except Exception as e: logger.error(f"Failed to check time alignment: {e}") + return False def main(): @@ -271,7 +318,13 @@ def main(): results = {} results["solar_data"] = test_france_solar_data(solar_path) - results["gfs_access"] = test_gfs_data_access() + results["gfs_access"] = test_gfs_data_access( + gfs_path + ) # testing S3 accsess -- can switch to local GFS + if os.path.exists(gfs_path_local): + results["time_alignment"] = test_time_alignment( + solar_path, gfs_path_local + ) # Use local GFS for time alignment test # Summary logger.info("\n" + "=" * 60) @@ -288,9 +341,6 @@ def main(): else: logger.info("\nSome tests failed - check logs above") - # Skip time alignment test since GFS is on S3 - logger.info("\nSkipping time alignment test (GFS is on S3)") - if __name__ == "__main__": main() diff --git a/src/open_data_pvnet/scripts/process_france_data.py b/src/open_data_pvnet/scripts/process_france_data.py index b8d945e..99ed217 100644 --- a/src/open_data_pvnet/scripts/process_france_data.py +++ b/src/open_data_pvnet/scripts/process_france_data.py @@ -1,4 +1,3 @@ - import pandas as pd import xarray as xr import os @@ -22,6 +21,7 @@ start_yr = 2020 end_yr = 2024 + def process_location_csv(generation_data_dir, region, year) -> pd.DataFrame: """Process the location CSV for a given region and year. Args: @@ -113,9 +113,6 @@ def process_location_csv(generation_data_dir, region, year) -> pd.DataFrame: return df - - - # Create a France wide aggregate def create_France_aggregate(generation_data_dir, region_list, year_list) -> pd.DataFrame: """Create a France-wide aggregate DataFrame from regional generation data. @@ -198,7 +195,8 @@ def create_xarray_dataset( region_df = region_df.sort_index() # Sort by datetime all_data.append(region_df) - location_ids.append(region) + # Use integer location_id from metadata + location_ids.append(int(metadata.loc[region, "location_id"])) latitudes.append(metadata.loc[region, "latitude"]) longitudes.append(metadata.loc[region, "longitude"]) @@ -276,6 +274,7 @@ def create_xarray_dataset( return ds + if __name__ == "__main__": parser = argparse.ArgumentParser( description="Process France RTE solar generation data and create xarray dataset" @@ -283,14 +282,14 @@ def create_xarray_dataset( parser.add_argument( "--generation-data-dir", type=str, - default="downloads", - help="Directory containing the generation CSV files (default: downloads)", + default=None, + help=f"Directory containing the generation CSV files (default: {generation_data_dir})", ) parser.add_argument( - "--start-yr", type=int, default=2020, help="Start year for data processing (default: 2020)" + "--start_yr", type=int, default=2020, help="Start year for data processing (default: 2020)" ) parser.add_argument( - "--end-yr", type=int, default=2024, help="End year for data processing (default: 2024)" + "--end_yr", type=int, default=2024, help="End year for data processing (default: 2024)" ) parser.add_argument( "--output", @@ -307,8 +306,11 @@ def create_xarray_dataset( args = parser.parse_args() - # Convert to absolute path if relative - if not os.path.isabs(args.generation_data_dir): + # Use global generation_data_dir if no argument provided, otherwise convert to absolute path if relative + if args.generation_data_dir is None: + # Use the global variable defined at top of file + pass # generation_data_dir already set + elif not os.path.isabs(args.generation_data_dir): generation_data_dir = os.path.join(os.getcwd(), args.generation_data_dir) else: generation_data_dir = args.generation_data_dir From e33acf3a43f992657aa3ff9d1977ffc35262d342 Mon Sep 17 00:00:00 2001 From: "Heron N." <162735136+Her0n24@users.noreply.github.com> Date: Thu, 12 Feb 2026 00:42:37 +0000 Subject: [PATCH 3/4] edited France README --- docs/france_readme.md | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/docs/france_readme.md b/docs/france_readme.md index 302eab0..6fe1a81 100644 --- a/docs/france_readme.md +++ b/docs/france_readme.md @@ -24,14 +24,6 @@ When transitioning back to winter time (e.g. 29 Oct 2023 3:00 to 2:00), data ent The converted zarr file is available on huggingface, link: https://huggingface.co/datasets/hhhn2/France_PV_data -### Scripts -| File | Description | -|------|-------------| -| `scripts/download_mendeley_india.py` | Dataset download instructions | -| `scripts/process_india_data.py` | Excel → Zarr conversion | -| `scripts/test_india_pipeline.py` | Pipeline validation tests | -| `scripts/train_india_baseline.py` | Solar-only baseline model | - ## Testing - Ran process_france_data.py successfully - Validated output with inspect_france_training_pipeline.py @@ -102,3 +94,5 @@ Capacity: 1655.3 MWp, NaN: 0.0% Capacity: 1807.9 MWp, NaN: 0.0% ## Next Steps +- Make a NOAA GFS pipeline for France +- Compare with baseline model From f2782cac994026bfe5eab5e9b4d78190902d3ef0 Mon Sep 17 00:00:00 2001 From: Her0n24 Date: Fri, 20 Feb 2026 20:02:20 +0000 Subject: [PATCH 4/4] reorganized metadata info, added config and gfs download --- .../configuration/france_configuration.yaml | 117 +++++ .../configs/france_gfs_config.yaml | 92 ++++ .../configs/france_pv_data_config.yaml | 98 ++++ .../scripts/fra/download_gfs_france_fast.py | 465 ++++++++++++++++++ .../scripts/{ => fra}/get_generation_csv.py | 35 +- .../inspect_france_training_pipeline.py | 17 +- .../scripts/{ => fra}/process_france_data.py | 84 +++- 7 files changed, 854 insertions(+), 54 deletions(-) create mode 100644 src/open_data_pvnet/configs/PVNet_configs/datamodule/configuration/france_configuration.yaml create mode 100644 src/open_data_pvnet/configs/france_gfs_config.yaml create mode 100644 src/open_data_pvnet/configs/france_pv_data_config.yaml create mode 100644 src/open_data_pvnet/scripts/fra/download_gfs_france_fast.py rename src/open_data_pvnet/scripts/{ => fra}/get_generation_csv.py (84%) rename src/open_data_pvnet/scripts/{ => fra}/inspect_france_training_pipeline.py (96%) rename src/open_data_pvnet/scripts/{ => fra}/process_france_data.py (83%) diff --git a/src/open_data_pvnet/configs/PVNet_configs/datamodule/configuration/france_configuration.yaml b/src/open_data_pvnet/configs/PVNet_configs/datamodule/configuration/france_configuration.yaml new file mode 100644 index 0000000..3ba54bf --- /dev/null +++ b/src/open_data_pvnet/configs/PVNet_configs/datamodule/configuration/france_configuration.yaml @@ -0,0 +1,117 @@ +general: + description: Configuration for training PVNet on France solar PV data with GFS data + name: france_pvnet_config + +input_data: + # # Either use Site OR GSP configuration + # site: + # # Path to Site data in NetCDF format + # file_path: PLACEHOLDER.nc + # # Path to metadata in CSV format + # metadata_file_path: PLACEHOLDER.csv + # time_resolution_minutes: 15 + # interval_start_minutes: -60 + # # Specified for intraday currently + # interval_end_minutes: 480 + # dropout_timedeltas_minutes: null + # dropout_fraction: 0 # Fraction of samples with dropout + + # France solar PV generation data + # Uses 'generation:' (generic PV data) not 'gsp:' (UK-specific Grid Supply Point) + generation: + # Path to generation data in zarr format -- update to S3 path when uploaded + # hugging face: https://huggingface.co/datasets/hhhn2/France_PV_data/tree/main + zarr_path: "./data/fra/france_pv_generation.zarr" + interval_start_minutes: -60 + interval_end_minutes: 480 + time_resolution_minutes: 30 # half hourly resolution + # Random value from the list below will be chosen as the delay when dropout is used + # If set to null no dropout is applied. Only values before t0 are dropped out for GSP. + # Values after t0 are assumed as targets and cannot be dropped. + dropout_timedeltas_minutes: [] + dropout_fraction: 0.0 # Fraction of samples with dropout + public: False # set to true when uploaded + + nwp: + gfs: + time_resolution_minutes: 180 # Match the dataset's resolution (3 hours) + interval_start_minutes: -180 + interval_end_minutes: 540 + dropout_fraction: 0.0 + dropout_timedeltas_minutes: [] + zarr_path: "./data/fra/gfs/france_gfs_2024_02.zarr" + provider: "gfs" + image_size_pixels_height: 2 + image_size_pixels_width: 2 + public: False # set to true when uploaded + channels: + - dlwrf # downwards long-wave radiation flux + - dswrf # downwards short-wave radiation flux + - hcc # high cloud cover + - mcc # medium cloud cover + - lcc # low cloud cover + - prate # precipitation rate + - r # relative humidity + - t # 2-metre temperature + - tcc # total cloud cover + - u10 # 10-metre wind U component + - u100 # 100-metre wind U component + - v10 # 10-metre wind V component + - v100 # 100-metre wind V component + - vis # visibility + normalisation_constants: + dlwrf: + mean: 298.342 + std: 96.305916 + dswrf: + mean: 168.12321 + std: 246.18533 + hcc: + mean: 35.272 + std: 42.525383 + lcc: + mean: 43.578342 + std: 44.3732 + mcc: + mean: 33.738823 + std: 43.150745 + prate: + mean: 2.8190969e-05 + std: 0.00010159573 + r: + mean: 18.359747 + std: 25.440672 + sde: + mean: 0.36937004 + std: 0.43345627 + t: + mean: 278.5223 + std: 22.825893 + tcc: + mean: 66.841606 + std: 41.030598 + u10: + mean: -0.0022310058 + std: 5.470838 + u100: + mean: 0.0823025 + std: 6.8899174 + v10: + mean: 0.06219831 + std: 4.7401133 + v100: + mean: 0.0797807 + std: 6.076132 + vis: + mean: 19628.32 + std: 8294.022 + u: + mean: 11.645444 + std: 10.614556 + v: + mean: 0.12330122 + std: 7.176398 + solar_position: + interval_start_minutes: -60 + interval_end_minutes: 480 + time_resolution_minutes: 30 \ No newline at end of file diff --git a/src/open_data_pvnet/configs/france_gfs_config.yaml b/src/open_data_pvnet/configs/france_gfs_config.yaml new file mode 100644 index 0000000..2090d3e --- /dev/null +++ b/src/open_data_pvnet/configs/france_gfs_config.yaml @@ -0,0 +1,92 @@ +general: + name: "france_gfs_config" + description: "Configuration for France GFS data" + +input_data: + nwp: + gfs: + # GFS provides 3-hourly forecasts globally + time_resolution_minutes: 180 + interval_start_minutes: -180 + interval_end_minutes: 540 + dropout_timedeltas_minutes: null + dropout_fraction: 0.0 + accum_channels: [] + max_staleness_minutes: 540 + + # Use existing OCF GFS data - filter to France bounds at runtime + zarr_path: "s3://ocf-open-data-pvnet/data/gfs/v4/2023.zarr" + provider: "gfs" + public: true + + geographic_bounds: + latitude_min: 42.0 + latitude_max: 51.5 + longitude_min: -5.5 + longitude_max: 9.0 + + # Spatial sampling + image_size_pixels_height: 2 + image_size_pixels_width: 2 + + # Weather channels for solar prediction + channels: + - dlwrf # downwards long-wave radiation flux + - dswrf # downwards short-wave radiation flux + - hcc # high cloud cover + - lcc # low cloud cover + - mcc # medium cloud cover + - prate # precipitation rate + - r # relative humidity + - t # 2-metre temperature + - tcc # total cloud cover + - u10 # 10-metre wind U component + - u100 # 100-metre wind U component + - v10 # 10-metre wind V component + - v100 # 100-metre wind V component + - vis # visibility + + # Normalisation constants (using global GFS stats from UK config) + normalisation_constants: + dlwrf: + mean: 298.342 + std: 96.305916 + dswrf: + mean: 168.12321 + std: 246.18533 + hcc: + mean: 35.272 + std: 42.525383 + lcc: + mean: 43.578342 + std: 44.3732 + mcc: + mean: 33.738823 + std: 43.150745 + prate: + mean: 2.8190969e-05 + std: 0.00010159573 + r: + mean: 18.359747 + std: 25.440672 + t: + mean: 278.5223 + std: 22.825893 + tcc: + mean: 66.841606 + std: 41.030598 + u10: + mean: -0.0022310058 + std: 5.470838 + u100: + mean: 0.0823025 + std: 6.8899174 + v10: + mean: 0.06219831 + std: 4.7401133 + v100: + mean: 0.0797807 + std: 6.076132 + vis: + mean: 19628.32 + std: 8294.022 \ No newline at end of file diff --git a/src/open_data_pvnet/configs/france_pv_data_config.yaml b/src/open_data_pvnet/configs/france_pv_data_config.yaml new file mode 100644 index 0000000..452ede5 --- /dev/null +++ b/src/open_data_pvnet/configs/france_pv_data_config.yaml @@ -0,0 +1,98 @@ +general: + name: "france_pv_config" + description: "Configuration for France PV generation data from RTÉ" + +input_data: + generation: + # Path to processed France solar Zarr + zarr_path: "./data/fra/france_pv_generation.zarr" + time_resolution_minutes: 30 + start_date: "2020-01-01" + end_date: "2024-12-31" + timezone: "UTC" + public: True + +metadata: + country: "France" + gsp_id: "FR" + latitude: 46.6 + longitude: 2.5 + +output: + format: "zarr" + csv_backup: True + +# France administrative regions metadata +# Coordinates represent principal municipality of each region +regions: + - location_id: 0 + name: "Auvergne-Rhône-Alpes" + principal_municipality: "Lyon" + latitude: 45.7640 + longitude: 4.8357 + + - location_id: 1 + name: "Bourgogne-Franche-Comté" + principal_municipality: "Dijon" + latitude: 47.3220 + longitude: 5.0415 + + - location_id: 2 + name: "Bretagne" + principal_municipality: "Rennes" + latitude: 48.1173 + longitude: -1.6778 + + - location_id: 3 + name: "Centre-Val-de-Loire" + principal_municipality: "Orléans" + latitude: 47.9030 + longitude: 1.9093 + + - location_id: 4 + name: "Grand-Est" + principal_municipality: "Strasbourg" + latitude: 48.5734 + longitude: 7.7521 + + - location_id: 5 + name: "Hauts-de-France" + principal_municipality: "Lille" + latitude: 50.6292 + longitude: 3.0573 + + - location_id: 6 + name: "Ile-de-France" + principal_municipality: "Paris" + latitude: 48.8566 + longitude: 2.3522 + + - location_id: 7 + name: "Normandie" + principal_municipality: "Rouen" + latitude: 49.4432 + longitude: 1.0993 + + - location_id: 8 + name: "Nouvelle-Aquitaine" + principal_municipality: "Bordeaux" + latitude: 44.8378 + longitude: -0.5792 + + - location_id: 9 + name: "Occitanie" + principal_municipality: "Toulouse" + latitude: 43.6047 + longitude: 1.4442 + + - location_id: 10 + name: "Pays-de-la-Loire" + principal_municipality: "Nantes" + latitude: 47.2184 + longitude: -1.5536 + + - location_id: 11 + name: "PACA" + principal_municipality: "Marseille" + latitude: 43.2965 + longitude: 5.3698 \ No newline at end of file diff --git a/src/open_data_pvnet/scripts/fra/download_gfs_france_fast.py b/src/open_data_pvnet/scripts/fra/download_gfs_france_fast.py new file mode 100644 index 0000000..3b7ac7e --- /dev/null +++ b/src/open_data_pvnet/scripts/fra/download_gfs_france_fast.py @@ -0,0 +1,465 @@ +""" +Download and process NOAA GFS data for France region. +Adopted from Raakshass:feature/india-solar-pipeline & Sharkyii:germany in open-data-pvnet repo. + +Two download modes: + 1. NOMADS GRIB filter (fast) — Selects specific variables + France subregion + in a single HTTP request. Returns ~100-200KB per file vs 300MB full GRIB. + Only available for last ~10 days of data. + 2. Herbie byte-range (fallback) — For historical data from S3. + Uses .idx index files to download specific variables. + +Output: OCF-compatible Zarr with dims (init_time_utc, step, latitude, longitude) +and 14 data variables matching existing GFS schema. + +Usage: + # Fast mode — recent data via NOMADS filter (recommended for testing) + python src/open_data_pvnet/scripts/fra/download_gfs_france_fast.py --year 2026 --months 2 --max-days 1 + + # Historical data via Herbie S3 byte-range + python src/open_data_pvnet/scripts/fra/download_gfs_france_fast.py --year 2024 --months 1 --max-days 1 --source herbie + + # Parallel downloads (10 workers) + python src/open_data_pvnet/scripts/fra/download_gfs_france_fast.py --year 2024 --months 1 --max-days 3 --workers 10 + +Requirements: + pip install xarray cfgrib eccodes numpy pandas zarr requests + pip install herbie-data # only needed for --source herbie +""" + +import argparse +import logging +import os +import tempfile +import warnings +from concurrent.futures import ThreadPoolExecutor, as_completed +from datetime import datetime, timedelta +from pathlib import Path + +import numpy as np +import pandas as pd +import requests +import xarray as xr + +try: + from tqdm import tqdm +except ImportError: + tqdm = None + +warnings.filterwarnings("ignore", category=FutureWarning) +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s | %(levelname)s | %(message)s", + datefmt="%Y-%m-%d %H:%M:%S", +) +logger = logging.getLogger(__name__) + +# Define France latitude and longitude bounds +FRANCE_MIN_LAT, FRANCE_MAX_LAT = 42.0, 51.5 +FRANCE_MIN_LON, FRANCE_MAX_LON = -5.5, 9.0 + +# --------------------------------------------------------------------------- # +# OCF channel mapping +# --------------------------------------------------------------------------- # + +OCF_CHANNELS = { + "dlwrf": { + "nomads": "DLWRF", + "search": ":DLWRF:surface", + "level": "surface", + }, + "dswrf": { + "nomads": "DSWRF", + "search": ":DSWRF:surface", + "level": "surface", + }, + "hcc": { + "nomads": "HCDC", + "search": ":[HT]CDC:high cloud layer:", + "level": "high_cloud_layer", + }, + "lcc": { + "nomads": "LCDC", + "search": ":[LT]CDC:low cloud layer:", + "level": "low_cloud_layer", + }, + "mcc": { + "nomads": "MCDC", + "search": ":[MT]CDC:middle cloud layer:", + "level": "middle_cloud_layer", + }, + "prate": { + "nomads": "PRATE", + "search": ":PRATE:surface:(?!.*ave)", + "level": "surface", + }, + "r": { + "nomads": "RH", + "search": ":RH:850 mb", + "level": "850_mb", + }, + "t": { + "nomads": "TMP", + "search": ":TMP:2 m above ground", + "level": "2_m_above_ground", + }, + "tcc": { + "nomads": "TCDC", + "search": ":TCDC:entire atmosphere:", + "level": "entire_atmosphere_(considered_as_a_single_layer)", + }, + "u10": { + "nomads": "UGRD", + "search": ":UGRD:10 m above ground", + "level": "10_m_above_ground", + }, + "u100": { + "nomads": "UGRD", + "search": ":UGRD:100 m above ground", + "level": "100_m_above_ground", + }, + "v10": { + "nomads": "VGRD", + "search": ":VGRD:10 m above ground", + "level": "10_m_above_ground", + }, + "v100": { + "nomads": "VGRD", + "search": ":VGRD:100 m above ground", + "level": "100_m_above_ground", + }, + "vis": { + "nomads": "VIS", + "search": ":VIS:surface", + "level": "surface", + }, +} + +# GFS forecast hours (17 steps: 0-48h at 3h intervals) +FORECAST_HOURS = list(range(0, 49, 3)) + +# GFS initialization hours (4x daily) +INIT_HOURS = [0, 6, 12, 18] + +# NOMADS base URL +NOMADS_BASE = "https://nomads.ncep.noaa.gov/cgi-bin/filter_gfs_0p25.pl" + +# --------------------------------------------------------------------------- # +# NOMADS GRIB Filter — fast, subregion-aware downloads +# --------------------------------------------------------------------------- # + +def build_nomads_url(date: datetime, init_hour: int, fxx: int) -> str: + """Build NOMADS GRIB filter URL for France-subset GFS download.""" + date_str = date.strftime("%Y%m%d") + + params = { + "dir": f"/gfs.{date_str}/{init_hour:02d}/atmos", + "file": f"gfs.t{init_hour:02d}z.pgrb2.0p25.f{fxx:03d}", + "subregion": "", + "toplat": str(FRANCE_MAX_LAT), + "bottomlat": str(FRANCE_MIN_LAT), + "leftlon": str(FRANCE_MIN_LON), + "rightlon": str(FRANCE_MAX_LON), + } + + nomads_vars = set() + for spec in OCF_CHANNELS.values(): + nomads_vars.add(spec["nomads"]) + for var in sorted(nomads_vars): + params[f"var_{var}"] = "on" + + nomads_levels = set() + for spec in OCF_CHANNELS.values(): + nomads_levels.add(spec["level"]) + for level in sorted(nomads_levels): + params[f"lev_{level}"] = "on" + + param_str = "&".join(f"{k}={v}" for k, v in params.items()) + return f"{NOMADS_BASE}?{param_str}" + + +def download_nomads_step( + date: datetime, + init_hour: int, + fxx: int, + tmp_dir: str, + timeout: int = 60, +) -> str | None: + """Download a single forecast step via NOMADS grib filter.""" + url = build_nomads_url(date, init_hour, fxx) + fname = f"gfs_{date.strftime('%Y%m%d')}_{init_hour:02d}z_f{fxx:03d}.grib2" + local_path = os.path.join(tmp_dir, fname) + + if os.path.exists(local_path) and os.path.getsize(local_path) > 1000: + return local_path + + try: + resp = requests.get(url, timeout=timeout) + if resp.status_code == 200 and len(resp.content) > 1000: + with open(local_path, "wb") as f: + f.write(resp.content) + return local_path + return None + except Exception: + return None + + +def extract_variables_from_grib(grib_path: str) -> dict[str, xr.DataArray]: + """Extract OCF variables from a subsetted GRIB file.""" + variables = {} + for ocf_name, spec in OCF_CHANNELS.items(): + try: + ds = xr.open_dataset( + grib_path, + engine="cfgrib", + backend_kwargs={ + "filter_by_keys": { + "shortName": spec["nomads"].lower(), + }, + "errors": "ignore", + }, + ) + if len(ds.data_vars) == 0: + continue + var_name = list(ds.data_vars)[0] + da = ds[var_name].load() + keep = {"latitude", "longitude"} + drop = [c for c in da.coords if c not in keep] + da = da.drop_vars(drop, errors="ignore") + da.name = ocf_name + variables[ocf_name] = da.astype(np.float32) + ds.close() + except Exception: + pass + return variables + + +# --------------------------------------------------------------------------- # +# Herbie byte-range downloads — for historical S3 data +# --------------------------------------------------------------------------- # + +def download_herbie_variable(herbie_obj, ch_name: str) -> tuple[str, xr.DataArray] | None: + """Download a single variable using an initialized Herbie object.""" + spec = OCF_CHANNELS[ch_name] + try: + ds = herbie_obj.xarray(spec["search"], verbose=False) + if ds is None or len(ds.data_vars) == 0: + return None + var_name = list(ds.data_vars)[0] + da = ds[var_name].load() + # Subset to France + da = da.sel( + latitude=slice(FRANCE_MAX_LAT, FRANCE_MIN_LAT), + longitude=slice(FRANCE_MIN_LON, FRANCE_MAX_LON), + ) + keep = {"latitude", "longitude"} + drop = [c for c in da.coords if c not in keep] + da = da.drop_vars(drop, errors="ignore") + da.name = ch_name + return ch_name, da.astype(np.float32) + except Exception: + return None + +def download_herbie_step( + date_str: str, + init_hour: int, + fxx: int, + channels: list[str], + workers: int = 4, +) -> dict[str, xr.DataArray]: + """Download variables via Herbie byte-range from S3 in parallel.""" + from herbie import Herbie + variables = {} + try: + H = Herbie( + date_str, + model="gfs", + fxx=fxx, + product="pgrb2.0p25", + verbose=False, + priority=['aws', 'nomads', 'google', 'azure'] + ) + # Parallelize variable downloads within the forecast step + with ThreadPoolExecutor(max_workers=workers) as executor: + futures = {executor.submit(download_herbie_variable, H, ch): ch for ch in channels} + for future in as_completed(futures): + result = future.result() + if result: + ch_name, da = result + variables[ch_name] = da + except Exception as e: + logger.warning(f" Herbie failed for f{fxx:03d}: {e}") + return variables + + +# --------------------------------------------------------------------------- # +# Core processing pipeline +# --------------------------------------------------------------------------- # + +def process_single_init_time( + date: datetime, + init_hour: int, + source: str = "nomads", + workers: int = 6, + channels: list[str] | None = None, +) -> xr.Dataset | None: + """Process all forecast steps for a single GFS init time.""" + if channels is None: + channels = list(OCF_CHANNELS.keys()) + init_time = pd.Timestamp(date.strftime("%Y-%m-%d")) + pd.Timedelta(hours=init_hour) + logger.info(f"Processing {init_time} [{source}]") + + step_datasets = {} + + if source == "nomads": + with tempfile.TemporaryDirectory(prefix="gfs_germany_") as tmp_dir: + grib_paths = {} + with ThreadPoolExecutor(max_workers=workers) as executor: + futures = { + executor.submit(download_nomads_step, date, init_hour, fxx, tmp_dir): fxx + for fxx in FORECAST_HOURS + } + for future in as_completed(futures): + fxx = futures[future] + try: + path = future.result() + if path: grib_paths[fxx] = path + except Exception: pass + for fxx in sorted(grib_paths.keys()): + variables = extract_variables_from_grib(grib_paths[fxx]) + if not variables: continue + ds = xr.Dataset(variables).expand_dims({"step": [np.timedelta64(fxx, "h")]}) + step_datasets[fxx] = ds + else: + date_str = date.strftime("%Y-%m-%d") + # Parallelize forecast steps for Herbie too + with ThreadPoolExecutor(max_workers=max(1, workers // 2)) as executor: + futures = { + executor.submit(download_herbie_step, date_str, init_hour, fxx, channels): fxx + for fxx in FORECAST_HOURS + } + for future in as_completed(futures): + fxx = futures[future] + variables = future.result() + if variables: + ds = xr.Dataset(variables).expand_dims({"step": [np.timedelta64(fxx, "h")]}) + step_datasets[fxx] = ds + + if not step_datasets: return None + + # Sort by step before concat + sorted_steps = [step_datasets[fxx] for fxx in sorted(step_datasets.keys())] + combined = xr.concat(sorted_steps, dim="step") + combined = combined.expand_dims({"init_time_utc": [init_time]}) + return combined + + +def process_month( + year: int, month: int, output_dir: str, + max_days: int = None, source: str = "nomads", + workers: int = 6, channels: list[str] = None +) -> str | None: + """Process one month of GFS data for Germany and save as Zarr.""" + start = datetime(year, month, 1) + if month == 12: end = datetime(year + 1, 1, 1) + else: end = datetime(year, month + 1, 1) + + # Calculate total days in the month + total_days = (end - start).days + + # Default max_days to total days in month if not specified + if max_days is None: + max_days = total_days + + dates = [] + current = start + while current < end: + dates.append(current) + current += timedelta(days=1) + dates = dates[:max_days] # Always slice, using calculated max_days + + logger.info(f"Processing {year}-{month:02d} [{source}]") + all_datasets = [] + + # Generate all (date, init_hour) pairs + init_pairs = [] + for date in dates: + for init_hour in INIT_HOURS: + init_pairs.append((date, init_hour)) + + pbar = None + if tqdm: + pbar = tqdm(total=len(init_pairs), desc=f"Downloading {year}-{month:02d}") + + def _process_one(pair): + d, h = pair + try: + return process_single_init_time(d, h, source, workers, channels) + except Exception as e: + logger.error(f"Failed {d.strftime('%Y-%m-%d')} {h:02d}Z: {e}") + return None + + # Parallelize at the init_time level for historical data + # We use fewer workers here if the inner loop is also parallelized + top_workers = 1 if source == "nomads" else max(1, workers // 4) + if source == "herbie": + # For historical data, top-level parallelism is more efficient + logger.info(f"Using {top_workers} concurrent initialization times for historical download") + with ThreadPoolExecutor(max_workers=top_workers) as executor: + futures = {executor.submit(_process_one, p): p for p in init_pairs} + for future in as_completed(futures): + ds = future.result() + if ds is not None: + all_datasets.append(ds) + if pbar: pbar.update(1) + else: + # NOMADS is usually fast enough and supports its own internal parallelism + for pair in init_pairs: + ds = _process_one(pair) + if ds is not None: + all_datasets.append(ds) + if pbar: pbar.update(1) + + if pbar: pbar.close() + + if not all_datasets: + logger.warning(f"No data captured for {year}-{month:02d}") + return None + + combined = xr.concat(all_datasets, dim="init_time_utc").sortby("init_time_utc") + if combined.latitude[0] < combined.latitude[-1]: + combined = combined.isel(latitude=slice(None, None, -1)) + + output_path = os.path.join(output_dir, f"france_gfs_{year}_{month:02d}.zarr") + os.makedirs(output_dir, exist_ok=True) + combined.to_zarr(output_path, mode="w", consolidated=True, zarr_version=2) + logger.info(f"✓ Saved: {output_path}") + return output_path + + +def main(): + parser = argparse.ArgumentParser(description="Download NOAA GFS data for France → OCF Zarr") + parser.add_argument("--year", type=int, required=True) + parser.add_argument("--months", type=int, nargs="+", required=True) + parser.add_argument("--output-dir", type=str, default="data/fra/gfs") + parser.add_argument("--max-days", type=int, default=None, + help="Maximum number of days to process from start of month (default: all days in the month)") + parser.add_argument("--source", choices=["nomads", "herbie"], default="nomads") + parser.add_argument("--workers", type=int, default=8, help="Parallel workers for steps/variables") + parser.add_argument("--channels", type=str, nargs="+", default=None) + parser.add_argument("--merge", action="store_true") + args = parser.parse_args() + + monthly_paths = [] + for month in args.months: + path = process_month(args.year, month, args.output_dir, args.max_days, args.source, args.workers, args.channels) + if path: monthly_paths.append(path) + + if args.merge and len(monthly_paths) > 1: + yearly = os.path.join(args.output_dir, f"france_gfs_{args.year}.zarr") + datasets = [xr.open_zarr(p) for p in monthly_paths] + xr.concat(datasets, dim="init_time_utc").sortby("init_time_utc").to_zarr(yearly, mode="w", consolidated=True, zarr_version=2) + logger.info(f"✓ Merged: {yearly}") + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/src/open_data_pvnet/scripts/get_generation_csv.py b/src/open_data_pvnet/scripts/fra/get_generation_csv.py similarity index 84% rename from src/open_data_pvnet/scripts/get_generation_csv.py rename to src/open_data_pvnet/scripts/fra/get_generation_csv.py index b983622..0beddf5 100644 --- a/src/open_data_pvnet/scripts/get_generation_csv.py +++ b/src/open_data_pvnet/scripts/fra/get_generation_csv.py @@ -10,7 +10,7 @@ - Capacity (TCH) data available from Jan 2020 Usage: - python get_generation_csv.py --start_yr 2019 --end_yr 2023 --consolidate_yr 2024 + python get_generation_csv.py --start_yr 2020 --end_yr 2023 --consolidate_yr 2024 # where users need to determine the consolidate year for assignment of a year in the file name # based on the latest available data on RTE. This way filenames will be consistent with the year of data they contain. """ @@ -18,6 +18,7 @@ import requests import pandas as pd import os +import sys from time import sleep import zipfile import argparse @@ -31,24 +32,20 @@ ) logger = logging.getLogger(__name__) -base_dir = os.getcwd() -parent_3_levels_up = os.path.dirname(os.path.dirname(os.path.dirname(base_dir))) -output_dir = os.path.join(parent_3_levels_up, "tmp") - -admin_region_list = [ - "Auvergne-Rhône-Alpes", - "Bourgogne-Franche-Comté", - "Bretagne", - "Centre-Val-de-Loire", - "Grand-Est", - "Hauts-de-France", - "Ile-de-France", - "Normandie", - "Nouvelle-Aquitaine", - "Occitanie", - "Pays-de-la-Loire", - "PACA", -] +# Get paths relative to this script's location +script_dir = os.path.dirname(os.path.abspath(__file__)) # .../scripts/fra/ +workspace_root = os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(script_dir)))) # 4 levels up +metadata_file_dir = os.path.join(workspace_root, "src", "open_data_pvnet", "configs") +output_dir = os.path.join(workspace_root, "tmp") + +# Load admin regions from CSV +# try catching metadata file not found error and log it +try: + metadata_df = pd.read_csv(os.path.join(metadata_file_dir, "admin_region_lat_lon.csv")) + admin_region_list = metadata_df["region"].tolist() +except FileNotFoundError: + logger.error(f"Metadata file not found: {os.path.join(metadata_file_dir, 'admin_region_lat_lon.csv')}") + sys.exit(1) def get_region_generation_csv(region, year, consolidated=False) -> None: diff --git a/src/open_data_pvnet/scripts/inspect_france_training_pipeline.py b/src/open_data_pvnet/scripts/fra/inspect_france_training_pipeline.py similarity index 96% rename from src/open_data_pvnet/scripts/inspect_france_training_pipeline.py rename to src/open_data_pvnet/scripts/fra/inspect_france_training_pipeline.py index e176966..0850d96 100644 --- a/src/open_data_pvnet/scripts/inspect_france_training_pipeline.py +++ b/src/open_data_pvnet/scripts/fra/inspect_france_training_pipeline.py @@ -6,7 +6,6 @@ 2. GFS NWP data for France is accessible from S3 3. Data timestamps align for training """ - import xarray as xr import pandas as pd import numpy as np @@ -25,19 +24,15 @@ gfs_path = "s3://ocf-open-data-pvnet/data/gfs/v4/2024.zarr" - # Load the zarr dataset -base_dir = os.getcwd() -parent_3_levels_up = os.path.dirname(os.path.dirname(os.path.dirname(base_dir))) -output_dir = os.path.join(parent_3_levels_up, "data") -solar_path = os.path.join(parent_3_levels_up, "data", "france_solar_combined.zarr") -print(f"Loading {solar_path}...\n") -gfs_path_local = os.path.join( - parent_3_levels_up, "data", "gfs_2023.zarr" -) # For testing local access if needed +base_dir = os.path.dirname(os.path.abspath(__file__)) # .../scripts/fra/ +workspace_root = os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(base_dir)))) # 4 levels up +output_dir = os.path.join(workspace_root, "data") +solar_path = os.path.join(workspace_root, "data", "france_solar_combined.zarr") +gfs_path_local = os.path.join(workspace_root, "data", "fra","gfs_2023.zarr") # For testing local access if needed # Define France latitude and longitude bounds -MIN_LAT, MAX_LAT = 41.5, 51.5 +MIN_LAT, MAX_LAT = 42.0, 51.5 MIN_LON, MAX_LON = -5.5, 9.0 diff --git a/src/open_data_pvnet/scripts/process_france_data.py b/src/open_data_pvnet/scripts/fra/process_france_data.py similarity index 83% rename from src/open_data_pvnet/scripts/process_france_data.py rename to src/open_data_pvnet/scripts/fra/process_france_data.py index 99ed217..80f5407 100644 --- a/src/open_data_pvnet/scripts/process_france_data.py +++ b/src/open_data_pvnet/scripts/fra/process_france_data.py @@ -1,9 +1,50 @@ +""" +Process France RTE Solar Generation Data + +This script processes CSV files from RTE's éCO2mix platform and creates an xarray Dataset +in Zarr format compatible with OCF data-sampler for PVNet training. + +Features: +- Processes regional solar generation data for 12 French administrative regions +- Handles timezone conversions (Europe/Paris → UTC) +- Manages DST transitions (drops nonexistent times, keeps first occurrence for ambiguous) +- Fills missing 30-minute timesteps +- Creates multi-region xarray Dataset with coordinates from metadata CSV + +Input: +- CSV files: eCO2mix_RTE_{region}_Annuel_{year}.csv in tmp/ directory +- Metadata: admin_region_lat_lon.csv with region coordinates + +Output: +- Zarr dataset with dimensions (location_id, time_utc) +- Variables: generation_mw, capacity_mwp +- Coordinates: location_id, time_utc, latitude, longitude + +Usage: + # Process all years (2020-2024) for all regions + python process_france_data.py + + # Process specific years + python process_france_data.py --start_yr 2022 --end_yr 2023 + + # Custom output location + python process_france_data.py --output france_solar_2020_2024.zarr + + # Custom data directory + python process_france_data.py --generation-data-dir /path/to/csvs + +Requirements: + pip install pandas xarray numpy zarr +""" + import pandas as pd import xarray as xr import os +import sys import logging import numpy as np import argparse +from pathlib import Path # Configure logging logging.basicConfig( @@ -13,13 +54,12 @@ ) logger = logging.getLogger(__name__) -base_dir = os.getcwd() -metadata_file_dir = os.path.join(os.path.dirname(base_dir), "configs") -parent_3_levels_up = os.path.dirname(os.path.dirname(os.path.dirname(base_dir))) -generation_data_dir = os.path.join(parent_3_levels_up, "tmp") -output_dir = os.path.join(parent_3_levels_up, "data") -start_yr = 2020 -end_yr = 2024 +# Get paths relative to this script's location +script_dir = os.path.dirname(os.path.abspath(__file__)) # .../scripts/fra/ +workspace_root = os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(script_dir)))) # 4 levels up +metadata_file_dir = os.path.join(workspace_root, "src", "open_data_pvnet", "configs") +generation_data_dir = os.path.join(workspace_root, "tmp") +output_dir = os.path.join(workspace_root, "data", "fra") def process_location_csv(generation_data_dir, region, year) -> pd.DataFrame: @@ -82,7 +122,7 @@ def process_location_csv(generation_data_dir, region, year) -> pd.DataFrame: # Ensure all 30-minute timesteps are present df = df.set_index("datetime") full_range = pd.date_range(start=df.index.min(), end=df.index.max(), freq="30min") - missing_timestamps = full_range.difference(df.index) + missing_timestamps = full_range.difference(pd.DatetimeIndex(df.index)) if len(missing_timestamps) > 0: logger.warning(f"Found {len(missing_timestamps)} missing 30-minute timesteps:") @@ -135,6 +175,10 @@ def create_France_aggregate(generation_data_dir, region_list, year_list) -> pd.D france_df = pd.concat([france_df, df], ignore_index=True) # Now we have a DataFrame with all regions and years. We can create an aggregate by summing generation and capacity across regions for each timestamp. + if france_df is None: + logger.error("No data was processed. france_df is None.") + return pd.DataFrame() + france_aggregate = ( france_df.groupby("datetime") .agg({"generation_mw": "sum", "capacity_mwp": "sum"}) @@ -315,21 +359,13 @@ def create_xarray_dataset( else: generation_data_dir = args.generation_data_dir - # Define regions and years - admin_region_list = [ - "Auvergne-Rhône-Alpes", - "Bourgogne-Franche-Comté", - "Bretagne", - "Centre-Val-de-Loire", - "Grand-Est", - "Hauts-de-France", - "Ile-de-France", - "Normandie", - "Nouvelle-Aquitaine", - "Occitanie", - "Pays-de-la-Loire", - "PACA", - ] + # Load admin regions from CSV + try: + metadata_df = pd.read_csv(args.metadata_file) + admin_region_list = metadata_df["region"].tolist() + except FileNotFoundError: + logger.error(f"Metadata file not found: {args.metadata_file}") + sys.exit(1) year_list = list(range(args.start_yr, args.end_yr + 1)) @@ -343,7 +379,7 @@ def create_xarray_dataset( # Save to Zarr output_file = os.path.join(output_dir, args.output) - ds.to_zarr(output_file, mode="w") + ds.to_zarr(Path(output_file), mode="w") # type: ignore[arg-type] logger.info(f"Saved dataset to {output_file}") # Display info