Skip to content

added parallel implementations for number_ algorithms #117

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 12 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions _nx_parallel/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,13 +132,41 @@ def get_info():
'get_chunks : str, function (default = "chunks")': "A function that takes in an iterable of all the nodes as input and returns an iterable `node_chunks`. The default chunking is done by slicing the `G.nodes` (or `nodes`) into `n_jobs` number of chunks."
},
},
"number_attracting_components": {
"url": "https://github.com/networkx/nx-parallel/blob/main/nx_parallel/algorithms/components/attracting.py#L9",
"additional_docs": "The parallel computation is implemented by dividing the list of attracting components into chunks and then finding the length of each chunk in parallel and then adding all the lengths at the end.",
"additional_parameters": {
'get_chunks : str, function (default = "chunks")': "A function that takes in a list of attracting components as input and returns an iterable `component_chunks`. The default chunking is done by slicing the list of attracting components into `n_jobs` number of chunks."
},
},
"number_connected_components": {
"url": "https://github.com/networkx/nx-parallel/blob/main/nx_parallel/algorithms/components/connected.py#L9",
"additional_docs": "The parallel computation is implemented by dividing the list of connected components into chunks and then finding the length of each chunk in parallel and then adding all the lengths at the end.",
"additional_parameters": {
'get_chunks : str, function (default = "chunks")': "A function that takes in a list of connected components as input and returns an iterable `component_chunks`. The default chunking is done by slicing the list of connected components into `n_jobs` number of chunks."
},
},
"number_of_isolates": {
"url": "https://github.com/networkx/nx-parallel/blob/main/nx_parallel/algorithms/isolate.py#L9",
"additional_docs": "The parallel computation is implemented by dividing the list of isolated nodes into chunks and then finding the length of each chunk in parallel and then adding all the lengths at the end.",
"additional_parameters": {
'get_chunks : str, function (default = "chunks")': "A function that takes in a list of all the isolated nodes as input and returns an iterable `isolate_chunks`. The default chunking is done by slicing the `isolates` into `n_jobs` number of chunks."
},
},
"number_strongly_connected_components": {
"url": "https://github.com/networkx/nx-parallel/blob/main/nx_parallel/algorithms/components/strongly_connected.py#L9",
"additional_docs": "The parallel computation is implemented by dividing the list of strongly connected components into chunks and then finding the length of each chunk in parallel and then adding all the lengths at the end.",
"additional_parameters": {
'get_chunks : str, function (default = "chunks")': "A function that takes in a list of strongly connected components as input and returns an iterable `component_chunks`. The default chunking is done by slicing the list of strongly connected components into `n_jobs` number of chunks."
},
},
"number_weakly_connected_components": {
"url": "https://github.com/networkx/nx-parallel/blob/main/nx_parallel/algorithms/components/weakly_connected.py#L9",
"additional_docs": "The parallel computation is implemented by dividing the list of weakly connected components into chunks and then finding the length of each chunk in parallel and then adding all the lengths at the end.",
"additional_parameters": {
'get_chunks : str, function (default = "chunks")': "A function that takes in a list of weakly connected components as input and returns an iterable `component_chunks`. The default chunking is done by slicing the list of weakly connected components into `n_jobs` number of chunks."
},
},
"square_clustering": {
"url": "https://github.com/networkx/nx-parallel/blob/main/nx_parallel/algorithms/cluster.py#L11",
"additional_docs": "The nodes are chunked into `node_chunks` and then the square clustering coefficient for all `node_chunks` are computed in parallel over `n_jobs` number of CPU cores.",
Expand Down
1 change: 1 addition & 0 deletions nx_parallel/algorithms/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from .shortest_paths import *
from .approximation import *
from .connectivity import *
from .components import *

# modules
from .efficiency_measures import *
Expand Down
4 changes: 4 additions & 0 deletions nx_parallel/algorithms/components/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from .attracting import *
from .connected import *
from .strongly_connected import *
from .weakly_connected import *
35 changes: 35 additions & 0 deletions nx_parallel/algorithms/components/attracting.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
from networkx import attracting_components
from joblib import Parallel, delayed
import nx_parallel as nxp

__all__ = ["number_attracting_components"]


@nxp._configure_if_nx_active()
def number_attracting_components(G, get_chunks="chunks"):
"""The parallel computation is implemented by dividing the list
of attracting components into chunks and then finding the length
of each chunk in parallel and then adding all the lengths at the end.

networkx.number_attracting_components : https://networkx.org/documentation/stable/reference/algorithms/generated/networkx.algorithms.components.number_attracting_components.html

Parameters
----------
get_chunks : str, function (default = "chunks")
A function that takes in a list of attracting components as input and returns
an iterable `component_chunks`. The default chunking is done by slicing the
list of attracting components into `n_jobs` number of chunks.
"""
if hasattr(G, "graph_object"):
G = G.graph_object

n_jobs = nxp.get_n_jobs()

attracting_comp_list = list(attracting_components(G))
if get_chunks == "chunks":
component_chunks = nxp.chunks(attracting_comp_list, n_jobs)
else:
component_chunks = get_chunks(attracting_comp_list)

results = Parallel()(delayed(len)(chunk) for chunk in component_chunks)
return sum(results)
35 changes: 35 additions & 0 deletions nx_parallel/algorithms/components/connected.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
from networkx import connected_components
from joblib import Parallel, delayed
import nx_parallel as nxp

__all__ = ["number_connected_components"]


@nxp._configure_if_nx_active()
def number_connected_components(G, get_chunks="chunks"):
"""The parallel computation is implemented by dividing the list
of connected components into chunks and then finding the length
of each chunk in parallel and then adding all the lengths at the end.

networkx.number_connected_components : https://networkx.org/documentation/stable/reference/algorithms/generated/networkx.algorithms.components.number_connected_components.html

Parameters
----------
get_chunks : str, function (default = "chunks")
A function that takes in a list of connected components as input and returns
an iterable `component_chunks`. The default chunking is done by slicing the
list of connected components into `n_jobs` number of chunks.
"""
if hasattr(G, "graph_object"):
G = G.graph_object

n_jobs = nxp.get_n_jobs()

connected_comp_list = list(connected_components(G))
if get_chunks == "chunks":
component_chunks = nxp.chunks(connected_comp_list, n_jobs)
else:
component_chunks = get_chunks(connected_comp_list)

results = Parallel()(delayed(len)(chunk) for chunk in component_chunks)
return sum(results)
36 changes: 36 additions & 0 deletions nx_parallel/algorithms/components/strongly_connected.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
from networkx import strongly_connected_components
from joblib import Parallel, delayed
import nx_parallel as nxp

__all__ = ["number_strongly_connected_components"]


@nxp._configure_if_nx_active()
def number_strongly_connected_components(G, get_chunks="chunks"):
"""The parallel computation is implemented by dividing the list
of strongly connected components into chunks and then finding the length
of each chunk in parallel and then adding all the lengths at the end.

networkx.number_strongly_connected_components : https://networkx.org/documentation/stable/reference/algorithms/generated/networkx.algorithms.components.number_strongly_connected_components.html

Parameters
----------
get_chunks : str, function (default = "chunks")
A function that takes in a list of strongly connected components as input
and returns an iterable `component_chunks`. The default chunking is done
by slicing the list of strongly connected components into `n_jobs` number
of chunks.
"""
if hasattr(G, "graph_object"):
G = G.graph_object

n_jobs = nxp.get_n_jobs()

strongly_connected_comp_list = list(strongly_connected_components(G))
if get_chunks == "chunks":
component_chunks = nxp.chunks(strongly_connected_comp_list, n_jobs)
else:
component_chunks = get_chunks(strongly_connected_comp_list)

results = Parallel()(delayed(len)(chunk) for chunk in component_chunks)
return sum(results)
36 changes: 36 additions & 0 deletions nx_parallel/algorithms/components/weakly_connected.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
from networkx import weakly_connected_components
from joblib import Parallel, delayed
import nx_parallel as nxp

__all__ = ["number_weakly_connected_components"]


@nxp._configure_if_nx_active()
def number_weakly_connected_components(G, get_chunks="chunks"):
"""The parallel computation is implemented by dividing the list
of weakly connected components into chunks and then finding the length
of each chunk in parallel and then adding all the lengths at the end.

networkx.number_weakly_connected_components : https://networkx.org/documentation/stable/reference/algorithms/generated/networkx.algorithms.components.number_weakly_connected_components.html

Parameters
----------
get_chunks : str, function (default = "chunks")
A function that takes in a list of weakly connected components as input
and returns an iterable `component_chunks`. The default chunking is done
by slicing the list of weakly connected components into `n_jobs` number
of chunks.
"""
if hasattr(G, "graph_object"):
G = G.graph_object

n_jobs = nxp.get_n_jobs()

weakly_connected_comp_list = list(weakly_connected_components(G))
if get_chunks == "chunks":
component_chunks = nxp.chunks(weakly_connected_comp_list, n_jobs)
else:
component_chunks = get_chunks(weakly_connected_comp_list)

results = Parallel()(delayed(len)(chunk) for chunk in component_chunks)
return sum(results)
8 changes: 8 additions & 0 deletions nx_parallel/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,14 @@
# Centrality
"betweenness_centrality",
"edge_betweenness_centrality",
# Components : attracting
"number_attracting_components",
# Components : connected
"number_connected_components",
# Components : strongly connected
"number_strongly_connected_components",
# Components : weakly connected
"number_weakly_connected_components",
# Efficiency
"local_efficiency",
# Shortest Paths : generic
Expand Down
13 changes: 12 additions & 1 deletion nx_parallel/tests/test_get_chunks.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ def test_get_functions_with_get_chunks():
"local_efficiency",
"node_redundancy",
"number_of_isolates",
"number_connected_components",
"number_attracting_components",
"number_weakly_connected_components",
"number_strongly_connected_components",
"square_clustering",
"tournament_is_strongly_connected",
}
Expand Down Expand Up @@ -79,6 +83,11 @@ def random_chunking(nodes):
"betweenness_centrality",
"edge_betweenness_centrality",
]
not_implemented_undirected = [
"number_attracting_components",
"number_weakly_connected_components",
"number_strongly_connected_components",
]

if func in tournament_funcs:
G = nx.tournament.random_tournament(15, seed=42)
Expand All @@ -87,7 +96,9 @@ def random_chunking(nodes):
c2 = getattr(nxp, func)(H, get_chunks=random_chunking)
assert c1 == c2
else:
G = nx.fast_gnp_random_graph(40, 0.6, seed=42)
G = nx.fast_gnp_random_graph(
40, 0.6, seed=42, directed=func in not_implemented_undirected
)
H = nxp.ParallelGraph(G)
c1 = getattr(nxp, func)(H)
c2 = getattr(nxp, func)(H, get_chunks=random_chunking)
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ packages = [
'nx_parallel.algorithms.approximation',
'nx_parallel.algorithms.bipartite',
'nx_parallel.algorithms.centrality',
'nx_parallel.algorithms.components',
'nx_parallel.algorithms.connectivity',
'nx_parallel.algorithms.shortest_paths',
'nx_parallel.utils',
Expand Down
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading