Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
235 changes: 221 additions & 14 deletions server/services/datacommons.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
"""Copy of Data Commons Python Client API Core without pandas dependency."""

import asyncio
import collections
import json
import logging
from typing import Dict, List
Expand Down Expand Up @@ -370,12 +371,6 @@ def v2event(node, prop):
return post(url, {"node": node, "property": prop})


def get_place_info(dcids: List[str]) -> Dict:
"""Retrieves Place Info given a list of DCIDs."""
url = get_service_url("/v1/bulk/info/place")
return post(f"{url}", {"nodes": sorted(set(dcids))})


def get_variable_group_info(nodes: List[str],
entities: List[str],
numEntitiesExistence=1) -> Dict:
Expand Down Expand Up @@ -403,16 +398,228 @@ def get_variable_ancestors(dcid: str):
return get(url).get("ancestors", [])


PLACE_TYPE_RANK = {
"CensusZipCodeTabulationArea": 1,
"AdministrativeArea5": 2,
"AdministrativeArea4": 2,
"Village": 5,
"City": 5,
"Town": 5,
"Borough": 5,
"AdministrativeArea3": 5,
"County": 10,
"AdministrativeArea2": 10,
"EurostatNUTS3": 10,
"CensusDivision": 15,
"State": 20,
"AdministrativeArea1": 20,
"EurostatNUTS2": 20,
"EurostatNUTS1": 20,
"Country": 30,
"CensusRegion": 35,
"GeoRegion": 38,
"Continent": 40,
"Place": 50,
}


def get_place_info(dcids: List[str]) -> Dict:
"""Retrieves Place Info given a list of DCIDs."""
# Get ancestors using BFS since v2/node doesn't support recursive ->containedInPlace+
ancestors_map = {dcid: set() for dcid in dcids}

parent_graph = {} # child_dcid -> list of parent_dcids
frontier = set(dcids)
visited = set()

# BFS to build parent graph (max depth 10)
for _ in range(10):
if not frontier:
break

# Filter visited nodes to avoid cycles
fetch_dcids = [d for d in frontier if d not in visited]
if not fetch_dcids:
break

resp = v2node(fetch_dcids, '->containedInPlace')
data = resp.get('data', {})

current_frontier = set()
for dcid in fetch_dcids:
visited.add(dcid)
node_data = data.get(dcid, {})

arcs_obj = node_data.get('arcs', {}).get('containedInPlace', {})
nodes_list = arcs_obj.get('nodes', []) if isinstance(arcs_obj,
dict) else []

parents = [x['dcid'] for x in nodes_list if 'dcid' in x]
if parents:
parent_graph[dcid] = parents
current_frontier.update(parents)

frontier = current_frontier

# Build ancestors list from the graph
for dcid in dcids:
queue = collections.deque([dcid])
seen = {dcid}
while queue:
curr = queue.popleft()
parents = parent_graph.get(curr, [])
for p in parents:
if p not in seen:
seen.add(p)
# Add to ancestors if it's not the node itself
if p != dcid:
ancestors_map[dcid].add(p)
queue.append(p)

all_dcids = set()
for anc_set in ancestors_map.values():
all_dcids.update(anc_set)
all_dcids.update(dcids)

all_dcids_list = sorted(list(all_dcids))
if not all_dcids_list:
return {'data': []}

types_resp = v2node(all_dcids_list, '->typeOf')
names_resp = v2node(all_dcids_list, '->name')

def get_first_value(resp, dcid, prop, key='dcid'):
node_data = resp.get('data', {}).get(dcid, {})
arcs_obj = node_data.get('arcs', {}).get(prop, {})
if not arcs_obj:
# Try checking without arrow if key mismatch
arcs_obj = node_data.get('arcs', {}).get(prop.replace('->', ''), {})

nodes_list = arcs_obj.get('nodes', []) if isinstance(arcs_obj, dict) else []

if nodes_list:
return nodes_list[0].get(key, '')
return ''

result_data = []
for dcid in dcids:
self_type = get_first_value(types_resp, dcid, 'typeOf')
self_name = get_first_value(names_resp, dcid, 'name', 'value')

parents = []
for anc_dcid in ancestors_map.get(dcid, []):
if anc_dcid == dcid:
continue

anc_type = get_first_value(types_resp, anc_dcid, 'typeOf')
anc_name = get_first_value(names_resp, anc_dcid, 'name', 'value')

if anc_type in PLACE_TYPE_RANK:
parents.append({
'dcid': anc_dcid,
'type': anc_type,
'name': anc_name,
'rank': PLACE_TYPE_RANK[anc_type]
})

parents.sort(key=lambda x: x['rank'])
for p in parents:
del p['rank']

result_data.append({
'node': dcid,
'info': {
'self': {
'dcid': dcid,
'type': self_type,
'name': self_name
},
'parents': parents
}
})

return {'data': result_data}


def get_series_dates(parent_entity, child_type, variables):
"""Get series dates."""
url = get_service_url("/v1/bulk/observation-dates/linked")
return post(
url, {
"linked_property": "containedInPlace",
"linked_entity": parent_entity,
"entity_type": child_type,
"variables": variables,
})
# Get direct children
children_resp = v2node([parent_entity], '<-containedInPlace')
child_dcids = []

node_data = children_resp.get('data', {}).get(parent_entity, {})
arcs_obj = node_data.get('arcs', {}).get('containedInPlace', {})
nodes_list = arcs_obj.get('nodes', []) if isinstance(arcs_obj, dict) else []
possible_children = [x['dcid'] for x in nodes_list if 'dcid' in x]

# Filter by type if there are children
if possible_children:
# Filter children by requested type
type_resp = v2node(possible_children, 'typeOf')
for child in possible_children:
# Check node types
c_data = type_resp.get('data', {}).get(child, {})
c_arcs = c_data.get('arcs', {}).get('typeOf', {})
c_types = c_arcs.get('nodes', []) if isinstance(c_arcs, dict) else []
c_type_ids = [t.get('dcid') for t in c_types]
if child_type in c_type_ids:
child_dcids.append(child)

if not child_dcids:
return {"datesByVariable": [], "facets": {}}

# Get observation dates for the filtered children

obs_resp = v2observation(
select=['date', 'variable', 'entity', 'value', 'facet'],
entity={'dcids': child_dcids},
variable={'dcids': variables})

# Aggregate results
# Aggregate results: { variable: { date: { facet: count } } }
agg_data = collections.defaultdict(
lambda: collections.defaultdict(lambda: collections.defaultdict(int)))

# Iterate through V2 response
by_var = obs_resp.get('byVariable', {})

all_facets = obs_resp.get('facets', {})

for var, var_data in by_var.items():
by_ent = var_data.get('byEntity', {})
for ent, ent_data in by_ent.items():

series = ent_data.get('series', [])
for obs in series:
date = obs.get('date')
if not date:
continue

# Facet handling
facet_id = obs.get('facet', "")
agg_data[var][date][facet_id] += 1
# Assuming facets details are in 'facets' key of response?
# v2observation response should have 'facets' top level key if requested?
# 'facet' in select might return the ID in the series or the object?
# Usually it returns facetID and a top-level facets map.

# Construct response
resp_dates = []
for var, dates_map in agg_data.items():
obs_dates = []
for date, facet_counts in dates_map.items():
entity_counts = []
for facet_id, count in facet_counts.items():
entity_counts.append({
"count": count,
"facet": facet_id # V1 expects facet ID or object?
# V1 proto: EntityCount { count, facet } where facet is string (ID?).
# But typically it might expect the full facet object in a separate map.
})
obs_dates.append({"date": date, "entityCount": entity_counts})
resp_dates.append({"variable": var, "observationDates": obs_dates})

return {"datesByVariable": resp_dates, "facets": all_facets}


def resolve(nodes, prop):
Expand Down
Loading
Loading