Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 52 additions & 6 deletions src/parallel/include/timpi/communicator.h
Original file line number Diff line number Diff line change
Expand Up @@ -638,14 +638,21 @@ class Communicator
* unsigned int TIMPI::packable_size(const T *, const Context *) is
* used to allow data vectors to reserve memory, and for additional
* error checking
*
* The approximate maximum size (in *entries*; number of bytes will
* likely be 4x or 8x larger) to use in a single data vector buffer
* can be specified for performance or memory usage reasons; if the
* range cannot be packed into a single buffer of this size then
* multiple buffers and messages will be used.
*/
template <typename Context, typename Iter>
inline
void send_packed_range (const unsigned int dest_processor_id,
const Context * context,
Iter range_begin,
const Iter range_end,
const MessageTag & tag=no_tag) const;
const MessageTag & tag=no_tag,
std::size_t approx_buffer_size = 1000000) const;

/**
* Nonblocking-send range-of-pointers to one processor. This
Expand All @@ -659,6 +666,12 @@ class Communicator
* unsigned int TIMPI::packable_size(const T *, const Context *) is
* used to allow data vectors to reserve memory, and for additional
* error checking
*
* The approximate maximum size (in *entries*; number of bytes will
* likely be 4x or 8x larger) to use in a single data vector buffer
* can be specified for performance or memory usage reasons; if the
* range cannot be packed into a single buffer of this size then
* multiple buffers and messages will be used.
*/
template <typename Context, typename Iter>
inline
Expand All @@ -667,7 +680,8 @@ class Communicator
Iter range_begin,
const Iter range_end,
Request & req,
const MessageTag & tag=no_tag) const;
const MessageTag & tag=no_tag,
std::size_t approx_buffer_size = 1000000) const;

/**
* Similar to the above Nonblocking send_packed_range with a few important differences:
Expand Down Expand Up @@ -846,7 +860,8 @@ class Communicator
OutputIter out,
const T * output_type, // used only to infer T
const MessageTag & send_tag = no_tag,
const MessageTag & recv_tag = any_tag) const;
const MessageTag & recv_tag = any_tag,
std::size_t approx_buffer_size = 1000000) const;

/**
* Send data \p send to one processor while simultaneously receiving
Expand Down Expand Up @@ -1024,23 +1039,47 @@ class Communicator
/**
* Take a range of local variables, combine it with ranges from all
* processors, and write the output to the output iterator on rank root.
*
* The approximate maximum size (in *entries*; number of bytes will
* likely be 4x or 8x larger) to use in a single data vector buffer
* to send can be specified for performance or memory usage reasons;
* if the range cannot be packed into a single buffer of this size
* then multiple buffers and messages will be used.
*
* Note that the received data vector sizes will be the *sum* of the
* sent vector sizes; a smaller-than-default size may be useful for
* users on many processors, in cases where all-to-one communication
* cannot be avoided entirely.
*/
template <typename Context, typename Iter, typename OutputIter>
inline void gather_packed_range (const unsigned int root_id,
Context * context,
Iter range_begin,
const Iter range_end,
OutputIter out) const;
OutputIter out,
std::size_t approx_buffer_size = 1000000) const;

/**
* Take a range of local variables, combine it with ranges from all
* processors, and write the output to the output iterator.
*
* The approximate maximum size (in *entries*; number of bytes will
* likely be 4x or 8x larger) to use in a single data vector buffer
* to send can be specified for performance or memory usage reasons;
* if the range cannot be packed into a single buffer of this size
* then multiple buffers and messages will be used.
*
* Note that the received data vector sizes will be the *sum* of the
* sent vector sizes; a smaller-than-default size may be useful for
* users on many processors, in cases where all-to-one communication
* cannot be avoided entirely.
*/
template <typename Context, typename Iter, typename OutputIter>
inline void allgather_packed_range (Context * context,
Iter range_begin,
const Iter range_end,
OutputIter out) const;
OutputIter out,
std::size_t approx_buffer_size = 1000000) const;

/**
* Effectively transposes the input vector across all processors.
Expand Down Expand Up @@ -1079,14 +1118,21 @@ class Communicator
* unsigned int TIMPI::packed_size(const T *,
* vector<int>::const_iterator)
* is used to advance to the beginning of the next object's data.
*
* The approximate maximum size (in *entries*; number of bytes will
* likely be 4x or 8x larger) to use in a single data vector buffer
* can be specified for performance or memory usage reasons; if the
* range cannot be packed into a single buffer of this size then
* multiple buffers and messages will be used.
*/
template <typename Context, typename OutputContext, typename Iter, typename OutputIter>
inline void broadcast_packed_range (const Context * context1,
Iter range_begin,
const Iter range_end,
OutputContext * context2,
OutputIter out,
const unsigned int root_id = 0) const;
const unsigned int root_id = 0,
std::size_t approx_buffer_size = 1000000) const;

/**
* C++ doesn't let us partially specialize functions (we're really
Expand Down
40 changes: 25 additions & 15 deletions src/parallel/include/timpi/parallel_implementation.h
Original file line number Diff line number Diff line change
Expand Up @@ -586,7 +586,8 @@ inline void Communicator::send_packed_range (const unsigned int dest_processor_i
const Context * context,
Iter range_begin,
const Iter range_end,
const MessageTag & tag) const
const MessageTag & tag,
std::size_t approx_buffer_size) const
{
// We will serialize variable size objects from *range_begin to
// *range_end as a sequence of plain data (e.g. ints) in this buffer
Expand All @@ -608,7 +609,7 @@ inline void Communicator::send_packed_range (const unsigned int dest_processor_i
std::vector<typename Packing<T>::buffer_type> buffer;

const Iter next_range_begin = pack_range
(context, range_begin, range_end, buffer);
(context, range_begin, range_end, buffer, approx_buffer_size);

timpi_assert_greater (std::distance(range_begin, next_range_begin), 0);

Expand All @@ -634,7 +635,8 @@ inline void Communicator::send_packed_range (const unsigned int dest_processor_i
Iter range_begin,
const Iter range_end,
Request & req,
const MessageTag & tag) const
const MessageTag & tag,
std::size_t approx_buffer_size) const
{
// Allocate a buffer on the heap so we don't have to free it until
// after the Request::wait()
Expand Down Expand Up @@ -670,9 +672,8 @@ inline void Communicator::send_packed_range (const unsigned int dest_processor_i

std::vector<buffer_t> * buffer = new std::vector<buffer_t>();

const Iter next_range_begin =
pack_range(context, range_begin, range_end,
*buffer);
const Iter next_range_begin = pack_range
(context, range_begin, range_end, *buffer, approx_buffer_size);

timpi_assert_greater (std::distance(range_begin, next_range_begin), 0);

Expand Down Expand Up @@ -1367,14 +1368,15 @@ Communicator::send_receive_packed_range (const unsigned int dest_processor_id,
OutputIter out_iter,
const T * output_type,
const MessageTag & send_tag,
const MessageTag & recv_tag) const
const MessageTag & recv_tag,
std::size_t approx_buffer_size) const
{
TIMPI_LOG_SCOPE("send_receive()", "Parallel");

Request req;

this->send_packed_range (dest_processor_id, context1, send_begin, send_end,
req, send_tag);
req, send_tag, approx_buffer_size);

this->receive_packed_range (source_processor_id, context2, out_iter,
output_type, recv_tag);
Expand Down Expand Up @@ -1937,7 +1939,8 @@ inline void Communicator::broadcast_packed_range(const Context * context1,
const Iter range_end,
OutputContext * context2,
OutputIter out_iter,
const unsigned int root_id) const
const unsigned int root_id,
std::size_t approx_buffer_size) const
{
typedef typename std::iterator_traits<Iter>::value_type T;
typedef typename Packing<T>::buffer_type buffer_t;
Expand All @@ -1950,7 +1953,7 @@ inline void Communicator::broadcast_packed_range(const Context * context1,

if (this->rank() == root_id)
range_begin = pack_range
(context1, range_begin, range_end, buffer);
(context1, range_begin, range_end, buffer, approx_buffer_size);

// this->broadcast(vector) requires the receiving vectors to
// already be the appropriate size
Expand Down Expand Up @@ -3028,12 +3031,17 @@ inline void Communicator::allgather(const T & sendval,
timpi_assert(this->size());
recv.resize(this->size());

static const std::size_t approx_total_buffer_size = 1e8;

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not know we can write an integer as 1e8. I thought is was a float type.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is, but C++ will silently convert float types to integral types, and if the compiler can prove that the float value fits in the integer exactly (as it can with a compile-time constant) then even -Wconversion won't complain about it.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume that there is a motivation for the choice of this number? Just at first glance it seems kind of arbitrary

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The total message should be less than 1G.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when considering today most the supercomputers will have 2G-4G per core

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My thinking exactly, @fdkong. I basically divided 1GB by a typical sizeof(buffertype) of 8 bytes then shrunk it a little further to be safe.

Though to be sure, I still don't like leaving this out of user control. If you weren't happy with my still-pretty-arbitrary choice then my next thought would have been to move this from a static local into a Communicator member variable, and I'm still kind of considering it regardless.

const std::size_t approx_each_buffer_size =
approx_total_buffer_size / this->size();

unsigned int comm_size = this->size();
if (comm_size > 1)
{
std::vector<T> range = {sendval};

allgather_packed_range((void *)(NULL), range.begin(), range.end(), recv.begin());
allgather_packed_range((void *)(NULL), range.begin(), range.end(), recv.begin(),
approx_each_buffer_size);
}
else if (comm_size > 0)
recv[0] = sendval;
Expand Down Expand Up @@ -3426,7 +3434,8 @@ inline void Communicator::gather_packed_range(const unsigned int root_id,
Context * context,
Iter range_begin,
const Iter range_end,
OutputIter out_iter) const
OutputIter out_iter,
std::size_t approx_buffer_size) const
{
typedef typename std::iterator_traits<Iter>::value_type T;
typedef typename Packing<T>::buffer_type buffer_t;
Expand All @@ -3441,7 +3450,7 @@ inline void Communicator::gather_packed_range(const unsigned int root_id,
std::vector<buffer_t> buffer;

range_begin = pack_range
(context, range_begin, range_end, buffer);
(context, range_begin, range_end, buffer, approx_buffer_size);

this->gather(root_id, buffer);

Expand All @@ -3458,7 +3467,8 @@ template <typename Context, typename Iter, typename OutputIter>
inline void Communicator::allgather_packed_range(Context * context,
Iter range_begin,
const Iter range_end,
OutputIter out_iter) const
OutputIter out_iter,
std::size_t approx_buffer_size) const
{
typedef typename std::iterator_traits<Iter>::value_type T;
typedef typename Packing<T>::buffer_type buffer_t;
Expand All @@ -3473,7 +3483,7 @@ inline void Communicator::allgather_packed_range(Context * context,
std::vector<buffer_t> buffer;

range_begin = pack_range
(context, range_begin, range_end, buffer);
(context, range_begin, range_end, buffer, approx_buffer_size);

this->allgather(buffer, false);

Expand Down
9 changes: 6 additions & 3 deletions src/parallel/include/timpi/serial_implementation.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,8 @@ inline void Communicator::send_packed_range(const unsigned int,
const Context *,
Iter,
const Iter,
const MessageTag &) const
const MessageTag &,
std::size_t) const
{ timpi_not_implemented(); }

template <typename Context, typename Iter>
Expand All @@ -103,7 +104,8 @@ inline void Communicator::send_packed_range (const unsigned int,
Iter,
const Iter,
Request &,
const MessageTag &) const
const MessageTag &,
std::size_t) const
{ timpi_not_implemented(); }

template <typename Context, typename Iter>
Expand Down Expand Up @@ -204,7 +206,8 @@ Communicator::send_receive_packed_range
OutputIter out_iter,
const T * output_type,
const MessageTag &,
const MessageTag &) const
const MessageTag &,
std::size_t) const
{
// This makes no sense on one processor unless we're deliberately
// sending to ourself.
Expand Down