Skip to content

Model training inference v2 #146

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions carbonplan_trace/v1/load.py
Original file line number Diff line number Diff line change
Expand Up @@ -229,16 +229,19 @@ def biomass(tiles, year):


def training(realm, y0=2003, y1=2010, reload=False, access_key_id=None, secret_access_key=None):
output_filename = f's3://carbonplan-climatetrace/v1/training/{realm}/all_data.parquet'
output_filename = f's3://carbonplan-climatetrace/v2/training/{realm}/all_data.parquet'
if fs.exists(output_filename) and not reload:
return pd.read_parquet(output_filename)
else:
output = []
for yr in range(y0, y1):
folder_name = f's3://carbonplan-climatetrace/v1/training/{realm}/{yr}/'
print(yr)
folder_name = f's3://carbonplan-climatetrace/v2/training/{realm}/{yr}/'
files = fs.ls(folder_name)
for f in files:
output.append(pd.read_parquet(f's3://{f}'))
single_df = pd.read_parquet(f's3://{f}')
single_df['year'] = yr
output.append(single_df)
output = pd.concat(output)
utils.write_parquet(output, output_filename, access_key_id, secret_access_key)
return output
Expand Down
221 changes: 188 additions & 33 deletions notebooks/processing/inference.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,17 @@
"from carbonplan_trace.v1 import utils\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import pyproj\n",
"\n",
"pyproj.__version__"
]
},
{
"cell_type": "code",
"execution_count": null,
Expand Down Expand Up @@ -88,7 +99,8 @@
" # spin up local cluster. must be on big enough machine\n",
" from dask.distributed import Client\n",
"\n",
" client = Client(n_workers=2, threads_per_worker=15, resources={\"workertoken\": 1})\n",
" # when very very huge use 8,8\n",
" client = Client(n_workers=8, threads_per_worker=8, resources={\"workertoken\": 1})\n",
" client\n",
"else:\n",
" gateway = Gateway()\n",
Expand All @@ -107,15 +119,6 @@
"# cluster.scale(100)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"cluster"
]
},
{
"cell_type": "code",
"execution_count": null,
Expand Down Expand Up @@ -145,6 +148,17 @@
" cluster.shutdown()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"shutdown_cluster(\"local\")"
]
},
{
"cell_type": "code",
"execution_count": null,
Expand All @@ -171,6 +185,20 @@
"tiles and write it out to a mapper with those specifications.\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"ul_lats = [\"10S\", \"20S\", \"30S\"]\n",
"ul_lons = [f\"{lon}E\" for lon in np.arange(110, 151, 10)]\n",
"lat_lon_tags = []\n",
"for ul_lat in ul_lats:\n",
" for ul_lon in ul_lons:\n",
" lat_lon_tags.append((ul_lat, ul_lon))"
]
},
{
"cell_type": "code",
"execution_count": null,
Expand All @@ -182,11 +210,12 @@
" \"palladium/production/s3fs-public/atoms/files/\"\n",
" \"WRS2_descending_0.zip\"\n",
")\n",
"bucket = \"s3://carbonplan-climatetrace/v1\"\n",
"bucket = \"s3://carbonplan-climatetrace/v2.1\"\n",
"\n",
"biomass_folder = \"s3://carbonplan-climatetrace/intermediate/ecoregions_mask/\"\n",
"biomass_files = fs.ls(biomass_folder)\n",
"lat_lon_tags = [utils.get_lat_lon_tags_from_tile_path(fp) for fp in biomass_files]\n",
"# biomass_folder = \"s3://carbonplan-climatetrace/intermediate/ecoregions_mask/\"\n",
"# biomass_files = fs.ls(biomass_folder) # just to get list of lat_lon tiles we want\n",
"# lat_lon_tags = [utils.get_lat_lon_tags_from_tile_path(fp) for fp in biomass_files]\n",
"# lat_lon_tags = [('60N', '130W')]#, ('40N', '130W')]#, ('00N', '060W')] #('50N', '130W'),\n",
"bounding_boxes = [utils.parse_bounding_box_from_lat_lon_tags(lat, lon) for lat, lon in lat_lon_tags]"
]
},
Expand All @@ -199,10 +228,11 @@
"from carbonplan_trace.v1.glas_allometric_eq import REALM_GROUPINGS\n",
"\n",
"processed_scenes = []\n",
"for year in np.arange(2014, 2021):\n",
" processed_scenes.extend(fs.ls(f\"{bucket}/inference/rf/{year}\", recursive=True))\n",
"for year in np.arange(2011, 2022):\n",
" processed_scenes.extend(fs.ls(f\"{bucket}/inference/xg/{year}\", recursive=True))\n",
"\n",
"processed_scenes = [scene[-19:-8] for scene in processed_scenes]"
"processed_scenes = [scene[-19:-8] for scene in processed_scenes]\n",
"len(processed_scenes)"
]
},
{
Expand All @@ -211,7 +241,15 @@
"metadata": {},
"outputs": [],
"source": [
"len(processed_scenes)"
"import carbonplan_trace"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"We'll loop through every scene and every year and calculate biomass for that scene. Will produce\n",
"table of values [x, y, (both specific to utm projection), lat, lon, biomass].\n"
]
},
{
Expand All @@ -220,7 +258,9 @@
"metadata": {},
"outputs": [],
"source": [
"len(processed_scenes) - 57875"
"for bounding_box in bounding_boxes:\n",
" min_lat, max_lat, min_lon, max_lon = bounding_box\n",
" valid_scenes = gdf.cx[min_lon:max_lon, min_lat:max_lat][[\"PATH\", \"ROW\"]].values"
]
},
{
Expand All @@ -229,15 +269,54 @@
"metadata": {},
"outputs": [],
"source": [
"len(bounding_boxes)"
"file_lengths = pd.DataFrame(\n",
" columns=[\"v1-rf\", \"v2-rf\", \"v2-xg\"],\n",
" index=[\"_\".join([str(path), str(row)]) for (path, row) in valid_scenes],\n",
")"
]
},
{
"cell_type": "markdown",
"cell_type": "code",
"execution_count": null,
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"# rerun_scenes = {'2010':[], '2014':[]}\n",
"# setups = [('v2', 'rf')]#, ('v2', 'xg')] #('v1', 'rf'),\n",
"# for year in ['2010', '2014']:\n",
"# for (version, model) in setups:\n",
"# for [path, row] in valid_scenes:\n",
"# output_name = f\"{year}/{path:03d}{row:03d}.parquet\"\n",
"# print(f's3://carbonplan-climatetrace/{version}/inference/{model}/{output_name}')\n",
"# if len(fs.ls(f's3://carbonplan-climatetrace/{version}/inference/{model}/{output_name}')) == 0:\n",
"# if [path, row] not in rerun_scenes[year]:\n",
"# rerun_scenes[year].append([path, row])\n",
"# i+=1\n",
"# file_length = len(pd.read_parquet(f's3://carbonplan-climatetrace/{version}/inference/{model}/{output_name}'))\n",
"# except FileNotFoundError:\n",
"# file_length = np.nan\n",
"\n",
"# file_lengths.loc[f'{path}_{row}', f'{version}-{model}'] = file_length"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"We'll loop through every scene and every year and calculate biomass for that scene. Will produce\n",
"table of values [x, y, (both specific to utm projection), lat, lon, biomass].\n"
"# file_lengths.to_csv('files_to_repeat.csv')"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# remove each entry in index"
]
},
{
Expand All @@ -249,25 +328,26 @@
"outputs": [],
"source": [
"landsat_bucket = \"s3://usgs-landsat/collection02/level-2/standard/etm/{}/{:03d}/{:03d}/\"\n",
"\n",
"with rio.Env(aws_session):\n",
" # tasks = []\n",
" tasks = []\n",
" task_ids = []\n",
" for bounding_box in bounding_boxes:\n",
" print(bounding_box)\n",
" min_lat, max_lat, min_lon, max_lon = bounding_box\n",
" scenes_in_tile = gdf.cx[min_lon:max_lon, min_lat:max_lat][[\"PATH\", \"ROW\"]].values\n",
" for year in np.arange(2014, 2021):\n",
" for year in np.arange(2011, 2022):\n",
" for [path, row] in scenes_in_tile:\n",
" scene_stores = fs.ls(landsat_bucket.format(year, path, row))\n",
" output_name = f\"{year}/{path:03d}{row:03d}\"\n",
" if len(scene_stores) == 0:\n",
" continue\n",
" elif output_name in processed_scenes:\n",
" continue\n",
" elif output_name in task_id:\n",
" continue\n",
" else:\n",
" tasks.append(\n",
" # predict(\n",
" # predict(\n",
" client.compute(\n",
" predict_delayed(\n",
" model_folder=f\"{bucket}/models/\",\n",
Expand All @@ -281,7 +361,7 @@
" resources={\"workertoken\": 1},\n",
" )\n",
" )\n",
" task_ids.append([path, row, year, max_lat, min_lon])"
" task_id.append(output_name)"
]
},
{
Expand All @@ -292,7 +372,7 @@
},
"outputs": [],
"source": [
"len(tasks)"
"len(rerun_scenes[\"2014\"])"
]
},
{
Expand All @@ -307,6 +387,15 @@
"results"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"results"
]
},
{
"cell_type": "code",
"execution_count": null,
Expand All @@ -320,8 +409,8 @@
"# row = task_id[i][1]\n",
"# year = task_id[i][2]\n",
"\n",
"path = 93\n",
"row = 11\n",
"path = 48\n",
"row = 22\n",
"year = 2014\n",
"\n",
"print(path, row, year)\n",
Expand All @@ -337,6 +426,72 @@
")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"fs.ls(\"s3://carbonplan-climatetrace/v2/inference/rf/2014/054018.parquet\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"fs.ls(\"s3://carbonplan-climatetrace/v2/inference/xg/2014/054018.parquet\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"fs.ls(\"s3://carbonplan-climatetrace/v2/inference/rf/2014/054018.parquet\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# i = 0\n",
"# path = task_id[i][0]\n",
"# row = task_id[i][1]\n",
"# year = task_id[i][2]\n",
"\n",
"path = 54\n",
"row = 18\n",
"year = 2010\n",
"\n",
"print(path, row, year)\n",
"\n",
"predict(\n",
" model_folder=f\"{bucket}/models/\",\n",
" path=path,\n",
" row=row,\n",
" year=year,\n",
" access_key_id=access_key_id,\n",
" secret_access_key=secret_access_key,\n",
" output_write_bucket=f\"{bucket}/inference\",\n",
")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"pd.read_parquet(\"s3://carbonplan-climatetrace/v2/inference/rf/2010/054018.parquet\")"
]
},
{
"cell_type": "code",
"execution_count": null,
Expand Down Expand Up @@ -404,9 +559,9 @@
],
"metadata": {
"kernelspec": {
"display_name": "Python 3 (ipykernel)",
"display_name": "Python [conda env:notebook] *",
"language": "python",
"name": "python3"
"name": "conda-env-notebook-py"
},
"language_info": {
"codemirror_mode": {
Expand Down
Loading