From 17cfcbd7d0824693f5073aa7cddf2f6bb610b893 Mon Sep 17 00:00:00 2001 From: Jacob Domagala Date: Tue, 16 Apr 2024 11:53:44 +0200 Subject: [PATCH] #2240: Code cleanup and make Rabenseifner work with any Op type --- .../reduce/allreduce/rabenseifner.h | 388 +++++++++++------- .../reduce/allreduce/recursive_doubling.h | 95 ++--- src/vt/objgroup/manager.impl.h | 1 - tests/perf/allreduce.cc | 52 ++- 4 files changed, 312 insertions(+), 224 deletions(-) diff --git a/src/vt/collective/reduce/allreduce/rabenseifner.h b/src/vt/collective/reduce/allreduce/rabenseifner.h index 8af07bff6a..fa5ea0a557 100644 --- a/src/vt/collective/reduce/allreduce/rabenseifner.h +++ b/src/vt/collective/reduce/allreduce/rabenseifner.h @@ -2,7 +2,7 @@ //@HEADER // ***************************************************************************** // -// reduce.h +// rabenseifner.h // DARMA/vt => Virtual Transport // // Copyright 2019-2021 National Technology & Engineering Solutions of Sandia, LLC @@ -90,16 +90,21 @@ template < typename DataT, template class Op, typename ObjT, auto finalHandler> struct Rabenseifner { - void initialize( - const DataT& data, vt::objgroup::proxy::Proxy proxy, - vt::objgroup::proxy::Proxy parentProxy, uint32_t num_nodes) { - this_node_ = vt::theContext()->getNode(); - is_even_ = this_node_ % 2 == 0; - val_ = data; - proxy_ = proxy; - num_steps_ = static_cast(log2(num_nodes)); - nprocs_pof2_ = 1 << num_steps_; - nprocs_rem_ = num_nodes - nprocs_pof2_; + template + Rabenseifner( + vt::objgroup::proxy::Proxy parentProxy, NodeType num_nodes, + Args&&... args) + : parent_proxy_(parentProxy), + val_(std::forward(args)...), + num_nodes_(num_nodes), + this_node_(vt::theContext()->getNode()), + is_even_(this_node_ % 2 == 0), + num_steps_(static_cast(log2(num_nodes_))), + nprocs_pof2_(1 << num_steps_), + nprocs_rem_(num_nodes_ - nprocs_pof2_), + gather_step_(num_steps_ - 1), + gather_mask_(nprocs_pof2_ >> 1), + finished_adjustment_part_(nprocs_rem_ == 0) { is_part_of_adjustment_group_ = this_node_ < (2 * nprocs_rem_); if (is_part_of_adjustment_group_) { if (is_even_) { @@ -111,15 +116,21 @@ struct Rabenseifner { vrt_node_ = this_node_ - nprocs_rem_; } + scatter_messages_.resize(num_steps_, nullptr); + scatter_steps_recv_.resize(num_steps_, false); + scatter_steps_reduced_.resize(num_steps_, false); + + gather_messages_.resize(num_steps_, nullptr); + gather_steps_recv_.resize(num_steps_, false); + gather_steps_reduced_.resize(num_steps_, false); + r_index_.resize(num_steps_, 0); r_count_.resize(num_steps_, 0); s_index_.resize(num_steps_, 0); s_count_.resize(num_steps_, 0); - w_size_ = data.size(); - int step = 0; - size_t wsize = data.size(); + size_t wsize = val_.size(); for (int mask = 1; mask < nprocs_pof2_; mask <<= 1) { auto vdest = vrt_node_ ^ mask; auto dest = (vdest < nprocs_rem_) ? vdest * 2 : vdest + nprocs_rem_; @@ -142,137 +153,206 @@ struct Rabenseifner { } } - steps_sent_.resize(num_steps_, false); - steps_recv_.resize(num_steps_, false); + scatter_steps_recv_.resize(num_steps_, false); + } - if constexpr (debug) { - fmt::print( - "[{}] Initialize with size = {} num_steps {} \n", this_node_, w_size_, - num_steps_); + void allreduce() { + if (nprocs_rem_) { + adjustForPowerOfTwo(); + } else { + scatterReduceIter(); } } - void partOne() { + void adjustForPowerOfTwo() { if (is_part_of_adjustment_group_) { auto const partner = is_even_ ? this_node_ + 1 : this_node_ - 1; if (is_even_) { - proxy_[partner].template send<&Rabenseifner::partOneRightHalf>( - DataT{val_.begin() + (val_.size() / 2), val_.end()}); + proxy_[partner] + .template send<&Rabenseifner::adjustForPowerOfTwoRightHalf>( + DataT{val_.begin() + (val_.size() / 2), val_.end()}); } else { - proxy_[partner].template send<&Rabenseifner::partOneLeftHalf>( - DataT{val_.begin(), val_.end() - (val_.size() / 2)}); + proxy_[partner] + .template send<&Rabenseifner::adjustForPowerOfTwoLeftHalf>( + DataT{val_.begin(), val_.end() - (val_.size() / 2)}); } } } - void partOneRightHalf(AllreduceRbnMsg* msg) { + void adjustForPowerOfTwoRightHalf(AllreduceRbnMsg* msg) { for (int i = 0; i < msg->val_.size(); i++) { val_[(val_.size() / 2) + i] += msg->val_[i]; } // Send to left node proxy_[theContext()->getNode() - 1] - .template send<&Rabenseifner::partOneFinalPart>( + .template send<&Rabenseifner::adjustForPowerOfTwoFinalPart>( DataT{val_.begin() + (val_.size() / 2), val_.end()}); } - void partOneLeftHalf(AllreduceRbnMsg* msg) { + void adjustForPowerOfTwoLeftHalf(AllreduceRbnMsg* msg) { for (int i = 0; i < msg->val_.size(); i++) { val_[i] += msg->val_[i]; } } - void partOneFinalPart(AllreduceRbnMsg* msg) { + void adjustForPowerOfTwoFinalPart(AllreduceRbnMsg* msg) { for (int i = 0; i < msg->val_.size(); i++) { val_[(val_.size() / 2) + i] = msg->val_[i]; } - partTwo(); + finished_adjustment_part_ = true; + + scatterReduceIter(); } - void partTwo() { + void printValues() { + if constexpr (debug) { + std::string printer(1024, 0x0); + for (auto val : val_) { + printer.append(fmt::format("{} ", val)); + } + fmt::print("[{}] Values = {} \n", this_node_, printer); + } + } + + bool scatterAllMessagesReceived() { + return std::all_of( + scatter_steps_recv_.cbegin(), + scatter_steps_recv_.cbegin() + scatter_step_, + [](const auto val) { return val; }); + } + + bool scatterIsDone() { + return scatter_step_ == num_steps_ and scatter_num_recv_ == num_steps_; + } + + bool scatterIsReady() { + return (is_part_of_adjustment_group_ and finished_adjustment_part_) and + scatter_step_ == 0 or + scatterAllMessagesReceived(); + } + + void scatterTryReduce(int32_t step) { if ( - vrt_node_ == -1 or (step_ >= num_steps_) or - (not std::all_of( - steps_recv_.cbegin(), steps_recv_.cbegin() + step_, - [](const auto val) { return val; }))) { + (step < scatter_step_) and not scatter_steps_reduced_[step] and + scatter_steps_recv_[step] and + std::all_of( + scatter_steps_reduced_.cbegin(), scatter_steps_reduced_.cbegin() + step, + [](const auto val) { return val; })) { + auto& in_msg = scatter_messages_.at(step); + auto& in_val = in_msg->val_; + for (int i = 0; i < in_val.size(); i++) { + Op()( + val_[r_index_[in_msg->step_] + i], in_val[i]); + } + + scatter_steps_reduced_[step] = true; + } + } + + void scatterReduceIter() { + if (not scatterIsReady()) { return; } - auto vdest = vrt_node_ ^ mask_; + auto vdest = vrt_node_ ^ scatter_mask_; auto dest = (vdest < nprocs_rem_) ? vdest * 2 : vdest + nprocs_rem_; if constexpr (debug) { fmt::print( "[{}] Part2 Step {}: Sending to Node {} starting with idx = {} and " "count " "{} \n", - this_node_, step_, dest, s_index_[step_], s_count_[step_]); + this_node_, scatter_step_, dest, s_index_[scatter_step_], + s_count_[scatter_step_]); } - proxy_[dest].template send<&Rabenseifner::partTwoHandler>( + proxy_[dest].template send<&Rabenseifner::scatterReduceIterHandler>( DataT{ - val_.begin() + (s_index_[step_]), - val_.begin() + (s_index_[step_]) + s_count_[step_]}, - step_); - - mask_ <<= 1; - num_send_++; - steps_sent_[step_] = true; - step_++; - - if (std::all_of( - steps_recv_.cbegin(), steps_recv_.cbegin() + step_, - [](const auto val) { return val; })) { - partTwo(); + val_.begin() + (s_index_[scatter_step_]), + val_.begin() + (s_index_[scatter_step_]) + s_count_[scatter_step_]}, + scatter_step_); + + scatter_mask_ <<= 1; + scatter_step_++; + + scatterTryReduce(scatter_step_ - 1); + + if (scatterIsDone()) { + printValues(); + finished_scatter_part_ = true; + gatherIter(); + } else if (scatterAllMessagesReceived()) { + scatterReduceIter(); } } - void partTwoHandler(AllreduceRbnMsg* msg) { - for (int i = 0; i < msg->val_.size(); i++) { - val_[r_index_[msg->step_] + i] += msg->val_[i]; + void scatterReduceIterHandler(AllreduceRbnMsg* msg) { + scatter_messages_[msg->step_] = promoteMsg(msg); + scatter_steps_recv_[msg->step_] = true; + scatter_num_recv_++; + + if (not finished_adjustment_part_) { + return; } + + scatterTryReduce(msg->step_); + if constexpr (debug) { fmt::print( - "[{}] Part2 Step {} mask_= {} nprocs_pof2_ = {}: " + "[{}] Part2 Step {} scatter_mask_= {} nprocs_pof2_ = {}: " "idx = {} from {}\n", - this_node_, msg->step_, mask_, nprocs_pof2_, r_index_[msg->step_], - theContext()->getFromNodeCurrentTask()); + this_node_, msg->step_, scatter_mask_, nprocs_pof2_, + r_index_[msg->step_], theContext()->getFromNodeCurrentTask()); } - steps_recv_[msg->step_] = true; - num_recv_++; - if (mask_ < nprocs_pof2_) { - if (std::all_of( - steps_recv_.cbegin(), steps_recv_.cbegin() + step_, - [](const auto val) { return val; })) { - partTwo(); - } - } else { - // step_ = num_steps_ - 1; - // mask_ = nprocs_pof2_ >> 1; - // partThree(); + + if ((scatter_mask_ < nprocs_pof2_) and scatterAllMessagesReceived()) { + scatterReduceIter(); + } else if (scatterIsDone()) { + printValues(); + finished_scatter_part_ = true; + gatherIter(); } } - void partThree() { - if ( - vrt_node_ == -1 or - (not std::all_of( - steps_recv_.cbegin() + step_ + 1, steps_recv_.cend(), - [](const auto val) { return val; }))) { - return; + bool gatherAllMessagesReceived() { + return std::all_of( + gather_steps_recv_.cbegin() + gather_step_ + 1, gather_steps_recv_.cend(), + [](const auto val) { return val; }); + } + + bool gatherIsDone() { + return (gather_step_ < 0) and (gather_num_recv_ == num_steps_); + } + + bool gatherIsReady() { + return (gather_step_ == num_steps_ - 1) or gatherAllMessagesReceived(); + } + + void gatherTryReduce(int32_t step) { + const auto doRed = (step > gather_step_) and + not gather_steps_reduced_[step] and gather_steps_recv_[step] and + std::all_of(gather_steps_reduced_.cbegin() + step + 1, + gather_steps_reduced_.cend(), + [](const auto val) { return val; }); + + if (doRed) { + auto& in_msg = gather_messages_.at(step); + auto& in_val = in_msg->val_; + for (int i = 0; i < in_val.size(); i++) { + val_[s_index_[in_msg->step_] + i] = in_val[i]; + } + + gather_steps_reduced_[step] = true; } + } - if (not startedPartThree_) { - step_ = num_steps_ - 1; - mask_ = nprocs_pof2_ >> 1; - num_send_ = 0; - num_recv_ = 0; - startedPartThree_ = true; - std::fill(steps_sent_.begin(), steps_sent_.end(), false); - std::fill(steps_recv_.begin(), steps_recv_.end(), false); + void gatherIter() { + if (not gatherIsReady()) { + return; } - auto vdest = vrt_node_ ^ mask_; + auto vdest = vrt_node_ ^ gather_mask_; auto dest = (vdest < nprocs_rem_) ? vdest * 2 : vdest + nprocs_rem_; if constexpr (debug) { @@ -280,43 +360,29 @@ struct Rabenseifner { "[{}] Part3 Step {}: Sending to Node {} starting with idx = {} and " "count " "{} \n", - this_node_, step_, dest, r_index_[step_], r_count_[step_]); + this_node_, gather_step_, dest, r_index_[gather_step_], + r_count_[gather_step_]); } - proxy_[dest].template send<&Rabenseifner::partThreeHandler>( + proxy_[dest].template send<&Rabenseifner::gatherIterHandler>( DataT{ - val_.begin() + (r_index_[step_]), - val_.begin() + (r_index_[step_]) + r_count_[step_]}, - step_); - - steps_sent_[step_] = true; - num_send_++; - mask_ >>= 1; - step_--; - if ( - step_ >= 0 and - std::all_of( - steps_recv_.cbegin() + step_ + 1, steps_recv_.cend(), - [](const auto val) { return val; })) { - partThree(); - } - } + val_.begin() + (r_index_[gather_step_]), + val_.begin() + (r_index_[gather_step_]) + r_count_[gather_step_]}, + gather_step_); - void partThreeHandler(AllreduceRbnMsg* msg) { - for (int i = 0; i < msg->val_.size(); i++) { - val_[s_index_[msg->step_] + i] = msg->val_[i]; - } + gather_mask_ >>= 1; + gather_step_--; + + gatherTryReduce(gather_step_ + 1); + printValues(); - if (not startedPartThree_) { - step_ = num_steps_ - 1; - mask_ = nprocs_pof2_ >> 1; - num_send_ = 0; - num_recv_ = 0; - startedPartThree_ = true; - std::fill(steps_sent_.begin(), steps_sent_.end(), false); - std::fill(steps_recv_.begin(), steps_recv_.end(), false); + if (gatherIsDone()) { + finalPart(); + } else if (gatherIsReady()) { + gatherIter(); } + } - num_recv_++; + void gatherIterHandler(AllreduceRbnMsg* msg) { if constexpr (debug) { fmt::print( "[{}] Part3 Step {}: Received idx = {} from {}\n", this_node_, @@ -324,55 +390,93 @@ struct Rabenseifner { theContext()->getFromNodeCurrentTask()); } - steps_recv_[msg->step_] = true; + gather_messages_[msg->step_] = promoteMsg(msg); + gather_steps_recv_[msg->step_] = true; + gather_num_recv_++; - if ( - mask_ > 0 and - ((step_ == num_steps_ - 1) or - std::all_of( - steps_recv_.cbegin() + step_ + 1, steps_recv_.cend(), - [](const auto val) { return val; }))) { - partThree(); + if (not finished_scatter_part_) { + return; + } + + gatherTryReduce(msg->step_); + printValues(); + + if (gather_mask_ > 0 and gatherIsReady()) { + gatherIter(); + } else if (gatherIsDone()) { + finalPart(); } } - void partFour() { + void finalPart() { + if (completed_) { + return; + } + + if (nprocs_rem_) { + sendToExcludedNodes(); + } + + parent_proxy_[this_node_].template invoke(val_); + completed_ = true; + } + + void sendToExcludedNodes() { if (is_part_of_adjustment_group_ and is_even_) { if constexpr (debug) { fmt::print( "[{}] Part4 : Sending to Node {} \n", this_node_, this_node_ + 1); } - proxy_[this_node_ + 1].template send<&Rabenseifner::partFourHandler>( - val_, 0); + proxy_[this_node_ + 1] + .template send<&Rabenseifner::sendToExcludedNodesHandler>(val_, 0); } } - void partFourHandler(AllreduceRbnMsg* msg) { val_ = msg->val_; } + void sendToExcludedNodesHandler(AllreduceRbnMsg* msg) { + val_ = msg->val_; + + parent_proxy_[this_node_].template invoke(val_); + completed_ = true; + } - NodeType this_node_ = {}; - bool is_even_ = false; vt::objgroup::proxy::Proxy proxy_ = {}; - vt::objgroup::proxy::Proxy parentProxy_ = {}; + vt::objgroup::proxy::Proxy parent_proxy_ = {}; + DataT val_ = {}; - NodeType vrt_node_ = {}; - bool is_part_of_adjustment_group_ = false; + NodeType this_node_ = {}; + NodeType num_nodes_ = {}; + bool is_even_ = false; int32_t num_steps_ = {}; int32_t nprocs_pof2_ = {}; int32_t nprocs_rem_ = {}; - int32_t mask_ = 1; - bool startedPartThree_ = false; - - size_t w_size_ = {}; - int32_t step_ = 0; - int32_t num_send_ = 0; - int32_t num_recv_ = 0; - std::vector steps_recv_ = {}; - std::vector steps_sent_ = {}; std::vector r_index_ = {}; std::vector r_count_ = {}; std::vector s_index_ = {}; std::vector s_count_ = {}; + + NodeType vrt_node_ = {}; + bool is_part_of_adjustment_group_ = false; + bool finished_adjustment_part_ = false; + + bool completed_ = false; + + // Scatter + int32_t scatter_mask_ = 1; + int32_t scatter_step_ = 0; + int32_t scatter_num_recv_ = 0; + std::vector scatter_steps_recv_ = {}; + std::vector scatter_steps_reduced_ = {}; + std::vector>> scatter_messages_ = {}; + bool finished_scatter_part_ = false; + + // Gather + int32_t gather_mask_ = 1; + int32_t gather_step_ = 0; + int32_t gather_num_recv_ = 0; + std::vector gather_steps_recv_ = {}; + std::vector gather_steps_reduced_ = {}; + std::vector>> gather_messages_ = {}; }; } // namespace vt::collective::reduce::allreduce diff --git a/src/vt/collective/reduce/allreduce/recursive_doubling.h b/src/vt/collective/reduce/allreduce/recursive_doubling.h index e1ac7873d6..917196a4d3 100644 --- a/src/vt/collective/reduce/allreduce/recursive_doubling.h +++ b/src/vt/collective/reduce/allreduce/recursive_doubling.h @@ -2,7 +2,7 @@ //@HEADER // ***************************************************************************** // -// reduce.h +// recursive_doubling.h // DARMA/vt => Virtual Transport // // Copyright 2019-2021 National Technology & Engineering Solutions of Sandia, LLC @@ -41,8 +41,8 @@ //@HEADER */ -#if !defined INCLUDED_VT_COLLECTIVE_REDUCE_ALLREDUCE_DISTANCE_DOUBLING_H -#define INCLUDED_VT_COLLECTIVE_REDUCE_ALLREDUCE_DISTANCE_DOUBLING_H +#if !defined INCLUDED_VT_COLLECTIVE_REDUCE_ALLREDUCE_RECURSIVE_DOUBLING_H +#define INCLUDED_VT_COLLECTIVE_REDUCE_ALLREDUCE_RECURSIVE_DOUBLING_H #include "vt/config.h" #include "vt/context/context.h" @@ -68,10 +68,11 @@ struct AllreduceDblMsg AllreduceDblMsg(AllreduceDblMsg const&) = default; AllreduceDblMsg(AllreduceDblMsg&&) = default; - explicit AllreduceDblMsg(DataT&& in_val) + AllreduceDblMsg(DataT&& in_val, int step = 0) : MessageParentType(), - val_(std::forward(in_val)) { } - explicit AllreduceDblMsg(DataT const& in_val, int step = 0) + val_(std::forward(in_val)), + step_(step) { } + AllreduceDblMsg(DataT const& in_val, int step = 0) : MessageParentType(), val_(in_val), step_(step) { } @@ -92,18 +93,18 @@ template < auto finalHandler> struct DistanceDoubling { template - DistanceDoubling(NodeType num_nodes, Args&&... args) - : val_(std::forward(args)...), - num_nodes_(num_nodes) { } - - void initialize() { - this_node_ = vt::theContext()->getNode(); - is_even_ = this_node_ % 2 == 0; - num_steps_ = static_cast(log2(num_nodes_)); - messages.resize(num_steps_, nullptr); - - nprocs_pof2_ = 1 << num_steps_; - nprocs_rem_ = num_nodes_ - nprocs_pof2_; + DistanceDoubling( + vt::objgroup::proxy::Proxy parentProxy, NodeType num_nodes, + Args&&... args) + : parent_proxy_(parentProxy), + val_(std::forward(args)...), + num_nodes_(num_nodes), + this_node_(vt::theContext()->getNode()), + is_even_(this_node_ % 2 == 0), + num_steps_(static_cast(log2(num_nodes_))), + nprocs_pof2_(1 << num_steps_), + nprocs_rem_(num_nodes_ - nprocs_pof2_), + finished_adjustment_part_(nprocs_rem_ == 0) { is_part_of_adjustment_group_ = this_node_ < (2 * nprocs_rem_); if (is_part_of_adjustment_group_) { if (is_even_) { @@ -115,22 +116,12 @@ struct DistanceDoubling { vrt_node_ = this_node_ - nprocs_rem_; } + messages_.resize(num_steps_, nullptr); steps_recv_.resize(num_steps_, false); steps_reduced_.resize(num_steps_, false); - - initialized_ = true; } - void allreduce( - vt::objgroup::proxy::Proxy proxy, - vt::objgroup::proxy::Proxy parentProxy) { - if (not initialized_) { - initialize(); - } - - proxy_ = proxy; - parent_proxy_ = parentProxy; - + void allreduce() { if (nprocs_rem_) { adjustForPowerOfTwo(); } else { @@ -157,9 +148,9 @@ struct DistanceDoubling { data.append(fmt::format("{} ", val)); } fmt::print( - "[{}] Part1 Handler initialized_ = {}: Received data ({}) " + "[{}] Part1 Handler: Received data ({}) " "from {}\n", - this_node_, initialized_, data, theContext()->getFromNodeCurrentTask()); + this_node_, data, theContext()->getFromNodeCurrentTask()); } Op()(val_, msg->val_); @@ -216,7 +207,7 @@ struct DistanceDoubling { std::all_of( steps_reduced_.cbegin(), steps_reduced_.cbegin() + step, [](const auto val) { return val; })) { - Op()(val_, messages.at(step)->val_); + Op()(val_, messages_.at(step)->val_); steps_reduced_[step] = true; } } @@ -228,28 +219,21 @@ struct DistanceDoubling { data.append(fmt::format("{} ", val)); } fmt::print( - "[{}] Part2 Step {} initialized_ = {} mask_= {} nprocs_pof2_ = {}: " + "[{}] Part2 Step {} mask_= {} nprocs_pof2_ = {}: " "Received data ({}) " "from {}\n", - this_node_, msg->step_, initialized_, mask_, nprocs_pof2_, data, + this_node_, msg->step_, mask_, nprocs_pof2_, data, theContext()->getFromNodeCurrentTask()); } + messages_.at(msg->step_) = promoteMsg(msg); + steps_recv_[msg->step_] = true; + // Special case when we receive step 2 message before step 1 is done on this node if (not finished_adjustment_part_) { - if (not initialized_) { - initialize(); - } - - messages.at(msg->step_) = promoteMsg(msg); - steps_recv_[msg->step_] = true; - return; } - messages.at(msg->step_) = promoteMsg(msg); - steps_recv_[msg->step_] = true; - tryReduce(msg->step_); if ((mask_ < nprocs_pof2_) and isReady()) { @@ -275,6 +259,7 @@ struct DistanceDoubling { val_ = msg->val_; parent_proxy_[this_node_].template invoke(val_); + completed_ = true; } void finalPart() { @@ -290,19 +275,21 @@ struct DistanceDoubling { completed_ = true; } - NodeType this_node_ = {}; - uint32_t num_nodes_ = {}; - bool is_even_ = false; vt::objgroup::proxy::Proxy proxy_ = {}; vt::objgroup::proxy::Proxy parent_proxy_ = {}; + DataT val_ = {}; - NodeType vrt_node_ = {}; - bool initialized_ = false; - bool is_part_of_adjustment_group_ = false; - bool finished_adjustment_part_ = false; + NodeType this_node_ = {}; + NodeType num_nodes_ = {}; + bool is_even_ = false; int32_t num_steps_ = {}; int32_t nprocs_pof2_ = {}; int32_t nprocs_rem_ = {}; + + NodeType vrt_node_ = {}; + bool is_part_of_adjustment_group_ = false; + bool finished_adjustment_part_ = false; + int32_t mask_ = 1; int32_t step_ = 0; @@ -311,9 +298,9 @@ struct DistanceDoubling { std::vector steps_recv_ = {}; std::vector steps_reduced_ = {}; - std::vector>> messages = {}; + std::vector>> messages_ = {}; }; } // namespace vt::collective::reduce::allreduce -#endif /*INCLUDED_VT_COLLECTIVE_REDUCE_ALLREDUCE_RABENSEIFNER_H*/ +#endif /*INCLUDED_VT_COLLECTIVE_REDUCE_ALLREDUCE_RECURSIVE_DOUBLING_H*/ diff --git a/src/vt/objgroup/manager.impl.h b/src/vt/objgroup/manager.impl.h index 719b574d4b..51567f61b5 100644 --- a/src/vt/objgroup/manager.impl.h +++ b/src/vt/objgroup/manager.impl.h @@ -57,7 +57,6 @@ #include "vt/collective/collective_alg.h" #include "vt/messaging/active.h" #include "vt/elm/elm_id_bits.h" -#include "vt/collective/reduce/allreduce/rabenseifner.h" #include "vt/messaging/message/smart_ptr.h" #include diff --git a/tests/perf/allreduce.cc b/tests/perf/allreduce.cc index 645f83136d..49bc107788 100644 --- a/tests/perf/allreduce.cc +++ b/tests/perf/allreduce.cc @@ -2,7 +2,7 @@ //@HEADER // ***************************************************************************** // -// reduce.cc +// allreduce.cc // DARMA/vt => Virtual Transport // // Copyright 2019-2021 National Technology & Engineering Solutions of Sandia, LLC @@ -61,7 +61,7 @@ static constexpr int num_iters = 1; struct MyTest : PerfTestHarness { void SetUp() override { PerfTestHarness::SetUp(); - data.resize(1 << 4); + data.resize(1 << 5); for (auto& val : data) { val = theContext()->getNode() + 1; } @@ -97,25 +97,20 @@ struct NodeObj { // } // fmt::print("\n"); - // const auto p = theContext()->getNumNodes(); - // const auto expected = (p * (p + 1)) / 2; - // for (auto val : in) { - // vtAssert(val == expected, "FAILURE!"); - // } + const auto p = theContext()->getNumNodes(); + const auto expected = (p * (p + 1)) / 2; + for (auto val : in) { + vtAssert(val == expected, "FAILURE!"); + } } void newReduceComplete(std::vector in) { - // fmt::print( - // "\n[{}]: allreduce_h done! (Size == {}) Results are ...\n", - // theContext()->getNode(), in.size()); - // const auto p = theContext()->getNumNodes(); - // const auto expected = (p * (p + 1)) / 2; - // for (auto val : in) { - // vtAssert(val == expected, "FAILURE!"); - // } + // std::string printer(1024, 0x0); + // printer.append(fmt::format("\n[{}]: allreduce_rabenseifner done! ", theContext()->getNode())); + // for (int node = 0; node < theContext()->getNumNodes(); ++node) { // if (node == theContext()->getNode()) { - // std::string printer(128, 0x0); + // for (auto val : in) { // printer.append(fmt::format("{} ", val)); // } @@ -127,6 +122,11 @@ struct NodeObj { // } // fmt::print("\n"); + const auto p = theContext()->getNumNodes(); + const auto expected = (p * (p + 1)) / 2; + for (auto val : in) { + vtAssert(val == expected, "FAILURE!"); + } } void reduceComplete(std::vector in) { @@ -161,13 +161,11 @@ VT_PERF_TEST(MyTest, test_allreduce_rabenseifner) { using Reducer = collective::reduce::allreduce::Rabenseifner< DataT, collective::PlusOp, NodeObj, &NodeObj::newReduceComplete>; - auto grp_proxy = - vt::theObjGroup()->makeCollective("allreduce_rabenseifner"); - vt::runInEpochCollective([=] { - grp_proxy[my_node_].template invoke<&Reducer::initialize>( - data, grp_proxy, proxy, num_nodes_); - grp_proxy[my_node_].template invoke<&Reducer::partOne>(); - }); + auto grp_proxy = vt::theObjGroup()->makeCollective( + "allreduce_rabenseifner", proxy, num_nodes_, data); + grp_proxy[my_node_].get()->proxy_ = grp_proxy; + vt::runInEpochCollective( + [=] { grp_proxy[my_node_].template invoke<&Reducer::allreduce>(); }); } VT_PERF_TEST(MyTest, test_allreduce_recursive_doubling) { @@ -179,10 +177,10 @@ VT_PERF_TEST(MyTest, test_allreduce_recursive_doubling) { DataT, collective::PlusOp, NodeObj, &NodeObj::recursiveDoubling>; auto grp_proxy = vt::theObjGroup()->makeCollective( - "allreduce_recursive_doubling", num_nodes_, data); - vt::runInEpochCollective([=] { - grp_proxy[my_node_].template invoke<&Reducer::allreduce>(grp_proxy, proxy); - }); + "allreduce_recursive_doubling", proxy, num_nodes_, data); + grp_proxy[my_node_].get()->proxy_ = grp_proxy; + vt::runInEpochCollective( + [=] { grp_proxy[my_node_].template invoke<&Reducer::allreduce>(); }); } VT_PERF_TEST_MAIN()