From 9af5c2afbb1f73b1a1f970931b54130f7981be21 Mon Sep 17 00:00:00 2001 From: Gaurav Vaidya Date: Fri, 17 Mar 2023 17:14:58 -0400 Subject: [PATCH] Remove try blocks that make debugging harder. These try blocks catch all the exceptions and report them to the logger, but I'm pretty sure that will happen anyway. By doing this, they get rid of the line-number information that would help debug what is causing the errors. --- node_normalizer/normalizer.py | 625 ++++++++++++++++------------------ node_normalizer/server.py | 55 ++- 2 files changed, 326 insertions(+), 354 deletions(-) diff --git a/node_normalizer/normalizer.py b/node_normalizer/normalizer.py index 27f8e47..a0ec458 100644 --- a/node_normalizer/normalizer.py +++ b/node_normalizer/normalizer.py @@ -29,27 +29,24 @@ async def normalize_message(app: FastAPI, message: Message) -> Message: Given a TRAPI message, updates the message to include a normalized qgraph, kgraph, and results """ - try: - ret = Message() + ret = Message() - logger.debug(f"message.query_graph is None: {message.query_graph is None}") - if message.query_graph is not None: - merged_qgraph = await normalize_qgraph(app, message.query_graph) - ret.query_graph = merged_qgraph + logger.debug(f"message.query_graph is None: {message.query_graph is None}") + if message.query_graph is not None: + merged_qgraph = await normalize_qgraph(app, message.query_graph) + ret.query_graph = merged_qgraph - logger.debug(f"message.knowledge_graph is None: {message.knowledge_graph is None}") - if message.knowledge_graph is not None: - merged_kgraph, node_id_map, edge_id_map = await normalize_kgraph(app, message.knowledge_graph) - ret.knowledge_graph = merged_kgraph + logger.debug(f"message.knowledge_graph is None: {message.knowledge_graph is None}") + if message.knowledge_graph is not None: + merged_kgraph, node_id_map, edge_id_map = await normalize_kgraph(app, message.knowledge_graph) + ret.knowledge_graph = merged_kgraph - logger.debug(f"message.results is None: {message.results is None}") - if message.results is not None: - merged_results = await normalize_results(app, message.results, node_id_map, edge_id_map) - ret.results = merged_results + logger.debug(f"message.results is None: {message.results is None}") + if message.results is not None: + merged_results = await normalize_results(app, message.results, node_id_map, edge_id_map) + ret.results = merged_results - return ret - except Exception as e: - logger.error(f'normalize_message Exception: {e}') + return ret async def normalize_results(app, @@ -72,84 +69,77 @@ async def normalize_results(app, node_binding_seen = set() - try: - for node_code, node_bindings in result.node_bindings.items(): - merged_node_bindings = [] - for n_bind in node_bindings: - merged_binding = n_bind.dict() - # merged_binding['id'] = node_id_map[n_bind.id.__root__] - merged_binding['id'] = node_id_map[n_bind.id] - - # get the information content value - ic_attrib = await get_info_content_attribute(app, merged_binding['id']) - - # did we get a good attribute dict - if ic_attrib: - if 'attributes' in merged_binding: - merged_binding['attributes'].append(ic_attrib) - else: - merged_binding['attributes'] = [ic_attrib] - - node_binding_information = [ - "atts" if k == 'attributes' - else (k, tuple(v)) if isinstance(v, list) - else (k, v) - for k, v in merged_binding.items() - ] + for node_code, node_bindings in result.node_bindings.items(): + merged_node_bindings = [] + for n_bind in node_bindings: + merged_binding = n_bind.dict() + # merged_binding['id'] = node_id_map[n_bind.id.__root__] + merged_binding['id'] = node_id_map[n_bind.id] - # if there are attributes in the node binding - if 'attributes' in merged_binding: - # storage for the pydantic Attributes - attribs = [] - - # the items in list of attributes must be of type Attribute - # in order to reuse hash method - if merged_binding['attributes'] is not None: - for attrib in merged_binding['attributes']: - new_attrib = Attribute.parse_obj(attrib) - - # add the new Attribute to the list - attribs.append(new_attrib) - - # call to get the hash - atty_hash = _hash_attributes(attribs) - node_binding_information.append(atty_hash) - node_binding_hash = frozenset(node_binding_information) + # get the information content value + ic_attrib = await get_info_content_attribute(app, merged_binding['id']) - if node_binding_hash in node_binding_seen: - continue + # did we get a good attribute dict + if ic_attrib: + if 'attributes' in merged_binding: + merged_binding['attributes'].append(ic_attrib) else: - node_binding_seen.add(node_binding_hash) - merged_node_bindings.append(merged_binding) - - merged_result['node_bindings'][node_code] = merged_node_bindings + merged_binding['attributes'] = [ic_attrib] + + node_binding_information = [ + "atts" if k == 'attributes' + else (k, tuple(v)) if isinstance(v, list) + else (k, v) + for k, v in merged_binding.items() + ] + + # if there are attributes in the node binding + if 'attributes' in merged_binding: + # storage for the pydantic Attributes + attribs = [] + + # the items in list of attributes must be of type Attribute + # in order to reuse hash method + if merged_binding['attributes'] is not None: + for attrib in merged_binding['attributes']: + new_attrib = Attribute.parse_obj(attrib) + + # add the new Attribute to the list + attribs.append(new_attrib) + + # call to get the hash + atty_hash = _hash_attributes(attribs) + node_binding_information.append(atty_hash) + node_binding_hash = frozenset(node_binding_information) + + if node_binding_hash in node_binding_seen: + continue + else: + node_binding_seen.add(node_binding_hash) + merged_node_bindings.append(merged_binding) - except Exception as e: - logger.exception(e) + merged_result['node_bindings'][node_code] = merged_node_bindings edge_binding_seen = set() - try: - for edge_code, edge_bindings in result.edge_bindings.items(): - merged_edge_bindings = [] - for e_bind in edge_bindings: - merged_binding = e_bind.dict() - merged_binding['id'] = edge_id_map[e_bind.id] - - edge_binding_hash = frozenset([ - (k, freeze(v)) - for k, v in merged_binding.items() - ]) - - if edge_binding_hash in edge_binding_seen: - continue - else: - edge_binding_seen.add(edge_binding_hash) - merged_edge_bindings.append(merged_binding) + for edge_code, edge_bindings in result.edge_bindings.items(): + merged_edge_bindings = [] + for e_bind in edge_bindings: + merged_binding = e_bind.dict() + merged_binding['id'] = edge_id_map[e_bind.id] - merged_result['edge_bindings'][edge_code] = merged_edge_bindings - except Exception as e: - logger.exception(e) + edge_binding_hash = frozenset([ + (k, freeze(v)) + for k, v in merged_binding.items() + ]) + + if edge_binding_hash in edge_binding_seen: + continue + else: + edge_binding_seen.add(edge_binding_hash) + merged_edge_bindings.append(merged_binding) + + merged_result['edge_bindings'][edge_code] = merged_edge_bindings try: # This used to have some list comprehension based on types. But in TRAPI 1.1 the list/dicts get pretty deep. @@ -189,28 +179,25 @@ async def normalize_qgraph(app: FastAPI, qgraph: QueryGraph) -> QueryGraph: node_code_map: Dict[str, Union[str, List]] = {} for node_code, node in qgraph.nodes.items(): - try: - merged_nodes[node_code] = node.dict() + merged_nodes[node_code] = node.dict() - # as of TRAPI 1.1, node.id must be none or a list. - # node.id can be none, a string, or a list - if not node.ids: - # do nothing - continue - else: - if not isinstance(node.ids.__root__, list): - raise Exception("node.ids must be a list") - primary_ids = set() - for nr in node.ids.__root__: - equivalent_curies = await get_equivalent_curies(app, nr) - if equivalent_curies[nr]: - primary_ids.add(equivalent_curies[nr]['id']['identifier']) - else: - primary_ids.add(nr) - merged_nodes[node_code]['ids'] = list(primary_ids) - node_code_map[node_code] = list(primary_ids) - except Exception as e: - logger.error(f'normalize_qgraph Exception: {e}') + # as of TRAPI 1.1, node.id must be none or a list. + # node.id can be none, a string, or a list + if not node.ids: + # do nothing + continue + else: + if not isinstance(node.ids.__root__, list): + raise Exception("node.ids must be a list") + primary_ids = set() + for nr in node.ids.__root__: + equivalent_curies = await get_equivalent_curies(app, nr) + if equivalent_curies[nr]: + primary_ids.add(equivalent_curies[nr]['id']['identifier']) + else: + primary_ids.add(nr) + merged_nodes[node_code]['ids'] = list(primary_ids) + node_code_map[node_code] = list(primary_ids) return QueryGraph.parse_obj({ 'nodes': merged_nodes, @@ -241,151 +228,148 @@ async def normalize_kgraph( node_id_map: Dict[str, str] = {} edge_id_map: Dict[str, str] = {} - try: - # Map for each node id (curie) and its primary id - node_id_map: Dict[str, str] = {} + # Map for each node id (curie) and its primary id + node_id_map: Dict[str, str] = {} - # Map for each edge id and its primary id - edge_id_map: Dict[str, str] = {} + # Map for each edge id and its primary id + edge_id_map: Dict[str, str] = {} - # Map for each edge to its s,p,r,o signature - primary_edges: Dict[Tuple[str, str, Optional[str], str, Union[UUID, int]], str] = {} + # Map for each edge to its s,p,r,o signature + primary_edges: Dict[Tuple[str, str, Optional[str], str, Union[UUID, int]], str] = {} - # cache for primary node ids - primary_nodes_seen = set() + # cache for primary node ids + primary_nodes_seen = set() - # Count of times a node has been merged for attribute merging - node_merge_count: Dict[str, int] = {} + # Count of times a node has been merged for attribute merging + node_merge_count: Dict[str, int] = {} - # cache for nodes - nodes_seen = set() + # cache for nodes + nodes_seen = set() - # cache for subject, predicate, relation, object, attribute hash tuples - edges_seen: Set[Tuple[str, str, str, str, Union[UUID, int]]] = set() + # cache for subject, predicate, relation, object, attribute hash tuples + edges_seen: Set[Tuple[str, str, str, str, Union[UUID, int]]] = set() - for node_id, node in kgraph.nodes.items(): - if node_id in nodes_seen: - continue + for node_id, node in kgraph.nodes.items(): + if node_id in nodes_seen: + continue - nodes_seen.add(node_id) - node_id_map[node_id] = node_id # expected to overridden by primary id + nodes_seen.add(node_id) + node_id_map[node_id] = node_id # expected to overridden by primary id - merged_node = node.dict() + merged_node = node.dict() - equivalent_curies = await get_equivalent_curies(app, node_id) + equivalent_curies = await get_equivalent_curies(app, node_id) - if equivalent_curies[node_id]: - primary_id = equivalent_curies[node_id]['id']['identifier'] - node_id_map[node_id] = primary_id + if equivalent_curies[node_id]: + primary_id = equivalent_curies[node_id]['id']['identifier'] + node_id_map[node_id] = primary_id - if primary_id in primary_nodes_seen: - merged_node = _merge_node_attributes( - node_a=merged_kgraph['nodes'][primary_id], - node_b=node.dict(), - merged_count=node_merge_count[primary_id] - ) - merged_kgraph['nodes'][primary_id] = merged_node - node_merge_count[primary_id] += 1 - continue - else: - node_merge_count[primary_id] = 0 + if primary_id in primary_nodes_seen: + merged_node = _merge_node_attributes( + node_a=merged_kgraph['nodes'][primary_id], + node_b=node.dict(), + merged_count=node_merge_count[primary_id] + ) + merged_kgraph['nodes'][primary_id] = merged_node + node_merge_count[primary_id] += 1 + continue + else: + node_merge_count[primary_id] = 0 - primary_nodes_seen.add(primary_id) + primary_nodes_seen.add(primary_id) - if 'label' in equivalent_curies[node_id]['id']: - primary_label = equivalent_curies[node_id]['id']['label'] - elif 'name' in merged_node: - primary_label = merged_node['name'] + if 'label' in equivalent_curies[node_id]['id']: + primary_label = equivalent_curies[node_id]['id']['label'] + elif 'name' in merged_node: + primary_label = merged_node['name'] + else: + primary_label = '' + + merged_node['name'] = primary_label + + # Even if there's already a same_as attribute we add another + # since it is coming from a new source + if 'equivalent_identifiers' in equivalent_curies[node_id]: + same_as_attribute = { + 'attribute_type_id': 'biolink:same_as', + 'value': [ + node['identifier'] + for node in equivalent_curies[node_id]['equivalent_identifiers'] + ], + 'original_attribute_name': 'equivalent_identifiers', + "value_type_id": "EDAM:data_0006", + + # TODO, should we add the app version as the source + # or perhaps the babel/redis cache version + # This will make unit testing a little more tricky + # see https://stackoverflow.com/q/57624731 + + # 'source': f'{app.title} {app.version}', + } + if 'attributes' in merged_node and merged_node['attributes']: + merged_node['attributes'].append(same_as_attribute) else: - primary_label = '' - - merged_node['name'] = primary_label - - # Even if there's already a same_as attribute we add another - # since it is coming from a new source - if 'equivalent_identifiers' in equivalent_curies[node_id]: - same_as_attribute = { - 'attribute_type_id': 'biolink:same_as', - 'value': [ - node['identifier'] - for node in equivalent_curies[node_id]['equivalent_identifiers'] - ], - 'original_attribute_name': 'equivalent_identifiers', - "value_type_id": "EDAM:data_0006", - - # TODO, should we add the app version as the source - # or perhaps the babel/redis cache version - # This will make unit testing a little more tricky - # see https://stackoverflow.com/q/57624731 - - # 'source': f'{app.title} {app.version}', - } - if 'attributes' in merged_node and merged_node['attributes']: - merged_node['attributes'].append(same_as_attribute) - else: - merged_node['attributes'] = [same_as_attribute] + merged_node['attributes'] = [same_as_attribute] - if 'type' in equivalent_curies[node_id]: - if type(equivalent_curies[node_id]['type']) is list: - merged_node['categories'] = equivalent_curies[node_id]['type'] - else: - merged_node['categories'] = [equivalent_curies[node_id]['type']] + if 'type' in equivalent_curies[node_id]: + if type(equivalent_curies[node_id]['type']) is list: + merged_node['categories'] = equivalent_curies[node_id]['type'] + else: + merged_node['categories'] = [equivalent_curies[node_id]['type']] - # get the information content value - ic_attrib = await get_info_content_attribute(app, node_id) + # get the information content value + ic_attrib = await get_info_content_attribute(app, node_id) - # did we get a good attribute dict - if ic_attrib: - # add the attribute to the node - merged_node['attributes'].append(ic_attrib) + # did we get a good attribute dict + if ic_attrib: + # add the attribute to the node + merged_node['attributes'].append(ic_attrib) - merged_kgraph['nodes'][primary_id] = merged_node - else: - merged_kgraph['nodes'][node_id] = merged_node - - for edge_id, edge in kgraph.edges.items(): - # Accessing __root__ directly seems wrong, - # https://github.com/samuelcolvin/pydantic/issues/730 - # could also do str(edge.subject) - if edge.subject in node_id_map: - primary_subject = node_id_map[edge.subject] - else: - # should we throw a validation error here? - primary_subject = edge.subject + merged_kgraph['nodes'][primary_id] = merged_node + else: + merged_kgraph['nodes'][node_id] = merged_node + + for edge_id, edge in kgraph.edges.items(): + # Accessing __root__ directly seems wrong, + # https://github.com/samuelcolvin/pydantic/issues/730 + # could also do str(edge.subject) + if edge.subject in node_id_map: + primary_subject = node_id_map[edge.subject] + else: + # should we throw a validation error here? + primary_subject = edge.subject - if edge.object in node_id_map: - primary_object = node_id_map[edge.object] - else: - primary_object = edge.object + if edge.object in node_id_map: + primary_object = node_id_map[edge.object] + else: + primary_object = edge.object - hashed_attributes = _hash_attributes(edge.attributes) + hashed_attributes = _hash_attributes(edge.attributes) - if hashed_attributes is False: - # we couldn't hash the attribute so assume unique - hashed_attributes = uuid.uuid4() + if hashed_attributes is False: + # we couldn't hash the attribute so assume unique + hashed_attributes = uuid.uuid4() - triple = ( - primary_subject, - edge.predicate, - primary_object, - hashed_attributes - ) + triple = ( + primary_subject, + edge.predicate, + primary_object, + hashed_attributes + ) - if triple in edges_seen: - edge_id_map[edge_id] = primary_edges[triple] - continue - else: - primary_edges[triple] = edge_id - edge_id_map[edge_id] = edge_id + if triple in edges_seen: + edge_id_map[edge_id] = primary_edges[triple] + continue + else: + primary_edges[triple] = edge_id + edge_id_map[edge_id] = edge_id - edges_seen.add(triple) - merged_edge = edge.dict() + edges_seen.add(triple) + merged_edge = edge.dict() - merged_edge['subject'] = primary_subject - merged_edge['object'] = primary_object - merged_kgraph['edges'][edge_id] = merged_edge - except Exception as e: - logger.error(f'normalize_kgraph Exception: {e}') + merged_edge['subject'] = primary_subject + merged_edge['object'] = primary_object + merged_kgraph['edges'][edge_id] = merged_edge return KnowledgeGraph.parse_obj(merged_kgraph), node_id_map, edge_id_map @@ -486,71 +470,68 @@ async def get_normalized_nodes( # conflation_redis = 5 upper_curies = [c.upper() for c in curies] - try: - canonical_ids = await app.state.redis_connection0.mget(*upper_curies, encoding='utf-8') - canonical_nonan = [canonical_id for canonical_id in canonical_ids if canonical_id is not None] - info_contents = {} + canonical_ids = await app.state.redis_connection0.mget(*upper_curies, encoding='utf-8') + canonical_nonan = [canonical_id for canonical_id in canonical_ids if canonical_id is not None] + info_contents = {} - # did we get some canonical ids - if canonical_nonan: - # get the information content values - info_contents = await get_info_content(app, canonical_nonan) + # did we get some canonical ids + if canonical_nonan: + # get the information content values + info_contents = await get_info_content(app, canonical_nonan) - # Get the equivalent_ids and types - eqids, types = await get_eqids_and_types(app, canonical_nonan) + # Get the equivalent_ids and types + eqids, types = await get_eqids_and_types(app, canonical_nonan) - # are we looking for conflated values - if conflate: - # TODO: filter to just types that have Gene or Protein? I'm not sure it's worth it when we have pipelining - other_ids = await app.state.redis_connection5.mget(*canonical_nonan, encoding='utf8') + # are we looking for conflated values + if conflate: + # TODO: filter to just types that have Gene or Protein? I'm not sure it's worth it when we have pipelining + other_ids = await app.state.redis_connection5.mget(*canonical_nonan, encoding='utf8') - # if there are other ids, then we want to rebuild eqids and types. That's because even though we have them, - # they're not necessarily first. For instance if what came in and got canonicalized was a protein id - # and we want gene first, then we're relying on the order of the other_ids to put it back in the right place. - other_ids = [json.loads(oids) if oids is not None else [] for oids in other_ids] - dereference_others = dict(zip(canonical_nonan, other_ids)) + # if there are other ids, then we want to rebuild eqids and types. That's because even though we have them, + # they're not necessarily first. For instance if what came in and got canonicalized was a protein id + # and we want gene first, then we're relying on the order of the other_ids to put it back in the right place. + other_ids = [json.loads(oids) if oids is not None else [] for oids in other_ids] + dereference_others = dict(zip(canonical_nonan, other_ids)) - all_other_ids = sum(other_ids, []) - eqids2, types2 = await get_eqids_and_types(app, all_other_ids) + all_other_ids = sum(other_ids, []) + eqids2, types2 = await get_eqids_and_types(app, all_other_ids) - final_eqids = [] - final_types = [] + final_eqids = [] + final_types = [] - deref_others_eqs = dict(zip(all_other_ids, eqids2)) - deref_others_typ = dict(zip(all_other_ids, types2)) + deref_others_eqs = dict(zip(all_other_ids, eqids2)) + deref_others_typ = dict(zip(all_other_ids, types2)) - zipped = zip(canonical_nonan, eqids, types) + zipped = zip(canonical_nonan, eqids, types) - for canonical_id, e, t in zipped: - # here's where we replace the eqids, types - if len(dereference_others[canonical_id]) > 0: - e = [] - t = [] + for canonical_id, e, t in zipped: + # here's where we replace the eqids, types + if len(dereference_others[canonical_id]) > 0: + e = [] + t = [] - for other in dereference_others[canonical_id]: - e += deref_others_eqs[other] - t += deref_others_typ[other] + for other in dereference_others[canonical_id]: + e += deref_others_eqs[other] + t += deref_others_typ[other] - final_eqids.append(e) - final_types.append(uniquify_list(t)) + final_eqids.append(e) + final_types.append(uniquify_list(t)) - dereference_ids = dict(zip(canonical_nonan, final_eqids)) - dereference_types = dict(zip(canonical_nonan, final_types)) - else: - dereference_ids = dict(zip(canonical_nonan, eqids)) - dereference_types = dict(zip(canonical_nonan, types)) + dereference_ids = dict(zip(canonical_nonan, final_eqids)) + dereference_types = dict(zip(canonical_nonan, final_types)) else: - dereference_ids = dict() - dereference_types = dict() + dereference_ids = dict(zip(canonical_nonan, eqids)) + dereference_types = dict(zip(canonical_nonan, types)) + else: + dereference_ids = dict() + dereference_types = dict() - # output the final result - normal_nodes = { - input_curie: await create_node(canonical_id, dereference_ids, dereference_types, info_contents) - for input_curie, canonical_id in zip(curies, canonical_ids) - } + # output the final result + normal_nodes = { + input_curie: await create_node(canonical_id, dereference_ids, dereference_types, info_contents) + for input_curie, canonical_id in zip(curies, canonical_ids) + } - except Exception as e: - logger.error(f'Exception: {e}') return normal_nodes @@ -620,38 +601,35 @@ async def get_curie_prefixes( """ ret_val: dict = {} # storage for the returned data - try: - # was an arg passed in - if semantic_types: - for item in semantic_types: - # get the curies for this type - curies = await app.state.redis_connection3.get(item, encoding='utf-8') + # was an arg passed in + if semantic_types: + for item in semantic_types: + # get the curies for this type + curies = await app.state.redis_connection3.get(item, encoding='utf-8') - # did we get any data - if not curies: - curies = '{' + f'"{item}"' + ': "Not found"}' + # did we get any data + if not curies: + curies = '{' + f'"{item}"' + ': "Not found"}' - curies = json.loads(curies) + curies = json.loads(curies) - # set the return data - ret_val[item] = {'curie_prefix': curies} - else: - types = await app.state.redis_connection3.lrange('semantic_types', 0, -1, encoding='utf-8') + # set the return data + ret_val[item] = {'curie_prefix': curies} + else: + types = await app.state.redis_connection3.lrange('semantic_types', 0, -1, encoding='utf-8') - for item in types: - # get the curies for this type - curies = await app.state.redis_connection3.get(item, encoding='utf-8') + for item in types: + # get the curies for this type + curies = await app.state.redis_connection3.get(item, encoding='utf-8') - # did we get any data - if not curies: - curies = '{' + f'"{item}"' + ': "Not found"}' + # did we get any data + if not curies: + curies = '{' + f'"{item}"' + ': "Not found"}' - curies = json.loads(curies) + curies = json.loads(curies) - # set the return data - ret_val[item] = {'curie_prefix': curies} - except Exception as e: - logger.error(f'get_curie_prefixes Exception: {e}') + # set the return data + ret_val[item] = {'curie_prefix': curies} return ret_val @@ -663,34 +641,31 @@ def _merge_node_attributes(node_a: Dict, node_b, merged_count: int) -> Dict: :param merged_count: the number of nodes merged into node_a **upon entering this fx** """ - try: - if not ('attributes' in node_b and node_b['attributes']): - return node_a - - if merged_count == 0: - if 'attributes' in node_a and node_a['attributes']: - new_attribute_list = [] - for attribute in node_a['attributes']: - new_dict = {} - for k, v in attribute.items(): - new_dict[f"{k}.1"] = v - new_attribute_list.append(new_dict) - - node_a['attributes'] = new_attribute_list - - # Need to DRY this off - b_attr_id = merged_count + 2 - if 'attributes' in node_b and node_b['attributes']: + if not ('attributes' in node_b and node_b['attributes']): + return node_a + + if merged_count == 0: + if 'attributes' in node_a and node_a['attributes']: new_attribute_list = [] - for attribute in node_b['attributes']: + for attribute in node_a['attributes']: new_dict = {} for k, v in attribute.items(): - new_dict[f"{k}.{b_attr_id}"] = v + new_dict[f"{k}.1"] = v new_attribute_list.append(new_dict) - node_a['attributes'] = node_a['attributes'] + new_attribute_list - except Exception as e: - logger.error(f'_merge_node_attributes Exception {e}') + node_a['attributes'] = new_attribute_list + + # Need to DRY this off + b_attr_id = merged_count + 2 + if 'attributes' in node_b and node_b['attributes']: + new_attribute_list = [] + for attribute in node_b['attributes']: + new_dict = {} + for k, v in attribute.items(): + new_dict[f"{k}.{b_attr_id}"] = v + new_attribute_list.append(new_dict) + + node_a['attributes'] = node_a['attributes'] + new_attribute_list return node_a diff --git a/node_normalizer/server.py b/node_normalizer/server.py index ffcec48..1d21f61 100644 --- a/node_normalizer/server.py +++ b/node_normalizer/server.py @@ -117,35 +117,32 @@ async def async_query(async_query: reasoner_pydantic.AsyncQuery): async def async_query_task(async_query: reasoner_pydantic.AsyncQuery): - try: - async_query.message = await normalize_message(app, async_query.message) - session = requests.Session() - retries = Retry( - total=3, - backoff_factor=3, - status_forcelist=[429, 500, 502, 503, 504], - method_whitelist=[ - "HEAD", - "GET", - "PUT", - "DELETE", - "OPTIONS", - "TRACE", - "POST", - ], - ) - session.mount("http://", HTTPAdapter(max_retries=retries)) - session.mount("https://", HTTPAdapter(max_retries=retries)) - logger.info(f"sending callback to: {async_query.callback}") - - post_response = session.post( - url=async_query.callback, - headers={"Content-Type": "application/json", "Accept": "application/json"}, - data=async_query.json(), - ) - logger.info(f"async_query post status code: {post_response.status_code}") - except BaseException as e: - logger.exception(e) + async_query.message = await normalize_message(app, async_query.message) + session = requests.Session() + retries = Retry( + total=3, + backoff_factor=3, + status_forcelist=[429, 500, 502, 503, 504], + method_whitelist=[ + "HEAD", + "GET", + "PUT", + "DELETE", + "OPTIONS", + "TRACE", + "POST", + ], + ) + session.mount("http://", HTTPAdapter(max_retries=retries)) + session.mount("https://", HTTPAdapter(max_retries=retries)) + logger.info(f"sending callback to: {async_query.callback}") + + post_response = session.post( + url=async_query.callback, + headers={"Content-Type": "application/json", "Accept": "application/json"}, + data=async_query.json(), + ) + logger.info(f"async_query post status code: {post_response.status_code}") @app.get(