diff --git a/server/services/datacommons.py b/server/services/datacommons.py index 553029a230..03977f32cf 100644 --- a/server/services/datacommons.py +++ b/server/services/datacommons.py @@ -14,9 +14,12 @@ """Copy of Data Commons Python Client API Core without pandas dependency.""" import asyncio +import collections +from itertools import groupby import json import logging -from typing import Dict, List +from operator import itemgetter +from typing import Dict, List, Set import urllib.parse from flask import current_app @@ -34,6 +37,7 @@ from server.services.discovery import get_service_url from shared.lib.constants import MIXER_RESPONSE_ID_FIELD from shared.lib.constants import MIXER_RESPONSE_ID_HEADER +from shared.lib.constants import PLACE_TYPE_RANK from shared.lib.constants import SURFACE_HEADER_NAME from shared.lib.constants import UNKNOWN_SURFACE @@ -370,10 +374,86 @@ def v2event(node, prop): return post(url, {"node": node, "property": prop}) -def get_place_info(dcids: List[str]) -> Dict: +def _extract_place_info(node_response_item: Dict) -> Dict: + """Extracts place info (name, type) from a V2 Node response item.""" + info = {} + # Extract name + if "name" in node_response_item.get("properties", {}): + info["name"] = node_response_item["properties"]["name"][0] + + # Extract type + if "typeOf" in node_response_item.get("arcs", {}): + types = node_response_item["arcs"]["typeOf"].get("nodes", []) + if types: + # Prefer a type that is in our rank map, else take the first one + chosen_type = types[0].get("dcid", "") + for t in types: + t_dcid = t.get("dcid", "") + if t_dcid in PLACE_TYPE_RANK: + chosen_type = t_dcid + break + info["type"] = chosen_type + return info + + +def get_place_info(place_dcids: List[str]) -> Dict: """Retrieves Place Info given a list of DCIDs.""" - url = get_service_url("/v1/bulk/info/place") - return post(f"{url}", {"nodes": sorted(set(dcids))}) + + # Step 1: Fetch details for the requested nodes + place_node_resp = v2node(place_dcids, "->[name, typeOf, containedInPlace]") + + if "data" not in place_node_resp: + logger.warning("V2 Node API response missing 'data' key.") + + # Intermediate storage + place_hierarchy_map = {} + unique_parent_dcids = set() + + if "data" in place_node_resp: + for dcid, data in place_node_resp["data"].items(): + info = {"self": _extract_place_info(data), "parents": []} + + # Extract parents + parents = [] + if "containedInPlace" in data.get("arcs", {}): + for node in data["arcs"]["containedInPlace"].get("nodes", []): + p_dcid = node.get("dcid") + if p_dcid: + parents.append(p_dcid) + unique_parent_dcids.add(p_dcid) + + place_hierarchy_map[dcid] = {"info": info, "parent_dcids": parents} + + # Step 2: Fetch details for all parents + parent_info_map = {} + if unique_parent_dcids: + parent_node_resp = v2node(list(unique_parent_dcids), "->[name, typeOf]") + if "data" in parent_node_resp: + for dcid, data in parent_node_resp["data"].items(): + p_info = _extract_place_info(data) + p_info["dcid"] = dcid + parent_info_map[dcid] = p_info + + # Step 3: Construct the final response + result_data = [] + for dcid in place_dcids: + if dcid in place_hierarchy_map: + entry = {"node": dcid, "info": place_hierarchy_map[dcid]["info"]} + + # Populate parents list + parents_list = [] + for p_dcid in place_hierarchy_map[dcid]["parent_dcids"]: + if p_dcid in parent_info_map: + parents_list.append(parent_info_map[p_dcid]) + + # Sort parents + entry["info"]["parents"] = sorted( + parents_list, + key=lambda x: PLACE_TYPE_RANK.get(x.get("type", ""), 100)) + + result_data.append(entry) + + return {"data": result_data} def get_variable_group_info(nodes: List[str], @@ -403,16 +483,67 @@ def get_variable_ancestors(dcid: str): return get(url).get("ancestors", []) -def get_series_dates(parent_entity, child_type, variables): +def _process_variable_dates(variable_dcid: str, + variable_observation_data: Dict) -> Dict: + """Aggregates observation counts for a single variable from V2 response.""" + # Pivot: Entity -> Date -> Facet TO Date -> Facet -> Count + counts_by_date_and_facet = collections.defaultdict( + lambda: collections.defaultdict(int)) + + by_entity = variable_observation_data.get("byEntity", {}) + for _, entity_data in by_entity.items(): + for facet_item in entity_data.get("orderedFacets", []): + facet_id = facet_item.get("facetId") + for obs in facet_item.get("observations", []): + date = obs.get("date") + if date: + counts_by_date_and_facet[date][facet_id] += 1 + + # Convert to list format + obs_dates = [] + for date in sorted(counts_by_date_and_facet.keys()): + entity_counts = [] + # Sort by facet_id for deterministic order + for facet_id in sorted(counts_by_date_and_facet[date].keys()): + count = counts_by_date_and_facet[date][facet_id] + entity_counts.append({"facet": facet_id, "count": count}) + obs_dates.append({"date": date, "entityCount": entity_counts}) + + return {"variable": variable_dcid, "observationDates": obs_dates} + + +def get_series_dates(parent_place_dcid, child_place_type, variable_dcids): """Get series dates.""" - url = get_service_url("/v1/bulk/observation-dates/linked") - return post( - url, { - "linked_property": "containedInPlace", - "linked_entity": parent_entity, - "entity_type": child_type, - "variables": variables, - }) + # Fetch series data from V2 + url = get_service_url("/v2/observation") + req = { + "select": ["date", "variable", "entity"], + "entity": { + "expression": + f"{parent_place_dcid}<-containedInPlace+{{typeOf:{child_place_type}}}" + }, + "variable": { + "dcids": sorted(variable_dcids) + } + } + resp = post(url, req) + + # Aggregate counts locally + if "byVariable" not in resp: + logger.warning("V2 Observation API response missing 'byVariable' key.") + if "facets" not in resp: + logger.warning("V2 Observation API response missing 'facets' key.") + + observations_by_variable = resp.get("byVariable", {}) + facets = resp.get("facets", {}) + + result_list = [] + # Sort by variable for deterministic order + for var in sorted(observations_by_variable.keys()): + data = observations_by_variable[var] + result_list.append(_process_variable_dates(var, data)) + + return {"datesByVariable": result_list, "facets": facets} def resolve(nodes, prop): diff --git a/server/services/discovery.py b/server/services/discovery.py index b883ec31d5..1c085dd86a 100644 --- a/server/services/discovery.py +++ b/server/services/discovery.py @@ -134,10 +134,8 @@ def get_service_url(self, endpoint_path: str) -> str: '/translate', '/search', # v1 - '/v1/bulk/info/place', '/v1/bulk/info/variable', '/v1/bulk/info/variable-group', - '/v1/bulk/observation-dates/linked', '/v1/variable/ancestors', '/v1/place/ranking', '/v1/place/related', diff --git a/server/tests/services/datacommons_v1_to_v2_test.py b/server/tests/services/datacommons_v1_to_v2_test.py new file mode 100644 index 0000000000..045d5df3bc --- /dev/null +++ b/server/tests/services/datacommons_v1_to_v2_test.py @@ -0,0 +1,188 @@ +# Copyright 2026 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import collections +import unittest +from unittest import mock + +import server.services.datacommons as dc + + +class TestV1ToV2Migration(unittest.TestCase): + + @mock.patch('server.services.datacommons.post') + def test_get_place_info(self, mock_post): + # Mock responses for v2node calls + # First call: get info for geoId/06 + # Second call: get info for parent country/USA + + def side_effect(url, req, *args, **kwargs): + if "v2/node" in url: + nodes = req.get("nodes", []) + if "geoId/06" in nodes: + return { + "data": { + "geoId/06": { + "properties": { + "name": ["California"] + }, + "arcs": { + "typeOf": { + "nodes": [{ + "dcid": "State" + }] + }, + "containedInPlace": { + "nodes": [{ + "dcid": "country/USA" + }] + } + } + } + } + } + if "country/USA" in nodes: + return { + "data": { + "country/USA": { + "properties": { + "name": ["United States"] + }, + "arcs": { + "typeOf": { + "nodes": [{ + "dcid": "Country" + }] + } + } + } + } + } + return {} + + mock_post.side_effect = side_effect + + result = dc.get_place_info(["geoId/06"]) + + expected = { + "data": [{ + "node": "geoId/06", + "info": { + "self": { + "name": "California", + "type": "State" + }, + "parents": [{ + "dcid": "country/USA", + "name": "United States", + "type": "Country" + }] + } + }] + } + + self.assertEqual(result, expected) + + @mock.patch('server.services.datacommons.post') + def test_get_series_dates(self, mock_post): + # Mock response for v2/observation based on real API response + v2_resp = { + "byVariable": { + "Count_Person": { + "byEntity": { + "geoId/06077": { + "orderedFacets": [{ + "facetId": "2176550201", + "observations": [{ + "date": "2024", + "value": 816108 + }] + }, { + "facetId": "1145703171", + "observations": [{ + "date": "2023", + "value": 787416 + }] + }] + }, + "geoId/06039": { + "orderedFacets": [{ + "facetId": "2176550201", + "observations": [{ + "date": "2024", + "value": 165432 + }] + }, { + "facetId": "1145703171", + "observations": [{ + "date": "2023", + "value": 158790 + }] + }] + } + } + } + }, + "facets": { + "2176550201": { + "importName": + "USCensusPEP_Annual_Population", + "measurementMethod": + "CensusPEPSurvey", + "observationPeriod": + "P1Y", + "provenanceUrl": + "https://www.census.gov/programs-surveys/popest.html" + }, + "1145703171": { + "importName": + "CensusACS5YearSurvey", + "measurementMethod": + "CensusACS5yrSurvey", + "provenanceUrl": + "https://www.census.gov/programs-surveys/acs/data/data-via-ftp.html" + } + } + } + + mock_post.return_value = v2_resp + + result = dc.get_series_dates("geoId/06", "County", ["Count_Person"]) + + # We expect aggregated counts + # 2023: 2 entities (06077, 06039) for facet 1145703171 + # 2024: 2 entities (06077, 06039) for facet 2176550201 + + expected = { + "datesByVariable": [{ + "variable": + "Count_Person", + "observationDates": [{ + "date": "2023", + "entityCount": [{ + "facet": "1145703171", + "count": 2 + }] + }, { + "date": "2024", + "entityCount": [{ + "facet": "2176550201", + "count": 2 + }] + }] + }], + "facets": v2_resp["facets"] + } + + self.assertEqual(result, expected) diff --git a/shared/lib/constants.py b/shared/lib/constants.py index 14ff10b9e7..70e9496c4c 100644 --- a/shared/lib/constants.py +++ b/shared/lib/constants.py @@ -475,3 +475,30 @@ # Flask App env config constants LOG_EXTREME_MIXER_CALLS = "LOG_EXTREME_MIXER_CALLS" LOG_CACHED_MIXER_RESPONSE_USAGE = "LOG_CACHED_MIXER_RESPONSE_USAGE" + +# Rank for sorting parents (City -> State -> Country) +# The values are arbitrary and relative, used only to establish +# a topological order (smaller number = "smaller" child place). +PLACE_TYPE_RANK = { + "CensusZipCodeTabulationArea": 1, + "AdministrativeArea5": 2, + "AdministrativeArea4": 2, + "Village": 5, + "City": 5, + "Town": 5, + "Borough": 5, + "AdministrativeArea3": 5, + "County": 10, + "AdministrativeArea2": 10, + "EurostatNUTS3": 10, + "CensusDivision": 15, + "State": 20, + "AdministrativeArea1": 20, + "EurostatNUTS2": 20, + "EurostatNUTS1": 20, + "Country": 30, + "CensusRegion": 35, + "GeoRegion": 38, + "Continent": 40, + "Place": 50, +}