Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
157 changes: 144 additions & 13 deletions server/services/datacommons.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,12 @@
"""Copy of Data Commons Python Client API Core without pandas dependency."""

import asyncio
import collections
from itertools import groupby
import json
import logging
from typing import Dict, List
from operator import itemgetter
from typing import Dict, List, Set
import urllib.parse

from flask import current_app
Expand All @@ -34,6 +37,7 @@
from server.services.discovery import get_service_url
from shared.lib.constants import MIXER_RESPONSE_ID_FIELD
from shared.lib.constants import MIXER_RESPONSE_ID_HEADER
from shared.lib.constants import PLACE_TYPE_RANK
from shared.lib.constants import SURFACE_HEADER_NAME
from shared.lib.constants import UNKNOWN_SURFACE

Expand Down Expand Up @@ -370,10 +374,86 @@ def v2event(node, prop):
return post(url, {"node": node, "property": prop})


def get_place_info(dcids: List[str]) -> Dict:
def _extract_place_info(node_response_item: Dict) -> Dict:
"""Extracts place info (name, type) from a V2 Node response item."""
info = {}
# Extract name
if "name" in node_response_item.get("properties", {}):
info["name"] = node_response_item["properties"]["name"][0]

# Extract type
if "typeOf" in node_response_item.get("arcs", {}):
types = node_response_item["arcs"]["typeOf"].get("nodes", [])
if types:
# Prefer a type that is in our rank map, else take the first one
chosen_type = types[0].get("dcid", "")
for t in types:
t_dcid = t.get("dcid", "")
if t_dcid in PLACE_TYPE_RANK:
chosen_type = t_dcid
break
info["type"] = chosen_type
return info


def get_place_info(place_dcids: List[str]) -> Dict:
"""Retrieves Place Info given a list of DCIDs."""
url = get_service_url("/v1/bulk/info/place")
return post(f"{url}", {"nodes": sorted(set(dcids))})

# Step 1: Fetch details for the requested nodes
place_node_resp = v2node(place_dcids, "->[name, typeOf, containedInPlace]")

if "data" not in place_node_resp:
logger.warning("V2 Node API response missing 'data' key.")

# Intermediate storage
place_hierarchy_map = {}
unique_parent_dcids = set()

if "data" in place_node_resp:
for dcid, data in place_node_resp["data"].items():
info = {"self": _extract_place_info(data), "parents": []}

# Extract parents
parents = []
if "containedInPlace" in data.get("arcs", {}):
for node in data["arcs"]["containedInPlace"].get("nodes", []):
p_dcid = node.get("dcid")
if p_dcid:
parents.append(p_dcid)
unique_parent_dcids.add(p_dcid)

place_hierarchy_map[dcid] = {"info": info, "parent_dcids": parents}

# Step 2: Fetch details for all parents
parent_info_map = {}
if unique_parent_dcids:
parent_node_resp = v2node(list(unique_parent_dcids), "->[name, typeOf]")
if "data" in parent_node_resp:
for dcid, data in parent_node_resp["data"].items():
p_info = _extract_place_info(data)
p_info["dcid"] = dcid
parent_info_map[dcid] = p_info

# Step 3: Construct the final response
result_data = []
for dcid in place_dcids:
if dcid in place_hierarchy_map:
entry = {"node": dcid, "info": place_hierarchy_map[dcid]["info"]}

# Populate parents list
parents_list = []
for p_dcid in place_hierarchy_map[dcid]["parent_dcids"]:
if p_dcid in parent_info_map:
parents_list.append(parent_info_map[p_dcid])

# Sort parents
entry["info"]["parents"] = sorted(
parents_list,
key=lambda x: PLACE_TYPE_RANK.get(x.get("type", ""), 100))

result_data.append(entry)

return {"data": result_data}


def get_variable_group_info(nodes: List[str],
Expand Down Expand Up @@ -403,16 +483,67 @@ def get_variable_ancestors(dcid: str):
return get(url).get("ancestors", [])


def get_series_dates(parent_entity, child_type, variables):
def _process_variable_dates(variable_dcid: str,
variable_observation_data: Dict) -> Dict:
"""Aggregates observation counts for a single variable from V2 response."""
# Pivot: Entity -> Date -> Facet TO Date -> Facet -> Count
counts_by_date_and_facet = collections.defaultdict(
lambda: collections.defaultdict(int))

by_entity = variable_observation_data.get("byEntity", {})
for _, entity_data in by_entity.items():
for facet_item in entity_data.get("orderedFacets", []):
facet_id = facet_item.get("facetId")
for obs in facet_item.get("observations", []):
date = obs.get("date")
if date:
counts_by_date_and_facet[date][facet_id] += 1

# Convert to list format
obs_dates = []
for date in sorted(counts_by_date_and_facet.keys()):
entity_counts = []
# Sort by facet_id for deterministic order
for facet_id in sorted(counts_by_date_and_facet[date].keys()):
count = counts_by_date_and_facet[date][facet_id]
entity_counts.append({"facet": facet_id, "count": count})
obs_dates.append({"date": date, "entityCount": entity_counts})

return {"variable": variable_dcid, "observationDates": obs_dates}


def get_series_dates(parent_place_dcid, child_place_type, variable_dcids):
"""Get series dates."""
url = get_service_url("/v1/bulk/observation-dates/linked")
return post(
url, {
"linked_property": "containedInPlace",
"linked_entity": parent_entity,
"entity_type": child_type,
"variables": variables,
})
# Fetch series data from V2
url = get_service_url("/v2/observation")
req = {
"select": ["date", "variable", "entity"],
"entity": {
"expression":
f"{parent_place_dcid}<-containedInPlace+{{typeOf:{child_place_type}}}"
},
"variable": {
"dcids": sorted(variable_dcids)
}
}
resp = post(url, req)

# Aggregate counts locally
if "byVariable" not in resp:
logger.warning("V2 Observation API response missing 'byVariable' key.")
if "facets" not in resp:
logger.warning("V2 Observation API response missing 'facets' key.")

observations_by_variable = resp.get("byVariable", {})
facets = resp.get("facets", {})

result_list = []
# Sort by variable for deterministic order
for var in sorted(observations_by_variable.keys()):
data = observations_by_variable[var]
result_list.append(_process_variable_dates(var, data))

return {"datesByVariable": result_list, "facets": facets}


def resolve(nodes, prop):
Expand Down
2 changes: 0 additions & 2 deletions server/services/discovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,10 +134,8 @@ def get_service_url(self, endpoint_path: str) -> str:
'/translate',
'/search',
# v1
'/v1/bulk/info/place',
'/v1/bulk/info/variable',
'/v1/bulk/info/variable-group',
'/v1/bulk/observation-dates/linked',
'/v1/variable/ancestors',
'/v1/place/ranking',
'/v1/place/related',
Expand Down
188 changes: 188 additions & 0 deletions server/tests/services/datacommons_v1_to_v2_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
# Copyright 2026 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import collections
import unittest
from unittest import mock

import server.services.datacommons as dc


class TestV1ToV2Migration(unittest.TestCase):

@mock.patch('server.services.datacommons.post')
def test_get_place_info(self, mock_post):
# Mock responses for v2node calls
# First call: get info for geoId/06
# Second call: get info for parent country/USA

def side_effect(url, req, *args, **kwargs):
if "v2/node" in url:
nodes = req.get("nodes", [])
if "geoId/06" in nodes:
return {
"data": {
"geoId/06": {
"properties": {
"name": ["California"]
},
"arcs": {
"typeOf": {
"nodes": [{
"dcid": "State"
}]
},
"containedInPlace": {
"nodes": [{
"dcid": "country/USA"
}]
}
}
}
}
}
if "country/USA" in nodes:
return {
"data": {
"country/USA": {
"properties": {
"name": ["United States"]
},
"arcs": {
"typeOf": {
"nodes": [{
"dcid": "Country"
}]
}
}
}
}
}
return {}

mock_post.side_effect = side_effect

result = dc.get_place_info(["geoId/06"])

expected = {
"data": [{
"node": "geoId/06",
"info": {
"self": {
"name": "California",
"type": "State"
},
"parents": [{
"dcid": "country/USA",
"name": "United States",
"type": "Country"
}]
}
}]
}

self.assertEqual(result, expected)

@mock.patch('server.services.datacommons.post')
def test_get_series_dates(self, mock_post):
# Mock response for v2/observation based on real API response
v2_resp = {
"byVariable": {
"Count_Person": {
"byEntity": {
"geoId/06077": {
"orderedFacets": [{
"facetId": "2176550201",
"observations": [{
"date": "2024",
"value": 816108
}]
}, {
"facetId": "1145703171",
"observations": [{
"date": "2023",
"value": 787416
}]
}]
},
"geoId/06039": {
"orderedFacets": [{
"facetId": "2176550201",
"observations": [{
"date": "2024",
"value": 165432
}]
}, {
"facetId": "1145703171",
"observations": [{
"date": "2023",
"value": 158790
}]
}]
}
}
}
},
"facets": {
"2176550201": {
"importName":
"USCensusPEP_Annual_Population",
"measurementMethod":
"CensusPEPSurvey",
"observationPeriod":
"P1Y",
"provenanceUrl":
"https://www.census.gov/programs-surveys/popest.html"
},
"1145703171": {
"importName":
"CensusACS5YearSurvey",
"measurementMethod":
"CensusACS5yrSurvey",
"provenanceUrl":
"https://www.census.gov/programs-surveys/acs/data/data-via-ftp.html"
}
}
}

mock_post.return_value = v2_resp

result = dc.get_series_dates("geoId/06", "County", ["Count_Person"])

# We expect aggregated counts
# 2023: 2 entities (06077, 06039) for facet 1145703171
# 2024: 2 entities (06077, 06039) for facet 2176550201

expected = {
"datesByVariable": [{
"variable":
"Count_Person",
"observationDates": [{
"date": "2023",
"entityCount": [{
"facet": "1145703171",
"count": 2
}]
}, {
"date": "2024",
"entityCount": [{
"facet": "2176550201",
"count": 2
}]
}]
}],
"facets": v2_resp["facets"]
}

self.assertEqual(result, expected)
Loading
Loading