Skip to content

Commit 2b95059

Browse files
authored
Updates to enable NumberMap to generate unique src/dst column names (#2050)
* Updates to enable `NumberMap` to generate unique src/dst column names instead of assuming the names `src` and `dst`. This is needed since cudf no longer allows for duplicate column names, and some user input may contain "src" and "dst" as pre-existing column names intended for other purposes. * This commit also cleans up various tech debt items (exception types, docstrings, etc.). Tested by ensuring the existing python unit tests for SG and MG (2 GPUs) passed, and also ran all notebooks as tests. New tests to emphasize support for specific column names that previously failed with the latest cudf were also added. Authors: - Rick Ratzel (https://github.com/rlratzel) Approvers: - Brad Rees (https://github.com/BradReesWork) - Chuck Hastings (https://github.com/ChuckHastings) URL: #2050
1 parent 42eaf3c commit 2b95059

25 files changed

+560
-206
lines changed

notebooks/structure/Renumber-2.ipynb

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -155,9 +155,11 @@
155155
"gdf['order'] = gdf.index\n",
156156
"\n",
157157
"tmp_df, numbering = NumberMap.renumber(gdf, ['src_ip'], ['dst_ip'])\n",
158+
"new_src_col_name = numbering.renumbered_src_col_name\n",
159+
"new_dst_col_name = numbering.renumbered_dst_col_name\n",
158160
"\n",
159161
"gdf = gdf.merge(tmp_df, on='order').sort_values('order').set_index(keys='order', drop=True)\n",
160-
"gdf = gdf.rename(columns={'src': 'src_r', 'dst': 'dst_r'})"
162+
"gdf = gdf.rename(columns={new_src_col_name: 'src_r', new_dst_col_name: 'dst_r'})"
161163
]
162164
},
163165
{

notebooks/structure/Renumber.ipynb

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@
126126
"source": [
127127
"# Run renumbering\n",
128128
"\n",
129-
"Output from renumbering is a data frame and a NumberMap object. The data frame contains the renumbered sources and destinations. The NumberMap will allow you to translate from external to internal vertex identifiers.\n",
129+
"Output from renumbering is a data frame and a NumberMap object. The data frame contains the renumbered sources and destinations. The NumberMap will allow you to translate from external to internal vertex identifiers. The renumbering call will rename the specified source and destination columns to indicate they were renumbered and no longer contain the original data, and the new names are guaranteed to be unique and not collide with other column names.\n",
130130
"\n",
131131
"Note that renumbering does not guarantee that the output data frame is in the same order as the input data frame (although in our simple example it will match). To address this we will add the index as a column of gdf before renumbering.\n"
132132
]
@@ -140,6 +140,8 @@
140140
"gdf['order'] = gdf.index\n",
141141
"\n",
142142
"renumbered_df, numbering = NumberMap.renumber(gdf, ['source_as_int'], ['dest_as_int'])\n",
143+
"new_src_col_name = numbering.renumbered_src_col_name\n",
144+
"new_dst_col_name = numbering.renumbered_dst_col_name\n",
143145
"\n",
144146
"renumbered_df"
145147
]
@@ -204,10 +206,10 @@
204206
"for i in range(len(renumbered_df)):\n",
205207
" print(\" \", i,\n",
206208
" \": (\", source_as_int[i], \",\", dest_as_int[i],\n",
207-
" \"), renumbered: (\", renumbered_df['src'][i], \",\", renumbered_df['dst'][i], \n",
209+
" \"), renumbered: (\", renumbered_df[new_src_col_name][i], \",\", renumbered_df[new_dst_col_name][i], \n",
208210
" \"), translate back: (\",\n",
209-
" numbering.from_internal_vertex_id(cudf.Series([renumbered_df['src'][i]]))['0'][0], \",\",\n",
210-
" numbering.from_internal_vertex_id(cudf.Series([renumbered_df['dst'][i]]))['0'][0], \")\"\n",
211+
" numbering.from_internal_vertex_id(cudf.Series([renumbered_df[new_src_col_name][i]]))['0'][0], \",\",\n",
212+
" numbering.from_internal_vertex_id(cudf.Series([renumbered_df[new_dst_col_name][i]]))['0'][0], \")\"\n",
211213
" )\n"
212214
]
213215
},
@@ -230,8 +232,8 @@
230232
"source": [
231233
"G = cugraph.Graph()\n",
232234
"gdf_r = cudf.DataFrame()\n",
233-
"gdf_r[\"src\"] = renumbered_df[\"src\"]\n",
234-
"gdf_r[\"dst\"] = renumbered_df[\"dst\"]\n",
235+
"gdf_r[\"src\"] = renumbered_df[new_src_col_name]\n",
236+
"gdf_r[\"dst\"] = renumbered_df[new_dst_col_name]\n",
235237
"\n",
236238
"G.from_cudf_edgelist(gdf_r, source='src', destination='dst', renumber=False)\n",
237239
"\n",

python/cugraph/cugraph/dask/centrality/katz_centrality.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@
2424

2525
def call_katz_centrality(sID,
2626
data,
27+
src_col_name,
28+
dst_col_name,
2729
num_verts,
2830
num_edges,
2931
vertex_partition_offsets,
@@ -40,6 +42,8 @@ def call_katz_centrality(sID,
4042
segment_offsets = \
4143
aggregate_segment_offsets[local_size * wid: local_size * (wid + 1)]
4244
return mg_katz_centrality.mg_katz_centrality(data[0],
45+
src_col_name,
46+
dst_col_name,
4347
num_verts,
4448
num_edges,
4549
vertex_partition_offsets,
@@ -153,9 +157,14 @@ def katz_centrality(input_graph,
153157
num_edges = len(ddf)
154158
data = get_distributed_data(ddf)
155159

160+
src_col_name = input_graph.renumber_map.renumbered_src_col_name
161+
dst_col_name = input_graph.renumber_map.renumbered_dst_col_name
162+
156163
result = [client.submit(call_katz_centrality,
157164
Comms.get_session_id(),
158165
wf[1],
166+
src_col_name,
167+
dst_col_name,
159168
num_verts,
160169
num_edges,
161170
vertex_partition_offsets,

python/cugraph/cugraph/dask/centrality/mg_katz_centrality_wrapper.pyx

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
#
2-
# Copyright (c) 2020-2021, NVIDIA CORPORATION.
2+
# Copyright (c) 2020-2022, NVIDIA CORPORATION.
33
#
44
# Licensed under the Apache License, Version 2.0 (the "License");
55
# you may not use this file except in compliance with the License.
@@ -25,6 +25,8 @@ import numpy as np
2525

2626

2727
def mg_katz_centrality(input_df,
28+
src_col_name,
29+
dst_col_name,
2830
num_global_verts,
2931
num_global_edges,
3032
vertex_partition_offsets,
@@ -43,8 +45,8 @@ def mg_katz_centrality(input_df,
4345
cdef size_t handle_size_t = <size_t>handle.getHandle()
4446
handle_ = <c_katz_centrality.handle_t*>handle_size_t
4547

46-
src = input_df['src']
47-
dst = input_df['dst']
48+
src = input_df[src_col_name]
49+
dst = input_df[dst_col_name]
4850
vertex_t = src.dtype
4951
if num_global_edges > (2**31 - 1):
5052
edge_t = np.dtype("int64")
@@ -79,7 +81,7 @@ def mg_katz_centrality(input_df,
7981
cdef uintptr_t c_edge_weights = <uintptr_t>NULL
8082
if weights is not None:
8183
c_edge_weights = weights.__cuda_array_interface__['data'][0]
82-
84+
8385
# FIXME: data is on device, move to host (to_pandas()), convert to np array and access pointer to pass to C
8486
vertex_partition_offsets_host = vertex_partition_offsets.values_host
8587
cdef uintptr_t c_vertex_partition_offsets = vertex_partition_offsets_host.__array_interface__['data'][0]
@@ -109,7 +111,7 @@ def mg_katz_centrality(input_df,
109111
num_global_verts, num_global_edges,
110112
is_weighted,
111113
False,
112-
True, True)
114+
True, True)
113115

114116
df = cudf.DataFrame()
115117
df['vertex'] = cudf.Series(np.arange(vertex_partition_offsets.iloc[rank], vertex_partition_offsets.iloc[rank+1]), dtype=vertex_t)

python/cugraph/cugraph/dask/common/input_utils.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
# Copyright (c) 2020-2021, NVIDIA CORPORATION.
1+
# Copyright (c) 2020-2022, NVIDIA CORPORATION.
22
#
33
# Licensed under the Apache License, Version 2.0 (the "License");
44
# you may not use this file except in compliance with the License.
@@ -229,7 +229,9 @@ def get_vertex_partition_offsets(input_graph):
229229
renumber_vertex_count = input_graph.renumber_map.implementation.ddf.\
230230
map_partitions(len).compute()
231231
renumber_vertex_cumsum = renumber_vertex_count.cumsum()
232-
vertex_dtype = input_graph.edgelist.edgelist_df['src'].dtype
232+
# Assume the input_graph edgelist was renumbered
233+
src_col_name = input_graph.renumber_map.renumbered_src_col_name
234+
vertex_dtype = input_graph.edgelist.edgelist_df[src_col_name].dtype
233235
vertex_partition_offsets = cudf.Series([0], dtype=vertex_dtype)
234236
vertex_partition_offsets = vertex_partition_offsets.append(cudf.Series(
235237
renumber_vertex_cumsum, dtype=vertex_dtype))

python/cugraph/cugraph/dask/community/louvain.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@
2626

2727
def call_louvain(sID,
2828
data,
29+
src_col_name,
30+
dst_col_name,
2931
num_verts,
3032
num_edges,
3133
vertex_partition_offsets,
@@ -38,6 +40,8 @@ def call_louvain(sID,
3840
segment_offsets = \
3941
aggregate_segment_offsets[local_size * wid: local_size * (wid + 1)]
4042
return c_mg_louvain.louvain(data[0],
43+
src_col_name,
44+
dst_col_name,
4145
num_verts,
4246
num_edges,
4347
vertex_partition_offsets,
@@ -130,9 +134,14 @@ def louvain(input_graph, max_iter=100, resolution=1.0):
130134
num_edges = len(ddf)
131135
data = get_distributed_data(ddf)
132136

137+
src_col_name = input_graph.renumber_map.renumbered_src_col_name
138+
dst_col_name = input_graph.renumber_map.renumbered_dst_col_name
139+
133140
futures = [client.submit(call_louvain,
134141
Comms.get_session_id(),
135142
wf[1],
143+
src_col_name,
144+
dst_col_name,
136145
num_verts,
137146
num_edges,
138147
vertex_partition_offsets,

python/cugraph/cugraph/dask/community/louvain_wrapper.pyx

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
# Copyright (c) 2020-2021, NVIDIA CORPORATION.
1+
# Copyright (c) 2020-2022, NVIDIA CORPORATION.
22
# Licensed under the Apache License, Version 2.0 (the "License");
33
# you may not use this file except in compliance with the License.
44
# You may obtain a copy of the License at
@@ -33,6 +33,8 @@ numberTypeMap = {np.dtype("int32") : <int>numberTypeEnum.int32Type,
3333

3434

3535
def louvain(input_df,
36+
src_col_name,
37+
dst_col_name,
3638
num_global_verts,
3739
num_global_edges,
3840
vertex_partition_offsets,
@@ -55,8 +57,8 @@ def louvain(input_df,
5557
# FIXME: much of this code is common to other algo wrappers, consider adding
5658
# this to a shared utility as well
5759

58-
src = input_df['src']
59-
dst = input_df['dst']
60+
src = input_df[src_col_name]
61+
dst = input_df[dst_col_name]
6062
num_local_edges = len(src)
6163

6264
if "value" in input_df.columns:

python/cugraph/cugraph/dask/components/connectivity.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121

2222
def call_wcc(sID,
2323
data,
24+
src_col_name,
25+
dst_col_name,
2426
num_verts,
2527
num_edges,
2628
vertex_partition_offsets,
@@ -31,6 +33,8 @@ def call_wcc(sID,
3133
segment_offsets = \
3234
aggregate_segment_offsets[local_size * wid: local_size * (wid + 1)]
3335
return mg_connectivity.mg_wcc(data[0],
36+
src_col_name,
37+
dst_col_name,
3438
num_verts,
3539
num_edges,
3640
vertex_partition_offsets,
@@ -62,9 +66,14 @@ def weakly_connected_components(input_graph):
6266
num_edges = len(ddf)
6367
data = get_distributed_data(ddf)
6468

69+
src_col_name = input_graph.renumber_map.renumbered_src_col_name
70+
dst_col_name = input_graph.renumber_map.renumbered_dst_col_name
71+
6572
result = [client.submit(call_wcc,
6673
Comms.get_session_id(),
6774
wf[1],
75+
src_col_name,
76+
dst_col_name,
6877
num_verts,
6978
num_edges,
7079
vertex_partition_offsets,

python/cugraph/cugraph/dask/components/mg_connectivity_wrapper.pyx

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
#
2-
# Copyright (c) 2021, NVIDIA CORPORATION.
2+
# Copyright (c) 2021-2022, NVIDIA CORPORATION.
33
#
44
# Licensed under the Apache License, Version 2.0 (the "License");
55
# you may not use this file except in compliance with the License.
@@ -25,6 +25,8 @@ import numpy as np
2525

2626

2727
def mg_wcc(input_df,
28+
src_col_name,
29+
dst_col_name,
2830
num_global_verts,
2931
num_global_edges,
3032
vertex_partition_offsets,
@@ -35,8 +37,8 @@ def mg_wcc(input_df,
3537
cdef size_t handle_size_t = <size_t>handle.getHandle()
3638
handle_ = <c_connectivity.handle_t*>handle_size_t
3739

38-
src = input_df['src']
39-
dst = input_df['dst']
40+
src = input_df[src_col_name]
41+
dst = input_df[dst_col_name]
4042
vertex_t = src.dtype
4143
if num_global_edges > (2**31 - 1):
4244
edge_t = np.dtype("int64")
@@ -91,15 +93,15 @@ def mg_wcc(input_df,
9193
is_weighted,
9294
True,
9395
False,
94-
True)
96+
True)
9597

9698
df = cudf.DataFrame()
9799
df['vertex'] = cudf.Series(np.arange(vertex_partition_offsets.iloc[rank], vertex_partition_offsets.iloc[rank+1]), dtype=vertex_t)
98100
df['labels'] = cudf.Series(np.zeros(len(df['vertex']), dtype=vertex_t))
99101

100102
cdef uintptr_t c_labels_val = df['labels'].__cuda_array_interface__['data'][0];
101103

102-
if vertex_t == np.int32:
104+
if vertex_t == np.int32:
103105
c_connectivity.call_wcc[int, float](handle_[0],
104106
graph_container,
105107
<int*>c_labels_val)

python/cugraph/cugraph/dask/link_analysis/mg_pagerank_wrapper.pyx

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
#
2-
# Copyright (c) 2020-2021, NVIDIA CORPORATION.
2+
# Copyright (c) 2020-2022, NVIDIA CORPORATION.
33
#
44
# Licensed under the Apache License, Version 2.0 (the "License");
55
# you may not use this file except in compliance with the License.
@@ -25,6 +25,8 @@ import numpy as np
2525

2626

2727
def mg_pagerank(input_df,
28+
src_col_name,
29+
dst_col_name,
2830
num_global_verts,
2931
num_global_edges,
3032
vertex_partition_offsets,
@@ -42,8 +44,8 @@ def mg_pagerank(input_df,
4244
cdef size_t handle_size_t = <size_t>handle.getHandle()
4345
handle_ = <c_pagerank.handle_t*>handle_size_t
4446

45-
src = input_df['src']
46-
dst = input_df['dst']
47+
src = input_df[src_col_name]
48+
dst = input_df[dst_col_name]
4749
vertex_t = src.dtype
4850
if num_global_edges > (2**31 - 1):
4951
edge_t = np.dtype("int64")
@@ -74,7 +76,7 @@ def mg_pagerank(input_df,
7476
cdef uintptr_t c_edge_weights = <uintptr_t>NULL
7577
if weights is not None:
7678
c_edge_weights = weights.__cuda_array_interface__['data'][0]
77-
79+
7880
# FIXME: data is on device, move to host (to_pandas()), convert to np array and access pointer to pass to C
7981
vertex_partition_offsets_host = vertex_partition_offsets.values_host
8082
cdef uintptr_t c_vertex_partition_offsets = vertex_partition_offsets_host.__array_interface__['data'][0]
@@ -104,7 +106,7 @@ def mg_pagerank(input_df,
104106
num_global_verts, num_global_edges,
105107
is_weighted,
106108
False,
107-
True, True)
109+
True, True)
108110

109111
df = cudf.DataFrame()
110112
df['vertex'] = cudf.Series(np.arange(vertex_partition_offsets.iloc[rank], vertex_partition_offsets.iloc[rank+1]), dtype=vertex_t)

0 commit comments

Comments
 (0)