Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
338a12d
update 'uniform neighbor sample' import, set the correct version of t…
Jun 3, 2022
8a77539
add features to simpleDistributedGraph, update bfs and sssp accordingly
Jun 7, 2022
bba35a9
Merge remote-tracking branch 'upstream/branch-22.08' into branch-22.0…
Jun 7, 2022
36c55a0
Merge remote-tracking branch 'upstream/branch-22.08' into branch-22.0…
Jun 8, 2022
1100ff2
update tests for mg neighgor sample, fix bug
Jun 8, 2022
26b644e
add test for int indices
Jun 8, 2022
668b19e
compute renumberering before calling the renumbered column names
Jun 8, 2022
59bdaa8
update the computation of the parameter 'alpha', implement 'vertex de…
Jun 8, 2022
86123f5
update katz centrality API and tests
Jun 8, 2022
049ed85
update tests for mg katz
Jun 8, 2022
567f288
update docstrings
Jun 10, 2022
77685db
update docstring, add tests for 'nodes' and 'has_node' functionality
Jun 12, 2022
e356496
add copyright
Jun 12, 2022
dedcfcf
add new algos for benchmark
Jun 13, 2022
b177983
remove unnecessary conditions
Jun 13, 2022
a15605e
fix flake8 errors
Jun 13, 2022
7506b57
skip legacy C++ renumbering for all algos following the pylibcugraph/…
Jun 13, 2022
fdb5495
update tests and functionalities
Jun 17, 2022
04b538d
skip mg property graph because it is a work in progress
Jun 17, 2022
29675a8
remove debug prints
Jun 17, 2022
6d6b7dc
update formating
Jun 17, 2022
7f32f28
update copyright
Jun 17, 2022
e01f8d8
disable expensive check for bfs and sssp when running benchmarks
Jun 21, 2022
b6e69d7
skip legacy C++ renumbering for eigenvector centrality
Jun 21, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 40 additions & 27 deletions benchmarks/python_e2e/benchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,15 +86,16 @@ def __init__(self,
# FIXME: need to accept and save individual algo args
self.construct_graph = benchmark(construct_graph_func)

#add starting node to algos: BFS and SSSP
# add starting node to algos: BFS and SSSP
# FIXME: Refactor BenchmarkRun __init__ because all the work
# done below should be done elsewhere
for i, algo in enumerate (algo_func_param_list):
if benchmark(algo).name in ["bfs", "sssp", "neighborhood_sampling"]:
if benchmark(algo).name in ["bfs", "sssp", "uniform_neighbor_sample"]:
param={}
param["start"]=self.input_dataframe['src'].head()[0]
if benchmark(algo).name in ["neighborhood_sampling"]:
if benchmark(algo).name in ["uniform_neighbor_sample"]:
start = [param.pop("start")]
labels = [0]
param["start_info_list"] = (start, labels)
param["start_list"] = start
param["fanout_vals"] = [1]
algo_func_param_list[i]=(algo,)+(param,)

Expand Down Expand Up @@ -128,32 +129,44 @@ def run(self):
self.__log("done.")
G = result.retval
self.results.append(result)

#algos with transposed=True : PageRank, Katz
#algos with transposed=False: BFS, SSSP, Louvain, HITS, Neighborhood_sampling
#algos supporting the legacy_renum_only: HITS, Neighborhood_sampling
#
# Algos with transposed=True : PageRank, Katz.
# Algos with transposed=False: BFS, SSSP, Louvain, HITS,
# Neighborhood_sampling.
# Algos supporting the legacy_renum_only: HITS, Neighborhood_sampling
#
for i in range(len(self.algos)):
if self.algos[i][0].name in ["pagerank", "katz"]: #set transpose=True when renumbering
if self.algos[i][0].name == "katz" and self.construct_graph.name == "from_dask_cudf_edgelist":
largest_out_degree = G.out_degree().compute().\
nlargest(n=1, columns="degree") #compute outdegree before renumbering because outdegree has transpose=False
largest_out_degree = largest_out_degree["degree"].iloc[0]
katz_alpha = 1 / (largest_out_degree + 1)
self.algos[i][1]["alpha"] = katz_alpha
elif self.algos[i][0].name == "katz" and self.construct_graph.name == "from_cudf_edgelist":
largest_out_degree = G.out_degree().nlargest(n=1, columns="degree")
largest_out_degree = largest_out_degree["degree"].iloc[0]
katz_alpha = 1 / (largest_out_degree + 1)
self.algos[i][1]["alpha"] = katz_alpha
if hasattr(G, "compute_renumber_edge_list"):
G.compute_renumber_edge_list(transposed=True)
elif self.algos[i][0].name in ["neighborhood_sampling", "hits"]:
if hasattr(G, "compute_renumber_edge_list"):
G.compute_renumber_edge_list(transposed=False, legacy_renum_only=True)
# set transpose=True when renumbering
if self.algos[i][0].name in ["pagerank", "katz"]:
if self.algos[i][0].name == "katz":
if self.construct_graph.name == "from_dask_cudf_edgelist":
# compute out_degree before renumbering because out_degree
# has transpose=False
degree_max = G.degree()['degree'].max().compute()
katz_alpha = 1 / (degree_max)
self.algos[i][1]["alpha"] = katz_alpha
elif self.construct_graph.name == "from_cudf_edgelist":
degree_max = G.degree()['degree'].max()
katz_alpha = 1 / (degree_max)
self.algos[i][1]["alpha"] = katz_alpha
if hasattr(G, "compute_renumber_edge_list"):
G.compute_renumber_edge_list(
transposed=True, legacy_renum_only=True)
else:
# FIXME: Pagerank still follows the old path. Update this once it
# follows the pylibcugraph/C path
if hasattr(G, "compute_renumber_edge_list"):
G.compute_renumber_edge_list(transposed=True)
else: #set transpose=False when renumbering
self.__log("running compute_renumber_edge_list...", end="")
if hasattr(G, "compute_renumber_edge_list"):
G.compute_renumber_edge_list(transposed=False)
if self.algos[i][0].name in ["wcc", "louvain"]:
# FIXME: Pagerank and Louvain still follow the old path.
# Update this once it follows the pylibcugraph/C path
G.compute_renumber_edge_list(transposed=False)
else:
G.compute_renumber_edge_list(
transposed=False, legacy_renum_only=True)
self.__log("done.")
# FIXME: need to handle individual algo args
for ((algo, params), validator) in zip(self.algos, self.validators):
Expand Down
36 changes: 22 additions & 14 deletions benchmarks/python_e2e/cugraph_dask_funcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,10 @@
from cugraph.structure.symmetrize import symmetrize_ddf
from cugraph.dask.common.mg_utils import get_visible_devices
from dask_cuda.initialize import initialize
from cugraph.experimental.dask import uniform_neighborhood_sampling
import cudf

import cugraph
from cugraph.dask.comms import comms as Comms
from cugraph.dask.common.mg_utils import get_visible_devices
from cugraph.generators import rmat
import tempfile

Expand Down Expand Up @@ -109,10 +107,15 @@ def construct_graph(dask_dataframe, symmetric=False):
object must be symmetrized and have self loops removed.
"""

G = cugraph.DiGraph()
if symmetric:
G = cugraph.Graph(directed=False)
else:
G = cugraph.Graph(directed=True)

if len(dask_dataframe.columns) > 2:
if symmetric: #symmetrize dask dataframe
dask_dataframe = symmetrize_ddf(dask_dataframe, 'src', 'dst', 'weight')
dask_dataframe = symmetrize_ddf(
dask_dataframe, 'src', 'dst', 'weight')

G.from_dask_cudf_edgelist(
dask_dataframe, source="src", destination="dst", edge_attr="weight")
Expand All @@ -130,11 +133,12 @@ def construct_graph(dask_dataframe, symmetric=False):


def bfs(G, start):
return cugraph.dask.bfs(G, start=start, return_distances=True)
return cugraph.dask.bfs(
G, start=start, return_distances=True, check_start=False)


def sssp(G, start):
return cugraph.dask.sssp(G, source=start)
return cugraph.dask.sssp(G, source=start, check_start=False)


def wcc(G):
Expand All @@ -156,15 +160,19 @@ def katz(G, alpha=None):
def hits(G):
return cugraph.dask.hits(G)

def neighborhood_sampling(G, start_info_list=None, fanout_vals=None):
def uniform_neighbor_sample(G, start_list=None, fanout_vals=None):
# convert list to cudf.Series
start_info_list = (
cudf.Series(start_info_list[0], dtype="int32"),
cudf.Series(start_info_list[1], dtype="int32"),
)

return uniform_neighborhood_sampling(
G, start_info_list=start_info_list, fanout_vals=fanout_vals)
start_list = cudf.Series(start_list, dtype="int32")
return cugraph.dask.uniform_neighbor_sample(
G, start_list=start_list, fanout_vals=fanout_vals)

def triangle_count(G):
# FIXME: Update this calls once triangle_count is promoted
return cugraph.dask.triangle_count(G)

def eigenvector_centrality(G):
# FIXME: Update this calls once triangle_count is promoted
return cugraph.dask.eigenvector_centrality(G)

################################################################################
# Session-wide setup and teardown
Expand Down
23 changes: 20 additions & 3 deletions benchmarks/python_e2e/cugraph_funcs.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2021, NVIDIA CORPORATION.
# Copyright (c) 2021-2022, NVIDIA CORPORATION.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
Expand All @@ -15,6 +15,7 @@

import cugraph
from cugraph.generators import rmat
import cudf


def generate_edgelist(scale,
Expand Down Expand Up @@ -96,9 +97,9 @@ def construct_graph(dataframe, symmetric=False):
symmetrized and have self loops removed.
"""
if symmetric:
G = cugraph.Graph()
G = cugraph.Graph(directed=False)
else:
G = cugraph.DiGraph()
G = cugraph.Graph(directed=True)

if len(dataframe.columns) > 2:
G.from_cudf_edgelist(
Expand Down Expand Up @@ -137,6 +138,22 @@ def pagerank(G):
def katz(G, alpha=None):
return cugraph.katz_centrality(G, alpha)

def hits(G):
return cugraph.hits(G)

def uniform_neighbor_sample(G, start_list=None, fanout_vals=None):
# convert list to cudf.Series
start_list = cudf.Series(start_list, dtype="int32")
return cugraph.uniform_neighbor_sample(
G, start_list=start_list, fanout_vals=fanout_vals)

def triangle_count(G):
# FIXME: Update this calls once triangle_count is promoted
return cugraph.experimental.triangle_count(G)

def eigenvector_centrality(G):
# FIXME: Update this calls once triangle_count is promoted
return cugraph.eigenvector_centrality(G)

################################################################################
# Session-wide setup and teardown
Expand Down
4 changes: 3 additions & 1 deletion benchmarks/python_e2e/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,9 @@ def run(algos,
"katz": funcs.katz,
"wcc": funcs.wcc,
"hits": funcs.hits,
"neighborhood_sampling": funcs.neighborhood_sampling,
"uniform_neighbor_sample": funcs.uniform_neighbor_sample,
"triangle_count": funcs.triangle_count,
"eigenvector_centrality": funcs.eigenvector_centrality,
}

if algos:
Expand Down
8 changes: 4 additions & 4 deletions benchmarks/python_e2e/reporting.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2021, NVIDIA CORPORATION.
# Copyright (c) 2021-2022, NVIDIA CORPORATION.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
Expand Down Expand Up @@ -43,15 +43,15 @@ def generate_console_report(benchmark_result_list):
# the graph_create run, then a run of each algo.
r = benchmark_result_list[0]
name = f"{r.name}({__namify_dict(r.params)})"
space = " " * (30 - len(name))
space = " " * (70 - len(name))
retstring += f"{name}{space}{r.runtime:.6}\n"

remaining_results = benchmark_result_list[1:]

for r in remaining_results:
retstring += f"{'-'*60}\n"
retstring += f"{'-'*80}\n"
name = f"{r.name}({__namify_dict(r.params)})"
space = " " * (30 - len(name))
space = " " * (70 - len(name))
retstring += f"{name}{space}{r.runtime:.6}\n"

return retstring
Expand Down
18 changes: 8 additions & 10 deletions python/cugraph/cugraph/centrality/katz_centrality.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@


def katz_centrality(
G, alpha=None, beta=None, max_iter=100, tol=1.0e-6,
G, alpha=None, beta=1.0, max_iter=100, tol=1.0e-6,
nstart=None, normalized=True
):
"""
Expand Down Expand Up @@ -114,11 +114,16 @@ def katz_centrality(
>>> kc = cugraph.katz_centrality(G)

"""
G, isNx = ensure_cugraph_obj_for_nx(G)

if alpha is None:
degree_max = G.degree()['degree'].max()
alpha = 1 / (degree_max)

if (alpha is not None) and (alpha <= 0.0):
raise ValueError(f"'alpha' must be a positive float or None, "
f"got: {alpha}")
if beta is None:
beta = 1.0

elif (not isinstance(beta, float)) or (beta <= 0.0):
raise ValueError(f"'beta' must be a positive float or None, "
f"got: {beta}")
Expand All @@ -128,8 +133,6 @@ def katz_centrality(
if (not isinstance(tol, float)) or (tol <= 0.0):
raise ValueError(f"'tol' must be a positive float, got: {tol}")

G, isNx = ensure_cugraph_obj_for_nx(G)

srcs = G.edgelist.edgelist_df['src']
dsts = G.edgelist.edgelist_df['dst']
if 'weights' in G.edgelist.edgelist_df.columns:
Expand All @@ -139,11 +142,6 @@ def katz_centrality(
# with type hardcoded to float32 is passed into wrapper
weights = cudf.Series((srcs + 1) / (srcs + 1), dtype="float32")

if alpha is None:
largest_out_degree = G.degrees().nlargest(n=1, columns="out_degree")
largest_out_degree = largest_out_degree["out_degree"].iloc[0]
alpha = 1 / (largest_out_degree + 1)

if nstart is not None:
if G.renumbered is True:
if len(G.renumber_map.implementation.col_names) > 1:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,8 @@ def eigenvector_centrality(
"""
client = default_client()
# Calling renumbering results in data that is sorted by degree
input_graph.compute_renumber_edge_list(transposed=False)
input_graph.compute_renumber_edge_list(
transposed=False, legacy_renum_only=True)

graph_properties = GraphProperties(
is_multigraph=False)
Expand Down
19 changes: 15 additions & 4 deletions python/cugraph/cugraph/dask/centrality/katz_centrality.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,21 @@ def katz_centrality(
"""
client = default_client()

if alpha is None:
degree_max = input_graph.degree()['degree'].max().compute()
alpha = 1 / (degree_max)

if (alpha is not None) and (alpha <= 0.0):
raise ValueError(f"'alpha' must be a positive float or None, "
f"got: {alpha}")

# FIXME: 'legacy_renum_only' will not trigger the C++ renumbering
# In the future, once all the algos follow the C/Pylibcugraph path,
# compute_renumber_edge_list will only be used for multicolumn and
# string vertices since the renumbering will be done in pylibcugraph
input_graph.compute_renumber_edge_list(transposed=True,
legacy_renum_only=False)

graph_properties = GraphProperties(
is_multigraph=False)

Expand All @@ -188,10 +203,6 @@ def katz_centrality(
num_edges = len(ddf)
data = get_distributed_data(ddf)

# FIXME: Incorporate legacy_renum_only=True to only trigger the python
# renumbering when more support is added in the C/C++ API
input_graph.compute_renumber_edge_list(transposed=True,
legacy_renum_only=False)
vertex_partition_offsets = get_vertex_partition_offsets(input_graph)
num_verts = vertex_partition_offsets.iloc[-1]

Expand Down
14 changes: 9 additions & 5 deletions python/cugraph/cugraph/dask/common/part_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,14 +93,18 @@ async def _extract_partitions(dask_obj, client=None, batch_enabled=False):
persisted = [client.persist(
dask_obj.get_partition(p), workers=w) for p, w in enumerate(
worker_list[:dask_obj.npartitions])]
# Persist empty dataframe with the remaining workers if there are
# less partitions than workers
# Persist empty dataframe/series with the remaining workers if
# there are less partitions than workers
if dask_obj.npartitions < len(worker_list):
# The empty df should have the same column names and dtypes as
# dask_obj
empty_df = cudf.DataFrame(columns=list(dask_obj.columns))
empty_df = empty_df.astype(dict(zip(
dask_obj.columns, dask_obj.dtypes)))
if isinstance(dask_obj, dask_cudf.DataFrame):
empty_df = cudf.DataFrame(columns=list(dask_obj.columns))
empty_df = empty_df.astype(dict(zip(
dask_obj.columns, dask_obj.dtypes)))
else:
empty_df = cudf.Series(dtype=dask_obj.dtype)

for p, w in enumerate(worker_list[dask_obj.npartitions:]):
empty_ddf = dask_cudf.from_cudf(empty_df, npartitions=1)
persisted.append(client.persist(empty_ddf, workers=w))
Expand Down
Loading