Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
1270f57
initial plugin for katanagraph
wkyu2kg Jan 19, 2022
431d0f1
initial addition of tests
wkyu2kg Jan 19, 2022
4bb8edc
correct the path name
wkyu2kg Jan 19, 2022
5c32c2e
enable runslow option
wkyu2kg Jan 19, 2022
b9dd26a
fix the format errors reported from black
wkyu2kg Jan 19, 2022
43b0845
initialize katana.local upon plugin registration
wkyu2kg Jan 19, 2022
35d52cf
have to import katana.local inside the function only
wkyu2kg Jan 19, 2022
f9a1543
fix the KeyError on unweighted edge lists
wkyu2kg Jan 20, 2022
6665dff
various fixes on the translators, yet to clean up
wkyu2kg Jan 24, 2022
2572d54
add additional type conversion
wkyu2kg Jan 24, 2022
03c2702
commit various fixes for now. float accuracy not addressed yet
wkyu2kg Jan 24, 2022
dcb4b44
remove duplicated self-edge, add a testcase and various cleanup
wkyu2kg Jan 27, 2022
f3188d8
include data directly as string array
wkyu2kg Jan 27, 2022
312ab32
fix the precision error introduced by accident
wkyu2kg Jan 27, 2022
3b3a392
add one-way node relabeling from nx to katana
wkyu2kg Jan 27, 2022
6c9830a
tweak the order of node addition and weight addition for pagerank
wkyu2kg Jan 28, 2022
1824dec
update the node schema with a new property
wkyu2kg Jan 28, 2022
9b1b1d6
revert the change to metegraph core
wkyu2kg Jan 28, 2022
319878d
remove the pytest mark
wkyu2kg Jan 28, 2022
2c7a258
remove edge1.csv to avoid fixed path and revert changes to test_graph.py
wkyu2kg Jan 29, 2022
01877af
rename file paths for consistency between plugin and internal copy
wkyu2kg Jan 30, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
repos:
- repo: https://github.com/psf/black
rev: stable
rev: 19.10b0
hooks:
- id: black
language_version: python3.7
language_version: python3.8
2 changes: 2 additions & 0 deletions dev-environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,5 @@ dependencies:
- conda-forge::grblas
- conda-forge::python-louvain
- conda-forge::websockets
- katanagraph/label/dev::katana-cpp
- katanagraph/label/dev::katana-python
Comment on lines +35 to +36
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably actually release something before becoming a dependency of an external project. ;-) @wkyu2kg maybe talk to the release people to see if we should tag a release in the open package repo.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is a good point. I also found katanagraph/label/dev is a bit inconsistent from other channels.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@roshandathathri and @tylershunt Arthur suggested to have a release of katana-cpp and katana-python in the conda repo for them to be the dependency of metagraph plugin. How do you think?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No objections but please start a discussion on slack. This may need inputs from many people including Rayees and approval from Chris/Barry.

10 changes: 9 additions & 1 deletion metagraph/plugins/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,13 @@
except ImportError: # pragma: no cover
has_grblas = False

try:
import katanagraph as _

has_katanagraph = True
except ImportError: # pragma: no cover
has_katanagraph = False

try:
import numba as _

Expand All @@ -66,11 +73,12 @@


def find_plugins():
from . import core, graphblas, networkx, numpy, pandas, python, scipy
from . import core, graphblas, katanagraph, networkx, numpy, pandas, python, scipy

# Default Plugins
registry.register_from_modules(core)
registry.register_from_modules(graphblas, name="core_graphblas")
registry.register_from_modules(katanagraph, name="core_katanagraph")
registry.register_from_modules(networkx, name="core_networkx")
registry.register_from_modules(numpy, name="core_numpy")
registry.register_from_modules(pandas, name="core_pandas")
Expand Down
1 change: 1 addition & 0 deletions metagraph/plugins/katanagraph/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from . import algorithms, translators, types
117 changes: 117 additions & 0 deletions metagraph/plugins/katanagraph/algorithms.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
from typing import Tuple

import numpy as np
from metagraph import NodeID, abstract_algorithm, concrete_algorithm
from metagraph.plugins.core.types import Graph, Vector
from metagraph.plugins.networkx.types import NetworkXGraph
from metagraph.plugins.numpy.types import NumpyNodeMap, NumpyVectorType

from katana.local.analytics import bfs, jaccard, local_clustering_coefficient

from .types import KatanaGraph


def has_node_prop(kg, node_prop_name):
nschema = kg.loaded_node_schema()
for i in range(len(nschema)):
if nschema[i].name == node_prop_name:
return True
return False


# breadth-first search,
@concrete_algorithm("traversal.bfs_iter")
def kg_bfs_iter(
graph: KatanaGraph, source_node: NodeID, depth_limit: int
) -> NumpyVectorType:
"""
.. py:function:: metagraph.algos.traversal.bfs_iter(graph, source_node, depth_limit)

Use BFS to traverse a graph given a source node and BFS depth limit (implemented by a Katana Graph API)

:param KatanaGraph graph: The origianl graph to traverse
:param NodeID source_node: The starting node for BFS
:param int depth: The BFS depth
:return: the BFS traversal result in order
:rtype: NumpyVectorType
"""
g = graph.value
edges = [
(src, dest)
for src in g
for dest in [g.get_edge_dest(e) for e in g.edge_ids(src)]
]
edge_weights = g.get_edge_property(graph.edge_weight_prop_name).to_pandas()
bfs_prop_name = "bfs_prop_start_from_" + str(source_node)
depth_limit_internal = (
2 ** 30 - 1 if depth_limit == -1 else depth_limit
) # return all the reachable nodes for the default value of depth_limit (-1)
start_node = source_node
if not has_node_prop(graph.value, bfs_prop_name):
bfs(graph.value, start_node, bfs_prop_name)
bfs_list_1st = graph.value.get_node_property(bfs_prop_name).to_numpy()
pg_bfs_list = (
graph.value.get_node_property(bfs_prop_name).to_pandas().values.tolist()
)
new_list = [
[i, pg_bfs_list[i]]
for i in range(len(pg_bfs_list))
if pg_bfs_list[i] < depth_limit_internal
]
sorted_list = sorted(new_list, key=lambda each: (each[1], each[0]))
bfs_arr = np.array([each[0] for each in sorted_list])
return bfs_arr


# TODO(pengfei):
# single-source shortest path
# connected components
# PageRank
# betweenness centrality
# triangle counting
# Louvain community detection
# subgraph extraction
# community detection using label propagation\


@abstract_algorithm("traversal.jaccard")
def jaccard_similarity(
graph: Graph(
is_directed=False,
edge_type="map",
edge_dtype={"int", "float"},
edge_has_negative_weights=False,
),
compare_node: NodeID,
) -> Vector:
pass


@concrete_algorithm("traversal.jaccard")
def jaccard_similarity_kg(graph: KatanaGraph, compare_node: NodeID) -> NumpyVectorType:
jaccard_prop_name = "jaccard_prop_with_" + str(compare_node)
if not has_node_prop(graph.value, jaccard_prop_name):
jaccard(graph.value, compare_node, jaccard_prop_name)
jaccard_similarities = graph.value.get_node_property(jaccard_prop_name).to_numpy()
return jaccard_similarities


@abstract_algorithm("clustering.local_clustering_coefficient")
def local_clustering(
graph: Graph(
is_directed=False,
edge_type="map",
edge_dtype={"int", "float"},
edge_has_negative_weights=False,
),
prop_name: str = "output",
) -> Vector:
pass


@concrete_algorithm("clustering.local_clustering_coefficient")
def local_clustering_kg(graph: KatanaGraph, prop_name: str) -> NumpyVectorType:
if not has_node_prop(graph.value, prop_name):
local_clustering_coefficient(graph.value, prop_name)
out = graph.value.get_node_property(prop_name)
return out.to_pandas().values
177 changes: 177 additions & 0 deletions metagraph/plugins/katanagraph/translators.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
from collections import OrderedDict

import metagraph as mg
import networkx as nx
import numpy as np
import pyarrow
import katana.local

from metagraph import translator
from metagraph.plugins.networkx.types import NetworkXGraph
from scipy.sparse import csr_matrix

from katana.local.import_data import from_csr

from .types import KatanaGraph


@translator
def networkx_to_katanagraph(x: NetworkXGraph, **props) -> KatanaGraph:
nlist = sorted(list(x.value.nodes(data=True)), key=lambda each: each[0])
ranks = np.arange(0, len(nlist))
nodes = [each[0] for each in nlist]
mapping = dict(zip(nodes, ranks))
# relabel Node IDs without changing the original graph
xval_map = nx.relabel_nodes(x.value, mapping)
aprops = NetworkXGraph.Type.compute_abstract_properties(
x,
{
"node_dtype",
"node_type",
"edge_dtype",
"edge_type",
"edge_has_negative_weights",
"is_directed",
},
)
is_weighted = aprops["edge_type"] == "map"
# get the edge list directly from the NetworkX Graph
elist_raw = list(xval_map.edges(data=True))
# sort the eddge list and node list
if aprops["is_directed"]:
elist = sorted(elist_raw, key=lambda each: (each[0], each[1]))
else:
inv_elist = [
(each[1], each[0], each[2]) for each in elist_raw if each[0] != each[1]
]
elist = sorted(elist_raw + inv_elist, key=lambda each: (each[0], each[1]))
# build the CSR format from the edge list (weight, (src, dst))
row = np.array([each_edge[0] for each_edge in elist])
col = np.array([each_edge[1] for each_edge in elist])
if is_weighted:
data = np.array([each_edge[2]["weight"] for each_edge in elist])
else:
# data = np.array([None for each_edge in elist])
data = np.array([0 for each_edge in elist])
csr = csr_matrix((data, (row, col)), shape=(len(nlist), len(nlist)))
# call the katana api to build a Graph (unweighted) from the CSR format
# noting that the first 0 in csr.indptr is excluded
katana.local.initialize()
pg = from_csr(csr.indptr[1:], csr.indices)
# add the edge weight as a new property
t = pyarrow.table(dict(edge_value_from_translator=data))
pg.add_edge_property(t)
node_list = [nid for nid in nodes]
node_rmap = pyarrow.table(dict(node_id_reverse_map=node_list))
pg.add_node_property(node_rmap)
node_id_map_prop_name = "node_id_reverse_map"

node_attributes = nx.get_node_attributes(x.value, "weight")
node_weight_prop_name = None
if node_attributes:
weights = [node_attributes[node] for node in node_list]
node_weight_prop = pyarrow.table(dict(node_value_from_translator=weights))
node_weight_prop_name = "node_value_from_translator"
pg.add_node_property(node_weight_prop)

# use the metagraph's Graph warpper to wrap the katana.local.Graph
return KatanaGraph(
pg_graph=pg,
is_weighted=is_weighted,
edge_weight_prop_name="edge_value_from_translator",
node_weight_prop_name=node_weight_prop_name,
node_id_map_prop_name=node_id_map_prop_name,
is_directed=aprops["is_directed"],
node_weight_index=0,
node_dtype=aprops["node_dtype"],
edge_dtype=aprops["edge_dtype"],
node_type=aprops["node_type"],
edge_type=aprops["edge_type"],
has_neg_weight=aprops["edge_has_negative_weights"],
)


@translator
def katanagraph_to_networkx(x: KatanaGraph, **props) -> NetworkXGraph:
pg = x.value
node_list = [src for src in pg]
# dest_list = [
# dest for src in pg for dest in [pg.get_edge_dest(e) for e in pg.edge_ids(src)]
# ]
# for src in pg:
# print("src:", src, "id:", pg.edge_ids(src))
# if pg.edge_ids(src) == range(0, 0):
# if src not in dest_list:
# raise ValueError("NetworkX does not support graph with isolated nodes")
edge_dict_count = {
(src, dest): 0
for src in pg
for dest in [pg.get_edge_dest(e) for e in pg.edge_ids(src)]
}
for src in pg:
for dest in [pg.get_edge_dest(e) for e in pg.edge_ids(src)]:
edge_dict_count[(src, dest)] += 1
if edge_dict_count[(src, dest)] > 1:
raise ValueError(
"NetworkX does not support graph with duplicated edges"
)
elist = []
edge_weights = pg.get_edge_property(x.edge_weight_prop_name).to_pandas()
if isinstance(edge_weights[0], np.int64):
elist = [
(nid, pg.get_edge_dest(j), int(edge_weights[j]))
for nid in pg
for j in pg.edge_ids(nid)
]
elif isinstance(edge_weights[0], pyarrow.lib.Int64Scalar):
elist = [
(nid, pg.get_edge_dest(j), edge_weights[j].as_py())
for nid in pg
for j in pg.edge_ids(nid)
]
elif isinstance(edge_weights[0], np.float64):
elist = [
(nid, pg.get_edge_dest(j), float(edge_weights[j]))
for nid in pg
for j in pg.edge_ids(nid)
]
elif isinstance(edge_weights[0], np.bool_):
elist = [
(nid, pg.get_edge_dest(j), bool(edge_weights[j]))
for nid in pg
for j in pg.edge_ids(nid)
]
elist = list(OrderedDict.fromkeys(elist))
if x.is_directed:
graph = nx.DiGraph()
else:
graph = nx.Graph()
# add node list first for the same order as weights
graph.add_weighted_edges_from(elist)
graph.add_nodes_from(node_list)

# remap Node IDs if needed
if x.node_id_map_prop_name:
nodeid_map = pg.get_node_property(x.node_id_map_prop_name).to_pandas()
ranks = np.arange(0, len(nodeid_map))
mapping = dict(zip(ranks, nodeid_map))
graph = nx.relabel_nodes(graph, mapping)

# retrieve node weights and set the graph
if x.node_weight_prop_name:
nodes = graph.nodes()
nlist = []
node_weights = pg.get_node_property(x.node_weight_prop_name).to_pandas()
if isinstance(node_weights[0], np.int64):
nlist = [int(wgt) for wgt in node_weights]
elif isinstance(node_weights[0], pyarrow.lib.Int64Scalar):
nlist = [wgt.as_py() for wgt in node_weights]
elif isinstance(node_weights[0], np.float64):
nlist = [float(wgt) for wgt in node_weights]
elif isinstance(node_weights[0], np.bool_):
nlist = [bool(wgt) for wgt in node_weights]
nx.set_node_attributes(
graph, {node: wgt for node, wgt in zip(nodeid_map, nlist)}, name="weight"
)

return mg.wrappers.Graph.NetworkXGraph(graph)
Loading