From da46b94468584a154fb15f8a8247cf8ba963b69a Mon Sep 17 00:00:00 2001 From: Jacob Domagala Date: Tue, 18 Jun 2024 18:35:02 +0200 Subject: [PATCH] #2240: Update Rabenseifner to use ID for each allreduce and update tests --- .../reduce/allreduce/data_handler.h | 14 +- .../reduce/allreduce/rabenseifner.h | 138 +++--- .../reduce/allreduce/rabenseifner.impl.h | 400 +++++++++++------- .../reduce/allreduce/recursive_doubling.h | 2 - .../allreduce/recursive_doubling.impl.h | 8 +- src/vt/objgroup/manager.h | 8 +- src/vt/objgroup/manager.impl.h | 65 ++- src/vt/objgroup/proxy/proxy_objgroup.impl.h | 9 +- tests/perf/allreduce.cc | 88 ++-- tests/unit/objgroup/test_objgroup.cc | 32 +- tests/unit/objgroup/test_objgroup_common.h | 12 +- 11 files changed, 417 insertions(+), 359 deletions(-) diff --git a/src/vt/collective/reduce/allreduce/data_handler.h b/src/vt/collective/reduce/allreduce/data_handler.h index 70b68815f5..3b9813cc01 100644 --- a/src/vt/collective/reduce/allreduce/data_handler.h +++ b/src/vt/collective/reduce/allreduce/data_handler.h @@ -47,7 +47,7 @@ #include -#ifdef VT_KOKKOS_ENABLED +#ifdef KOKKOS_ENABLED_CHECKPOINT #include #endif @@ -57,21 +57,22 @@ template class DataHandler { public: using Scalar = void; + static size_t size(void) { return 0; } }; -template -class DataHandler::value>::type> { +template +class DataHandler::value>::type> { public: - using ScalarType = Scalar; + using Scalar = ScalarType; static std::vector toVec(const ScalarType& data) { return std::vector{data}; } static ScalarType fromVec(const std::vector& data) { return data[0]; } - static ScalarType fromMemory(ScalarType* data, size_t count) { + static ScalarType fromMemory(ScalarType* data, size_t) { return *data; } // static const ScalarType* data(const ScalarType& data) { return &data; } - // static size_t size(const ScalarType&) { return 1; } + static size_t size(const ScalarType&) { return 1; } // static ScalarType& at(ScalarType& data, size_t) { return data; } // static void set(ScalarType& data, size_t, const ScalarType& value) { data = value; } // static ScalarType split(ScalarType&, size_t, size_t) { return ScalarType{}; } @@ -80,7 +81,6 @@ class DataHandler::va template class DataHandler> { public: - using UnderlyingType = std::vector; using Scalar = T; static const std::vector& toVec(const std::vector& data) { return data; } diff --git a/src/vt/collective/reduce/allreduce/rabenseifner.h b/src/vt/collective/reduce/allreduce/rabenseifner.h index aad1b0a414..8bea78fb66 100644 --- a/src/vt/collective/reduce/allreduce/rabenseifner.h +++ b/src/vt/collective/reduce/allreduce/rabenseifner.h @@ -56,36 +56,6 @@ namespace vt::collective::reduce::allreduce { -template -struct AllreduceRbnMsg - : SerializeIfNeeded, DataT> { - using MessageParentType = - SerializeIfNeeded<::vt::Message, AllreduceRbnMsg, DataT>; - - AllreduceRbnMsg() = default; - AllreduceRbnMsg(AllreduceRbnMsg const&) = default; - AllreduceRbnMsg(AllreduceRbnMsg&&) = default; - - AllreduceRbnMsg(DataT&& in_val, int step = 0) - : MessageParentType(), - val_(std::forward(in_val)), - step_(step) { } - AllreduceRbnMsg(DataT const& in_val, int step = 0) - : MessageParentType(), - val_(in_val), - step_(step) { } - - template - void serialize(SerializeT& s) { - MessageParentType::serialize(s); - s | val_; - s | step_; - } - - DataT val_ = {}; - int32_t step_ = {}; -}; - template struct AllreduceRbnRawMsg : Message { @@ -102,10 +72,11 @@ struct AllreduceRbnRawMsg } } - AllreduceRbnRawMsg(Scalar* in_val, size_t size, int step = 0) + AllreduceRbnRawMsg(Scalar* in_val, size_t size, size_t id, int step = 0) : MessageParentType(), val_(in_val), size_(size), + id_(id), step_(step) { } template @@ -121,11 +92,13 @@ struct AllreduceRbnRawMsg checkpoint::dispatch::serializeArray(s, val_, size_); + s | id_; s | step_; } Scalar* val_ = {}; size_t size_ = {}; + size_t id_ = {}; int32_t step_ = {}; bool owning_ = false; }; @@ -169,19 +142,22 @@ struct Rabenseifner { * \param args Additional arguments for initializing the data value. */ template - void initialize(Args&&... args); + void initialize(size_t id, Args&&... args); + + void initializeState(size_t id); + size_t generateNewId() { return id_++; } /** * \brief Execute the final handler callback with the reduced result. */ - void executeFinalHan(); + void executeFinalHan(size_t id); /** * \brief Perform the allreduce operation. * * This function starts the allreduce operation, adjusting for non-power-of-two process counts if necessary. */ - void allreduce(); + void allreduce(size_t id); /** * \brief Adjust the process count to the nearest power-of-two. @@ -189,7 +165,7 @@ struct Rabenseifner { * This function performs additional steps to handle non-power-of-two process counts, ensuring that the * main scatter-reduce and gather-allgather phases can proceed with a power-of-two number of processes. */ - void adjustForPowerOfTwo(); + void adjustForPowerOfTwo(size_t id); /** * \brief Handler for adjusting the right half of the process group. @@ -223,35 +199,35 @@ struct Rabenseifner { * * \return True if all scatter messages have been received, false otherwise. */ - bool scatterAllMessagesReceived(); + bool scatterAllMessagesReceived(size_t id); /** * \brief Check if the scatter phase is complete. * * \return True if the scatter phase is complete, false otherwise. */ - bool scatterIsDone(); + bool scatterIsDone(size_t id); /** * \brief Check if the scatter phase is ready to proceed. * * \return True if the scatter phase is ready to proceed, false otherwise. */ - bool scatterIsReady(); + bool scatterIsReady(size_t id); /** * \brief Try to reduce the received scatter messages. * * \param step The current step in the scatter phase. */ - void scatterTryReduce(int32_t step); + void scatterTryReduce(size_t id, int32_t step); /** * \brief Perform the scatter-reduce iteration. * * This function sends data to the appropriate partner process and proceeds to the next step in the scatter phase. */ - void scatterReduceIter(); + void scatterReduceIter(size_t id); /** * \brief Handler for receiving scatter-reduce messages. @@ -267,35 +243,35 @@ struct Rabenseifner { * * \return True if all gather messages have been received, false otherwise. */ - bool gatherAllMessagesReceived(); + bool gatherAllMessagesReceived(size_t id); /** * \brief Check if the gather phase is complete. * * \return True if the gather phase is complete, false otherwise. */ - bool gatherIsDone(); + bool gatherIsDone(size_t id); /** * \brief Check if the gather phase is ready to proceed. * * \return True if the gather phase is ready to proceed, false otherwise. */ - bool gatherIsReady(); + bool gatherIsReady(size_t id); /** * \brief Try to reduce the received gather messages. * * \param step The current step in the gather phase. */ - void gatherTryReduce(int32_t step); + void gatherTryReduce(size_t id, int32_t step); /** * \brief Perform the gather iteration. * * This function sends data to the appropriate partner process and proceeds to the next step in the gather phase. */ - void gatherIter(); + void gatherIter(size_t id); /** * \brief Handler for receiving gather messages. @@ -311,14 +287,14 @@ struct Rabenseifner { * * This function completes the allreduce operation, handling any remaining steps and invoking the final handler. */ - void finalPart(); + void finalPart(size_t id); /** * \brief Send the result to excluded nodes. * * This function handles the final step for non-power-of-two process counts, sending the reduced result to excluded nodes. */ - void sendToExcludedNodes(); + void sendToExcludedNodes(size_t id); /** * \brief Handler for receiving the final result on excluded nodes. @@ -332,9 +308,46 @@ struct Rabenseifner { vt::objgroup::proxy::Proxy proxy_ = {}; vt::objgroup::proxy::Proxy parent_proxy_ = {}; - // DataT val_ = {}; - std::vector val_; - size_t size_ = {}; + struct State { + std::vector val_ = {}; + size_t size_ = {}; + + bool finished_adjustment_part_ = false; + MsgSharedPtr> left_adjust_message_ = nullptr; + MsgSharedPtr> right_adjust_message_ = nullptr; + + int32_t mask_ = 1; + int32_t step_ = 0; + bool initialized_ = false; + bool completed_ = false; + + // Scatter + int32_t scatter_mask_ = 1; + int32_t scatter_step_ = 0; + int32_t scatter_num_recv_ = 0; + std::vector scatter_steps_recv_ = {}; + std::vector scatter_steps_reduced_ = {}; + std::vector>> scatter_messages_ = + {}; + bool finished_scatter_part_ = false; + + // Gather + int32_t gather_step_ = 0; + int32_t gather_mask_ = 1; + int32_t gather_num_recv_ = 0; + std::vector gather_steps_recv_ = {}; + std::vector gather_steps_reduced_ = {}; + std::vector>> gather_messages_ = + {}; + + std::vector r_index_ = {}; + std::vector r_count_ = {}; + std::vector s_index_ = {}; + std::vector s_count_ = {}; + }; + + size_t id_ = 0; + std::unordered_map states_ = {}; NodeType num_nodes_ = {}; NodeType this_node_ = {}; @@ -343,33 +356,8 @@ struct Rabenseifner { int32_t nprocs_pof2_ = {}; int32_t nprocs_rem_ = {}; - std::vector r_index_ = {}; - std::vector r_count_ = {}; - std::vector s_index_ = {}; - std::vector s_count_ = {}; - NodeType vrt_node_ = {}; bool is_part_of_adjustment_group_ = false; - bool finished_adjustment_part_ = false; - - bool completed_ = false; - - // Scatter - int32_t scatter_mask_ = 1; - int32_t scatter_step_ = 0; - int32_t scatter_num_recv_ = 0; - std::vector scatter_steps_recv_ = {}; - std::vector scatter_steps_reduced_ = {}; - std::vector>> scatter_messages_ = {}; - bool finished_scatter_part_ = false; - - // Gather - int32_t gather_step_ = 0; - int32_t gather_mask_ = 1; - int32_t gather_num_recv_ = 0; - std::vector gather_steps_recv_ = {}; - std::vector gather_steps_reduced_ = {}; - std::vector>> gather_messages_ = {}; }; } // namespace vt::collective::reduce::allreduce diff --git a/src/vt/collective/reduce/allreduce/rabenseifner.impl.h b/src/vt/collective/reduce/allreduce/rabenseifner.impl.h index b717b4e374..ad40160965 100644 --- a/src/vt/collective/reduce/allreduce/rabenseifner.impl.h +++ b/src/vt/collective/reduce/allreduce/rabenseifner.impl.h @@ -74,62 +74,74 @@ Rabenseifner::Rabenseifner( vrt_node_ = this_node_ - nprocs_rem_; } - scatter_messages_.resize(num_steps_, nullptr); - scatter_steps_recv_.resize(num_steps_, false); - scatter_steps_reduced_.resize(num_steps_, false); + initialize(generateNewId(), std::forward(data)...); +} + +template < + typename DataT, template class Op, typename ObjT, auto finalHandler +>void Rabenseifner::initializeState(size_t id) +{ + auto& state = states_[id]; + + state.scatter_messages_.resize(num_steps_, nullptr); + state.scatter_steps_recv_.resize(num_steps_, false); + state.scatter_steps_reduced_.resize(num_steps_, false); - gather_messages_.resize(num_steps_, nullptr); - gather_steps_recv_.resize(num_steps_, false); - gather_steps_reduced_.resize(num_steps_, false); + state.gather_messages_.resize(num_steps_, nullptr); + state.gather_steps_recv_.resize(num_steps_, false); + state.gather_steps_reduced_.resize(num_steps_, false); - r_index_.resize(num_steps_, 0); - r_count_.resize(num_steps_, 0); - s_index_.resize(num_steps_, 0); - s_count_.resize(num_steps_, 0); + state.finished_adjustment_part_ = not is_part_of_adjustment_group_; + state.completed_ = false; - initialize(std::forward(data)...); + state.scatter_mask_ = 1; + state.scatter_step_ = 0; + state.scatter_num_recv_ = 0; + state.finished_scatter_part_ = false; + + state.gather_step_ = num_steps_ - 1; + state.gather_mask_ = nprocs_pof2_ >> 1; + state.gather_num_recv_ = 0; + + state.r_index_.resize(num_steps_, 0); + state.r_count_.resize(num_steps_, 0); + state.s_index_.resize(num_steps_, 0); + state.s_count_.resize(num_steps_, 0); + + state.initialized_ = true; } template < typename DataT, template class Op, typename ObjT, auto finalHandler > template -void Rabenseifner::initialize(Args&&... data) { - val_ = DataType::toVec(std::forward(data)...); - - finished_adjustment_part_ = not is_part_of_adjustment_group_; - completed_ = false; - - scatter_mask_ = 1; - scatter_step_ = 0; - scatter_num_recv_ = 0; - finished_scatter_part_ = false; +void Rabenseifner::initialize(size_t id, Args&&... data) { + auto& state = states_[id]; + state.val_ = DataType::toVec(std::forward(data)...); - gather_step_ = num_steps_ - 1; - gather_mask_ = nprocs_pof2_ >> 1; - gather_num_recv_ = 0; + initializeState(id); int step = 0; - size_ = val_.size(); - auto size = size_; + state.size_ = state.val_.size(); + auto size = state.size_; for (int mask = 1; mask < nprocs_pof2_; mask <<= 1) { auto vdest = vrt_node_ ^ mask; auto dest = (vdest < nprocs_rem_) ? vdest * 2 : vdest + nprocs_rem_; if (this_node_ < dest) { - r_count_[step] = size / 2; - s_count_[step] = size - r_count_[step]; - s_index_[step] = r_index_[step] + r_count_[step]; + state.r_count_[step] = size / 2; + state.s_count_[step] = size - state.r_count_[step]; + state.s_index_[step] = state.r_index_[step] + state.r_count_[step]; } else { - s_count_[step] = size / 2; - r_count_[step] = size - s_count_[step]; - r_index_[step] = s_index_[step] + s_count_[step]; + state.s_count_[step] = size / 2; + state.r_count_[step] = size - state.s_count_[step]; + state.r_index_[step] = state.s_index_[step] + state.s_count_[step]; } if (step + 1 < num_steps_) { - r_index_[step + 1] = r_index_[step]; - s_index_[step + 1] = r_index_[step]; - size = r_count_[step]; + state.r_index_[step + 1] = state.r_index_[step]; + state.s_index_[step + 1] = state.r_index_[step]; + size = state.r_count_[step]; step++; } } @@ -138,51 +150,61 @@ void Rabenseifner::initialize(Args&&... data) { terse, allreduce, "Rabenseifner initialize: size_ = {} num_steps_ = {} nprocs_pof2_ = {} nprocs_rem_ = " "{} " - "is_part_of_adjustment_group_ = {} vrt_node_ = {} \n", - size_, num_steps_, nprocs_pof2_, nprocs_rem_, is_part_of_adjustment_group_, - vrt_node_ + "is_part_of_adjustment_group_ = {} vrt_node_ = {} ID = {}\n", + state.size_, num_steps_, nprocs_pof2_, nprocs_rem_, is_part_of_adjustment_group_, + vrt_node_, id ); } template < typename DataT, template class Op, typename ObjT, auto finalHandler > -void Rabenseifner::executeFinalHan() { +void Rabenseifner::executeFinalHan(size_t id) { // theCB()->makeSend(parent_proxy_[this_node_]).sendTuple(std::make_tuple(val_)); - vt_debug_print(terse, allreduce, "Rabenseifner executing final handler\n"); - parent_proxy_[this_node_].template invoke(val_); - completed_ = true; + auto& state = states_.at(id); + vt_debug_print(terse, allreduce, "Rabenseifner executing final handler ID = {}\n", id); + parent_proxy_[this_node_].template invoke(state.val_); + state.completed_ = true; } template < typename DataT, template class Op, typename ObjT, auto finalHandler> -void Rabenseifner::allreduce() { +void Rabenseifner::allreduce(size_t id) { if (is_part_of_adjustment_group_) { - adjustForPowerOfTwo(); + adjustForPowerOfTwo(id); } else { - scatterReduceIter(); + scatterReduceIter(id); } } template < typename DataT, template class Op, typename ObjT, auto finalHandler > -void Rabenseifner::adjustForPowerOfTwo() { +void Rabenseifner::adjustForPowerOfTwo(size_t id) { if (is_part_of_adjustment_group_) { + auto& state = states_.at(id); auto const partner = is_even_ ? this_node_ + 1 : this_node_ - 1; if (is_even_) { proxy_[partner] .template send<&Rabenseifner::adjustForPowerOfTwoRightHalf>( - val_.data() + (size_ / 2), size_ - (size_ / 2)); + state.val_.data() + (state.size_ / 2), state.size_ - (state.size_ / 2), id); + + if(state.left_adjust_message_ != nullptr){ + adjustForPowerOfTwoLeftHalf(state.left_adjust_message_.get()); + } } else { proxy_[partner].template send<&Rabenseifner::adjustForPowerOfTwoLeftHalf>( - val_.data(), size_ / 2); + state.val_.data(), state.size_ / 2, id); + + if(state.right_adjust_message_ != nullptr){ + adjustForPowerOfTwoRightHalf(state.right_adjust_message_.get()); + } } vt_debug_print( - terse, allreduce, "Rabenseifner Part1: Sending to Node {}\n", partner + terse, allreduce, "Rabenseifner (Send Part1): To Node {} ID = {}\n", partner, id ); } } @@ -193,14 +215,28 @@ template < void Rabenseifner::adjustForPowerOfTwoRightHalf( AllreduceRbnRawMsg* msg) { + auto& state = states_[msg->id_]; + + if(not state.initialized_){ + initializeState(msg->id_); + state.right_adjust_message_ = promoteMsg(msg); + + return; + } + + vt_debug_print( + terse, allreduce, "Rabenseifner (Recv Part1): From Node {} ID = {}\n", + theContext()->getFromNodeCurrentTask(), msg->id_ + ); + for (uint32_t i = 0; i < msg->size_; i++) { - Op()(val_[(size_ / 2) + i], msg->val_[i]); + Op()(state.val_[(state.size_ / 2) + i], msg->val_[i]); } // Send to left node proxy_[theContext()->getNode() - 1] .template send<&Rabenseifner::adjustForPowerOfTwoFinalPart>( - val_.data() + (size_ / 2), size_ - (size_ / 2)); + state.val_.data() + (state.size_ / 2), state.size_ - (state.size_ / 2), msg->id_); } template < @@ -209,8 +245,21 @@ template < void Rabenseifner::adjustForPowerOfTwoLeftHalf( AllreduceRbnRawMsg* msg) { + auto& state = states_[msg->id_]; + if(not state.initialized_){ + initializeState(msg->id_); + state.left_adjust_message_ = promoteMsg(msg); + + return; + } + + vt_debug_print( + terse, allreduce, "Rabenseifner (Recv Part1): From Node {} ID = {}\n", + theContext()->getFromNodeCurrentTask(), msg->id_ + ); + for (uint32_t i = 0; i < msg->size_; i++) { - Op()(val_[i], msg->val_[i]); + Op()(state.val_[i], msg->val_[i]); } } @@ -219,94 +268,113 @@ template < > void Rabenseifner::adjustForPowerOfTwoFinalPart( AllreduceRbnRawMsg* msg) { + + vt_debug_print( + terse, allreduce, "Rabenseifner (Recv Part2): From Node {} ID = {}\n", + theContext()->getFromNodeCurrentTask(), msg->id_ + ); + + auto& state = states_[msg->id_]; + for (uint32_t i = 0; i < msg->size_; i++) { - val_[(size_ / 2) + i] = msg->val_[i]; + state.val_[(state.size_ / 2) + i] = msg->val_[i]; } - finished_adjustment_part_ = true; + state.finished_adjustment_part_ = true; - scatterReduceIter(); + scatterReduceIter(msg->id_); } template < typename DataT, template class Op, typename ObjT, auto finalHandler > -bool Rabenseifner::scatterAllMessagesReceived() { +bool Rabenseifner::scatterAllMessagesReceived(size_t id) { + auto& state = states_.at(id); + return std::all_of( - scatter_steps_recv_.cbegin(), scatter_steps_recv_.cbegin() + scatter_step_, + state.scatter_steps_recv_.cbegin(), state.scatter_steps_recv_.cbegin() + state.scatter_step_, [](auto const val) { return val; }); } template < typename DataT, template class Op, typename ObjT, auto finalHandler > -bool Rabenseifner::scatterIsDone() { - return scatter_step_ == num_steps_ and scatter_num_recv_ == num_steps_; +bool Rabenseifner::scatterIsDone(size_t id) { + auto& state = states_.at(id); + return (state.scatter_step_ == num_steps_) and (state.scatter_num_recv_ == num_steps_); } template < typename DataT, template class Op, typename ObjT, auto finalHandler > -bool Rabenseifner::scatterIsReady() { - return ((is_part_of_adjustment_group_ and finished_adjustment_part_) and - scatter_step_ == 0) or - scatterAllMessagesReceived(); +bool Rabenseifner::scatterIsReady(size_t id) { + auto& state = states_.at(id); + return ((is_part_of_adjustment_group_ and state.finished_adjustment_part_) and + state.scatter_step_ == 0) or + ((state.scatter_mask_ < nprocs_pof2_) and scatterAllMessagesReceived(id)); } template < typename DataT, template class Op, typename ObjT, auto finalHandler > void Rabenseifner::scatterTryReduce( - int32_t step) { + size_t id, int32_t step) { + auto& state = states_.at(id); if ( - (step < scatter_step_) and not scatter_steps_reduced_[step] and - scatter_steps_recv_[step] and + (step < state.scatter_step_) and not state.scatter_steps_reduced_[step] and + state.scatter_steps_recv_[step] and std::all_of( - scatter_steps_reduced_.cbegin(), scatter_steps_reduced_.cbegin() + step, + state.scatter_steps_reduced_.cbegin(), state.scatter_steps_reduced_.cbegin() + step, [](auto const val) { return val; })) { - auto& in_msg = scatter_messages_.at(step); + auto& in_msg = state.scatter_messages_.at(step); auto& in_val = in_msg->val_; for (uint32_t i = 0; i < in_msg->size_; i++) { - Op()(val_[r_index_[in_msg->step_] + i], in_val[i]); + Op()(state.val_[state.r_index_[in_msg->step_] + i], in_val[i]); } - scatter_steps_reduced_[step] = true; + state.scatter_steps_reduced_[step] = true; } } template < typename DataT, template class Op, typename ObjT, auto finalHandler > -void Rabenseifner::scatterReduceIter() { - if (not scatterIsReady()) { +void Rabenseifner::scatterReduceIter(size_t id) { + if (not scatterIsReady(id)) { return; } - auto vdest = vrt_node_ ^ scatter_mask_; + auto& state = states_.at(id); + auto vdest = vrt_node_ ^ state.scatter_mask_; auto dest = (vdest < nprocs_rem_) ? vdest * 2 : vdest + nprocs_rem_; + vtAssert(dest < num_nodes_, fmt::format("Rabenseifner Part2 (Send step {}): To Node {} starting with idx = {} and " + "count " + "{} ID = {}\n", + state.scatter_step_, dest, state.s_index_[state.scatter_step_], + state.s_count_[state.scatter_step_], id)); vt_debug_print( terse, allreduce, "Rabenseifner Part2 (Send step {}): To Node {} starting with idx = {} and " "count " - "{} \n", - scatter_step_, dest, s_index_[scatter_step_], - s_count_[scatter_step_] + "{} ID = {}\n", + state.scatter_step_, dest, state.s_index_[state.scatter_step_], + state.s_count_[state.scatter_step_], id ); proxy_[dest].template send<&Rabenseifner::scatterReduceIterHandler>( - val_.data() + s_index_[scatter_step_], s_count_[scatter_step_], scatter_step_ + state.val_.data() + state.s_index_[state.scatter_step_], state.s_count_[state.scatter_step_], id, state.scatter_step_ ); - scatter_mask_ <<= 1; - scatter_step_++; + state.scatter_mask_ <<= 1; + state.scatter_step_++; - scatterTryReduce(scatter_step_ - 1); + scatterTryReduce(id, state.scatter_step_ - 1); - if (scatterIsDone()) { - finished_scatter_part_ = true; - gatherIter(); - } else if (scatterAllMessagesReceived()) { - scatterReduceIter(); + if (scatterIsDone(id)) { + state.finished_scatter_part_ = true; + gatherIter(id); + } else { + scatterReduceIter(id); } } @@ -315,110 +383,118 @@ template < > void Rabenseifner::scatterReduceIterHandler( AllreduceRbnRawMsg* msg) { - scatter_messages_[msg->step_] = promoteMsg(msg); - scatter_steps_recv_[msg->step_] = true; - scatter_num_recv_++; + auto& state = states_.at(msg->id_); - if (not finished_adjustment_part_) { + state.scatter_messages_[msg->step_] = promoteMsg(msg); + state.scatter_steps_recv_[msg->step_] = true; + state.scatter_num_recv_++; + + if (not state.finished_adjustment_part_) { return; } - scatterTryReduce(msg->step_); + scatterTryReduce(msg->id_, msg->step_); vt_debug_print( terse, allreduce, "Rabenseifner Part2 (Recv step {}): scatter_mask_= {} nprocs_pof2_ = {}: " - "idx = {} from {}\n", - msg->step_, scatter_mask_, nprocs_pof2_, r_index_[msg->step_], - theContext()->getFromNodeCurrentTask() + "idx = {} from {} ID = {}\n", + msg->step_, state.scatter_mask_, nprocs_pof2_, state.r_index_[msg->step_], + theContext()->getFromNodeCurrentTask(), msg->id_ ); - if ((scatter_mask_ < nprocs_pof2_) and scatterAllMessagesReceived()) { - scatterReduceIter(); - } else if (scatterIsDone()) { - finished_scatter_part_ = true; - gatherIter(); + if ((state.scatter_mask_ < nprocs_pof2_) and scatterAllMessagesReceived(msg->id_)) { + scatterReduceIter(msg->id_); + } else if (scatterIsDone(msg->id_)) { + state.finished_scatter_part_ = true; + gatherIter(msg->id_); } } template < typename DataT, template class Op, typename ObjT, auto finalHandler > -bool Rabenseifner::gatherAllMessagesReceived() { +bool Rabenseifner::gatherAllMessagesReceived(size_t id) { + auto& state = states_.at(id); return std::all_of( - gather_steps_recv_.cbegin() + gather_step_ + 1, gather_steps_recv_.cend(), + state.gather_steps_recv_.cbegin() + state.gather_step_ + 1, state.gather_steps_recv_.cend(), [](auto const val) { return val; }); } template < typename DataT, template class Op, typename ObjT, auto finalHandler > -bool Rabenseifner::gatherIsDone() { - return (gather_step_ < 0) and (gather_num_recv_ == num_steps_); +bool Rabenseifner::gatherIsDone(size_t id) { + auto& state = states_.at(id); + return (state.gather_step_ < 0) and (state.gather_num_recv_ == num_steps_); } template < typename DataT, template class Op, typename ObjT, auto finalHandler > -bool Rabenseifner::gatherIsReady() { - return (gather_step_ == num_steps_ - 1) or gatherAllMessagesReceived(); +bool Rabenseifner::gatherIsReady(size_t id) { + auto& state = states_.at(id); + return (state.gather_step_ == num_steps_ - 1) or gatherAllMessagesReceived(id); } template < typename DataT, template class Op, typename ObjT, auto finalHandler > void Rabenseifner::gatherTryReduce( - int32_t step) { - auto const doRed = (step > gather_step_) and - not gather_steps_reduced_[step] and gather_steps_recv_[step] and - std::all_of(gather_steps_reduced_.cbegin() + step + 1, - gather_steps_reduced_.cend(), + size_t id, int32_t step) { + auto& state = states_.at(id); + + auto const doRed = (step > state.gather_step_) and + not state.gather_steps_reduced_[step] and state.gather_steps_recv_[step] and + std::all_of(state.gather_steps_reduced_.cbegin() + step + 1, + state.gather_steps_reduced_.cend(), [](auto const val) { return val; }); if (doRed) { - auto& in_msg = gather_messages_.at(step); + auto& in_msg = state.gather_messages_.at(step); auto& in_val = in_msg->val_; for (uint32_t i = 0; i < in_msg->size_; i++) { - val_[s_index_[in_msg->step_] + i] = in_val[i]; + state.val_[state.s_index_[in_msg->step_] + i] = in_val[i]; } - gather_steps_reduced_[step] = true; + state.gather_steps_reduced_[step] = true; } } template < typename DataT, template class Op, typename ObjT, auto finalHandler > -void Rabenseifner::gatherIter() { - if (not gatherIsReady()) { +void Rabenseifner::gatherIter(size_t id) { + if (not gatherIsReady(id)) { return; } - auto vdest = vrt_node_ ^ gather_mask_; + auto& state = states_.at(id); + auto vdest = vrt_node_ ^ state.gather_mask_; auto dest = (vdest < nprocs_rem_) ? vdest * 2 : vdest + nprocs_rem_; vt_debug_print( terse, allreduce, "Rabenseifner Part3 (step {}): Sending to Node {} starting with idx = {} and " "count " - "{} \n", - gather_step_, dest, r_index_[gather_step_], - r_count_[gather_step_] + "{} ID = {}\n", + state.gather_step_, dest, state.r_index_[state.gather_step_], + state.r_count_[state.gather_step_], id ); proxy_[dest].template send<&Rabenseifner::gatherIterHandler>( - val_.data() + r_index_[gather_step_], r_count_[gather_step_], gather_step_ + state.val_.data() + state.r_index_[state.gather_step_], state.r_count_[state.gather_step_], id, state.gather_step_ ); - gather_mask_ >>= 1; - gather_step_--; + state.gather_mask_ >>= 1; + state.gather_step_--; - gatherTryReduce(gather_step_ + 1); + gatherTryReduce(id, state.gather_step_ + 1); - if (gatherIsDone()) { - finalPart(); - } else if (gatherIsReady()) { - gatherIter(); + if (gatherIsDone(id)) { + finalPart(id); + } else if (gatherIsReady(id)) { + gatherIter(id); } } @@ -427,78 +503,81 @@ template < > void Rabenseifner::gatherIterHandler( AllreduceRbnRawMsg* msg) { + auto& state = states_.at(msg->id_); vt_debug_print( - terse, allreduce, "Rabenseifner Part3 (step {}): Received idx = {} from {}\n", - msg->step_, s_index_[msg->step_], - theContext()->getFromNodeCurrentTask() + terse, allreduce, "Rabenseifner Part3 (step {}): Received idx = {} from {} ID = {}\n", + msg->step_, state.s_index_[msg->step_], + theContext()->getFromNodeCurrentTask(), msg->id_ ); - gather_messages_[msg->step_] = promoteMsg(msg); - gather_steps_recv_[msg->step_] = true; - gather_num_recv_++; + state.gather_messages_[msg->step_] = promoteMsg(msg); + state.gather_steps_recv_[msg->step_] = true; + state.gather_num_recv_++; - if (not finished_scatter_part_) { + if (not state.finished_scatter_part_) { return; } - gatherTryReduce(msg->step_); + gatherTryReduce(msg->id_, msg->step_); - if (gather_mask_ > 0 and gatherIsReady()) { - gatherIter(); - } else if (gatherIsDone()) { - finalPart(); + if (state.gather_mask_ > 0 and gatherIsReady(msg->id_)) { + gatherIter(msg->id_); + } else if (gatherIsDone(msg->id_)) { + finalPart(msg->id_); } } template < typename DataT, template class Op, typename ObjT, auto finalHandler > -void Rabenseifner::finalPart() { - if (completed_) { +void Rabenseifner::finalPart(size_t id) { + auto& state = states_.at(id); + if (state.completed_) { return; } if (nprocs_rem_) { - sendToExcludedNodes(); + sendToExcludedNodes(id); } vt_debug_print( terse, allreduce, - "Rabenseifner Part4: Executing final handler with size {}\n", val_.size() + "Rabenseifner Part4: Executing final handler with size {} ID = {}\n", state.val_.size(), id ); parent_proxy_[this_node_].template invoke( - DataType::fromVec(val_) + DataType::fromVec(state.val_) ); - completed_ = true; + state.completed_ = true; - std::fill(scatter_messages_.begin(), scatter_messages_.end(), nullptr); - std::fill(gather_messages_.begin(), gather_messages_.end(), nullptr); + std::fill(state.scatter_messages_.begin(), state.scatter_messages_.end(), nullptr); + std::fill(state.gather_messages_.begin(), state.gather_messages_.end(), nullptr); - scatter_steps_recv_.assign(num_steps_, false); - gather_steps_recv_.assign(num_steps_, false); + state.scatter_steps_recv_.assign(num_steps_, false); + state.gather_steps_recv_.assign(num_steps_, false); - scatter_steps_reduced_.assign(num_steps_, false); - gather_steps_reduced_.assign(num_steps_, false); + state.scatter_steps_reduced_.assign(num_steps_, false); + state.gather_steps_reduced_.assign(num_steps_, false); - r_index_.assign(num_steps_, 0); - r_count_.assign(num_steps_, 0); - s_index_.assign(num_steps_, 0); - s_count_.assign(num_steps_, 0); + state.r_index_.assign(num_steps_, 0); + state.r_count_.assign(num_steps_, 0); + state.s_index_.assign(num_steps_, 0); + state.s_count_.assign(num_steps_, 0); } template < typename DataT, template class Op, typename ObjT, auto finalHandler > -void Rabenseifner::sendToExcludedNodes() { +void Rabenseifner::sendToExcludedNodes(size_t id) { + auto& state = states_.at(id); if (is_part_of_adjustment_group_ and is_even_) { vt_debug_print( - terse, allreduce, "Rabenseifner Part4: Sending to Node {} \n", - this_node_ + 1 + terse, allreduce, "Rabenseifner Part4: Sending to Node {} ID = {}\n", + this_node_ + 1, id ); proxy_[this_node_ + 1] - .template send<&Rabenseifner::sendToExcludedNodesHandler>(val_.data(), size_); + .template send<&Rabenseifner::sendToExcludedNodesHandler>(state.val_.data(), state.size_, id); } } @@ -507,15 +586,16 @@ template < > void Rabenseifner::sendToExcludedNodesHandler( AllreduceRbnRawMsg* msg) { + auto& state = states_.at(msg->id_); vt_debug_print( terse, allreduce, - "Rabenseifner Part4: Received allreduce result with size {}\n", msg->size_ + "Rabenseifner Part4: Received allreduce result with size {} ID = {}\n", msg->size_, msg->id_ ); parent_proxy_[this_node_].template invoke( DataType::fromMemory(msg->val_, msg->size_) ); - completed_ = true; + state.completed_ = true; } } // namespace vt::collective::reduce::allreduce diff --git a/src/vt/collective/reduce/allreduce/recursive_doubling.h b/src/vt/collective/reduce/allreduce/recursive_doubling.h index 69807a084f..02eca76ee6 100644 --- a/src/vt/collective/reduce/allreduce/recursive_doubling.h +++ b/src/vt/collective/reduce/allreduce/recursive_doubling.h @@ -226,8 +226,6 @@ struct RecursiveDoubling { vt::objgroup::proxy::Proxy proxy_ = {}; vt::objgroup::proxy::Proxy parent_proxy_ = {}; - // DataT val_ = {}; - struct State{ DataT val_ = {}; bool finished_adjustment_part_ = false; diff --git a/src/vt/collective/reduce/allreduce/recursive_doubling.impl.h b/src/vt/collective/reduce/allreduce/recursive_doubling.impl.h index 569ed51df3..b614ab8e34 100644 --- a/src/vt/collective/reduce/allreduce/recursive_doubling.impl.h +++ b/src/vt/collective/reduce/allreduce/recursive_doubling.impl.h @@ -341,11 +341,11 @@ void RecursiveDoubling::finalPart(size_t id) { parent_proxy_[this_node_].template invoke(state.val_); state.completed_ = true; + states_.erase(id); + // std::fill(state.messages_.begin(), state.messages_.end(), nullptr); - std::fill(state.messages_.begin(), state.messages_.end(), nullptr); - - state.steps_recv_.assign(num_steps_, false); - state.steps_reduced_.assign(num_steps_, false); + // state.steps_recv_.assign(num_steps_, false); + // state.steps_reduced_.assign(num_steps_, false); } } // namespace vt::collective::reduce::allreduce diff --git a/src/vt/objgroup/manager.h b/src/vt/objgroup/manager.h index 2b336f7acf..50538f8368 100644 --- a/src/vt/objgroup/manager.h +++ b/src/vt/objgroup/manager.h @@ -292,11 +292,11 @@ struct ObjGroupManager : runtime::component::Component { ProxyType proxy, std::string const& name, std::string const& parent = "" ); -template -ObjGroupManager::PendingSendType allreduce(ProxyType proxy, const DataT& data); +template +ObjGroupManager::PendingSendType allreduce(ProxyType proxy, Args&&... data); -template class Op, typename DataT> -ObjGroupManager::PendingSendType allreduce(ProxyType proxy, const DataT& data); +template class Op, typename DataT, typename... Args> +ObjGroupManager::PendingSendType allreduce(ProxyType proxy, Args&&... data); /** * \brief Perform a reduction over an objgroup diff --git a/src/vt/objgroup/manager.impl.h b/src/vt/objgroup/manager.impl.h index e1044ae3ff..c2105a2706 100644 --- a/src/vt/objgroup/manager.impl.h +++ b/src/vt/objgroup/manager.impl.h @@ -41,7 +41,6 @@ //@HEADER */ -#include "vt/configs/types/types_sentinels.h" #if !defined INCLUDED_VT_OBJGROUP_MANAGER_IMPL_H #define INCLUDED_VT_OBJGROUP_MANAGER_IMPL_H @@ -268,31 +267,11 @@ ObjGroupManager::PendingSendType ObjGroupManager::broadcast(MsgSharedPtr m return objgroup::broadcast(msg,han); } - -// Helper trait to detect if a type is a specialization of a given variadic template -template