Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 42 additions & 33 deletions python/cugraph/cugraph/dask/sampling/uniform_neighbor_sample.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,28 @@

import dask_cudf
import cudf
import cupy as cp

from pylibcugraph import ResourceHandle

from pylibcugraph import uniform_neighbor_sample as pylibcugraph_uniform_neighbor_sample

from cugraph.dask.comms import comms as Comms
from cugraph.dask.common.input_utils import get_distributed_data

src_n = "sources"
dst_n = "destinations"
indices_n = "indices"


def create_empty_df(indices_t, weight_t):
df = cudf.DataFrame(
{
src_n: numpy.empty(shape=0, dtype=indices_t),
dst_n: numpy.empty(shape=0, dtype=indices_t),
indices_n: numpy.empty(shape=0, dtype=weight_t),
}
)
return df


def convert_to_cudf(cp_arrays, weight_t):
Expand All @@ -33,9 +48,9 @@ def convert_to_cudf(cp_arrays, weight_t):
cupy_sources, cupy_destinations, cupy_indices = cp_arrays

df = cudf.DataFrame()
df["sources"] = cupy_sources
df["destinations"] = cupy_destinations
df["indices"] = cupy_indices
df[src_n] = cupy_sources
df[dst_n] = cupy_destinations
df[indices_n] = cupy_indices

if weight_t == "int32":
df.indices = df.indices.astype("int32")
Expand All @@ -46,17 +61,17 @@ def convert_to_cudf(cp_arrays, weight_t):


def _call_plc_uniform_neighbor_sample(
sID, mg_graph_x, st_x, fanout_vals, with_replacement
sID, mg_graph_x, st_x, fanout_vals, with_replacement, weight_t
):
return pylibcugraph_uniform_neighbor_sample(
cp_arrays = pylibcugraph_uniform_neighbor_sample(
resource_handle=ResourceHandle(Comms.get_handle(sID).getHandle()),
input_graph=mg_graph_x,
start_list=st_x,
h_fan_out=fanout_vals,
with_replacement=with_replacement,
# FIXME: should we add this parameter as an option?
do_expensive_check=True,
do_expensive_check=False,
)
return convert_to_cudf(cp_arrays, weight_t)


def uniform_neighbor_sample(
Expand Down Expand Up @@ -99,7 +114,6 @@ def uniform_neighbor_sample(
Contains the indices from the sampling result for path
reconstruction
"""

if isinstance(start_list, int):
start_list = [start_list]

Expand All @@ -123,47 +137,42 @@ def uniform_neighbor_sample(
else:
weight_t = "float32"

# start_list uses "external" vertex IDs, but if the graph has been
# renumbered, the start vertex IDs must also be renumbered.
if "_SRC_" in input_graph.edgelist.edgelist_df:
indices_t = input_graph.edgelist.edgelist_df["_SRC_"].dtype
elif src_n in input_graph.edgelist.edgelist_df:
indices_t = input_graph.edgelist.edgelist_df[src_n].dtype
else:
indices_t = numpy.int32

if input_graph.renumbered:
start_list = input_graph.lookup_internal_vertex_id(start_list).compute()

start_list = dask_cudf.from_cudf(
start_list, npartitions=min(input_graph._npartitions, len(start_list))
)
start_list = get_distributed_data(start_list)
wait(start_list)
start_list = start_list.worker_to_parts

client = input_graph._client

session_id = Comms.get_session_id()
start_list = cp.array_split(start_list.values, input_graph._npartitions)

result = [
client.submit(
_call_plc_uniform_neighbor_sample,
Comms.get_session_id(),
session_id,
input_graph._plc_graph[w],
start_list[w][0],
start_list[i],
fanout_vals,
with_replacement,
weight_t=weight_t,
workers=[w],
allow_other_workers=False,
pure=False,
)
for w in Comms.get_workers()
]

wait(result)

cudf_result = [
client.submit(convert_to_cudf, cp_arrays, weight_t) for cp_arrays in result
for i, w in enumerate(Comms.get_workers())
]

wait(cudf_result)

ddf = dask_cudf.from_delayed(cudf_result).persist()
ddf = dask_cudf.from_delayed(
result, meta=create_empty_df(indices_t, weight_t), verify_meta=False
).persist()
wait(ddf)

# Wait until the inactive futures are released
wait([(r.release(), c_r.release()) for r, c_r in zip(result, cudf_result)])
wait([r.release() for r in result])

if input_graph.renumbered:
ddf = input_graph.unrenumber(ddf, "sources", preserve_order=True)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,23 @@
from cugraph.testing import utils
from cugraph.dask import uniform_neighbor_sample

# If the rapids-pytest-benchmark plugin is installed, the "gpubenchmark"
# fixture will be available automatically. Check that this fixture is available
# by trying to import rapids_pytest_benchmark, and if that fails, set
# "gpubenchmark" to the standard "benchmark" fixture provided by
# pytest-benchmark.
try:
import rapids_pytest_benchmark # noqa: F401
except ImportError:
import pytest_benchmark

gpubenchmark = pytest_benchmark.plugin.benchmark

# =============================================================================
# Pytest Setup / Teardown - called for each test function
# =============================================================================


def setup_function():
gc.collect()

Expand Down Expand Up @@ -300,3 +313,42 @@ def test_mg_uniform_neighbor_sample_ensure_no_duplicates(dask_client):
)

assert len(output_df.compute()) == 3


# =============================================================================
# Benchmarks
# =============================================================================


@pytest.mark.slow
@pytest.mark.parametrize("n_samples", [1_000, 5_000, 10_000])
def bench_uniform_neigbour_sample_email_eu_core(gpubenchmark, dask_client, n_samples):
input_data_path = utils.RAPIDS_DATASET_ROOT_DIR_PATH / "email-Eu-core.csv"
chunksize = dcg.get_chunksize(input_data_path)

ddf = dask_cudf.read_csv(
input_data_path,
chunksize=chunksize,
delimiter=" ",
names=["src", "dst", "value"],
dtype=["int32", "int32", "int32"],
)

dg = cugraph.Graph(directed=False)
dg.from_dask_cudf_edgelist(
ddf,
source="src",
destination="dst",
edge_attr="value",
store_transposed=False,
legacy_renum_only=True,
)
# Partition the dataframe to add in chunks
srcs = dg.input_df["src"]
start_list = srcs[:n_samples].compute()

def func():
_ = cugraph.dask.uniform_neighbor_sample(dg, start_list, [10])
del _

gpubenchmark(func)