Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 53 additions & 19 deletions python/cugraph-pyg/cugraph_pyg/data/cugraph_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ def __init__(

self.__infer_offsets(num_nodes_dict, num_edges_dict)
self.__infer_existing_tensors(F)
self.__infer_edge_types(num_edges_dict)
self.__infer_edge_types(num_nodes_dict, num_edges_dict)

self._edge_attr_cls = CuGraphEdgeAttr

Expand Down Expand Up @@ -462,6 +462,9 @@ def _is_delayed(self):
return False
return self.__graph.is_multi_gpu()

def _numeric_vertex_type_from_name(self, vertex_type_name: str) -> int:
return np.searchsorted(self.__vertex_type_offsets["type"], vertex_type_name)

def get_vertex_index(self, vtypes) -> TensorType:
if isinstance(vtypes, str):
vtypes = [vtypes]
Expand Down Expand Up @@ -559,12 +562,12 @@ def _get_edge_index(self, attr: CuGraphEdgeAttr) -> Tuple[TensorType, TensorType
src_type, _, dst_type = attr.edge_type
src_offset = int(
self.__vertex_type_offsets["start"][
np.searchsorted(self.__vertex_type_offsets["type"], src_type)
self._numeric_vertex_type_from_name(src_type)
]
)
dst_offset = int(
self.__vertex_type_offsets["start"][
np.searchsorted(self.__vertex_type_offsets["type"], dst_type)
self._numeric_vertex_type_from_name(dst_type)
]
)
coli = np.searchsorted(
Expand Down Expand Up @@ -693,6 +696,29 @@ def _get_vertex_groups_from_sample(

return noi_index

def _get_sample_from_vertex_groups(
self, vertex_groups: Dict[str, TensorType]
) -> TensorType:
"""
Inverse of _get_vertex_groups_from_sample() (although with de-offsetted ids).
Given a dictionary of node types and de-offsetted node ids, return
the global (non-renumbered) vertex ids.

Example Input: {'horse': [1, 3, 5], 'duck': [1, 2]}
Output: [1, 3, 5, 14, 15]
"""
t = torch.tensor([], dtype=torch.int64, device="cuda")

for group_name, ix in vertex_groups.items():
type_id = self._numeric_vertex_type_from_name(group_name)
if not ix.is_cuda:
ix = ix.cuda()
offset = self.__vertex_type_offsets["start"][type_id]
u = ix + offset
t = torch.concatenate([t, u])

return t

def _get_renumbered_edge_groups_from_sample(
self, sampling_results: cudf.DataFrame, noi_index: dict
) -> Tuple[dict, dict]:
Expand Down Expand Up @@ -823,16 +849,21 @@ def create_named_tensor(
)
)

def __infer_edge_types(self, num_edges_dict) -> None:
def __infer_edge_types(
self,
num_nodes_dict: Dict[str, int],
num_edges_dict: Dict[Tuple[str, str, str], int],
) -> None:
self.__edge_types_to_attrs = {}

for pyg_can_edge_type in sorted(num_edges_dict.keys()):
sz = num_edges_dict[pyg_can_edge_type]
sz_src = num_nodes_dict[pyg_can_edge_type[0]]
sz_dst = num_nodes_dict[pyg_can_edge_type[-1]]
self.__edge_types_to_attrs[pyg_can_edge_type] = CuGraphEdgeAttr(
edge_type=pyg_can_edge_type,
layout=EdgeLayout.COO,
is_sorted=False,
size=(sz, sz),
size=(sz_src, sz_dst),
)

def __infer_existing_tensors(self, F) -> None:
Expand Down Expand Up @@ -862,22 +893,25 @@ def _get_tensor(self, attr: CuGraphTensorAttr) -> TensorType:
cols = attr.properties

idx = attr.index
if feature_backend == "torch":
if not isinstance(idx, torch.Tensor):
raise TypeError(
f"Type {type(idx)} invalid"
f" for feature store backend {feature_backend}"
)
idx = idx.cpu()
elif feature_backend == "numpy":
# allow indexing through cupy arrays
if isinstance(idx, cupy.ndarray):
idx = idx.get()
elif isinstance(idx, torch.Tensor):
idx = np.asarray(idx.cpu())
if idx is not None:
if feature_backend == "torch":
if not isinstance(idx, torch.Tensor):
raise TypeError(
f"Type {type(idx)} invalid"
f" for feature store backend {feature_backend}"
)
idx = idx.cpu()
elif feature_backend == "numpy":
# allow feature indexing through cupy arrays
if isinstance(idx, cupy.ndarray):
idx = idx.get()
elif isinstance(idx, torch.Tensor):
idx = np.asarray(idx.cpu())

if cols is None:
t = self.__features.get_data(idx, attr.group_name, attr.attr_name)
if idx is None:
t = t[-1]

if isinstance(t, np.ndarray):
t = torch.as_tensor(t, device="cuda")
Expand Down
68 changes: 49 additions & 19 deletions python/cugraph-pyg/cugraph_pyg/loader/cugraph_node_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,14 @@
from cugraph_pyg.loader.filter import _filter_cugraph_store
from cugraph_pyg.sampler.cugraph_sampler import _sampler_output_from_sampling_results

from typing import Union, Tuple, Sequence, List
from typing import Union, Tuple, Sequence, List, Dict

torch_geometric = import_optional("torch_geometric")
InputNodes = (
Sequence
if isinstance(torch_geometric, MissingModule)
else torch_geometric.typing.InputNodes
)


class EXPERIMENTAL__BulkSampleLoader:
Expand All @@ -39,15 +44,15 @@ def __init__(
self,
feature_store: CuGraphStore,
graph_store: CuGraphStore,
all_indices: Union[Sequence, int],
input_nodes: Union[InputNodes, int] = None,
batch_size: int = 0,
shuffle=False,
edge_types: Sequence[Tuple[str]] = None,
directory=None,
starting_batch_id=0,
batches_per_partition=100,
# Sampler args
num_neighbors: List[int] = [1, 1],
num_neighbors: Union[List[int], Dict[Tuple[str, str, str], List[int]]] = None,
replace: bool = True,
# Other kwargs for the BulkSampler
**kwargs,
Expand All @@ -64,9 +69,9 @@ def __init__(
graph_store: CuGraphStore
The graph store containing the graph structure.

all_indices: Union[Tensor, int]
input_nodes: Union[InputNodes, int]
The input nodes associated with this sampler.
If this is an integer N , this loader will load N batches
If this is an integer N, this loader will load N batches
from disk rather than performing sampling in memory.

batch_size: int
Expand Down Expand Up @@ -97,6 +102,16 @@ def __init__(
Defaults to 100. Gets passed to the bulk
sampler if there is one; otherwise, this argument
is used to determine which files to read.

num_neighbors: Union[List[int],
Dict[Tuple[str, str, str], List[int]]] (required)
The number of neighbors to sample for each node in each iteration.
If an entry is set to -1, all neighbors will be included.
In heterogeneous graphs, may also take in a dictionary denoting
the number of neighbors to sample for each individual edge type.

Note: in cuGraph, only one value of num_neighbors is currently supported.
Passing in a dictionary will result in an exception.
"""

self.__feature_store = feature_store
Expand All @@ -106,18 +121,29 @@ def __init__(
self.__batches_per_partition = batches_per_partition
self.__starting_batch_id = starting_batch_id

if isinstance(all_indices, int):
if isinstance(input_nodes, int):
# Will be loading from disk
self.__num_batches = all_indices
self.__num_batches = input_nodes
self.__directory = directory
iter(os.listdir(self.__directory))
return

input_type, input_nodes = torch_geometric.loader.utils.get_input_nodes(
(feature_store, graph_store), input_nodes
)
if input_type is not None:
input_nodes = graph_store._get_sample_from_vertex_groups(
{input_type: input_nodes}
)

if batch_size is None or batch_size < 1:
raise ValueError("Batch size must be >= 1")

self.__directory = tempfile.TemporaryDirectory(dir=directory)

if isinstance(num_neighbors, dict):
raise ValueError("num_neighbors dict is currently unsupported!")

bulk_sampler = BulkSampler(
batch_size,
self.__directory.name,
Expand All @@ -129,21 +155,21 @@ def __init__(
)

# Make sure indices are in cupy
all_indices = cupy.asarray(all_indices)
input_nodes = cupy.asarray(input_nodes)

# Shuffle
if shuffle:
cupy.random.shuffle(all_indices)
cupy.random.shuffle(input_nodes)

# Truncate if we can't evenly divide the input array
stop = (len(all_indices) // batch_size) * batch_size
all_indices = all_indices[:stop]
stop = (len(input_nodes) // batch_size) * batch_size
input_nodes = input_nodes[:stop]

# Split into batches
all_indices = cupy.split(all_indices, len(all_indices) // batch_size)
input_nodes = cupy.split(input_nodes, len(input_nodes) // batch_size)

self.__num_batches = 0
for batch_num, batch_i in enumerate(all_indices):
for batch_num, batch_i in enumerate(input_nodes):
self.__num_batches += 1
bulk_sampler.add_batches(
cudf.DataFrame(
Expand Down Expand Up @@ -246,8 +272,8 @@ class EXPERIMENTAL__CuGraphNeighborLoader:
def __init__(
self,
data: Union[CuGraphStore, Tuple[CuGraphStore, CuGraphStore]],
input_nodes: Sequence,
batch_size: int,
input_nodes: Union[InputNodes, int] = None,
batch_size: int = None,
**kwargs,
):
"""
Expand All @@ -256,19 +282,23 @@ def __init__(
data: CuGraphStore or (CuGraphStore, CuGraphStore)
The CuGraphStore or stores where the graph/feature data is held.

batch_size: int
batch_size: int (required)
The number of input nodes in each batch.

input_nodes: Tensor
The input nodes for *this* loader. If there are multiple loaders,
the appropriate split should be given for this loader.
input_nodes: Union[InputNodes, int] (required)
The input nodes associated with this sampler.

**kwargs: kwargs
Keyword arguments to pass through for sampling.
i.e. "shuffle", "fanout"
See BulkSampleLoader.
"""

if input_nodes is None:
raise ValueError("input_nodes is required")
if batch_size is None:
raise ValueError("batch_size is required")

# Allow passing in a feature store and graph store as a tuple, as
# in the standard PyG API. If only one is passed, it is assumed
# it is behaving as both a graph store and a feature store.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,3 +47,30 @@ def test_cugraph_loader_basic(dask_client, karate_gnn):
if "type1" in sample:
for prop in sample["type1"]["prop0"].tolist():
assert prop % 41 == 0


@pytest.mark.skipif(isinstance(torch, MissingModule), reason="torch not available")
def test_cugraph_loader_hetero(dask_client, karate_gnn):
F, G, N = karate_gnn
cugraph_store = CuGraphStore(F, G, N, multi_gpu=True)
loader = CuGraphNeighborLoader(
(cugraph_store, cugraph_store),
input_nodes=("type1", torch.tensor([0, 1, 2, 5], device="cuda")),
batch_size=2,
num_neighbors=[4, 4],
random_state=62,
replace=False,
)

samples = [s for s in loader]

assert len(samples) == 2
for sample in samples:
print(sample)
if "type0" in sample:
for prop in sample["type0"]["prop0"].tolist():
assert prop % 31 == 0

if "type1" in sample:
for prop in sample["type1"]["prop0"].tolist():
assert prop % 41 == 0
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@


torch = import_optional("torch")
torch_geometric = import_optional("torch_geometric")


@pytest.mark.skipif(isinstance(torch, MissingModule), reason="torch not available")
Expand Down Expand Up @@ -152,7 +153,10 @@ def test_edge_types(graph, dask_client):
assert eta.keys() == G.keys()

for attr_name, attr_repr in eta.items():
assert len(G[attr_name][0]) == attr_repr.size[-1]
src_size = N[attr_name[0]]
dst_size = N[attr_name[-1]]
assert src_size == attr_repr.size[0]
assert dst_size == attr_repr.size[-1]
assert attr_name == attr_repr.edge_type


Expand Down Expand Up @@ -311,6 +315,17 @@ def test_get_tensor(graph, dask_client):
assert tsr == base_series


@pytest.mark.skipif(isinstance(torch, MissingModule), reason="torch not available")
def test_get_tensor_empty_idx(karate_gnn, dask_client):
F, G, N = karate_gnn
cugraph_store = CuGraphStore(F, G, N, multi_gpu=True)

t = cugraph_store.get_tensor(
CuGraphTensorAttr(group_name="type0", attr_name="prop0", index=None)
)
assert t.tolist() == (torch.arange(17, dtype=torch.float32) * 31).tolist()


@pytest.mark.skipif(isinstance(torch, MissingModule), reason="torch not available")
def test_multi_get_tensor(graph, dask_client):
F, G, N = graph
Expand Down Expand Up @@ -397,6 +412,22 @@ def test_get_tensor_size(graph, dask_client):
assert cugraph_store.get_tensor_size(tensor_attr) == torch.Size((sz,))


@pytest.mark.skipif(isinstance(torch, MissingModule), reason="torch not available")
@pytest.mark.skipif(
isinstance(torch_geometric, MissingModule), reason="pyg not available"
)
def test_get_input_nodes(karate_gnn, dask_client):
F, G, N = karate_gnn
cugraph_store = CuGraphStore(F, G, N, multi_gpu=True)

node_type, input_nodes = torch_geometric.loader.utils.get_input_nodes(
(cugraph_store, cugraph_store), "type0"
)

assert node_type == "type0"
assert input_nodes.tolist() == torch.arange(17, dtype=torch.int32).tolist()


@pytest.mark.skipif(isinstance(torch, MissingModule), reason="torch not available")
def test_mg_frame_handle(graph, dask_client):
F, G, N = graph
Expand Down
Loading