diff --git a/src/algorithms/include/timpi/parallel_sync.h b/src/algorithms/include/timpi/parallel_sync.h index ebb55dc..991d1a0 100644 --- a/src/algorithms/include/timpi/parallel_sync.h +++ b/src/algorithms/include/timpi/parallel_sync.h @@ -169,8 +169,6 @@ void push_parallel_vector_data(const Communicator & comm, // without confusing one for the other auto tag = comm.get_unique_tag(); - MapToVectors received_data; - // Post all of the sends, non-blocking and synchronous // Save off the old send_mode so we can restore it after this @@ -306,11 +304,11 @@ void push_parallel_vector_data(const Communicator & comm, } -template void push_parallel_packed_range(const Communicator & comm, - const MapToVectors & data, + const MapToContainers & data, Context * context, const ActionFunctor & act_on_data) { @@ -320,16 +318,15 @@ void push_parallel_packed_range(const Communicator & comm, // This function implements the "NBX" algorithm from // https://htor.inf.ethz.ch/publications/img/hoefler-dsde-protocols.pdf - typedef decltype(data.begin()->second.front()) ref_type; - typedef typename std::remove_reference::type nonref_type; + typedef typename MapToContainers::value_type map_pair_type; + typedef typename map_pair_type::second_type container_type; + typedef typename container_type::value_type nonref_type; typedef typename std::remove_const::type nonconst_nonref_type; // We'll grab a tag so we can overlap request sends and receives // without confusing one for the other auto tag = comm.get_unique_tag(); - MapToVectors received_data; - // Post all of the sends, non-blocking and synchronous // Save off the old send_mode so we can restore it after this @@ -372,10 +369,10 @@ void push_parallel_packed_range(const Communicator & comm, std::list>> receive_reqs; auto current_request = std::make_shared(); - std::multimap>> incoming_data; - auto current_incoming_data = std::make_shared>(); + std::multimap> incoming_data; + auto current_incoming_data = std::make_shared(); - nonconst_nonref_type * output_type; + nonconst_nonref_type * output_type = nullptr; unsigned int current_src_proc = 0; @@ -386,19 +383,18 @@ void push_parallel_packed_range(const Communicator & comm, current_src_proc = TIMPI::any_source; // Check if there is a message and start receiving it - if (comm.possibly_receive_packed_range(current_src_proc, - context, - std::back_inserter(*current_incoming_data), - output_type, - *current_request, - tag)) + if (comm.possibly_receive_packed_range + (current_src_proc, context, + std::inserter(*current_incoming_data, + current_incoming_data->end()), + output_type, *current_request, tag)) { receive_reqs.emplace_back(current_src_proc, current_request); current_request = std::make_shared(); // current_src_proc will now hold the src pid for this receive incoming_data.emplace(current_src_proc, current_incoming_data); - current_incoming_data = std::make_shared>(); + current_incoming_data = std::make_shared(); } // Clean up outstanding receive requests @@ -481,7 +477,7 @@ void pull_parallel_vector_data(const Communicator & comm, typedef typename MapToVectors::mapped_type query_type; std::multimap > - response_data, received_data; + response_data; #ifndef NDEBUG processor_id_type max_pid = 0; diff --git a/src/parallel/include/timpi/packing.h b/src/parallel/include/timpi/packing.h index f578931..4ee8a89 100644 --- a/src/parallel/include/timpi/packing.h +++ b/src/parallel/include/timpi/packing.h @@ -25,11 +25,11 @@ #include "timpi/standard_type.h" // C++ includes -#include +#include // memcpy #include +#include // enable_if, is_same +#include // pair #include -#include -#include // FIXME: This *should* be in TIMPI namespace but we have libMesh diff --git a/src/parallel/include/timpi/parallel_implementation.h b/src/parallel/include/timpi/parallel_implementation.h index 62a0406..7c87b9b 100644 --- a/src/parallel/include/timpi/parallel_implementation.h +++ b/src/parallel/include/timpi/parallel_implementation.h @@ -257,8 +257,6 @@ inline Status Communicator::packed_range_probe (const unsigned int src_processor { TIMPI_LOG_SCOPE("packed_range_probe()", "Parallel"); - timpi_experimental(); - Status stat((StandardType::buffer_type>())); int int_flag; @@ -737,8 +735,6 @@ inline void Communicator::nonblocking_send_packed_range (const unsigned int dest Request & req, const MessageTag & tag) const { - timpi_experimental(); - // Allocate a buffer on the heap so we don't have to free it until // after the Request::wait() typedef typename std::iterator_traits::value_type T; @@ -1201,8 +1197,6 @@ inline void Communicator::nonblocking_receive_packed_range (const unsigned int s Status & stat, const MessageTag & tag) const { - timpi_experimental(); - typedef typename Packing::buffer_type buffer_t; // Receive serialized variable size objects as a sequence of @@ -1421,8 +1415,6 @@ inline void Communicator::nonblocking_send_packed_range (const unsigned int dest std::shared_ptr::value_type>::buffer_type>> & buffer, const MessageTag & tag) const { - timpi_experimental(); - // Allocate a buffer on the heap so we don't have to free it until // after the Request::wait() typedef typename std::iterator_traits::value_type T; @@ -2014,8 +2006,6 @@ inline void Communicator::nonblocking_receive_packed_range (const unsigned int s std::shared_ptr::buffer_type>> & buffer, const MessageTag & tag) const { - timpi_experimental(); - // If they didn't pass in a buffer - let's make one if (buffer == nullptr) buffer = std::make_shared::buffer_type>>(); diff --git a/test/packed_range_unit.C b/test/packed_range_unit.C index 6f8c7c4..3f700f9 100644 --- a/test/packed_range_unit.C +++ b/test/packed_range_unit.C @@ -1,6 +1,10 @@ +#include #include #include +#include +#include +#include #include #define TIMPI_UNIT_ASSERT(expr) \ @@ -197,6 +201,92 @@ Communicator *TestCommWorld; TIMPI_UNIT_ASSERT(recv[0] == check); } + void testPushPackedImpl(int M) + { + const int size = TestCommWorld->size(), + rank = TestCommWorld->rank(); + + std::map> + data, received_data; + + auto stringy_number = [] (int number) + { + std::string digit_strings [10] = {"zero", "one", "two", + "three", "four", "five", "six", "seven", "eight", "nine"}; + + std::string returnval = "done"; + while (number) + { + returnval = digit_strings[number%10]+" "+returnval; + number = number/10; + }; + + return returnval; + }; + + for (int d=0; d != M; ++d) + { + int diffsize = std::abs(d-rank); + int diffsqrt = std::sqrt(diffsize); + if (diffsqrt*diffsqrt == diffsize) + for (int i=-1; i != diffsqrt; ++i) + data[d].insert(stringy_number(d)); + } + + auto collect_data = + [&received_data] + (processor_id_type pid, + const typename std::multiset & data) + { + auto & received = received_data[pid]; + received.insert(data.begin(), data.end()); + }; + + void * context = nullptr; + TIMPI::push_parallel_packed_range(*TestCommWorld, data, context, collect_data); + + // Test the received results, for each processor id p we're in + // charge of. + std::vector checked_sizes(size, 0); + for (int p=rank; p < M; p += size) + for (int srcp=0; srcp != size; ++srcp) + { + int diffsize = std::abs(srcp-p); + int diffsqrt = std::sqrt(diffsize); + if (diffsqrt*diffsqrt != diffsize) + { + if (received_data.count(srcp)) + { + const std::multiset & datum = received_data[srcp]; + TIMPI_UNIT_ASSERT + (std::count(datum.begin(), datum.end(), + stringy_number(p)) == std::ptrdiff_t(0)); + } + continue; + } + + TIMPI_UNIT_ASSERT(received_data.count(srcp) == std::size_t(1)); + const std::multiset & datum = received_data[srcp]; + TIMPI_UNIT_ASSERT + (std::count(datum.begin(), datum.end(), stringy_number(p)) == + std::ptrdiff_t(diffsqrt+1)); + checked_sizes[srcp] += diffsqrt+1; + } + + for (int srcp=0; srcp != size; ++srcp) + TIMPI_UNIT_ASSERT(checked_sizes[srcp] == received_data[srcp].size()); + + } + + void testPushPacked() + { + testPushPackedImpl(TestCommWorld->size()); + } + + void testPushPackedOversized() + { + testPushPackedImpl((TestCommWorld->size() + 4) * 2); + } int main(int argc, const char * const * argv) { @@ -207,6 +297,8 @@ int main(int argc, const char * const * argv) testNullSendReceive(); testContainerAllGather(); testContainerSendReceive(); + testPushPacked(); + testPushPackedOversized(); return 0; }