diff --git a/src/pathpyG/core/temporal_graph.py b/src/pathpyG/core/temporal_graph.py index a019f12b..90182444 100644 --- a/src/pathpyG/core/temporal_graph.py +++ b/src/pathpyG/core/temporal_graph.py @@ -31,24 +31,20 @@ def __init__(self, data: Data, mapping: IndexMap | None = None) -> None: print(t) ``` """ - if not isinstance(data.edge_index, EdgeIndex): - data.edge_index = data.edge_index = EdgeIndex( - data=data.edge_index.contiguous(), sparse_size=(data.num_nodes, data.num_nodes) + self.data = data + if not isinstance(self.data.edge_index, EdgeIndex): + self.data.edge_index = EdgeIndex( + data=self.data.edge_index.contiguous(), sparse_size=(self.data.num_nodes, self.data.num_nodes) ) # reorder temporal data - # TODO: Fix in PyG - if data.num_nodes != data.num_edges: - self.data = data.sort_by_time() - else: - sorted_idx = torch.argsort(data.time) - data.time = data.time[sorted_idx] - for edge_attr in data.edge_attrs(): - if edge_attr == "edge_index": - data.edge_index = data.edge_index[:, sorted_idx] - else: - data[edge_attr] = data[edge_attr][sorted_idx] - self.data = data + # Note that we do not use `torch_geometric.self.data.Data.sort_by_time` because it cannot sort numpy arrays` + sorted_idx = torch.argsort(self.data.time) + for edge_attr in set(self.data.edge_attrs()).union(set(["time"])): + if edge_attr == "edge_index": + self.data.edge_index = self.data.edge_index[:, sorted_idx] + else: + data[edge_attr] = data[edge_attr][sorted_idx] if mapping is not None: self.mapping = mapping @@ -68,6 +64,12 @@ def from_edge_list(edge_list, num_nodes: Optional[int] = None) -> TemporalGraph: edge_array = np.array(edge_list) ts = edge_array[:, 2].astype(np.number) + # Convert timestamps to tensor + if np.issubdtype(ts.dtype, np.integer): + ts = torch.tensor(ts, dtype=torch.long) + else: + ts = torch.tensor(ts, dtype=torch.float32) + index_map = IndexMap(np.unique(edge_array[:, :2])) edge_index = index_map.to_idxs(edge_array[:, :2].T) @@ -77,7 +79,7 @@ def from_edge_list(edge_list, num_nodes: Optional[int] = None) -> TemporalGraph: return TemporalGraph( data=Data( edge_index=edge_index, - time=torch.Tensor(ts), + time=ts, num_nodes=num_nodes, ), mapping=index_map, diff --git a/src/pathpyG/io/netzschleuder.py b/src/pathpyG/io/netzschleuder.py index 2597479b..d4d911e8 100644 --- a/src/pathpyG/io/netzschleuder.py +++ b/src/pathpyG/io/netzschleuder.py @@ -174,9 +174,7 @@ def read_netzschleuder_graph( if timestamps: g = df_to_temporal_graph(df=edges, is_undirected=not is_directed, num_nodes=num_nodes) else: - g = df_to_graph(df=edges, multiedges=True, num_nodes=num_nodes) - if not is_directed: - g = g.to_undirected() + g = df_to_graph(df=edges, multiedges=multiedges, is_undirected=not is_directed, num_nodes=num_nodes) node_attrs = pd.read_csv( f"{temp_dir}/nodes.csv", header=0, sep=",", skip_blank_lines=True, skipinitialspace=True diff --git a/src/pathpyG/io/pandas.py b/src/pathpyG/io/pandas.py index d4b1a789..03d357bf 100644 --- a/src/pathpyG/io/pandas.py +++ b/src/pathpyG/io/pandas.py @@ -1,18 +1,32 @@ from __future__ import annotations -from typing import TYPE_CHECKING, Any, List, Optional, Tuple, Union +from typing import Any, Optional, Union -from collections import Counter +import ast +import re +import warnings import pandas as pd import torch import numpy as np - -import datetime -from time import mktime +from torch_geometric.data import Data from pathpyG.core.graph import Graph from pathpyG.core.temporal_graph import TemporalGraph -from pathpyG.utils.config import config +from pathpyG.core.index_map import IndexMap + +# Regex to check if the attribute is iterable (e.g., list, dict, etc.) +_iterable_re = re.compile(r"^\s*[\[\{\(].*[\]\}\)]\s*$") +_number_re = re.compile( + r"""^\s* # optional leading whitespace + [+-]? # optional sign + ( # start group + (\d+\.\d*)|(\.\d+)|(\d+) # float or int + ) + ([eE][+-]?\d+)? # optional exponent + \s*$ # optional trailing whitespace +""" +) +_integer_re = re.compile(r"^\s*[+-]?\d+\s*$") def _check_column_name(frame: pd.DataFrame, name: str, synonyms: list) -> pd.DataFrame: @@ -25,7 +39,34 @@ def _check_column_name(frame: pd.DataFrame, name: str, synonyms: list) -> pd.Dat return frame -def df_to_graph(df: pd.DataFrame, is_undirected: bool = False, multiedges: bool = False, **kwargs: Any) -> Graph: +def _parse_df_column(df: pd.DataFrame, data: Data, attr: str, idx: list | None = None, prefix: str = "") -> None: + """Helper function to parse a column in a DataFrame and add it as an attribute to the graph.""" + if idx is None: + idx = np.arange(len(df)) + + if df[attr].dtype == "object": + if _iterable_re.match(str(df[attr].values[0])): + data[prefix + attr] = torch.tensor( + [ast.literal_eval(x) for x in df[attr].values[idx]], device=data.edge_index.device + ) + elif _number_re.match(str(df[attr].values[0])): + # if the attribute is a number, convert it to a tensor + if _integer_re.match(str(df[attr].values[0])): + data[prefix + attr] = torch.tensor( + df[attr].values.astype(int)[idx], device=data.edge_index.device + ) + else: + data[prefix + attr] = torch.tensor( + df[attr].values.astype(float)[idx], device=data.edge_index.device + ) + else: + # if the attribute is not iterable, convert it to a string + data[prefix + attr] = np.array(df[attr].values.astype(str)[idx]) + else: + data[prefix + attr] = torch.tensor(df[attr].values[idx], device=data.edge_index.device) + + +def df_to_graph(df: pd.DataFrame, is_undirected: bool = False, multiedges: bool = False, num_nodes: int | None = None) -> Graph: """Reads a network from a pandas data frame. The data frame is expected to have a minimum of two columns @@ -65,7 +106,6 @@ def df_to_graph(df: pd.DataFrame, is_undirected: bool = False, multiedges: bool print(n) ``` """ - # assign column names if no header is present no_header = all(isinstance(x, int) for x in df.columns.values.tolist()) @@ -77,47 +117,46 @@ def df_to_graph(df: pd.DataFrame, is_undirected: bool = False, multiedges: bool col_names += ["edge_attr_{0}".format(i - 2)] df.columns = col_names - df["v"] = df["v"].astype(str) - df["w"] = df["w"].astype(str) + edge_df = df[["v", "w"]].drop_duplicates() + if not multiedges and (len(edge_df) != len(df)): + print("Data frame contains multiple edges, but multiedges is set to False. Removing duplicates.") + df = df.drop_duplicates(subset=["v", "w"]) - edges: list = [] - edge_set: set = set() - - # counter for multiple edges - counter: Counter = Counter() - - for row in df.to_dict(orient="records"): - _v, _w = row.pop("v"), row.pop("w") - - # check if edge was already generated - if (_v, _w) in edge_set and not multiedges: - counter[(_v, _w)] += 1 + mapping = IndexMap(node_ids=np.unique(df[["v", "w"]].values)) + data = Data( + edge_index=mapping.to_idxs(df[["v", "w"]].values.T), + num_nodes=num_nodes if num_nodes is not None else mapping.node_ids.shape[0], + ) + cols = df.columns.tolist() + cols.remove("v") + cols.remove("w") + for col in cols: + if col.startswith("edge_"): + prefix = "" else: - # add edge - edges.append((_v, _w)) - edge_set.add((_v, _w)) - - # check for multi-edges - if len(counter) > 0: - print( - "%i edges existed already " - "and were not considered. " - "To capture those edges, consider creating " - "a multiedge and/or directed network.", - sum(counter.values()), - ) + prefix = "edge_" - # create graph - g = Graph.from_edge_list(edges, is_undirected=is_undirected, **kwargs) - - # assign edge attributes - add_edge_attributes(df, g) + _parse_df_column( + df=df, + data=data, + attr=col, + prefix=prefix + ) + g = Graph(data=data, mapping=mapping) + if is_undirected: + g = g.to_undirected() return g def add_node_attributes(df: pd.DataFrame, g: Graph): - """Add node attributes from pandas data frame to existing graph, where node - IDs or indices are given in column `v` and node attributes x are given in columns `node_x` + """Add node attributes from pandas data frame to existing `Graph`. + + Add node attributes from pandas data frame to existing graph, where node + IDs or indices are given in column `v` and node attributes x are given in columns `node_x`. + + Args: + df: A DataFrame with rows containing nodes and optional node attributes. + g: The graph to which the node attributes should be added. """ if "v" in df: print("Mapping node attributes based on node names in column `v`") @@ -141,7 +180,7 @@ def add_node_attributes(df: pd.DataFrame, g: Graph): return # get indices of nodes in tensor - node_idx = [g.mapping.to_idx(x) for x in df["v"]] + node_idx = g.mapping.to_idxs(attributed_nodes) else: if set(attributed_nodes) != set([i for i in range(g.n)]): print("Mismatch between nodes in DataFrame and nodes in graph") @@ -158,96 +197,91 @@ def add_node_attributes(df: pd.DataFrame, g: Graph): continue # prefix attribute names that are not already prefixed - prefix = "" - if not attr.startswith("node_"): + if attr.startswith("node_"): + prefix = "" + else: prefix = "node_" - # eval values for array-valued attributes - try: - values = np.array([eval(x) for x in df[attr].values]) - g.data[prefix + attr] = torch.from_numpy(values[node_idx]).to(device=g.data.edge_index.device) - continue - except: - pass + _parse_df_column( + df=df, + data=g.data, + idx=node_idx, + attr=attr, + prefix=prefix, + ) - # try to directly construct tensor for scalar values - try: - g.data[prefix + attr] = torch.from_numpy(df[attr].values[node_idx]).to(device=g.data.edge_index.device) - continue - except: - pass - # numpy array of strings - try: - g.data[prefix + attr] = np.array(df[attr].values.astype(str)[node_idx]) - except: - t = df[attr].dtype - print(f"Could not assign node attribute {attr} of type {t}") +def add_edge_attributes(df: pd.DataFrame, g: Graph, time_attr: str | None = None) -> None: + """Add (temporal) edge attributes from pandas data frame to existing `Graph`. + Add edge attributes from `pandas.DataFrame` to existing `Graph`, where source/target node + IDs are given in columns `v` and `w` and edge attributes x are given in columns `edge_x`. + If `time_attr` is not None, the dataframe is expected to contain temporal data with a timestamp + in a column named as specified in `time_attr`. -def add_edge_attributes(df: pd.DataFrame, g: Graph) -> None: - """Add edge attributes from pandas data frame to existing graph, where source/target node - IDs are given in columns `v` and `w` and edge attributes x are given in columns `edge_x` + Args: + df: A DataFrame with rows containing edges and optional edge attributes. + g: The graph to which the edge attributes should be added. + time_attr: If not None, the name of the column containing time stamps for temporal edges. """ - - if "v" not in df or "w" not in df: - print("data frame must have columns `v` and `w`") - return - - attributed_edges = list(zip(df["v"], df["w"])) - - # check for duplicated edge attributes - if len(set(attributed_edges)) < len(attributed_edges): - print("data frame contains multiple attribute values for single edge") - return - - # check for difference between edges in graph and edges in attributes - if set(attributed_edges) != set([(v, w) for v, w in g.edges]): - print("Mismatch between edges in DataFrame and edges in graph") - return + assert "v" in df and "w" in df, "Data frame must have columns `v` and `w` for source and target nodes" # extract indices of source/target node of edges - src = [g.mapping.to_idx(x) for x in df["v"]] - tgt = [g.mapping.to_idx(x) for x in df["w"]] - - # find indices of edges in edge_index - edge_idx = [] - for i in range(len(src)): - x = torch.where((g.data.edge_index[0, :] == src[i]) & (g.data.edge_index[1, :] == tgt[i]))[0].item() - edge_idx.append(x) - for attr in df.columns: - if attr != "v" and attr != "w": + src = g.mapping.to_idxs(df["v"].tolist()) + tgt = g.mapping.to_idxs(df["w"].tolist()) + + edge_attrs = list(df.columns) + edge_attrs.remove("v") + edge_attrs.remove("w") + + if time_attr is not None: + assert time_attr in df, f"Data frame must have column `{time_attr}` for time stamps" + + time = df[time_attr].values + edge_attrs.remove(time_attr) + + # find indices of edges in edge_index + edge_idx = [] + for src_i, tgt_i, time_i in zip(src, tgt, time): + matching_idx = torch.where( + (g.data.edge_index[0, :] == src_i) & (g.data.edge_index[1, :] == tgt_i) & (g.data.time == time_i) + )[0] + if matching_idx.numel() == 1: + edge_idx.append(matching_idx.item()) + else: + # if the edge is not unique, raise a warning + if matching_idx.numel() > 1: + # if there are multiple edges, take the first one + edge_idx.append(matching_idx[0].item()) + warnings.warn(f"Edge ({src_i}, {tgt_i}) exists {matching_idx.numel()} times in the graph", stacklevel=2) + else: + # find indices of edges in edge_index + edge_idx = [] + for src_i, tgt_i in zip(src, tgt): + matching_idx = torch.where((g.data.edge_index[0, :] == src_i) & (g.data.edge_index[1, :] == tgt_i))[0] + assert ( + matching_idx.numel() == 1 + ), f"Edge ({src_i}, {tgt_i}) either does not exist or is duplicated in the graph" + edge_idx.append(matching_idx.item()) + + for attr in edge_attrs: + if attr.startswith("edge_"): prefix = "" - if not attr.startswith("edge_"): - prefix = "edge_" - - # eval values for array-valued attributes - try: - values = np.array([eval(x) for x in df[attr].values]) - g.data[prefix + attr] = torch.from_numpy(values[edge_idx]).to(device=g.data.edge_index.device) - continue - except: - pass - - # try to directly construct tensor for scalar values - try: - g.data[prefix + attr] = torch.from_numpy(df[attr].values[edge_idx]).to(device=g.data.edge_index.device) - continue - except: - pass - - # numpy array of strings - try: - g.data[prefix + attr] = np.array(df[attr].values.astype(str)[edge_idx]) - except: - t = df[attr].dtype - print(f"Could not assign edge attribute {attr} of type {t}") - - # g.data[prefix+attr] = df[attr].values[edge_idx] + else: + prefix = "edge_" + + # parse column and add to graph + _parse_df_column( + df=df, + data=g.data, + idx=edge_idx, + attr=attr, + prefix=prefix, + ) def df_to_temporal_graph( - df: pd.DataFrame, is_undirected: bool = False, timestamp_format="%Y-%m-%d %H:%M:%S", time_rescale=1, **kwargs: Any + df: pd.DataFrame, is_undirected: bool = False, timestamp_format="%Y-%m-%d %H:%M:%S", time_rescale=1, num_nodes: int | None = None ) -> TemporalGraph: """Reads a temporal graph from a pandas data frame. @@ -301,24 +335,50 @@ def df_to_temporal_graph( col_names += ["edge_attr_{0}".format(i - 2)] df.columns = col_names - tedges = [] - for row in df.to_dict(orient="records"): - _v, _w, _t = str(row.pop("v")), str(row.pop("w")), str(row.pop("t")) - try: - t = float(_t) - except: - # if time stamp is a string, use timestamp_format to convert - # it to UNIX timestamp - x = datetime.datetime.strptime(_t, timestamp_format) - t = int(mktime(x.timetuple())) - tedges.append((_v, _w, int(t / time_rescale))) + # optionally parse time stamps + if df["t"].dtype == "object": + # convert time stamps to seconds since epoch + df["t"] = pd.to_datetime(df["t"], format=timestamp_format) + # rescale time stamps + df["t"] = df["t"].astype("int64") // time_rescale + elif df["t"].dtype == "int64" or df["t"].dtype == "float64": + # rescale time stamps + df["t"] = df["t"] // time_rescale + elif pd.api.types.is_datetime64_any_dtype(df["t"]): + df["t"] = df["t"].astype("int64") // time_rescale + else: + raise ValueError( + "Column `t` must be of type `object`, `int64`, `float64`, or a datetime type. " + f"Found {df['t'].dtype} instead." + ) - g = TemporalGraph.from_edge_list(tedges, **kwargs) + mapping = IndexMap(node_ids=np.unique(df[["v", "w"]].values)) + data = Data( + edge_index=mapping.to_idxs(df[["v", "w"]].values.T), + time=torch.tensor(df["t"].values), + num_nodes=num_nodes if num_nodes is not None else mapping.node_ids.shape[0], + ) + cols = df.columns.tolist() + cols.remove("v") + cols.remove("w") + for col in cols: + if col.startswith("edge_"): + prefix = "" + else: + prefix = "edge_" + _parse_df_column( + df=df, + data=data, + attr=col, + prefix=prefix + ) + g = TemporalGraph(data=data, mapping=mapping) + if is_undirected: - return g.to_undirected() - else: - return g + g = g.to_undirected() + + return g def graph_to_df(graph: Graph, node_indices: Optional[bool] = False) -> pd.DataFrame: diff --git a/tests/io/test_netzschleuder.py b/tests/io/test_netzschleuder.py index 0c5d1eb8..ea080158 100644 --- a/tests/io/test_netzschleuder.py +++ b/tests/io/test_netzschleuder.py @@ -2,7 +2,7 @@ import pytest -from torch import tensor, equal +import torch from pathpyG import Graph, TemporalGraph from pathpyG.io import list_netzschleuder_records, read_netzschleuder_graph, read_netzschleuder_record @@ -32,11 +32,12 @@ def test_node_attrs(): def test_edge_attrs(): """Test the extraction of edge attributes""" - g = read_netzschleuder_graph("ambassador", "1985_1989") + g = read_netzschleuder_graph("ambassador", "1985_1989", multiedges=True) assert "edge_weight" in g.edge_attrs() - assert equal( + print(g.data.edge_weight) + assert torch.equal( g.data.edge_weight, - tensor( + torch.tensor( [ 1, 1, @@ -51,15 +52,15 @@ def test_edge_attrs(): 1, 1, 3, - 3, 1, 1, 1, + 3, 1, 3, 3, 1, - 3, + 1, 3, 2, 1, @@ -70,7 +71,7 @@ def test_edge_attrs(): 1, 1, 1, - 1, + 3, 1, 3, 3, @@ -103,6 +104,15 @@ def test_read_netzschleuder_record(): record = read_netzschleuder_record(record_name, url) +def test_read_netzschleuder_graph(): + """Test the read_netzschleuder_graph() function for timestamped data.""" + + g = read_netzschleuder_graph(name="email_company") + assert isinstance(g, Graph) + assert g.n == 167 + assert g.m == 5784 + + def test_read_netzschleuder_graph_temporal(): """Test the read_netzschleuder_graph() function for timestamped data.""" @@ -110,5 +120,6 @@ def test_read_netzschleuder_graph_temporal(): assert isinstance(g, TemporalGraph) assert g.n == 167 assert g.m == 82927 - assert g.start_time == 1262454016.0 - assert g.end_time == 1285884544.0 + assert g.start_time == 1262454010 + assert g.end_time == 1285884492 + assert "edge_weight" in g.edge_attrs()