Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions requirements_all.txt
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ netCDF4
protobuf
rasterio
rdp
requests-mock
s2sphere
sentence-transformers
tabula-py
Expand Down
52 changes: 28 additions & 24 deletions scripts/earthengine/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@
import re
import sys
import tempfile
from pathlib import Path
from typing import Union

from absl import logging
import datacommons as dc
from dateutil.relativedelta import relativedelta
from geopy import distance
import s2sphere
Expand All @@ -41,12 +41,12 @@
os.path.join(os.path.dirname(os.path.dirname(_SCRIPTS_DIR)), 'util'))

from config_map import ConfigMap, read_py_dict_from_file, write_py_dict_to_file
from dc_api_wrapper import dc_api_wrapper
from dc_api_wrapper import dc_api_get_node_property

# Constants
_MAX_LATITUDE = 90.0
_MAX_LONGITUDE = 180.0
_DC_API_ROOT = 'http://autopush.api.datacommons.org'
_DC_API_ROOT = 'https://api.datacommons.org'

# Utilities for dicts.

Expand Down Expand Up @@ -366,27 +366,31 @@ def place_id_to_lat_lng(placeid: str,
placeid)
elif dc_api_lookup:
# Get the lat/lng from the DC API
latlng = []
for prop in ['latitude', 'longitude']:
# dc.utils._API_ROOT = 'http://autopush.api.datacommons.org'
# resp = dc.get_property_values([placeid], prop)
resp = dc_api_wrapper(
function=dc.get_property_values,
args={
'dcids': [placeid],
'prop': prop,
},
use_cache=True,
api_root=_DC_API_ROOT,
)
if not resp or placeid not in resp:
return (0, 0)
values = resp[placeid]
if not len(values):
return (0, 0)
latlng.append(float(values[0]))
lat = latlng[0]
lng = latlng[1]
resp = dc_api_get_node_property(
[placeid],
['latitude', 'longitude'],
{
'dc_api_version': 'V2',
'dc_api_use_cache': True,
'dc_api_root': _DC_API_ROOT,
},
)
node_props = resp.get(placeid) if resp else None
if not node_props:
return (0, 0)

def _parse_coordinate(val):
if isinstance(val, list):
val = val[0] if val else None
if isinstance(val, str):
val = val.split(',')[0].strip().strip('"')
return str_get_numeric_value(val)

lat = _parse_coordinate(node_props.get('latitude'))
lng = _parse_coordinate(node_props.get('longitude'))

if lat is None or lng is None:
return (0, 0)
return (lat, lng)


Expand Down
25 changes: 25 additions & 0 deletions scripts/earthengine/utils_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@

import math
import os
from pathlib import Path
import sys
import tempfile
import unittest
from unittest import mock

from absl import logging
import s2sphere
Expand Down Expand Up @@ -371,3 +373,26 @@ def test_date_format_by_time_period(self):
utils.date_format_by_time_period('2022-04-10', 'P3M'))
self.assertEqual('2021',
utils.date_format_by_time_period('2021-01-10', '1Y'))


class PlaceUtilsTest(unittest.TestCase):

def test_place_id_to_lat_lng_dc_api(self):
placeid = 'geoId/06085'
response = {
placeid: {
'latitude': '"37.221614","37.36"',
'longitude': '"-121.68954","-121.97"',
}
}
with mock.patch('utils.dc_api_get_node_property',
return_value=response) as mock_get:
lat, lng = utils.place_id_to_lat_lng(placeid, dc_api_lookup=True)
self.assertAlmostEqual(37.221614, lat)
self.assertAlmostEqual(-121.68954, lng)
mock_get.assert_called_once_with(
[placeid], ['latitude', 'longitude'], {
'dc_api_version': 'V2',
'dc_api_use_cache': True,
'dc_api_root': utils._DC_API_ROOT,
})
85 changes: 72 additions & 13 deletions scripts/rff/preprocess_raster.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,30 @@
# Copyright 2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import csv
import datacommons as dc
import glob
import json
import numpy as np
import os
from osgeo import gdal
from pathlib import Path
from shapely import geometry
import sys

RFF_DIR = os.path.dirname(os.path.abspath(__file__))
sys.path.append(RFF_DIR)
from rff import util
REPO_ROOT = Path(__file__).resolve().parents[2]
sys.path.insert(0, str(REPO_ROOT))
from util.dc_api_wrapper import dc_api_batched_wrapper, get_datacommons_client
from scripts.rff import util

bandname_to_gdcStatVars = {
"std_dev": "StandardDeviation_<c_var>",
Expand All @@ -36,16 +50,61 @@ def get_dcid(sp_scale, lat, lon):


def get_county_geoid(lat, lon):
counties = dc.get_places_in(['country/USA'], 'County')['country/USA']
counties_simp = dc.get_property_values(counties, 'geoJsonCoordinatesDP1')
config = {'dc_api_use_cache': True}
client = get_datacommons_client(config)

def extract_geojson(node_data, prop_name):
nodes = node_data.get('arcs', {}).get(prop_name, {}).get('nodes', [])
if not nodes:
return None
first_node = nodes[0]
if isinstance(first_node, dict):
return first_node.get('value')
return first_node.value

counties_result = client.node.fetch_place_children(
place_dcids=['country/USA'],
children_type='County',
as_dict=True,
)
counties = [
node.get('dcid')
for node in counties_result.get('country/USA', [])
if node.get('dcid')
]
counties_simp = dc_api_batched_wrapper(
function=client.node.fetch_property_values,
dcids=counties,
args={'properties': 'geoJsonCoordinatesDP1'},
dcid_arg_kw='node_dcids',
config=config,
)
point = geometry.Point(lon, lat)
for p, gj in counties_simp.items():
if len(gj) == 0:
gj = dc.get_property_values([p], 'geoJsonCoordinates')[p]
if len(gj) == 0: # property not defined for one county in alaska
continue
if geometry.shape(json.loads(gj[0])).contains(point):
return p
counties_missing_dp1 = []
for county in counties:
node_data = counties_simp.get(county, {})
geojson = extract_geojson(node_data, 'geoJsonCoordinatesDP1')
if not geojson:
counties_missing_dp1.append(county)
continue
if geometry.shape(json.loads(geojson)).contains(point):
return county
fallback = {}
if counties_missing_dp1:
fallback = dc_api_batched_wrapper(
function=client.node.fetch_property_values,
dcids=counties_missing_dp1,
args={'properties': 'geoJsonCoordinates'},
dcid_arg_kw='node_dcids',
config=config,
)
for county in counties_missing_dp1:
node_data = fallback.get(county, {})
geojson = extract_geojson(node_data, 'geoJsonCoordinates')
if not geojson: # property not defined for one county in alaska
continue
if geometry.shape(json.loads(geojson)).contains(point):
return county
return None


Expand Down
122 changes: 122 additions & 0 deletions scripts/rff/preprocess_raster_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
# Copyright 2026 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import sys
from pathlib import Path
import types
import unittest
from unittest import mock

REPO_ROOT = Path(__file__).resolve().parents[2]
sys.path.insert(0, str(REPO_ROOT))

if "osgeo" not in sys.modules:
osgeo_module = types.ModuleType("osgeo")
gdal_module = types.ModuleType("gdal")
osgeo_module.gdal = gdal_module
sys.modules["osgeo"] = osgeo_module
sys.modules["osgeo.gdal"] = gdal_module

from scripts.rff import preprocess_raster


class FakeNodeEndpoint:

def __init__(self, place_children):
self._place_children = place_children

def fetch_place_children(self, place_dcids, children_type, as_dict):
return {"country/USA": self._place_children}

def fetch_property_values(self, node_dcids, properties):
raise AssertionError("fetch_property_values should not be called")


class FakeClient:

def __init__(self, node):
self.node = node


class PreprocessRasterTest(unittest.TestCase):

def test_get_county_geoid_dp1(self):
county = "geoId/06085"
geojson = (
'{"type":"Polygon","coordinates":[[[0,0],[0,2],[2,2],[2,0],[0,0]]]}'
)
dp1_properties = {
county: {
"arcs": {
"geoJsonCoordinatesDP1": {
"nodes": [{
"value": geojson
}],
},
},
},
}
node = FakeNodeEndpoint(place_children=[{"dcid": county}])
client = FakeClient(node)
with mock.patch.object(preprocess_raster,
"get_datacommons_client",
return_value=client), mock.patch.object(
preprocess_raster,
"dc_api_batched_wrapper",
return_value=dp1_properties) as mock_wrapper:
result = preprocess_raster.get_county_geoid(1.0, 1.0)
self.assertEqual(result, county)
self.assertEqual(mock_wrapper.call_count, 1)

def test_get_county_geoid_fallback(self):
county = "geoId/06085"
geojson = (
'{"type":"Polygon","coordinates":[[[0,0],[0,2],[2,2],[2,0],[0,0]]]}'
)
dp1_properties = {
county: {
"arcs": {
"geoJsonCoordinatesDP1": {
"nodes": [],
},
},
},
}
fallback_properties = {
county: {
"arcs": {
"geoJsonCoordinates": {
"nodes": [{
"value": geojson
}],
},
},
},
}
node = FakeNodeEndpoint(place_children=[{"dcid": county}])
client = FakeClient(node)
with mock.patch.object(preprocess_raster,
"get_datacommons_client",
return_value=client), mock.patch.object(
preprocess_raster,
"dc_api_batched_wrapper",
side_effect=[
dp1_properties, fallback_properties
]) as mock_wrapper:
result = preprocess_raster.get_county_geoid(1.0, 1.0)
self.assertEqual(result, county)
self.assertEqual(mock_wrapper.call_count, 2)


if __name__ == '__main__':
unittest.main()
19 changes: 13 additions & 6 deletions scripts/us_census/enhanced_tmcf/process_etmcf.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,17 @@
import csv
import datacommons as dc
import os
import sys
from dataclasses import dataclass
from pathlib import Path
from typing import Dict, List, Tuple

from absl import app
from absl import flags
from dataclasses import dataclass
from typing import Dict, List, Tuple

REPO_ROOT = Path(__file__).resolve().parents[3]
sys.path.insert(0, str(REPO_ROOT))

from util.dc_api_wrapper import dc_api_get_node_property

GEO_ID_COLUMN = 'GEO_ID'
NUM_DCIDS_TO_QUERY = 50
Expand Down Expand Up @@ -70,9 +76,10 @@ def _get_places_not_found(census_geoids: List[str]) -> List[str]:
for i in range(0, len(geo_ids), NUM_DCIDS_TO_QUERY):
selected_geo_ids = geo_ids[i:i + NUM_DCIDS_TO_QUERY]
selected_dcids = [geoId_to_dcids[g] for g in selected_geo_ids]
res = dc.get_property_values(selected_dcids, 'name')
res = dc_api_get_node_property(selected_dcids, 'name')
for index in range(len(selected_dcids)):
if not res[selected_dcids[index]]:
name = res.get(selected_dcids[index], {}).get('name')
if not name:
geoIds_not_found.append(selected_geo_ids[index])
return geoIds_not_found

Expand Down Expand Up @@ -292,4 +299,4 @@ def process_enhanced_tmcf(input_folder, output_folder, etmcf_filename,
# Use the existing input CSV, the new_csv_columns list and maps of geoIds to DCIDs (for places)
# and a list of geoIds not found to produce the processed (traditional) TMCF and corresponding CSV.
_write_modified_csv(input_csv_filepath, csv_out, new_csv_columns,
geo_ids_to_dcids, geo_ids_not_found)
geo_ids_to_dcids, geo_ids_not_found)
Loading
Loading