From 6448ac4293b157ef70ea06d6ef0f3704dc1bee61 Mon Sep 17 00:00:00 2001 From: Roy Stogner Date: Thu, 25 Jun 2020 12:06:49 -0500 Subject: [PATCH 1/2] More packed_range approx_buffer_size options In particular this should be useful for gather/allgather, where concatenation of buffers can cause overflows with the default buffer size on sufficiently large-scale problems --- src/parallel/include/timpi/communicator.h | 58 +++++++++++++++++-- .../include/timpi/parallel_implementation.h | 33 ++++++----- .../include/timpi/serial_implementation.h | 9 ++- 3 files changed, 77 insertions(+), 23 deletions(-) diff --git a/src/parallel/include/timpi/communicator.h b/src/parallel/include/timpi/communicator.h index fc65685..0a787e0 100644 --- a/src/parallel/include/timpi/communicator.h +++ b/src/parallel/include/timpi/communicator.h @@ -638,6 +638,12 @@ class Communicator * unsigned int TIMPI::packable_size(const T *, const Context *) is * used to allow data vectors to reserve memory, and for additional * error checking + * + * The approximate maximum size (in *entries*; number of bytes will + * likely be 4x or 8x larger) to use in a single data vector buffer + * can be specified for performance or memory usage reasons; if the + * range cannot be packed into a single buffer of this size then + * multiple buffers and messages will be used. */ template inline @@ -645,7 +651,8 @@ class Communicator const Context * context, Iter range_begin, const Iter range_end, - const MessageTag & tag=no_tag) const; + const MessageTag & tag=no_tag, + std::size_t approx_buffer_size = 1000000) const; /** * Nonblocking-send range-of-pointers to one processor. This @@ -659,6 +666,12 @@ class Communicator * unsigned int TIMPI::packable_size(const T *, const Context *) is * used to allow data vectors to reserve memory, and for additional * error checking + * + * The approximate maximum size (in *entries*; number of bytes will + * likely be 4x or 8x larger) to use in a single data vector buffer + * can be specified for performance or memory usage reasons; if the + * range cannot be packed into a single buffer of this size then + * multiple buffers and messages will be used. */ template inline @@ -667,7 +680,8 @@ class Communicator Iter range_begin, const Iter range_end, Request & req, - const MessageTag & tag=no_tag) const; + const MessageTag & tag=no_tag, + std::size_t approx_buffer_size = 1000000) const; /** * Similar to the above Nonblocking send_packed_range with a few important differences: @@ -846,7 +860,8 @@ class Communicator OutputIter out, const T * output_type, // used only to infer T const MessageTag & send_tag = no_tag, - const MessageTag & recv_tag = any_tag) const; + const MessageTag & recv_tag = any_tag, + std::size_t approx_buffer_size = 1000000) const; /** * Send data \p send to one processor while simultaneously receiving @@ -1024,23 +1039,47 @@ class Communicator /** * Take a range of local variables, combine it with ranges from all * processors, and write the output to the output iterator on rank root. + * + * The approximate maximum size (in *entries*; number of bytes will + * likely be 4x or 8x larger) to use in a single data vector buffer + * to send can be specified for performance or memory usage reasons; + * if the range cannot be packed into a single buffer of this size + * then multiple buffers and messages will be used. + * + * Note that the received data vector sizes will be the *sum* of the + * sent vector sizes; a smaller-than-default size may be useful for + * users on many processors, in cases where all-to-one communication + * cannot be avoided entirely. */ template inline void gather_packed_range (const unsigned int root_id, Context * context, Iter range_begin, const Iter range_end, - OutputIter out) const; + OutputIter out, + std::size_t approx_buffer_size = 1000000) const; /** * Take a range of local variables, combine it with ranges from all * processors, and write the output to the output iterator. + * + * The approximate maximum size (in *entries*; number of bytes will + * likely be 4x or 8x larger) to use in a single data vector buffer + * to send can be specified for performance or memory usage reasons; + * if the range cannot be packed into a single buffer of this size + * then multiple buffers and messages will be used. + * + * Note that the received data vector sizes will be the *sum* of the + * sent vector sizes; a smaller-than-default size may be useful for + * users on many processors, in cases where all-to-one communication + * cannot be avoided entirely. */ template inline void allgather_packed_range (Context * context, Iter range_begin, const Iter range_end, - OutputIter out) const; + OutputIter out, + std::size_t approx_buffer_size = 1000000) const; /** * Effectively transposes the input vector across all processors. @@ -1079,6 +1118,12 @@ class Communicator * unsigned int TIMPI::packed_size(const T *, * vector::const_iterator) * is used to advance to the beginning of the next object's data. + * + * The approximate maximum size (in *entries*; number of bytes will + * likely be 4x or 8x larger) to use in a single data vector buffer + * can be specified for performance or memory usage reasons; if the + * range cannot be packed into a single buffer of this size then + * multiple buffers and messages will be used. */ template inline void broadcast_packed_range (const Context * context1, @@ -1086,7 +1131,8 @@ class Communicator const Iter range_end, OutputContext * context2, OutputIter out, - const unsigned int root_id = 0) const; + const unsigned int root_id = 0, + std::size_t approx_buffer_size = 1000000) const; /** * C++ doesn't let us partially specialize functions (we're really diff --git a/src/parallel/include/timpi/parallel_implementation.h b/src/parallel/include/timpi/parallel_implementation.h index f7b7156..3eb9070 100644 --- a/src/parallel/include/timpi/parallel_implementation.h +++ b/src/parallel/include/timpi/parallel_implementation.h @@ -586,7 +586,8 @@ inline void Communicator::send_packed_range (const unsigned int dest_processor_i const Context * context, Iter range_begin, const Iter range_end, - const MessageTag & tag) const + const MessageTag & tag, + std::size_t approx_buffer_size) const { // We will serialize variable size objects from *range_begin to // *range_end as a sequence of plain data (e.g. ints) in this buffer @@ -608,7 +609,7 @@ inline void Communicator::send_packed_range (const unsigned int dest_processor_i std::vector::buffer_type> buffer; const Iter next_range_begin = pack_range - (context, range_begin, range_end, buffer); + (context, range_begin, range_end, buffer, approx_buffer_size); timpi_assert_greater (std::distance(range_begin, next_range_begin), 0); @@ -634,7 +635,8 @@ inline void Communicator::send_packed_range (const unsigned int dest_processor_i Iter range_begin, const Iter range_end, Request & req, - const MessageTag & tag) const + const MessageTag & tag, + std::size_t approx_buffer_size) const { // Allocate a buffer on the heap so we don't have to free it until // after the Request::wait() @@ -670,9 +672,8 @@ inline void Communicator::send_packed_range (const unsigned int dest_processor_i std::vector * buffer = new std::vector(); - const Iter next_range_begin = - pack_range(context, range_begin, range_end, - *buffer); + const Iter next_range_begin = pack_range + (context, range_begin, range_end, *buffer, approx_buffer_size); timpi_assert_greater (std::distance(range_begin, next_range_begin), 0); @@ -1367,14 +1368,15 @@ Communicator::send_receive_packed_range (const unsigned int dest_processor_id, OutputIter out_iter, const T * output_type, const MessageTag & send_tag, - const MessageTag & recv_tag) const + const MessageTag & recv_tag, + std::size_t approx_buffer_size) const { TIMPI_LOG_SCOPE("send_receive()", "Parallel"); Request req; this->send_packed_range (dest_processor_id, context1, send_begin, send_end, - req, send_tag); + req, send_tag, approx_buffer_size); this->receive_packed_range (source_processor_id, context2, out_iter, output_type, recv_tag); @@ -1937,7 +1939,8 @@ inline void Communicator::broadcast_packed_range(const Context * context1, const Iter range_end, OutputContext * context2, OutputIter out_iter, - const unsigned int root_id) const + const unsigned int root_id, + std::size_t approx_buffer_size) const { typedef typename std::iterator_traits::value_type T; typedef typename Packing::buffer_type buffer_t; @@ -1950,7 +1953,7 @@ inline void Communicator::broadcast_packed_range(const Context * context1, if (this->rank() == root_id) range_begin = pack_range - (context1, range_begin, range_end, buffer); + (context1, range_begin, range_end, buffer, approx_buffer_size); // this->broadcast(vector) requires the receiving vectors to // already be the appropriate size @@ -3426,7 +3429,8 @@ inline void Communicator::gather_packed_range(const unsigned int root_id, Context * context, Iter range_begin, const Iter range_end, - OutputIter out_iter) const + OutputIter out_iter, + std::size_t approx_buffer_size) const { typedef typename std::iterator_traits::value_type T; typedef typename Packing::buffer_type buffer_t; @@ -3441,7 +3445,7 @@ inline void Communicator::gather_packed_range(const unsigned int root_id, std::vector buffer; range_begin = pack_range - (context, range_begin, range_end, buffer); + (context, range_begin, range_end, buffer, approx_buffer_size); this->gather(root_id, buffer); @@ -3458,7 +3462,8 @@ template inline void Communicator::allgather_packed_range(Context * context, Iter range_begin, const Iter range_end, - OutputIter out_iter) const + OutputIter out_iter, + std::size_t approx_buffer_size) const { typedef typename std::iterator_traits::value_type T; typedef typename Packing::buffer_type buffer_t; @@ -3473,7 +3478,7 @@ inline void Communicator::allgather_packed_range(Context * context, std::vector buffer; range_begin = pack_range - (context, range_begin, range_end, buffer); + (context, range_begin, range_end, buffer, approx_buffer_size); this->allgather(buffer, false); diff --git a/src/parallel/include/timpi/serial_implementation.h b/src/parallel/include/timpi/serial_implementation.h index ccbad43..ebcb03b 100644 --- a/src/parallel/include/timpi/serial_implementation.h +++ b/src/parallel/include/timpi/serial_implementation.h @@ -94,7 +94,8 @@ inline void Communicator::send_packed_range(const unsigned int, const Context *, Iter, const Iter, - const MessageTag &) const + const MessageTag &, + std::size_t) const { timpi_not_implemented(); } template @@ -103,7 +104,8 @@ inline void Communicator::send_packed_range (const unsigned int, Iter, const Iter, Request &, - const MessageTag &) const + const MessageTag &, + std::size_t) const { timpi_not_implemented(); } template @@ -204,7 +206,8 @@ Communicator::send_receive_packed_range OutputIter out_iter, const T * output_type, const MessageTag &, - const MessageTag &) const + const MessageTag &, + std::size_t) const { // This makes no sense on one processor unless we're deliberately // sending to ourself. From 767e95d0874eb0818ffcd9ade627fbe566374fc8 Mon Sep 17 00:00:00 2001 From: Roy Stogner Date: Thu, 25 Jun 2020 13:37:13 -0500 Subject: [PATCH 2/2] Limit the total buffer size in allgather dispatch Otherwise our default buffer size can overflow MPI with only thousands of processors in some cases. --- src/parallel/include/timpi/parallel_implementation.h | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/src/parallel/include/timpi/parallel_implementation.h b/src/parallel/include/timpi/parallel_implementation.h index 3eb9070..865c018 100644 --- a/src/parallel/include/timpi/parallel_implementation.h +++ b/src/parallel/include/timpi/parallel_implementation.h @@ -3031,12 +3031,17 @@ inline void Communicator::allgather(const T & sendval, timpi_assert(this->size()); recv.resize(this->size()); + static const std::size_t approx_total_buffer_size = 1e8; + const std::size_t approx_each_buffer_size = + approx_total_buffer_size / this->size(); + unsigned int comm_size = this->size(); if (comm_size > 1) { std::vector range = {sendval}; - allgather_packed_range((void *)(NULL), range.begin(), range.end(), recv.begin()); + allgather_packed_range((void *)(NULL), range.begin(), range.end(), recv.begin(), + approx_each_buffer_size); } else if (comm_size > 0) recv[0] = sendval;