API improvements for end-to-end MG sampling performance#3269
Conversation
seunghwak
left a comment
There was a problem hiding this comment.
Few quick comments, let me think more about this and I will post additional questions.
| std::optional<rmm::device_uvector<int32_t>>, | ||
| std::optional<rmm::device_vector<edge_t>>> | ||
| uniform_neighbor_sample( | ||
| raft::handle_t const& handle, |
There was a problem hiding this comment.
So, these are what this API should satisfy.
- If
starting_verticesincludes seeds from multiple batches, the caller should be able to separate sampling outputs from different batches and the caller should be able to place sampling outputs on different GPUs based on mapping between batch IDs and GPUs. If the caller is invoking this function for a single batch, the caller may want the sampling outputs for the localsampling_verticesto be stored locally as well. - For the output stored on a single GPU, we should be able to sort them based on batch IDs.
- Once we sort based on the batch ID, some users may want to sort based on hops as a primary key (if they don't need to create a separate tree per seed within a batch) or seed IDs as a primary key and hops as a secondary key (if they need to create a separate tree per seed).
There was a problem hiding this comment.
I am thinking about the following,
This function optionally takes rmm::device_uvector<label_t>&& labels and rmm::device_uvector<int>&& dst_comm_ranks (labels.size() == dst_comm_ranks.size() == start_vertices.size()).
Here labels don't necessarily coincide with batch IDs, but users may set labels to batch_id * batch_size + seed index within a batch (if the result need to be sorted within a batch using seed IDs as a primary key and hops as a secondary key) or just batch_id (if no need to create a tree per seed). The output results are shuffled based on dst_comm_ranks for each starting vertex (if dst_comm_ranks is valid) or all the results for local seeds will be stored locally.
We return optional labels and hops for each edge data (src, dst, optional (edge weight, ID, type)). Within each GPU, the output results will be sorted using labels as a primary key (if labels are provided in the input) and hops as a secondary key.
One concern is that computing global "seed index within a batch" might be challenging for the caller if they want to distribute seeds for a single batch to multiple GPUs, but I guess this is not a common use case. If there are 100 batches and 10 GPUs, users may assign batch [0-10) to GPU0 (so start_vertices for GPU0 holding all the seeds for batch [0-10)), batch [10-20) to GPU1, and so on.
We may be able to reduce the output size if we don't return label, hop per every edge data and create offset arrays per label & hop, but I am not sure the benefit out-weights the increase in complexity. If the memory footprint is a major concern, we may just reduce the number of batches we process per call (as the main reason for processing multiple batches in a single call is to saturate GPUs, but if we're hitting the memory limit, we might be going to far in this direction).
Anything am I missing or any other thoughts?
There was a problem hiding this comment.
This is a good point, they're a lot to unpack here. Let me create a few separate comments.
The original objective of processing multiple batches in a single call is to efficiently utilize the parallelism on the GPU. If we do individual calls with a small batch size then we don't get enough parallelism and the overhead of calling the function is large relative to the work accomplished. By processing multiple batches of seeds in the same call we increase the amount of parallel work to be done on the GPU and we reduce the number of times we face the overhead of calling the function, allocating memory, launching kernels, etc. This improves our overall efficiency.
I don't know where the threshold is, but at some point there are enough vertices in a call to get the efficiencies we are looking for. Doing larger numbers of batches in a single call should still drive down overhead, but the improvement should decrease dramatically once we reach saturation of the GPU parallelism. It seems like (with the large graphs we are targeting) that this threshold is much less than "all seeds to be sampled in a training epoch".
In the current pipeline, the output from this processing is going to be moved out of GPU memory to be retrieved by the trainers as they work (currently written at the python layer to parquet files). As long as we process in the first call enough batches to keep the trainers busy while we do a second call, there would be no drop in pipeline throughput by making multiple calls to the sampling code with smaller numbers of batches. That would reduce the memory requirement, and we can overlap the sampling with the training a bit more. It might actually improve the overall latency for training an epoch (although perhaps only marginally, I don't really know the performance numbers you're dealing with).
This does suggest that the extra memory required to drive this function doesn't need to be a driving factor in the design of the API.
There was a problem hiding this comment.
Regarding the labels discussion. Part of the reason I went with the name "label" rather than "batch" was this notion that batches might not be the only reason for labeling the data.
There's absolutely no reason that the sampling code needs to understand anything about how the caller assigns labels and uses them. If we keep with the label as a vector (labels.size() == start_vertices.size()) rather than switching to a CSR-style representation, that label can literally be any arbitrary 32-bit integer value (doesn't need to be contiguous values starting from 0). Either route for organizing the result (either mapping label to GPU id or providing a destination vector (dst_comm_ranks.size() == start_vertices.size()) allows for us to shuffle the data to the correct GPU. We could talk about sorting options (beyond sorting by label) if that would be helpful, although that seems like an easy enough feature to add later when we need it.
It seems like much of the use cases you describe can be addressed by the caller creating the labels in a more sophisticated way and then grouping the results once they are returned. There's no reason the caller couldn't create labels such that the training batch is decomposed into k labels that all get mapped to the same GPU and the results will be combined by the caller. That allows for construction of the trees. That allows for the same seed to be repeated within a batch and being able to differentiate between which tree came from which seed.
The beauty there is that the responsibility for managing the sophistication lies in the caller rather than in the library function.
There was a problem hiding this comment.
I don't know where the threshold is, but at some point there are enough vertices in a call to get the efficiencies we are looking for.
I guess the memory footprint for the sampling is a significant percentage (say more than 10%) of the GPU's total memory, we may not be too far from saturating the GPU. And I agree that "This does suggest that the extra memory required to drive this function doesn't need to be a driving factor in the design of the API." We still need to be frugal with memory but it should not be the #1 priority when we need to make trade-offs.
There was a problem hiding this comment.
Regarding the labels discussion. Part of the reason I went with the name "label" rather than "batch" was this notion that batches might not be the only reason for labeling the data.
There's absolutely no reason that the sampling code needs to understand anything about how the caller assigns labels and uses them.
Yes, now I agree with this. And I just want to emphasize that this API should allow creating one tree per seed within a batch, so we need to provide a mechanism to distinguishes sampled edges from different seeds, and labels can serve this purpose (32 bit might be sufficient for the foreseeable future, but in few GPU generations in the future, especially with grace-hopper like systems, we may exceed the 32 bit boundary, so we may keep it as label_t than int32_t).
There was a problem hiding this comment.
I will add a label_t and only instantiate int32_t for now.
…od sampling, default to current behavior
| template <typename vertex_t, typename edge_t, bool store_transposed, bool multi_gpu> | ||
| rmm::device_uvector<vertex_t> select_random_vertices( | ||
| raft::handle_t const& handle, | ||
|
|
There was a problem hiding this comment.
No need for this empty space.
There was a problem hiding this comment.
Removed in next push
| labels->end(), | ||
| [output_label = std::get<0>(*label_to_output_comm_rank), | ||
| output_rank = std::get<1>(*label_to_output_comm_rank)] __device__(auto val) { | ||
| auto pos = thrust::find(thrust::seq, output_label.begin(), output_label.end(), val); |
There was a problem hiding this comment.
thrust::find is linear scan, this can be expensive if output_label.size() is large. Better use binary search (if output_labels can be sorted) or a hash table. May look for prims/kv_store.cuh as well.
There was a problem hiding this comment.
I meant to go back and look at that, I will fix in next push.
| thrust::count_if(handle.get_thrust_policy(), | ||
| thrust::make_counting_iterator<size_t>(0), | ||
| thrust::make_counting_iterator<size_t>(labels->size()), | ||
| [d_labels = labels->data()] __device__(size_t idx) { |
There was a problem hiding this comment.
You can use predefined is_first_in_run_t.
auto num_unique_majors =
thrust::count_if(rmm::exec_policy(loop_stream),
thrust::make_counting_iterator(size_t{0}),
thrust::make_counting_iterator(this_chunk_size),
is_first_in_run_t<vertex_t const*>{tmp_majors.data()});
There was a problem hiding this comment.
Thanks for the link. I knew we did something like this in the renumbering code, but I wasn't able to find it.
Refactored in the next push to look like the linked renumbering code.
| rmm::device_uvector<label_t> unique_labels(num_unique_labels, handle.get_stream()); | ||
|
|
||
| thrust::copy_if( | ||
| handle.get_thrust_policy(), | ||
| thrust::make_zip_iterator(thrust::make_counting_iterator<size_t>(0), labels->begin()), | ||
| thrust::make_zip_iterator(thrust::make_counting_iterator<size_t>(labels->size()), | ||
| labels->end()), | ||
| thrust::make_zip_iterator(thrust::make_discard_iterator(), unique_labels.begin()), | ||
| [d_labels = labels->data()] __device__(auto tuple) { | ||
| size_t idx = thrust::get<0>(tuple); | ||
| return (idx == 0) || (d_labels[idx - 1] != d_labels[idx]); | ||
| }); | ||
|
|
||
| thrust::transform(handle.get_thrust_policy(), | ||
| labels->begin(), | ||
| labels->end(), | ||
| labels->begin(), | ||
| [d_unique_labels = raft::device_span<label_t const>{ | ||
| unique_labels.data(), unique_labels.size()}] __device__(label_t label) { | ||
| auto pos = thrust::lower_bound( | ||
| thrust::seq, d_unique_labels.begin(), d_unique_labels.end(), label); | ||
| return static_cast<label_t>(thrust::distance(d_unique_labels.begin(), pos)); | ||
| }); | ||
|
|
||
| offsets = detail::compute_sparse_offsets<size_t>( | ||
| labels->begin(), labels->end(), size_t{0}, unique_labels.size(), handle.get_stream()); | ||
| labels = std::move(unique_labels); |
There was a problem hiding this comment.
Can't you just use reduce_by_key? labels are already sorted and you want to compute the counts of each labels (you can compute offsets from counts by running scan).
There was a problem hiding this comment.
Refactored in the next push to look like the linked renumbering code.
| * @return true if sorted, false if not sorted | ||
| */ | ||
| template <typename data_t> | ||
| bool is_span_sorted(raft::handle_t const& handle, raft::device_span<data_t> span); |
There was a problem hiding this comment.
Just a suggestion, but span in is_span_sorted sounds a bit redundant. This is taking a device_span so it is clear that this checks whether a device_span is sorted or not even without span in the function name.
There was a problem hiding this comment.
Fixed in next push.
| thrust::tabulate( | ||
| handle_->get_thrust_policy(), | ||
| batch_number.begin(), | ||
| batch_number.end(), | ||
| [rank = handle_->get_comms().get_rank(), | ||
| batch_size = uniform_neighbor_sampling_usecase.batch_size] __device__(int32_t index) { | ||
| return rank + (index / batch_size); | ||
| }); | ||
|
|
||
| size_t num_batches = 1 + handle_->get_comms().get_rank() + | ||
| (batch_number.size() / uniform_neighbor_sampling_usecase.batch_size); | ||
| num_batches = cugraph::host_scalar_allreduce( | ||
| handle_->get_comms(), num_batches, raft::comms::op_t::MAX, handle_->get_stream()); | ||
|
|
||
| rmm::device_uvector<int32_t> unique_batches(num_batches, handle_->get_stream()); | ||
| rmm::device_uvector<int32_t> comm_ranks(num_batches, handle_->get_stream()); | ||
|
|
||
| cugraph::detail::sequence_fill( | ||
| handle_->get_stream(), unique_batches.data(), unique_batches.size(), int32_t{0}); | ||
| thrust::tabulate(handle_->get_thrust_policy(), | ||
| batch_number.begin(), | ||
| batch_number.end(), | ||
| [batch_size = uniform_neighbor_sampling_usecase.batch_size] __device__( | ||
| int32_t index) { return index / batch_size; }); | ||
| comm_ranks.begin(), | ||
| comm_ranks.end(), | ||
| [num_gpus = handle_->get_comms().get_size()] __device__(auto index) { | ||
| return index % num_gpus; | ||
| }); |
There was a problem hiding this comment.
Is this logic correct?
After
thrust::tabulate(
handle_->get_thrust_policy(),
batch_number.begin(),
batch_number.end(),
[rank = handle_->get_comms().get_rank(),
batch_size = uniform_neighbor_sampling_usecase.batch_size] __device__(int32_t index) {
return rank + (index / batch_size);
});
Batch IDs in each GPU will be [rank, (local_number_of_sources + (batch_size - 1)) / batch_size).
Say local_number_of_sources = 100 and batch_size = 10.
GPU 0, batch IDs will be [0, 10)
GPU 1, Batch IDs will be [1, 11).
Isn't num_batches the number of unique batch IDs? In the case above, this should be 11 (0, 1, 2, ... , 10), right?
Not sure how
size_t num_batches = 1 + handle_->get_comms().get_rank() +
(batch_number.size() / uniform_neighbor_sampling_usecase.batch_size);
num_batches = cugraph::host_scalar_allreduce(
handle_->get_comms(), num_batches, raft::comms::op_t::MAX, handle_->get_stream());
computes the number of unique batches.
And shouldn't the batch_size be globally enforced for each batch ID? Based on this logic, batch_size will be a local batch_size, and global batch_size for each batch ID will grow with # GPUs. Am I missing something?
There was a problem hiding this comment.
Yes, this logic is faulty. I will reconsider.
There was a problem hiding this comment.
Restructured this code in latest push. Batch sizes should now be correct except for the last batch which might be small.
There was a problem hiding this comment.
The new update looks good to me.
| num_batches] __device__(int32_t index) { return (seed_offset + index) % num_batches; }); | ||
|
|
||
| rmm::device_uvector<int32_t> unique_batches(num_batches, handle_->get_stream()); | ||
| rmm::device_uvector<int32_t> comm_ranks(num_batches, handle_->get_stream()); |
There was a problem hiding this comment.
So, are we replicating the same unique_batches and comm_ranks in every GPU?
I might be thinking too far, but this can be a problem if # labels is very large. May not need to address in this PR, but something we need to at least start thinking about.
| if TYPE_CHECKING: | ||
| from cugraph import Graph | ||
|
|
||
| src_n = "sources" |
There was a problem hiding this comment.
no code change
We really need a utility class with all the standard column names.
|
/merge |
Recent discussion has identified some performance issues in end-to-end testing of the MG sampling code.
This PR implements changes to the API to improve the end-to-end performance.