|
| 1 | +# Copyright (c) 2022, NVIDIA CORPORATION. |
| 2 | +# |
| 3 | +# Licensed under the Apache License, Version 2.0 (the "License"); |
| 4 | +# you may not use this file except in compliance with the License. |
| 5 | +# You may obtain a copy of the License at |
| 6 | +# |
| 7 | +# http://www.apache.org/licenses/LICENSE-2.0 |
| 8 | +# |
| 9 | +# Unless required by applicable law or agreed to in writing, software |
| 10 | +# distributed under the License is distributed on an "AS IS" BASIS, |
| 11 | +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 12 | +# See the License for the specific language governing permissions and |
| 13 | +# limitations under the License. |
| 14 | +# |
| 15 | + |
| 16 | +from dask.distributed import wait |
| 17 | +import cugraph.dask.comms.comms as Comms |
| 18 | +import dask_cudf |
| 19 | +import cudf |
| 20 | +from cugraph.dask.common.input_utils import get_distributed_data |
| 21 | +from cugraph.utilities import renumber_vertex_pair |
| 22 | + |
| 23 | +from pylibcugraph.experimental import ( |
| 24 | + jaccard_coefficients as pylibcugraph_jaccard_coefficients, |
| 25 | +) |
| 26 | +from pylibcugraph import ResourceHandle |
| 27 | + |
| 28 | + |
| 29 | +def convert_to_cudf(cp_arrays): |
| 30 | + """ |
| 31 | + Creates a cudf DataFrame from cupy arrays from pylibcugraph wrapper |
| 32 | + """ |
| 33 | + |
| 34 | + cupy_source, cupy_destination, cupy_similarity = cp_arrays |
| 35 | + |
| 36 | + df = cudf.DataFrame() |
| 37 | + df["source"] = cupy_source |
| 38 | + df["destination"] = cupy_destination |
| 39 | + df["jaccard_coeff"] = cupy_similarity |
| 40 | + |
| 41 | + return df |
| 42 | + |
| 43 | + |
| 44 | +def _call_plc_jaccard( |
| 45 | + sID, mg_graph_x, vertex_pair, use_weight, do_expensive_check, vertex_pair_col_name |
| 46 | +): |
| 47 | + |
| 48 | + first = vertex_pair[vertex_pair_col_name[0]] |
| 49 | + second = vertex_pair[vertex_pair_col_name[1]] |
| 50 | + |
| 51 | + return pylibcugraph_jaccard_coefficients( |
| 52 | + resource_handle=ResourceHandle(Comms.get_handle(sID).getHandle()), |
| 53 | + graph=mg_graph_x, |
| 54 | + first=first, |
| 55 | + second=second, |
| 56 | + use_weight=use_weight, |
| 57 | + do_expensive_check=do_expensive_check, |
| 58 | + ) |
| 59 | + |
| 60 | + |
| 61 | +def jaccard(input_graph, vertex_pair=None, use_weight=False): |
| 62 | + """ |
| 63 | + Compute the Jaccard similarity between each pair of vertices connected by |
| 64 | + an edge, or between arbitrary pairs of vertices specified by the user. |
| 65 | + Jaccard similarity is defined between two sets as the ratio of the volume |
| 66 | + of their intersection divided by the volume of their union. In the context |
| 67 | + of graphs, the neighborhood of a vertex is seen as a set. The Jaccard |
| 68 | + similarity weight of each edge represents the strength of connection |
| 69 | + between vertices based on the relative similarity of their neighbors. If |
| 70 | + first is specified but second is not, or vice versa, an exception will be |
| 71 | + thrown. |
| 72 | +
|
| 73 | + NOTE: If the vertex_pair parameter is not specified then the behavior |
| 74 | + of cugraph.jaccard is different from the behavior of |
| 75 | + networkx.jaccard_coefficient. |
| 76 | +
|
| 77 | + cugraph.dask.jaccard, in the absence of a specified vertex pair list, will |
| 78 | + compute the two_hop_neighbors of the entire graph to construct a vertex pair |
| 79 | + list and will return the jaccard coefficient for those vertex pairs. This is |
| 80 | + not advisable as the vertex_pairs can grow exponentially with respect to the |
| 81 | + size of the datasets |
| 82 | +
|
| 83 | + networkx.jaccard_coefficient, in the absence of a specified vertex |
| 84 | + pair list, will return an upper triangular dense matrix, excluding |
| 85 | + the diagonal as well as vertex pairs that are directly connected |
| 86 | + by an edge in the graph, of jaccard coefficients. Technically, networkx |
| 87 | + returns a lazy iterator across this upper triangular matrix where |
| 88 | + the actual jaccard coefficient is computed when the iterator is |
| 89 | + dereferenced. Computing a dense matrix of results is not feasible |
| 90 | + if the number of vertices in the graph is large (100,000 vertices |
| 91 | + would result in 4.9 billion values in that iterator). |
| 92 | +
|
| 93 | + If your graph is small enough (or you have enough memory and patience) |
| 94 | + you can get the interesting (non-zero) values that are part of the networkx |
| 95 | + solution by doing the following: |
| 96 | +
|
| 97 | + But please remember that cugraph will fill the dataframe with the entire |
| 98 | + solution you request, so you'll need enough memory to store the 2-hop |
| 99 | + neighborhood dataframe. |
| 100 | +
|
| 101 | +
|
| 102 | + Parameters |
| 103 | + ---------- |
| 104 | + input_graph : cugraph.Graph |
| 105 | + cuGraph Graph instance, should contain the connectivity information |
| 106 | + as an edge list (edge weights are not used for this algorithm). The |
| 107 | + graph should be undirected where an undirected edge is represented by a |
| 108 | + directed edge in both direction. The adjacency list will be computed if |
| 109 | + not already present. |
| 110 | +
|
| 111 | + This implementation only supports undirected, unweighted Graph. |
| 112 | +
|
| 113 | + vertex_pair : cudf.DataFrame, optional (default=None) |
| 114 | + A GPU dataframe consisting of two columns representing pairs of |
| 115 | + vertices. If provided, the jaccard coefficient is computed for the |
| 116 | + given vertex pairs. If the vertex_pair is not provided then the |
| 117 | + current implementation computes the jaccard coefficient for all |
| 118 | + adjacent vertices in the graph. |
| 119 | +
|
| 120 | + use_weight : bool, optional (default=False) |
| 121 | + Currently not supported |
| 122 | +
|
| 123 | + Returns |
| 124 | + ------- |
| 125 | + result : dask_cudf.DataFrame |
| 126 | + GPU distributed data frame containing 2 dask_cudf.Series |
| 127 | +
|
| 128 | + ddf['source']: dask_cudf.Series |
| 129 | + The source vertex ID (will be identical to first if specified) |
| 130 | + ddf['destination']: dask_cudf.Series |
| 131 | + The destination vertex ID (will be identical to second if |
| 132 | + specified) |
| 133 | + ddf['jaccard_coeff']: dask_cudf.Series |
| 134 | + The computed Jaccard coefficient between the source and destination |
| 135 | + vertices |
| 136 | + """ |
| 137 | + |
| 138 | + if input_graph.is_directed(): |
| 139 | + raise ValueError("input graph must be undirected") |
| 140 | + |
| 141 | + if vertex_pair is None: |
| 142 | + # Call two_hop neighbor of the entire graph |
| 143 | + vertex_pair = input_graph.get_two_hop_neighbors() |
| 144 | + |
| 145 | + vertex_pair_col_name = vertex_pair.columns |
| 146 | + |
| 147 | + if use_weight: |
| 148 | + raise ValueError( |
| 149 | + "'use_weight' is currently not supported and must be set to 'False'" |
| 150 | + ) |
| 151 | + |
| 152 | + # FIXME: Implement a better way to check if the graph is weighted similar |
| 153 | + # to 'simpleGraph' |
| 154 | + if len(input_graph.edgelist.edgelist_df.columns) == 3: |
| 155 | + raise RuntimeError("input graph must be unweighted") |
| 156 | + |
| 157 | + if isinstance(vertex_pair, (dask_cudf.DataFrame, cudf.DataFrame)): |
| 158 | + vertex_pair = renumber_vertex_pair(input_graph, vertex_pair) |
| 159 | + |
| 160 | + elif vertex_pair is not None: |
| 161 | + raise ValueError("vertex_pair must be a dask_cudf or cudf dataframe") |
| 162 | + |
| 163 | + if not isinstance(vertex_pair, (dask_cudf.DataFrame)): |
| 164 | + vertex_pair = dask_cudf.from_cudf( |
| 165 | + vertex_pair, npartitions=len(Comms.get_workers()) |
| 166 | + ) |
| 167 | + vertex_pair = get_distributed_data(vertex_pair) |
| 168 | + wait(vertex_pair) |
| 169 | + vertex_pair = vertex_pair.worker_to_parts |
| 170 | + |
| 171 | + # Initialize dask client |
| 172 | + client = input_graph._client |
| 173 | + |
| 174 | + do_expensive_check = False |
| 175 | + |
| 176 | + if vertex_pair is not None: |
| 177 | + result = [ |
| 178 | + client.submit( |
| 179 | + _call_plc_jaccard, |
| 180 | + Comms.get_session_id(), |
| 181 | + input_graph._plc_graph[w], |
| 182 | + vertex_pair[w][0], |
| 183 | + use_weight, |
| 184 | + do_expensive_check, |
| 185 | + vertex_pair_col_name, |
| 186 | + workers=[w], |
| 187 | + allow_other_workers=False, |
| 188 | + ) |
| 189 | + for w in Comms.get_workers() |
| 190 | + ] |
| 191 | + |
| 192 | + wait(result) |
| 193 | + |
| 194 | + cudf_result = [client.submit(convert_to_cudf, cp_arrays) for cp_arrays in result] |
| 195 | + |
| 196 | + wait(cudf_result) |
| 197 | + |
| 198 | + ddf = dask_cudf.from_delayed(cudf_result).persist() |
| 199 | + wait(ddf) |
| 200 | + |
| 201 | + # Wait until the inactive futures are released |
| 202 | + wait([(r.release(), c_r.release()) for r, c_r in zip(result, cudf_result)]) |
| 203 | + |
| 204 | + if input_graph.renumbered: |
| 205 | + ddf = input_graph.unrenumber(ddf, "source") |
| 206 | + ddf = input_graph.unrenumber(ddf, "destination") |
| 207 | + |
| 208 | + return ddf |
0 commit comments