Skip to content
37 changes: 21 additions & 16 deletions cpp/src/community/detail/common_methods.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -224,17 +224,19 @@ rmm::device_uvector<typename graph_view_t::vertex_type> update_clustering_by_del
cugraph::detail::compute_gpu_id_from_ext_vertex_t<vertex_t> vertex_to_gpu_id_op{
handle.get_comms().get_size()};

vertex_cluster_weights_v =
cugraph::collect_values_for_keys(handle.get_comms(),
cluster_keys_v.begin(),
cluster_keys_v.end(),
cluster_weights_v.data(),
next_clusters_v.begin(),
next_clusters_v.end(),
vertex_to_gpu_id_op,
invalid_vertex_id<vertex_t>::value,
std::numeric_limits<weight_t>::max(),
handle.get_stream());
kv_store_t<vertex_t, weight_t, false> cluster_key_weight_map(
cluster_keys_v.begin(),
cluster_keys_v.end(),
cluster_weights_v.data(),
invalid_vertex_id<vertex_t>::value,
std::numeric_limits<weight_t>::max(),
handle.get_stream());
vertex_cluster_weights_v = cugraph::collect_values_for_keys(handle.get_comms(),
cluster_key_weight_map.view(),
next_clusters_v.begin(),
next_clusters_v.end(),
vertex_to_gpu_id_op,
handle.get_stream());

src_cluster_weights = edge_src_property_t<graph_view_t, weight_t>(handle, graph_view);
update_edge_src_property(
Expand Down Expand Up @@ -329,18 +331,21 @@ rmm::device_uvector<typename graph_view_t::vertex_type> update_clustering_by_del
decltype(cluster_old_sum_subtract_pair_first)>(
cluster_old_sum_subtract_pair_first));

kv_store_t<vertex_t, weight_t, false> cluster_key_weight_map(
cluster_keys_v.begin(),
cluster_keys_v.begin() + cluster_keys_v.size(),
cluster_weights_v.begin(),
invalid_vertex_id<vertex_t>::value,
std::numeric_limits<weight_t>::max(),
handle.get_stream());
per_v_transform_reduce_dst_key_aggregated_outgoing_e(
handle,
graph_view,
zipped_src_device_view,
graph_view_t::is_multi_gpu ? dst_clusters_cache.view()
: detail::edge_minor_property_view_t<vertex_t, vertex_t const*>(
next_clusters_v.data(), vertex_t{0}),
cluster_keys_v.begin(),
cluster_keys_v.end(),
cluster_weights_v.begin(),
invalid_vertex_id<vertex_t>::value,
std::numeric_limits<weight_t>::max(),
cluster_key_weight_map.view(),
detail::key_aggregated_edge_op_t<vertex_t, weight_t>{total_edge_weight, resolution},
thrust::make_tuple(vertex_t{-1}, weight_t{0}),
detail::reduce_op_t<vertex_t, weight_t>{},
Expand Down
69 changes: 20 additions & 49 deletions cpp/src/prims/detail/nbr_intersection.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
*/
#pragma once

#include <prims/kv_store.cuh>

#include <cugraph/edge_partition_device_view.cuh>
#include <cugraph/graph.hpp>
#include <cugraph/partition_manager.hpp>
Expand All @@ -23,7 +25,6 @@
#include <cugraph/utilities/shuffle_comm.cuh>
#include <cugraph/utilities/thrust_tuple_utils.hpp>

#include <cuco/static_map.cuh>
#include <raft/core/device_span.hpp>
#include <raft/handle.hpp>
#include <rmm/device_uvector.hpp>
Expand Down Expand Up @@ -271,10 +272,7 @@ struct pick_min_degree_t {
edge_partition.local_degree(edge_partition.major_offset_from_major_nocheck(major0));
}
} else {
auto idx =
first_element_to_idx_map
.find(major0, cuco::detail::MurmurHash3_32<vertex_t>{}, thrust::equal_to<vertex_t>{})
->second.load(cuda::memory_order_relaxed);
auto idx = first_element_to_idx_map.find(major0);
local_degree0 =
static_cast<edge_t>(first_element_offsets[idx + 1] - first_element_offsets[idx]);
}
Expand Down Expand Up @@ -302,10 +300,7 @@ struct pick_min_degree_t {
edge_partition.local_degree(edge_partition.major_offset_from_major_nocheck(major1));
}
} else {
auto idx =
second_element_to_idx_map
.find(major1, cuco::detail::MurmurHash3_32<vertex_t>{}, thrust::equal_to<vertex_t>{})
->second.load(cuda::memory_order_relaxed);
auto idx = second_element_to_idx_map.find(major1);
local_degree1 =
static_cast<edge_t>(second_element_offsets[idx + 1] - second_element_offsets[idx]);
}
Expand Down Expand Up @@ -366,11 +361,7 @@ struct copy_intersecting_nbrs_and_update_intersection_size_t {
edge_partition.local_edges(edge_partition.major_offset_from_major_nocheck(major));
}
} else {
auto idx = first_element_to_idx_map
.find(thrust::get<0>(pair),
cuco::detail::MurmurHash3_32<vertex_t>{},
thrust::equal_to<vertex_t>{})
->second.load(cuda::memory_order_relaxed);
auto idx = first_element_to_idx_map.find(thrust::get<0>(pair));
local_degree0 =
static_cast<edge_t>(first_element_offsets[idx + 1] - first_element_offsets[idx]);
indices0 = first_element_indices + first_element_offsets[idx];
Expand Down Expand Up @@ -400,11 +391,7 @@ struct copy_intersecting_nbrs_and_update_intersection_size_t {
edge_partition.local_edges(edge_partition.major_offset_from_major_nocheck(major));
}
} else {
auto idx = second_element_to_idx_map
.find(thrust::get<1>(pair),
cuco::detail::MurmurHash3_32<vertex_t>{},
thrust::equal_to<vertex_t>{})
->second.load(cuda::memory_order_relaxed);
auto idx = second_element_to_idx_map.find(thrust::get<1>(pair));
local_degree1 =
static_cast<edge_t>(second_element_offsets[idx + 1] - second_element_offsets[idx]);
indices1 = second_element_indices + second_element_offsets[idx];
Expand Down Expand Up @@ -573,8 +560,6 @@ nbr_intersection(raft::handle_t const& handle,
using edge_t = typename GraphViewType::edge_type;
using weight_t = typename GraphViewType::weight_type;

double constexpr load_factor = 0.7;

static_assert(std::is_same_v<typename thrust::iterator_traits<VertexPairIterator>::value_type,
thrust::tuple<vertex_t, vertex_t>>);

Expand Down Expand Up @@ -606,13 +591,8 @@ nbr_intersection(raft::handle_t const& handle,
// range for this GPU); Note that no need to collect for first pair elements as they already
// locally reside.

auto poly_alloc = rmm::mr::polymorphic_allocator<char>(rmm::mr::get_current_device_resource());
[[maybe_unused]] auto stream_adapter =
rmm::mr::make_stream_allocator_adaptor(poly_alloc, handle.get_stream());

std::optional<std::unique_ptr<
cuco::static_map<vertex_t, vertex_t, cuda::thread_scope_device, decltype(stream_adapter)>>>
major_to_idx_map_ptr{std::nullopt};
std::optional<std::unique_ptr<kv_store_t<vertex_t, vertex_t, false>>> major_to_idx_map_ptr{
std::nullopt};
std::optional<rmm::device_uvector<size_t>> major_nbr_offsets{std::nullopt};
std::optional<rmm::device_uvector<vertex_t>> major_nbr_indices{std::nullopt};

Expand Down Expand Up @@ -862,31 +842,20 @@ nbr_intersection(raft::handle_t const& handle,
std::tie(*major_nbr_indices, std::ignore) = shuffle_values(
row_comm, local_nbrs_for_rx_majors.begin(), local_nbr_counts, handle.get_stream());

major_to_idx_map_ptr = std::make_unique<
cuco::static_map<vertex_t, vertex_t, cuda::thread_scope_device, decltype(stream_adapter)>>(
// cuco::static_map requires at least one empty slot
std::max(static_cast<size_t>(static_cast<double>(unique_majors.size()) / load_factor),
static_cast<size_t>(unique_majors.size()) + 1),
cuco::sentinel::empty_key<vertex_t>{invalid_vertex_id<vertex_t>::value},
cuco::sentinel::empty_value<vertex_t>{invalid_vertex_id<vertex_t>::value},
stream_adapter,
major_to_idx_map_ptr = std::make_unique<kv_store_t<vertex_t, vertex_t, false>>(
unique_majors.begin(),
unique_majors.end(),
thrust::make_counting_iterator(vertex_t{0}),
invalid_vertex_id<vertex_t>::value,
invalid_vertex_id<vertex_t>::value,
handle.get_stream());
auto pair_first = thrust::make_zip_iterator(unique_majors.begin(),
thrust::make_counting_iterator(vertex_t{0}));
(*major_to_idx_map_ptr)
->insert(pair_first,
pair_first + unique_majors.size(),
cuco::detail::MurmurHash3_32<vertex_t>{},
thrust::equal_to<vertex_t>{},
handle.get_stream());
}
}

// 3. Collect neighbor list for minors (for the neighbors within the minor range for this GPU)

std::optional<std::unique_ptr<
cuco::static_map<vertex_t, vertex_t, cuda::thread_scope_device, decltype(stream_adapter)>>>
minor_to_idx_map_ptr{std::nullopt};
std::optional<std::unique_ptr<kv_store_t<vertex_t, vertex_t, false>>> minor_to_idx_map_ptr{
std::nullopt};
std::optional<rmm::device_uvector<size_t>> minor_nbr_offsets{std::nullopt};
std::optional<rmm::device_uvector<vertex_t>> minor_nbr_indices{std::nullopt};

Expand Down Expand Up @@ -985,7 +954,8 @@ nbr_intersection(raft::handle_t const& handle,
handle
.get_stream()); // initially store minimum degrees (upper bound for intersection sizes)
if (intersect_minor_nbr[0] && intersect_minor_nbr[1]) {
auto second_element_to_idx_map = (*major_to_idx_map_ptr)->get_device_view();
auto second_element_to_idx_map =
detail::kv_cuco_store_device_view_t((*major_to_idx_map_ptr)->view());
thrust::transform(handle.get_thrust_policy(),
get_dataframe_buffer_begin(vertex_pair_buffer),
get_dataframe_buffer_end(vertex_pair_buffer),
Expand Down Expand Up @@ -1017,7 +987,8 @@ nbr_intersection(raft::handle_t const& handle,
rx_v_pair_nbr_intersection_offsets.back_element(handle.get_stream()),
handle.get_stream());
if (intersect_minor_nbr[0] && intersect_minor_nbr[1]) {
auto second_element_to_idx_map = (*major_to_idx_map_ptr)->get_device_view();
auto second_element_to_idx_map =
detail::kv_cuco_store_device_view_t((*major_to_idx_map_ptr)->view());
thrust::tabulate(handle.get_thrust_policy(),
rx_v_pair_nbr_intersection_sizes.begin(),
rx_v_pair_nbr_intersection_sizes.end(),
Expand Down
Loading