Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,6 @@
from datetime import datetime
from typing import Any

# from functools import partial
from retriever.data_tiers.tier_1.elasticsearch.attribute_types import (
AttrFieldMeta,
AttributeFilterQuery,
AttrValType,
SingleAttributeFilterQueryPayload,
)
from retriever.data_tiers.tier_1.elasticsearch.constraints.attributes.meta_info import (
ATTR_META,
)
Expand All @@ -61,6 +54,14 @@
from retriever.data_tiers.tier_1.elasticsearch.constraints.attributes.ops.handle_match import (
handle_match,
)

# from functools import partial
from retriever.data_tiers.tier_1.elasticsearch.constraints.types.attribute_types import (
AttrFieldMeta,
AttributeFilterQuery,
AttrValType,
SingleAttributeFilterQueryPayload,
)
from retriever.types.trapi import AttributeConstraintDict
from retriever.utils import biolink

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
from retriever.data_tiers.tier_1.elasticsearch.attribute_types import AttrFieldMeta
from retriever.data_tiers.tier_1.elasticsearch.constraints.types.attribute_types import (
AttrFieldMeta,
)

keyword_field = AttrFieldMeta(container="scalar", value_type="keyword", curie=False)

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from typing import cast

from retriever.data_tiers.tier_1.elasticsearch.attribute_types import (
from retriever.data_tiers.tier_1.elasticsearch.constraints.types.attribute_types import (
ComparisonOperator,
ESTermComparisonClause,
ESValueComparisonQuery,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import re
from typing import Any

from retriever.data_tiers.tier_1.elasticsearch.attribute_types import (
from retriever.data_tiers.tier_1.elasticsearch.constraints.types.attribute_types import (
AttrFieldMeta,
ESRegexQuery,
RegexTerm,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from retriever.data_tiers.tier_1.elasticsearch.qualifier_types import (
from retriever.data_tiers.tier_1.elasticsearch.constraints.types.qualifier_types import (
ESConstraintsChainedQuery,
ESQueryForSingleQualifierConstraint,
ESTermClause,
Expand Down
64 changes: 57 additions & 7 deletions src/retriever/data_tiers/tier_1/elasticsearch/driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,17 @@
run_batch_query,
run_single_query,
)
from retriever.data_tiers.tier_1.elasticsearch.meta import (
extract_metadata_entries_from_blob,
generate_operations,
get_t1_indices,
get_t1_metadata,
merge_operations,
)
from retriever.data_tiers.tier_1.elasticsearch.types import ESEdge, ESPayload
from retriever.data_tiers.utils import parse_dingo_metadata
from retriever.data_tiers.utils import (
parse_dingo_metadata_unhashed,
)
from retriever.types.dingo import DINGO_ADAPTER, DINGOMetadata
from retriever.types.metakg import Operation, OperationNode
from retriever.types.trapi import BiolinkEntity, Infores
Expand Down Expand Up @@ -212,20 +221,61 @@ async def _get_metadata(self, url: str, retries: int = 0) -> dict[str, Any] | No
metadata = ormsgpack.unpackb(metadata_pack)
return metadata

@override
async def get_metadata(self) -> dict[str, Any] | None:
async def legacy_get_metadata(self) -> dict[str, Any] | None:
"""Legacy method for loading metadata remotely."""
return await self._get_metadata(CONFIG.tier1.metadata_url)

@override
async def get_operations(
async def legacy_get_operations(
self,
) -> tuple[list[Operation], dict[BiolinkEntity, OperationNode]]:
metadata = await self.get_metadata()
"""Legacy method for getting operations based on unified metadata."""
metadata = await self.legacy_get_metadata()
if metadata is None:
raise ValueError(
"Unable to obtain metadata from backend, cannot parse operations."
)
infores = Infores(CONFIG.tier1.backend_infores)
operations, nodes = parse_dingo_metadata(DINGOMetadata(**metadata), 1, infores)
# operations, nodes = parse_dingo_metadata(DINGOMetadata(**metadata), 1, infores)
operations, nodes = parse_dingo_metadata_unhashed(
DINGOMetadata(**metadata), 1, infores
)
operations = merge_operations(operations)
log.success(f"Parsed {infores} as a Tier 1 resource.")

return operations, nodes

@override
async def get_metadata(self) -> dict[str, Any] | None:
return await get_t1_metadata(
es_connection=self.es_connection,
indices_alias=CONFIG.tier1.elasticsearch.index_name,
)

@override
async def get_operations(
self,
) -> tuple[list[Operation], dict[BiolinkEntity, OperationNode]]:
# return await self.legacy_get_metadata()
return await self.get_t1_operations()

async def get_t1_operations(
self,
) -> tuple[list[Operation], dict[BiolinkEntity, OperationNode]]:
"""Get tier1 operations based on metadata."""
metadata_blob = await self.get_metadata()

if metadata_blob is None:
raise ValueError(
"Unable to obtain metadata from backend, cannot parse operations."
)

if self.es_connection is None:
raise ValueError("Elasticsearch connection not configured.")

indices = await get_t1_indices(self.es_connection)

metadata_list = extract_metadata_entries_from_blob(metadata_blob, indices)

operations, nodes = await generate_operations(metadata_list)

return operations, nodes
236 changes: 236 additions & 0 deletions src/retriever/data_tiers/tier_1/elasticsearch/meta.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,236 @@
from collections import defaultdict
from copy import deepcopy
from typing import Any

import ormsgpack
from elasticsearch import AsyncElasticsearch
from loguru import logger as log

from retriever.config.general import CONFIG
from retriever.data_tiers.utils import (
generate_operation,
get_simple_op_hash,
parse_dingo_metadata_unhashed,
)
from retriever.types.dingo import DINGOMetadata
from retriever.types.metakg import Operation, OperationNode, UnhashedOperation
from retriever.types.trapi import BiolinkEntity, Infores, MetaAttributeDict
from retriever.utils.redis import REDIS_CLIENT
from retriever.utils.trapi import hash_hex

T1MetaData = dict[str, Any]

CACHE_KEY = "TIER1_META"


async def get_t1_indices(
client: AsyncElasticsearch,
) -> list[str]:
"""For fetch a list of indices from ES."""
resp = await client.indices.resolve_index(
name=CONFIG.tier1.elasticsearch.index_name
)
if "aliases" not in resp:
raise Exception(
f"Failed to get indices from ES: {CONFIG.tier1.elasticsearch.index_name}"
)

backing_indices: list[str] = []
for a in resp.get("aliases", []):
if a["name"] == "dingo":
backing_indices.extend(a["indices"])

return backing_indices


async def save_metadata_cache(key: str, payload: T1MetaData) -> None:
"""Wrapper for persist metadata in Redis."""
await REDIS_CLIENT.set(
hash_hex(hash(key)),
ormsgpack.packb(payload),
compress=True,
)


async def read_metadata_cache(key: str) -> T1MetaData | None:
"""Wrapper for retrieving persisted metadata in Redis."""
metadata_pack = await REDIS_CLIENT.get(hash_hex(hash(key)), compressed=True)
if metadata_pack is not None:
return ormsgpack.unpackb(metadata_pack)

return None


def extract_metadata_entries_from_blob(
blob: T1MetaData, indices: list[str]
) -> list[T1MetaData]:
"""Extract a list of metadata entries from raw blob."""
meta_entries: list[T1MetaData] = list(
filter(
None,
[blob[index_name].get("graph") for index_name in indices],
)
)

return meta_entries


async def retrieve_metadata_from_es(
es_connection: AsyncElasticsearch, indices_alias: str
) -> T1MetaData:
"""Method to retrieve prefetched metadata from Elasticsearch."""
mappings = await es_connection.indices.get_mapping(index=indices_alias)
tier1_indices = await get_t1_indices(es_connection)

# here we pull an array of metadata, instead of 1

meta: T1MetaData = defaultdict(dict)
for index_name in tier1_indices:
raw = mappings[index_name]["mappings"]["_meta"]
keys = ["graph", "release"]

for key in keys:
blob = raw.get(key)
if blob:
meta[index_name].update({key: blob})

if not meta:
raise ValueError("No metadata retrieved from Elasticsearch.")

return meta


RETRY_LIMIT = 3


async def get_t1_metadata(
es_connection: AsyncElasticsearch | None, indices_alias: str, retries: int = 0
) -> T1MetaData | None:
"""Caller to orchestrate retrieving t1 metadata."""
meta_blob = await read_metadata_cache(CACHE_KEY)
if not meta_blob:
try:
if es_connection is None:
raise ValueError(
"Invalid Elasticsearch connection. Driver must be initialized and connected."
)
meta_blob = await retrieve_metadata_from_es(es_connection, indices_alias)
await save_metadata_cache(CACHE_KEY, meta_blob)
except ValueError as e:
# if exceeds retries or ES connection is invalid, return None
if retries == RETRY_LIMIT or str(e).startswith(
"Invalid Elasticsearch connection"
):
return None
return await get_t1_metadata(es_connection, indices_alias, retries + 1)

log.success("DINGO Metadata retrieved!")
return meta_blob


def hash_meta_attribute(attr: MetaAttributeDict) -> int:
"""Method to hash MetaAttributeDict."""
keys = [
"attribute_type_id",
"attribute_source",
"original_attribute_names",
"constraint_use",
"constraint_name",
]
values: list[Any] = []
for key in keys:
val: list[str] | None = attr.get(key)
if isinstance(val, list):
values.append(tuple(val))
else:
values.append(val)
return hash(tuple(values))


def merge_nodes(
nodes: dict[BiolinkEntity, OperationNode],
curr_nodes: dict[BiolinkEntity, OperationNode],
infores: Infores,
) -> dict[BiolinkEntity, OperationNode]:
"""Merge OperationNodes generated."""
for category, node in curr_nodes.items():
# Category not seen before → initialize
if category not in nodes:
nodes[category] = deepcopy(node)
continue

existing = nodes[category]
# Merge prefixes
existing.prefixes[infores].extend(node.prefixes[infores])
# Merge attributes
existing.attributes[infores].extend(node.attributes[infores])

return nodes


def dedupe_nodes(
nodes: dict[BiolinkEntity, OperationNode], infores: Infores
) -> dict[BiolinkEntity, OperationNode]:
"""De-duplicate OperationNodes generated."""
for current in nodes.values():
current.prefixes[infores] = list(set(current.prefixes[infores]))

seen_attr: set[int] = set()
attrs: list[MetaAttributeDict] = current.attributes[infores]
deduped: list[MetaAttributeDict] = []
for attr in attrs:
hash_code = hash_meta_attribute(attr)
if hash_code not in seen_attr:
deduped.append(attr)
seen_attr.add(hash_code)

current.attributes[infores] = deduped

return nodes


def merge_operations(ops_unhashed: list[UnhashedOperation]) -> list[Operation]:
"""Merge duplicate operations."""
seen_op = dict[str, Operation]()
operations = list[Operation]()

for op in ops_unhashed:
op_hash = get_simple_op_hash(op)
if op_hash not in seen_op:
operation = generate_operation(op, op_hash)
operations.append(operation)
seen_op[op_hash] = operation
# needs merging if seen
else:
hashed_op = seen_op[op_hash]
if hashed_op.attributes is not None and op.attributes is not None:
hashed_op.attributes.extend(op.attributes)
if hashed_op.qualifiers is not None and op.qualifiers is not None:
hashed_op.qualifiers.update(op.qualifiers)

# ignoring access_metadata for now

return operations


async def generate_operations(
meta_entries: list[T1MetaData],
) -> tuple[list[Operation], dict[BiolinkEntity, OperationNode]]:
"""Generate operations and associated nodes based on metadata provided."""
infores = Infores(CONFIG.tier1.backend_infores)

operations_unhashed: list[UnhashedOperation] = []
nodes: dict[BiolinkEntity, OperationNode] = {}

for meta_entry in meta_entries:
curr_ops, curr_nodes = parse_dingo_metadata_unhashed(
DINGOMetadata(**meta_entry), 1, infores
)
operations_unhashed.extend(curr_ops)
nodes = merge_nodes(nodes, curr_nodes, infores)

operations = merge_operations(operations_unhashed)
nodes = dedupe_nodes(nodes, infores)

log.success(f"Parsed {infores} as a Tier 1 resource.")
return operations, nodes
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from retriever.data_tiers.tier_1.elasticsearch.constraints.attributes.attribute import (
process_attribute_constraints,
)
from retriever.data_tiers.tier_1.elasticsearch.constraints.qualifier import (
from retriever.data_tiers.tier_1.elasticsearch.constraints.qualifiers.qualifier import (
process_qualifier_constraints,
)
from retriever.data_tiers.tier_1.elasticsearch.types import (
Expand Down
Loading