Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 50 additions & 2 deletions api/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import warnings
import os
import re
from contextlib import asynccontextmanager
from typing import Dict, List, Union, Annotated, Optional

from fastapi import Body, FastAPI, Query
Expand All @@ -25,11 +26,48 @@

SOLR_HOST = os.getenv("SOLR_HOST", "localhost")
SOLR_PORT = os.getenv("SOLR_PORT", "8983")
BABEL_VERSION = os.getenv("BABEL_VERSION")
BABEL_DOWNLOAD_URL_BASE = os.getenv("BABEL_DOWNLOAD_URL_BASE", "https://stars.renci.org/var/babel_outputs/")
CONFLATIONS = os.getenv("CONFLATIONS", "GeneProtein,DrugChemical")

app = FastAPI(**get_app_info())
logger = logging.getLogger(__name__)
logging.basicConfig(level=os.getenv("LOGLEVEL", logging.INFO))

conflations = {}
@asynccontextmanager
async def lifespan(app: FastAPI):
# Initialization code.
for conflation_name in [conflation.strip() for conflation in CONFLATIONS.split(",")]:
# TODO: measure memory usage so we can tell how much holding conflations in memory is costing us.
conflations[conflation_name] = {}
start_time = time.time_ns()

# Download the conflation
download_url = f"{BABEL_DOWNLOAD_URL_BASE}{BABEL_VERSION}/conflation/{conflation_name}.txt"
logger.info(f"Downloading conflation {conflation_name} from {download_url} as a text file.")
# TODO: download as a Gzip file (which will save us a whole bunch of time).
with httpx.stream("GET", download_url) as response:
response.raise_for_status()

for line in response.iter_lines():
# The simplest way to store the conflation information would be the same way we do it with NodeNorm:
# we store conflation[ConflationName][curie2] = [curie1, curie2, ...]
# Lots of duplication, but hopefully we can reuse the lists and it won't cost us too much.
row = json.loads(line)
for curie in row:
if curie in conflations[conflation_name]:
logger.warning(f"CURIE {curie} already loaded in conflation {conflation_name}, overwriting.")
conflations[conflation_name][curie] = row

end_time = time.time_ns()
logger.info(f"Loaded {len(conflations[conflation_name].keys()):,} conflations for {conflation_name} in {(end_time - start_time)/1_000_000:,.2f} ms.")

# yield so that the FastAPI app can start up.
yield

# Cleanup code would go here if we had any.

app = FastAPI(lifespan=lifespan, **get_app_info())
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
Expand Down Expand Up @@ -71,7 +109,7 @@ async def status() -> Dict:
result = response.json()

# Do we know the Babel version and version URL? It will be stored in an environmental variable if we do.
babel_version = os.environ.get("BABEL_VERSION", "unknown")
babel_version = BABEL_VERSION
babel_version_url = os.environ.get("BABEL_VERSION_URL", "")

# We should have a status for name_lookup_shard1_replica_n1.
Expand All @@ -82,6 +120,15 @@ async def status() -> Dict:
if 'index' in core:
index = core['index']

conflation_information = {}
for conflation_name in conflations:
conflation_information[conflation_name] = {
'distinct_curies': len(conflations[conflation_name].keys()),
# Since we reuse row objects in the conflation, we can count the distinct number of id()s to figure out
# the number of distinct conflations.
'distinct_conflations': len({id(v) for v in conflations[conflation_name].values()}),
}

return {
'status': 'ok',
'message': 'Reporting results from primary core.',
Expand All @@ -95,6 +142,7 @@ async def status() -> Dict:
'segmentCount': index.get('segmentCount', ''),
'lastModified': index.get('lastModified', ''),
'size': index.get('size', ''),
'conflations': conflation_information,
}
else:
return {
Expand Down