Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 15 additions & 19 deletions src/algorithms/include/timpi/parallel_sync.h
Original file line number Diff line number Diff line change
Expand Up @@ -169,8 +169,6 @@ void push_parallel_vector_data(const Communicator & comm,
// without confusing one for the other
auto tag = comm.get_unique_tag();

MapToVectors received_data;

// Post all of the sends, non-blocking and synchronous

// Save off the old send_mode so we can restore it after this
Expand Down Expand Up @@ -306,11 +304,11 @@ void push_parallel_vector_data(const Communicator & comm,
}


template <typename MapToVectors,
template <typename MapToContainers,
typename ActionFunctor,
typename Context>
void push_parallel_packed_range(const Communicator & comm,
const MapToVectors & data,
const MapToContainers & data,
Context * context,
const ActionFunctor & act_on_data)
{
Expand All @@ -320,16 +318,15 @@ void push_parallel_packed_range(const Communicator & comm,
// This function implements the "NBX" algorithm from
// https://htor.inf.ethz.ch/publications/img/hoefler-dsde-protocols.pdf

typedef decltype(data.begin()->second.front()) ref_type;
typedef typename std::remove_reference<ref_type>::type nonref_type;
typedef typename MapToContainers::value_type map_pair_type;
typedef typename map_pair_type::second_type container_type;
typedef typename container_type::value_type nonref_type;
typedef typename std::remove_const<nonref_type>::type nonconst_nonref_type;

// We'll grab a tag so we can overlap request sends and receives
// without confusing one for the other
auto tag = comm.get_unique_tag();

MapToVectors received_data;

// Post all of the sends, non-blocking and synchronous

// Save off the old send_mode so we can restore it after this
Expand Down Expand Up @@ -372,10 +369,10 @@ void push_parallel_packed_range(const Communicator & comm,
std::list<std::pair<unsigned int, std::shared_ptr<Request>>> receive_reqs;
auto current_request = std::make_shared<Request>();

std::multimap<processor_id_type, std::shared_ptr<std::vector<nonconst_nonref_type>>> incoming_data;
auto current_incoming_data = std::make_shared<std::vector<nonconst_nonref_type>>();
std::multimap<processor_id_type, std::shared_ptr<container_type>> incoming_data;

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a really good switch.

auto current_incoming_data = std::make_shared<container_type>();

nonconst_nonref_type * output_type;
nonconst_nonref_type * output_type = nullptr;

unsigned int current_src_proc = 0;

Expand All @@ -386,19 +383,18 @@ void push_parallel_packed_range(const Communicator & comm,
current_src_proc = TIMPI::any_source;

// Check if there is a message and start receiving it
if (comm.possibly_receive_packed_range(current_src_proc,
context,
std::back_inserter(*current_incoming_data),
output_type,
*current_request,
tag))
if (comm.possibly_receive_packed_range
(current_src_proc, context,
std::inserter(*current_incoming_data,
current_incoming_data->end()),
output_type, *current_request, tag))
{
receive_reqs.emplace_back(current_src_proc, current_request);
current_request = std::make_shared<Request>();

// current_src_proc will now hold the src pid for this receive
incoming_data.emplace(current_src_proc, current_incoming_data);
current_incoming_data = std::make_shared<std::vector<nonconst_nonref_type>>();
current_incoming_data = std::make_shared<container_type>();
}

// Clean up outstanding receive requests
Expand Down Expand Up @@ -481,7 +477,7 @@ void pull_parallel_vector_data(const Communicator & comm,
typedef typename MapToVectors::mapped_type query_type;

std::multimap<processor_id_type, std::vector<datum> >
response_data, received_data;
response_data;

#ifndef NDEBUG
processor_id_type max_pid = 0;
Expand Down
6 changes: 3 additions & 3 deletions src/parallel/include/timpi/packing.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@
#include "timpi/standard_type.h"

// C++ includes
#include <cstddef>
#include <cstring> // memcpy
#include <iterator>
#include <type_traits> // enable_if, is_same
#include <utility> // pair
#include <vector>
#include <utility>
#include <cstring>


// FIXME: This *should* be in TIMPI namespace but we have libMesh
Expand Down
10 changes: 0 additions & 10 deletions src/parallel/include/timpi/parallel_implementation.h
Original file line number Diff line number Diff line change
Expand Up @@ -257,8 +257,6 @@ inline Status Communicator::packed_range_probe (const unsigned int src_processor
{
TIMPI_LOG_SCOPE("packed_range_probe()", "Parallel");

timpi_experimental();

Status stat((StandardType<typename Packing<T>::buffer_type>()));

int int_flag;
Expand Down Expand Up @@ -737,8 +735,6 @@ inline void Communicator::nonblocking_send_packed_range (const unsigned int dest
Request & req,
const MessageTag & tag) const
{
timpi_experimental();

// Allocate a buffer on the heap so we don't have to free it until
// after the Request::wait()
typedef typename std::iterator_traits<Iter>::value_type T;
Expand Down Expand Up @@ -1201,8 +1197,6 @@ inline void Communicator::nonblocking_receive_packed_range (const unsigned int s
Status & stat,
const MessageTag & tag) const
{
timpi_experimental();

typedef typename Packing<T>::buffer_type buffer_t;

// Receive serialized variable size objects as a sequence of
Expand Down Expand Up @@ -1421,8 +1415,6 @@ inline void Communicator::nonblocking_send_packed_range (const unsigned int dest
std::shared_ptr<std::vector<typename Packing<typename std::iterator_traits<Iter>::value_type>::buffer_type>> & buffer,
const MessageTag & tag) const
{
timpi_experimental();

// Allocate a buffer on the heap so we don't have to free it until
// after the Request::wait()
typedef typename std::iterator_traits<Iter>::value_type T;
Expand Down Expand Up @@ -2014,8 +2006,6 @@ inline void Communicator::nonblocking_receive_packed_range (const unsigned int s
std::shared_ptr<std::vector<typename Packing<T>::buffer_type>> & buffer,
const MessageTag & tag) const
{
timpi_experimental();

// If they didn't pass in a buffer - let's make one
if (buffer == nullptr)
buffer = std::make_shared<std::vector<typename Packing<T>::buffer_type>>();
Expand Down
92 changes: 92 additions & 0 deletions test/packed_range_unit.C
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
#include <timpi/parallel_sync.h>
#include <timpi/timpi.h>

#include <iterator>
#include <map>
#include <set>
#include <string>
#include <vector>

#define TIMPI_UNIT_ASSERT(expr) \
Expand Down Expand Up @@ -197,6 +201,92 @@ Communicator *TestCommWorld;
TIMPI_UNIT_ASSERT(recv[0] == check);
}

void testPushPackedImpl(int M)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hah, this should have been done a long time ago. Whoops.

{
const int size = TestCommWorld->size(),
rank = TestCommWorld->rank();

std::map<processor_id_type, std::multiset<std::string>>
data, received_data;

auto stringy_number = [] (int number)
{
std::string digit_strings [10] = {"zero", "one", "two",
"three", "four", "five", "six", "seven", "eight", "nine"};

std::string returnval = "done";
while (number)
{
returnval = digit_strings[number%10]+" "+returnval;
number = number/10;
};

return returnval;
};

for (int d=0; d != M; ++d)
{
int diffsize = std::abs(d-rank);
int diffsqrt = std::sqrt(diffsize);
if (diffsqrt*diffsqrt == diffsize)
for (int i=-1; i != diffsqrt; ++i)
data[d].insert(stringy_number(d));
}

auto collect_data =
[&received_data]
(processor_id_type pid,
const typename std::multiset<std::string> & data)
{
auto & received = received_data[pid];
received.insert(data.begin(), data.end());
};

void * context = nullptr;
TIMPI::push_parallel_packed_range(*TestCommWorld, data, context, collect_data);

// Test the received results, for each processor id p we're in
// charge of.
std::vector<std::size_t> checked_sizes(size, 0);
for (int p=rank; p < M; p += size)
for (int srcp=0; srcp != size; ++srcp)
{
int diffsize = std::abs(srcp-p);
int diffsqrt = std::sqrt(diffsize);
if (diffsqrt*diffsqrt != diffsize)
{
if (received_data.count(srcp))
{
const std::multiset<std::string> & datum = received_data[srcp];
TIMPI_UNIT_ASSERT
(std::count(datum.begin(), datum.end(),
stringy_number(p)) == std::ptrdiff_t(0));
}
continue;
}

TIMPI_UNIT_ASSERT(received_data.count(srcp) == std::size_t(1));
const std::multiset<std::string> & datum = received_data[srcp];
TIMPI_UNIT_ASSERT
(std::count(datum.begin(), datum.end(), stringy_number(p)) ==
std::ptrdiff_t(diffsqrt+1));
checked_sizes[srcp] += diffsqrt+1;
}

for (int srcp=0; srcp != size; ++srcp)
TIMPI_UNIT_ASSERT(checked_sizes[srcp] == received_data[srcp].size());

}

void testPushPacked()
{
testPushPackedImpl(TestCommWorld->size());
}

void testPushPackedOversized()
{
testPushPackedImpl((TestCommWorld->size() + 4) * 2);
}

int main(int argc, const char * const * argv)
{
Expand All @@ -207,6 +297,8 @@ int main(int argc, const char * const * argv)
testNullSendReceive();
testContainerAllGather();
testContainerSendReceive();
testPushPacked();
testPushPackedOversized();

return 0;
}