Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 16 additions & 9 deletions code/ARAX/ARAXQuery/Expand/expand_utilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import sys
import os
import traceback
from typing import Union, Optional
from typing import Union, Optional, cast

sys.path.append(os.path.dirname(os.path.abspath(__file__))+"/../../../UI/OpenAPI/python-flask-server/")
from openapi_server.models.knowledge_graph import KnowledgeGraph
Expand Down Expand Up @@ -290,8 +290,9 @@ def get_curie_synonyms(curie: Union[str, list[str]], log: Optional[ARAXResponse]
except Exception:
tb = traceback.format_exc()
error_type, error, _ = sys.exc_info()
error_code = error_type.__name__ if error_type is not None else "UnknownError"
log.error(f"Encountered a problem using NodeSynonymizer: {tb}",
error_code=error_type.__name__) # type: ignore[union-attr]
error_code=error_code)
return []
else:
if equivalent_curies_dict is not None:
Expand Down Expand Up @@ -320,8 +321,9 @@ def get_curie_synonyms_dict(curie: Union[str, list[str]],
except Exception:
tb = traceback.format_exc()
error_type, error, _ = sys.exc_info()
error_code = error_type.__name__ if error_type is not None else "UnknownError"
log.error(f"Encountered a problem using NodeSynonymizer: {tb}",
error_code=error_type.__name__) # type: ignore[union-attr]
error_code=error_code)
return dict()
else:
if equivalent_curies_dict is not None:
Expand All @@ -348,15 +350,17 @@ def get_canonical_curies_dict(curie: Union[str, list[str]], log: ARAXResponse) -
except Exception:
tb = traceback.format_exc()
error_type, error, _ = sys.exc_info()
error_code = error_type.__name__ if error_type is not None else "UnknownError"
log.error(f"Encountered a problem using NodeSynonymizer: {tb}",
error_code=error_type.__name__) # type: ignore[union-attr]
error_code=error_code)
return {}
else:
if canonical_curies_dict is not None:
unrecognized_curies = {input_curie for input_curie in canonical_curies_dict if not canonical_curies_dict.get(input_curie)}
if unrecognized_curies:
log.warning(f"NodeSynonymizer did not recognize: {unrecognized_curies}")
return canonical_curies_dict
# Cast to proper type since we know the structure from NodeSynonymizer
return cast(dict[str, dict[str, str]], canonical_curies_dict)
else:
log.error("NodeSynonymizer returned None", error_code="NodeNormalizationIssue")
return {}
Expand All @@ -372,8 +376,9 @@ def get_canonical_curies_list(curie: Union[str, list[str]], log: ARAXResponse) -
except Exception:
tb = traceback.format_exc()
error_type, error, _ = sys.exc_info()
error_code = error_type.__name__ if error_type is not None else "UnknownError"
log.error(f"Encountered a problem using NodeSynonymizer: {tb}",
error_code=error_type.__name__) # type: ignore[union-attr]
error_code=error_code)
return []
else:
if canonical_curies_dict is not None:
Expand Down Expand Up @@ -420,7 +425,8 @@ def get_curie_names(curie: Union[str, list[str]], log: ARAXResponse) -> dict[str
synonymizer = NodeSynonymizer()
log.debug(f"Looking up names for {len(curies)} input curies using NodeSynonymizer.get_curie_names()")
curie_to_name_map = synonymizer.get_curie_names(curies)
return curie_to_name_map
# Cast to proper type since we know the structure from NodeSynonymizer
return cast(dict[str, str], curie_to_name_map)


def qg_is_fulfilled(query_graph: QueryGraph,
Expand Down Expand Up @@ -486,7 +492,7 @@ def get_connected_qedge_keys(qnode_key: str, qg: QueryGraph) -> set[str]:


def is_subclass_self_qedge(qedge: QEdge) -> bool:
return qedge.subject == qedge.object and qedge.predicates == ["biolink:subclass_of"]
return bool(qedge.subject == qedge.object and qedge.predicates == ["biolink:subclass_of"])


def flip_edge(edge: Edge, new_predicate: str) -> Edge:
Expand Down Expand Up @@ -601,7 +607,8 @@ def remove_semmeddb_edges_and_nodes_with_low_publications(kg: KnowledgeGraph,
except Exception:
tb = traceback.format_exc()
error_type, error, _ = sys.exc_info()
log.error(tb, error_code=error_type.__name__) # type: ignore[union-attr]
error_code = error_type.__name__ if error_type is not None else "UnknownError"
log.error(tb, error_code=error_code)
log.error("Something went wrong removing semmeddb edges from the knowledge graph")
else:
log.info(f"{edges_removed_counter} Semmeddb Edges with low publication count successfully removed")
Expand Down
31 changes: 24 additions & 7 deletions code/ARAX/ARAXQuery/Expand/kp_info_cacher.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import requests_cache
import sys
from datetime import datetime, timedelta
from typing import Optional
from typing import Optional, Dict, cast

sys.path.append(os.path.dirname(os.path.abspath(__file__)))
sys.path.append(os.path.dirname(os.path.abspath(__file__)) + "/../")
Expand Down Expand Up @@ -57,20 +57,24 @@ def refresh_kp_info_caches(self):

smart_api_cache_contents = {"allowed_kp_urls": allowed_kp_urls,
"kps_excluded_by_version": smart_api_helper.kps_excluded_by_version,
"kps_excluded_by_maturity": smart_api_helper.kps_excluded_by_maturity}
"kps_excluded_by_maturity": smart_api_helper.kps_excluded_by_maturity,
"kps_excluded_by_black_list": getattr(smart_api_helper, 'kps_excluded_by_black_list', set())}

else:
eprint("Keeping pre-existing SmartAPI cache since we got no results back from SmartAPI")
with open(self.smart_api_and_meta_map_cache, "rb") as cache_file:
smart_api_cache_contents = pickle.load(cache_file)['smart_api_cache']

# Grab KPs' meta map info based off of their /meta_knowledge_graph endpoints
meta_map = self._build_meta_map(allowed_kps_dict=smart_api_cache_contents["allowed_kp_urls"])
allowed_kp_urls = cast(Dict[str, str], smart_api_cache_contents["allowed_kp_urls"])
meta_map, valid_kps, kp_status_codes = self._build_meta_map(allowed_kps_dict=allowed_kp_urls)


common_cache = {
"smart_api_cache": smart_api_cache_contents,
"meta_map_cache": meta_map
"meta_map_cache": meta_map,
"valid_kps": valid_kps,
"kp_status_codes": kp_status_codes,
}

with open(f"{self.smart_api_and_meta_map_cache}.tmp", "wb") as smart_api__and_meta_map_cache_temp:
Expand Down Expand Up @@ -140,9 +144,12 @@ def load_kp_info_caches(self, log: ARAXResponse):
cache = pickle.load(cache)
smart_api_info = cache['smart_api_cache']
meta_map = cache['meta_map_cache']
# Handle backwards compatibility - older caches might not have valid_kps
valid_kps = cache.get('valid_kps', set(meta_map.keys()))
kp_status_codes = cache.get('kp_status_codes', {})


return smart_api_info, meta_map
return smart_api_info, meta_map, valid_kps, kp_status_codes

# --------------------------------- METHODS FOR BUILDING META MAP ----------------------------------------------- #
# --- Note: These methods can't go in KPSelector because it would create a circular dependency with this class -- #
Expand All @@ -152,10 +159,15 @@ def _build_meta_map(self, allowed_kps_dict: dict[str, str]):
cache_file = pathlib.Path(self.smart_api_and_meta_map_cache )
if cache_file.exists():
with open(self.smart_api_and_meta_map_cache, "rb") as cache:
meta_map = pickle.load(cache)['meta_map_cache']
cache_data = pickle.load(cache)
meta_map = cache_data['meta_map_cache']
else:
meta_map = dict()

# Track which KPs have fresh/valid meta-KG data (returned HTTP 200)
valid_kps = set()
kp_status_codes = {} # Track status codes for better error messages

# Then (try to) get updated meta info from each KP
for kp_infores_curie, kp_endpoint_url in allowed_kps_dict.items():
if kp_endpoint_url:
Expand All @@ -166,9 +178,12 @@ def _build_meta_map(self, allowed_kps_dict: dict[str, str]):
except requests.exceptions.Timeout:
eprint(f" Timed out when trying to hit {kp_infores_curie}'s /meta_knowledge_graph endpoint "
f"(waited 10 seconds)")
kp_status_codes[kp_infores_curie] = "timeout"
except Exception:
eprint(f" Ran into a problem getting {kp_infores_curie}'s meta info")
kp_status_codes[kp_infores_curie] = "connection_error"
else:
kp_status_codes[kp_infores_curie] = str(kp_response.status_code)
if kp_response.status_code == 200:
try:
kp_meta_kg = kp_response.json()
Expand All @@ -182,6 +197,8 @@ def _build_meta_map(self, allowed_kps_dict: dict[str, str]):
meta_map[kp_infores_curie] = {"predicates": self._convert_meta_kg_to_meta_map(kp_meta_kg),
"prefixes": {category: meta_node["id_prefixes"]
for category, meta_node in kp_meta_kg["nodes"].items()}}
# Track that this KP returned valid data
valid_kps.add(kp_infores_curie)
else:
eprint(f"Unable to access {kp_infores_curie}'s /meta_knowledge_graph endpoint "
f"(returned status of {kp_response.status_code} for URL {kp_endpoint_url})")
Expand All @@ -193,7 +210,7 @@ def _build_meta_map(self, allowed_kps_dict: dict[str, str]):
eprint(f"Detected a stale KP in meta map ({stale_kp}) - deleting it")
del meta_map[stale_kp]

return meta_map
return meta_map, valid_kps, kp_status_codes

@staticmethod
def _convert_meta_kg_to_meta_map(kp_meta_kg: dict) -> dict:
Expand Down
41 changes: 32 additions & 9 deletions code/ARAX/ARAXQuery/Expand/kp_selector.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,29 +27,38 @@ def __init__(self, kg2_mode: bool = False, log: ARAXResponse = ARAXResponse()):
self.log = log
self.kg2_mode = kg2_mode
self.kp_cacher = KPInfoCacher()
self.meta_map, self.kp_urls, self.kps_excluded_by_version, self.kps_excluded_by_maturity = self._load_cached_kp_info()
(self.meta_map,
self.kp_urls,
self.kps_excluded_by_version,
self.kps_excluded_by_maturity,
self.kps_excluded_by_black_list,
self.valid_kps_from_cache,
self.kp_status_codes) = self._load_cached_kp_info()
if (not self.kg2_mode) and (self.kp_urls is None):
raise ValueError("KP info cache has not been filled and we are not in KG2 mode; cannot initialize KP selector")
self.valid_kps = {"infores:rtx-kg2"} if self.kg2_mode else set(self.kp_urls.keys())
self.valid_kps = {"infores:rtx-kg2"} if self.kg2_mode else self.valid_kps_from_cache
self.bh = BiolinkHelper()

def _load_cached_kp_info(self) -> tuple:
if self.kg2_mode:
# We don't need any KP meta info when in KG2 mode, because there are no KPs to choose from
return None, None, None, None
return None, None, None, None, set(), {}
else:
# Load cached KP info
kp_cacher = KPInfoCacher()
try:
smart_api_info, meta_map = kp_cacher.load_kp_info_caches(self.log)
smart_api_info, meta_map, valid_kps, kp_status_codes = kp_cacher.load_kp_info_caches(self.log)
except Exception as e:
self.log.error(f"Failed to load KP info caches due to {e}", error_code="LoadKPCachesFailed")
return None, None, None, None
return None, None, None, None, set(), {}

return (meta_map,
smart_api_info["allowed_kp_urls"],
smart_api_info["kps_excluded_by_version"],
smart_api_info["kps_excluded_by_maturity"])
smart_api_info["kps_excluded_by_maturity"],
smart_api_info.get("kps_excluded_by_black_list", set()),
valid_kps,
kp_status_codes)

def get_kps_for_single_hop_qg(self, qg: QueryGraph) -> Optional[set[str]]:
"""
Expand All @@ -70,10 +79,19 @@ def get_kps_for_single_hop_qg(self, qg: QueryGraph) -> Optional[set[str]]:
obj_categories = set(self.bh.get_descendants(qg.nodes[qedge.object].categories))
predicates = set(self.bh.get_descendants(qedge_predicates))

# use metamap to check kp for predicate triple
self.log.debug(f"selecting from {len(self.valid_kps)} kps")
# use metamap to check kp for predicate triple - only use valid KPs (those that returned HTTP 200 during cache refresh)
self.log.debug(f"selecting from {len(self.valid_kps)} valid kps")
accepting_kps = set()
for kp in self.meta_map:

# Log skipped KPs that have meta-KG data but are not valid (didn't return HTTP 200 during cache refresh)
stale_kps = set(self.meta_map.keys()).difference(self.valid_kps)
for stale_kp in stale_kps:
status_code = self.kp_status_codes.get(stale_kp, "unknown")
self.log.update_query_plan(qedge_key, stale_kp, "Skipped", f"MetaKG endpoint was unreachable ({status_code})")

# Check if valid KPs support the query (only consider KPs that are both valid and have meta-KG data)
valid_kps_with_meta = self.valid_kps.intersection(set(self.meta_map.keys()))
for kp in valid_kps_with_meta:
if self._triple_is_in_meta_map(kp,
sub_categories,
predicates,
Expand All @@ -100,6 +118,11 @@ def get_kps_for_single_hop_qg(self, qg: QueryGraph) -> Optional[set[str]]:
self.log.update_query_plan(qedge_key, kp, "Skipped", f"KP does not have a {maturity} TRAPI {version} endpoint")
self.log.debug(f"Skipped {kp}: KP does not have a {maturity} TRAPI {version} endpoint")

# Log hard-blocklisted KPs
for kp in set(filter(None, getattr(self, 'kps_excluded_by_black_list', set()))):
self.log.update_query_plan(qedge_key, kp, "Skipped", "Blocklisted by ARAX (KP is unstable)")
self.log.debug(f"Skipped {kp}: Blocklisted by ARAX (KP is unstable)")

return accepting_kps

def kp_accepts_single_hop_qg(self, qg: QueryGraph, kp: str) -> Optional[bool]:
Expand Down
6 changes: 6 additions & 0 deletions code/ARAX/ARAXQuery/Expand/mypy.ini
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
[mypy]
# Global mypy configuration for RTX Expand module

# Suppress import-not-found errors for internal modules that use dynamic path manipulation
# These modules exist at runtime but mypy can't find them due to sys.path.append() usage
ignore_missing_imports = True
27 changes: 20 additions & 7 deletions code/ARAX/ARAXQuery/Expand/smartapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,13 @@

def eprint(*args, **kwargs): print(*args, file=sys.stderr, **kwargs)


# Hardcoded blacklist for KPs that are known to be failing or problematic
# These infores CURIEs will be excluded from all SmartAPI-derived KP lists
BLOCKLISTED_KPS = {
'infores:spoke',
}

class SmartAPI:
"""SmartAPI."""

Expand All @@ -17,6 +24,7 @@ def __init__(self):
self.base_url = "http://smart-api.info/api"
self.kps_excluded_by_version = set()
self.kps_excluded_by_maturity = set()
self.kps_excluded_by_black_list = set()
self.kps_accepted = set()


Expand All @@ -35,7 +43,7 @@ def get_all_trapi_endpoint_info(self):
try:
response_content.raise_for_status()
response_dict = response_content.json()
except:
except Exception:
return endpoints

hits = response_dict["hits"]["hits"] if "hits" in response_dict["hits"] else response_dict["hits"]
Expand Down Expand Up @@ -75,7 +83,7 @@ def get_trapi_endpoints(self, version=None, whitelist=None, blacklist=None):
try:
response_content.raise_for_status()
response_dict = response_content.json()
except:
except Exception:
return endpoints

hits = response_dict["hits"]["hits"] if "hits" in response_dict["hits"] else response_dict["hits"]
Expand Down Expand Up @@ -140,7 +148,7 @@ def get_trapi_endpoints(self, version=None, whitelist=None, blacklist=None):

try:
smartapi_url = "https://smart-api.info/ui/" + hit["_id"]
except:
except Exception:
smartapi_url = None

endpoints.append({
Expand All @@ -153,6 +161,11 @@ def get_trapi_endpoints(self, version=None, whitelist=None, blacklist=None):
"smartapi_url": smartapi_url
})

# Apply ARAX hard blacklist first (absolute exclusion)
blocklisted_endpoints = [ep for ep in endpoints if ep["infores_name"] in BLOCKLISTED_KPS]
self.kps_excluded_by_black_list = {ep["infores_name"] for ep in blocklisted_endpoints}
endpoints = [ep for ep in endpoints if ep["infores_name"] not in BLOCKLISTED_KPS]

if whitelist:
endpoints = [ep for ep in endpoints if ep["infores_name"] in whitelist]
if blacklist:
Expand All @@ -177,7 +190,7 @@ def collate_and_print(self, endpoints):
for ep in endpoints:
infores_name = str(ep["infores_name"])
component = str(ep["component"])
maturities = {server["maturity"] for server in ep["servers"] if server["maturity"] != None}
maturities = {server["maturity"] for server in ep["servers"] if server["maturity"] is not None}
n_entries = 1
# if new entry, start with n_entries = 1
if infores_name not in entries:
Expand Down Expand Up @@ -247,7 +260,7 @@ def get_all_trapi_kp_registrations(self, log=None, trapi_version=None, req_matur
all_KPs = [ep for ep in endpoints if ep["component"] == "KP"]

if req_maturity:
if hierarchy == None:
if hierarchy is None:
hierarchy = ["development","staging","testing","production"]
if req_maturity not in hierarchy:
raise ValueError("Invalid maturity passed to get_kps")
Expand Down Expand Up @@ -343,10 +356,10 @@ def main():
output = smartapi.get_operations_endpoints(whitelist=args.whitelist, blacklist=args.blacklist)

elif args.results_type == "get_kps":
if (args.hierarchy or args.flexible) and (args.req_maturity == None):
if (args.hierarchy or args.flexible) and (args.req_maturity is None):
argparser.print_help()
return
if args.hierarchy and args.flexible == None:
if args.hierarchy and args.flexible is None:
argparser.print_help()
return
output = smartapi.get_all_trapi_kp_registrations(trapi_version=args.version, req_maturity=args.req_maturity, flexible=args.flexible, hierarchy=args.hierarchy, whitelist=args.whitelist, blacklist=args.blacklist)
Expand Down
Loading