From 5e79b09e68acc3b69efbff0ab7f8b6e6ddff8bd7 Mon Sep 17 00:00:00 2001 From: "Roy H. Stogner" Date: Wed, 6 May 2020 08:44:41 -0500 Subject: [PATCH 1/6] Clean up C++ headers in packing.h --- src/parallel/include/timpi/packing.h | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/parallel/include/timpi/packing.h b/src/parallel/include/timpi/packing.h index f578931..4ee8a89 100644 --- a/src/parallel/include/timpi/packing.h +++ b/src/parallel/include/timpi/packing.h @@ -25,11 +25,11 @@ #include "timpi/standard_type.h" // C++ includes -#include +#include // memcpy #include +#include // enable_if, is_same +#include // pair #include -#include -#include // FIXME: This *should* be in TIMPI namespace but we have libMesh From f42d11d34133c1c069339692a5cfc68673bb92b3 Mon Sep 17 00:00:00 2001 From: "Roy H. Stogner" Date: Wed, 6 May 2020 12:55:59 -0500 Subject: [PATCH 2/6] Remove atavistic variables Not sure why these aren't triggering unused variable warnings for me. --- src/algorithms/include/timpi/parallel_sync.h | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/src/algorithms/include/timpi/parallel_sync.h b/src/algorithms/include/timpi/parallel_sync.h index ebb55dc..830d75c 100644 --- a/src/algorithms/include/timpi/parallel_sync.h +++ b/src/algorithms/include/timpi/parallel_sync.h @@ -169,8 +169,6 @@ void push_parallel_vector_data(const Communicator & comm, // without confusing one for the other auto tag = comm.get_unique_tag(); - MapToVectors received_data; - // Post all of the sends, non-blocking and synchronous // Save off the old send_mode so we can restore it after this @@ -328,8 +326,6 @@ void push_parallel_packed_range(const Communicator & comm, // without confusing one for the other auto tag = comm.get_unique_tag(); - MapToVectors received_data; - // Post all of the sends, non-blocking and synchronous // Save off the old send_mode so we can restore it after this @@ -481,7 +477,7 @@ void pull_parallel_vector_data(const Communicator & comm, typedef typename MapToVectors::mapped_type query_type; std::multimap > - response_data, received_data; + response_data; #ifndef NDEBUG processor_id_type max_pid = 0; From 48fbe7fdd55584f087f85c7ca549a3f3c6f8598c Mon Sep 17 00:00:00 2001 From: "Roy H. Stogner" Date: Wed, 6 May 2020 12:57:09 -0500 Subject: [PATCH 3/6] Fix possibly-uninitialized variable warning --- src/algorithms/include/timpi/parallel_sync.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/algorithms/include/timpi/parallel_sync.h b/src/algorithms/include/timpi/parallel_sync.h index 830d75c..90acfdc 100644 --- a/src/algorithms/include/timpi/parallel_sync.h +++ b/src/algorithms/include/timpi/parallel_sync.h @@ -371,7 +371,7 @@ void push_parallel_packed_range(const Communicator & comm, std::multimap>> incoming_data; auto current_incoming_data = std::make_shared>(); - nonconst_nonref_type * output_type; + nonconst_nonref_type * output_type = nullptr; unsigned int current_src_proc = 0; From f88416fd07b3123cfff72486a205a792e1e06c55 Mon Sep 17 00:00:00 2001 From: "Roy H. Stogner" Date: Wed, 6 May 2020 13:02:07 -0500 Subject: [PATCH 4/6] Container-agnostic push_parallel_packed_range Since we have to pack and unpack *anyway*, there's no great reason to force users to put the things they're packing into vectors; we can now handle set/multiset/etc too. --- src/algorithms/include/timpi/parallel_sync.h | 26 ++++++++++---------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/src/algorithms/include/timpi/parallel_sync.h b/src/algorithms/include/timpi/parallel_sync.h index 90acfdc..991d1a0 100644 --- a/src/algorithms/include/timpi/parallel_sync.h +++ b/src/algorithms/include/timpi/parallel_sync.h @@ -304,11 +304,11 @@ void push_parallel_vector_data(const Communicator & comm, } -template void push_parallel_packed_range(const Communicator & comm, - const MapToVectors & data, + const MapToContainers & data, Context * context, const ActionFunctor & act_on_data) { @@ -318,8 +318,9 @@ void push_parallel_packed_range(const Communicator & comm, // This function implements the "NBX" algorithm from // https://htor.inf.ethz.ch/publications/img/hoefler-dsde-protocols.pdf - typedef decltype(data.begin()->second.front()) ref_type; - typedef typename std::remove_reference::type nonref_type; + typedef typename MapToContainers::value_type map_pair_type; + typedef typename map_pair_type::second_type container_type; + typedef typename container_type::value_type nonref_type; typedef typename std::remove_const::type nonconst_nonref_type; // We'll grab a tag so we can overlap request sends and receives @@ -368,8 +369,8 @@ void push_parallel_packed_range(const Communicator & comm, std::list>> receive_reqs; auto current_request = std::make_shared(); - std::multimap>> incoming_data; - auto current_incoming_data = std::make_shared>(); + std::multimap> incoming_data; + auto current_incoming_data = std::make_shared(); nonconst_nonref_type * output_type = nullptr; @@ -382,19 +383,18 @@ void push_parallel_packed_range(const Communicator & comm, current_src_proc = TIMPI::any_source; // Check if there is a message and start receiving it - if (comm.possibly_receive_packed_range(current_src_proc, - context, - std::back_inserter(*current_incoming_data), - output_type, - *current_request, - tag)) + if (comm.possibly_receive_packed_range + (current_src_proc, context, + std::inserter(*current_incoming_data, + current_incoming_data->end()), + output_type, *current_request, tag)) { receive_reqs.emplace_back(current_src_proc, current_request); current_request = std::make_shared(); // current_src_proc will now hold the src pid for this receive incoming_data.emplace(current_src_proc, current_incoming_data); - current_incoming_data = std::make_shared>(); + current_incoming_data = std::make_shared(); } // Clean up outstanding receive requests From 4caf5337a8b09fb85959e3467db779a837edfcd4 Mon Sep 17 00:00:00 2001 From: "Roy H. Stogner" Date: Wed, 6 May 2020 13:17:38 -0500 Subject: [PATCH 5/6] No timpi_experimental on nonblocking packed_range We're now starting to get some test coverage on these, they're working, and they're uniquely named enough that even if we come up with a better API we'll be able to keep backward compatibility with these. --- src/parallel/include/timpi/parallel_implementation.h | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/src/parallel/include/timpi/parallel_implementation.h b/src/parallel/include/timpi/parallel_implementation.h index 62a0406..7c87b9b 100644 --- a/src/parallel/include/timpi/parallel_implementation.h +++ b/src/parallel/include/timpi/parallel_implementation.h @@ -257,8 +257,6 @@ inline Status Communicator::packed_range_probe (const unsigned int src_processor { TIMPI_LOG_SCOPE("packed_range_probe()", "Parallel"); - timpi_experimental(); - Status stat((StandardType::buffer_type>())); int int_flag; @@ -737,8 +735,6 @@ inline void Communicator::nonblocking_send_packed_range (const unsigned int dest Request & req, const MessageTag & tag) const { - timpi_experimental(); - // Allocate a buffer on the heap so we don't have to free it until // after the Request::wait() typedef typename std::iterator_traits::value_type T; @@ -1201,8 +1197,6 @@ inline void Communicator::nonblocking_receive_packed_range (const unsigned int s Status & stat, const MessageTag & tag) const { - timpi_experimental(); - typedef typename Packing::buffer_type buffer_t; // Receive serialized variable size objects as a sequence of @@ -1421,8 +1415,6 @@ inline void Communicator::nonblocking_send_packed_range (const unsigned int dest std::shared_ptr::value_type>::buffer_type>> & buffer, const MessageTag & tag) const { - timpi_experimental(); - // Allocate a buffer on the heap so we don't have to free it until // after the Request::wait() typedef typename std::iterator_traits::value_type T; @@ -2014,8 +2006,6 @@ inline void Communicator::nonblocking_receive_packed_range (const unsigned int s std::shared_ptr::buffer_type>> & buffer, const MessageTag & tag) const { - timpi_experimental(); - // If they didn't pass in a buffer - let's make one if (buffer == nullptr) buffer = std::make_shared::buffer_type>>(); From 21b67daf6ecca547eeed065867597bf0422a4fa5 Mon Sep 17 00:00:00 2001 From: "Roy H. Stogner" Date: Wed, 6 May 2020 13:18:57 -0500 Subject: [PATCH 6/6] Unit tests for push_parallel_packed_range --- test/packed_range_unit.C | 92 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 92 insertions(+) diff --git a/test/packed_range_unit.C b/test/packed_range_unit.C index 6f8c7c4..3f700f9 100644 --- a/test/packed_range_unit.C +++ b/test/packed_range_unit.C @@ -1,6 +1,10 @@ +#include #include #include +#include +#include +#include #include #define TIMPI_UNIT_ASSERT(expr) \ @@ -197,6 +201,92 @@ Communicator *TestCommWorld; TIMPI_UNIT_ASSERT(recv[0] == check); } + void testPushPackedImpl(int M) + { + const int size = TestCommWorld->size(), + rank = TestCommWorld->rank(); + + std::map> + data, received_data; + + auto stringy_number = [] (int number) + { + std::string digit_strings [10] = {"zero", "one", "two", + "three", "four", "five", "six", "seven", "eight", "nine"}; + + std::string returnval = "done"; + while (number) + { + returnval = digit_strings[number%10]+" "+returnval; + number = number/10; + }; + + return returnval; + }; + + for (int d=0; d != M; ++d) + { + int diffsize = std::abs(d-rank); + int diffsqrt = std::sqrt(diffsize); + if (diffsqrt*diffsqrt == diffsize) + for (int i=-1; i != diffsqrt; ++i) + data[d].insert(stringy_number(d)); + } + + auto collect_data = + [&received_data] + (processor_id_type pid, + const typename std::multiset & data) + { + auto & received = received_data[pid]; + received.insert(data.begin(), data.end()); + }; + + void * context = nullptr; + TIMPI::push_parallel_packed_range(*TestCommWorld, data, context, collect_data); + + // Test the received results, for each processor id p we're in + // charge of. + std::vector checked_sizes(size, 0); + for (int p=rank; p < M; p += size) + for (int srcp=0; srcp != size; ++srcp) + { + int diffsize = std::abs(srcp-p); + int diffsqrt = std::sqrt(diffsize); + if (diffsqrt*diffsqrt != diffsize) + { + if (received_data.count(srcp)) + { + const std::multiset & datum = received_data[srcp]; + TIMPI_UNIT_ASSERT + (std::count(datum.begin(), datum.end(), + stringy_number(p)) == std::ptrdiff_t(0)); + } + continue; + } + + TIMPI_UNIT_ASSERT(received_data.count(srcp) == std::size_t(1)); + const std::multiset & datum = received_data[srcp]; + TIMPI_UNIT_ASSERT + (std::count(datum.begin(), datum.end(), stringy_number(p)) == + std::ptrdiff_t(diffsqrt+1)); + checked_sizes[srcp] += diffsqrt+1; + } + + for (int srcp=0; srcp != size; ++srcp) + TIMPI_UNIT_ASSERT(checked_sizes[srcp] == received_data[srcp].size()); + + } + + void testPushPacked() + { + testPushPackedImpl(TestCommWorld->size()); + } + + void testPushPackedOversized() + { + testPushPackedImpl((TestCommWorld->size() + 4) * 2); + } int main(int argc, const char * const * argv) { @@ -207,6 +297,8 @@ int main(int argc, const char * const * argv) testNullSendReceive(); testContainerAllGather(); testContainerSendReceive(); + testPushPacked(); + testPushPackedOversized(); return 0; }