Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
194 changes: 110 additions & 84 deletions python/cugraph/cugraph/gnn/graph_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,9 @@ def add_node_data(
self,
df,
node_col_name,
feat_name=None,
ntype=None,
is_single_vector_feature=True,
feat_name=None,
contains_vector_features=False,
):
Comment on lines +58 to 60
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed to this to keep the API cleaner and intuitive as you would want to provide feat_name together with contains_vector_features

"""
Add a dataframe describing node properties to the PropertyGraph.
Expand All @@ -68,54 +68,41 @@ def add_node_data(
interface.
node_col_name : string
The column name that contains the values to be used as vertex IDs.
feat_name : string
The feature name under which we should save the added properties
(ignored if is_single_vector_feature=False and the col names of
the dataframe are treated as corresponding feature names)
ntype : string
The node type to be added.
For example, if dataframe contains data about users, ntype
might be "users".
If not specified, the type of properties will be added as
an empty string.
is_single_vector_feature : True
Whether to treat all the columns of the dataframe being added as
a single 2d feature
feat_name : {} or string
A map of feature names under which we should save the added
properties like {"feat_1":[f1, f2], "feat_2":[f3, f4]}
(ignored if contains_vector_features=False and the col names of
the dataframe are treated as corresponding feature names)
contains_vector_features : False
Whether to treat the columns of the dataframe being added as
as 2d features
Returns
-------
None
"""
self.gdata.add_vertex_data(df, vertex_col_name=node_col_name, type_name=ntype)
columns = [col for col in list(df.columns) if col != node_col_name]

if is_single_vector_feature:
if feat_name is None:
raise ValueError(
"feature name must be provided when wrapping"
+ " multiple columns under a single feature name"
)

elif feat_name:
raise ValueError(
"feat_name is only valid when wrapping"
+ " multiple columns under a single feature name"
)

if is_single_vector_feature:
self.ndata_feat_col_d[feat_name] = columns
else:
for col in columns:
self.ndata_feat_col_d[col] = [col]
_update_feature_map(
self.ndata_feat_col_d, feat_name, contains_vector_features, columns
)
# Clear properties if set as data has changed

self.__clear_cached_properties()

def add_edge_data(
self,
df,
node_col_names,
canonical_etype=None,
feat_name=None,
etype=None,
is_single_vector_feature=True,
contains_vector_features=False,
):
"""
Add a dataframe describing edge properties to the PropertyGraph.
Expand All @@ -128,47 +115,34 @@ def add_edge_data(
node_col_names : string
The column names that contain the values to be used as the source
and destination vertex IDs for the edges.
feat_name : string
The feature name under which we should save the added properties
(ignored if is_single_vector_feature=False and the col names of
the dataframe are treated as corresponding feature names)
etype : string
canonical_etype : string
The edge type to be added. This should follow the string format
'(src_type),(edge_type),(dst_type)'
If not specified, the type of properties will be added as
an empty string.
is_single_vector_feature : True
Wether to treat all the columns of the dataframe being
added as a single 2d feature
feat_name : string or dict {}
The feature name under which we should save the added properties
(ignored if contains_vector_features=False and the col names of
the dataframe are treated as corresponding feature names)
contains_vector_features : False
Whether to treat the columns of the dataframe being added as
as 2d features
Returns
-------
None
"""
self.gdata.add_edge_data(df, vertex_col_names=node_col_names, type_name=etype)
self.gdata.add_edge_data(
df, vertex_col_names=node_col_names, type_name=canonical_etype
)
columns = [col for col in list(df.columns) if col not in node_col_names]
if is_single_vector_feature:
if feat_name is None:
raise ValueError(
"feature name must be provided when wrapping"
+ " multiple columns under a single feature name"
)

elif feat_name:
raise ValueError(
"feat_name is only valid when wrapping"
+ " multiple columns under a single feature name"
)

if is_single_vector_feature:
self.edata_feat_col_d[feat_name] = columns
else:
for col in columns:
self.edata_feat_col_d[col] = [col]
_update_feature_map(
self.edata_feat_col_d, feat_name, contains_vector_features, columns
)

# Clear properties if set as data has changed
self.__clear_cached_properties()

def get_node_storage(self, feat_name, ntype=None):
def get_node_storage(self, key, ntype=None, indices_offset=0):
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed to key to match DGL.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if ntype is None:
ntypes = self.ntypes
if len(self.ntypes) > 1:
Expand All @@ -179,22 +153,23 @@ def get_node_storage(self, feat_name, ntype=None):
)
)
ntype = ntypes[0]
if feat_name not in self.ndata_feat_col_d:
if key not in self.ndata_feat_col_d:
raise ValueError(
f"feat_name {feat_name} not found in CuGraphStore" " node features",
f"key {key} not found in CuGraphStore node features",
f" {list(self.ndata_feat_col_d.keys())}",
)

columns = self.ndata_feat_col_d[feat_name]
columns = self.ndata_feat_col_d[key]

return CuFeatureStorage(
pg=self.gdata,
columns=columns,
storage_type="node",
indices_offset=indices_offset,
backend_lib=self.backend_lib,
)

def get_edge_storage(self, feat_name, etype=None):
def get_edge_storage(self, key, etype=None, indices_offset=0):
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed to key to match DGL.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if etype is None:
etypes = self.etypes
if len(self.etypes) > 1:
Expand All @@ -206,18 +181,19 @@ def get_edge_storage(self, feat_name, etype=None):
)

etype = etypes[0]
if feat_name not in self.edata_feat_col_d:
if key not in self.edata_feat_col_d:
raise ValueError(
f"feat_name {feat_name} not found in CuGraphStore" " edge features",
f"key {key} not found in CuGraphStore" " edge features",
f" {list(self.edata_feat_col_d.keys())}",
)
columns = self.edata_feat_col_d[feat_name]
columns = self.edata_feat_col_d[key]

return CuFeatureStorage(
pg=self.gdata,
columns=columns,
storage_type="edge",
backend_lib=self.backend_lib,
indices_offset=indices_offset,
)

def num_nodes(self, ntype=None):
Expand Down Expand Up @@ -500,25 +476,27 @@ def node_subgraph(
return _subg

def __clear_cached_properties(self):
if hasattr(self, "has_multiple_etypes"):
# Check for cached properties using self.__dict__ because calling
# hasattr() accesses the attribute and forces computation
if "has_multiple_etypes" in self.__dict__:
del self.has_multiple_etypes

if hasattr(self, "num_nodes_dict"):
if "num_nodes_dict" in self.__dict__:
del self.num_nodes_dict

if hasattr(self, "num_edges_dict"):
if "num_edges_dict" in self.__dict__:
del self.num_edges_dict

if hasattr(self, "extracted_subgraph"):
if "extracted_subgraph" in self.__dict__:
del self.extracted_subgraph

if hasattr(self, "extracted_reverse_subgraph"):
if "extracted_reverse_subgraph" in self.__dict__:
del self.extracted_reverse_subgraph

if hasattr(self, "extracted_subgraphs_per_type"):
if "extracted_subgraphs_per_type" in self.__dict__:
del self.extracted_subgraphs_per_type

if hasattr(self, "extracted_reverse_subgraphs_per_type"):
if "extracted_reverse_subgraphs_per_type" in self.__dict__:
del self.extracted_reverse_subgraphs_per_type


Expand All @@ -529,7 +507,9 @@ class CuFeatureStorage:
is fine. DGL simply uses duck-typing to implement its sampling pipeline.
"""

def __init__(self, pg, columns, storage_type, backend_lib="torch"):
def __init__(
self, pg, columns, storage_type, backend_lib="torch", indices_offset=0
):
self.pg = pg
self.columns = columns
if backend_lib == "torch":
Expand All @@ -548,6 +528,7 @@ def __init__(self, pg, columns, storage_type, backend_lib="torch"):
self.storage_type = storage_type

self.from_dlpack = from_dlpack
self.indices_offset = indices_offset
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Setting offset makes upstream code cleaner


def fetch(self, indices, device=None, pin_memory=False, **kwargs):
"""Fetch the features of the given node/edge IDs to the
Expand Down Expand Up @@ -575,6 +556,9 @@ def fetch(self, indices, device=None, pin_memory=False, **kwargs):
indices = indices.get()
else:
indices = cudf.Series(indices)

indices = indices + self.indices_offset

if self.storage_type == "node":
subset_df = self.pg.get_vertex_data(
vertex_ids=indices, columns=self.columns
Expand All @@ -586,20 +570,17 @@ def fetch(self, indices, device=None, pin_memory=False, **kwargs):

if isinstance(subset_df, dask_cudf.DataFrame):
subset_df = subset_df.compute()

if len(subset_df) == 0:
raise ValueError(f"{indices=} not found in FeatureStorage")
else:
tensor = self.from_dlpack(subset_df.to_dlpack())

if isinstance(tensor, cp.ndarray):
# can not transfer to
# a different device for cupy
return tensor
else:
if device:
cap = subset_df.to_dlpack()
tensor = self.from_dlpack(cap)
del cap
if device:
if not isinstance(tensor, cp.ndarray):
# Cant transfer to different device for cupy
tensor = tensor.to(device)
else:
return tensor
return tensor
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed an error here when a user provides device, cant test this as that needs Pytorch installed here.



def return_dlpack_d(d):
Expand Down Expand Up @@ -632,7 +613,7 @@ def sample_single_sg(
fanout_vals=[fanout],
with_replacement=with_replacement,
# FIXME: is_edge_ids=True does not seem to do anything
# issue https://github.com/rapidsai/cugraph/issues/2562
# issue: https://github.com/rapidsai/cugraph/issues/2562
)
return sampled_df

Expand Down Expand Up @@ -721,3 +702,48 @@ def get_subgraph_from_edgelist(edge_list, is_mg, reverse_edges=False):
)

return subgraph


def _update_feature_map(
pg_feature_map, feat_name_obj, contains_vector_features, columns
):
if contains_vector_features:
if feat_name_obj is None:
raise ValueError(
"feature name must be provided when wrapping"
+ " multiple columns under a single feature name"
+ " or a feature map"
)

if isinstance(feat_name_obj, str):
pg_feature_map[feat_name_obj] = columns

elif isinstance(feat_name_obj, dict):
covered_columns = []
for col in feat_name_obj.keys():
current_cols = feat_name_obj[col]
# Handle strings too
if isinstance(current_cols, str):
current_cols = [current_cols]
covered_columns = covered_columns + current_cols

if set(covered_columns) != set(columns):
raise ValueError(
f"All the columns {columns} not covered in {covered_columns} "
f"Please check the feature_map {feat_name_obj} provided"
)

for key, cols in feat_name_obj.items():
if isinstance(cols, str):
cols = [cols]
pg_feature_map[key] = cols
else:
raise ValueError(f"{feat_name_obj} should be str or dict")
else:
if feat_name_obj:
raise ValueError(
f"feat_name {feat_name_obj} is only valid when "
"wrapping multiple columns under feature names"
)
for col in columns:
pg_feature_map[col] = [col]
Loading