diff --git a/src/vt/collective/reduce/allreduce/allreduce.h b/src/vt/collective/reduce/allreduce/allreduce.h index 83f8f16472..db92fc890f 100644 --- a/src/vt/collective/reduce/allreduce/allreduce.h +++ b/src/vt/collective/reduce/allreduce/allreduce.h @@ -47,11 +47,12 @@ #include "vt/config.h" #include "vt/context/context.h" #include "vt/messaging/message/message.h" +#include "vt/objgroup/proxy/proxy_objgroup.h" #include #include -namespace vt::collective::reduce::alleduce { +namespace vt::collective::reduce::allreduce { template struct AllreduceMsg @@ -66,65 +67,244 @@ struct AllreduceMsg explicit AllreduceMsg(DataT&& in_val) : MessageParentType(), val_(std::forward(in_val)) { } - explicit AllreduceMsg(DataT const& in_val) + explicit AllreduceMsg(DataT const& in_val, int step = 0) : MessageParentType(), - val_(in_val) { } + val_(in_val), + step_(step) { } template void serialize(SerializeT& s) { MessageParentType::serialize(s); s | val_; + s | step_; } DataT val_ = {}; + int32_t step_ = {}; }; template struct Allreduce { - void rightHalf(AllreduceMsg* msg) { - for (int i = 0; i < msg->vec_.size(); i++) { - val_[(val_.size() / 2) + i] += msg->vec_[i]; + void initialize( + const DataT& data, vt::objgroup::proxy::Proxy proxy, + uint32_t num_nodes) { + this_node_ = vt::theContext()->getNode(); + is_even_ = this_node_ % 2 == 0; + val_ = data; + proxy_ = proxy; + num_steps_ = static_cast(log2(num_nodes)); + nprocs_pof2_ = 1 << num_steps_; + nprocs_rem_ = num_nodes - nprocs_pof2_; + is_part_of_adjustment_group_ = this_node_ < (2 * nprocs_rem_); + if (is_part_of_adjustment_group_) { + if (is_even_) { + vrt_node_ = this_node_ / 2; + } else { + vrt_node_ = -1; + } + } else { + vrt_node_ = this_node_ - nprocs_rem_; + } + + r_index_.resize(num_steps_, 0); + r_count_.resize(num_steps_, 0); + s_index_.resize(num_steps_, 0); + s_count_.resize(num_steps_, 0); + + w_size_ = data.size(); + + int step = 0; + size_t wsize = data.size(); + for (int mask = 1; mask < nprocs_pof2_; mask <<= 1) { + auto vdest = vrt_node_ ^ mask; + auto dest = (vdest < nprocs_rem_) ? vdest * 2 : vdest + nprocs_rem_; + + if (this_node_ < dest) { + r_count_[step] = wsize / 2; + s_count_[step] = wsize - r_count_[step]; + s_index_[step] = r_index_[step] + r_count_[step]; + } else { + s_count_[step] = wsize / 2; + r_count_[step] = wsize - s_count_[step]; + r_index_[step] = s_index_[step] + s_count_[step]; + } + + if (step + 1 < num_steps_) { + r_index_[step + 1] = r_index_[step]; + s_index_[step + 1] = r_index_[step]; + wsize = r_count_[step]; + step++; + } + } + + // std::string str(1024, 0x0); + // for (int i = 0; i < num_steps_; ++i) { + // str.append(fmt::format( + // "Step{}: send_idx = {} send_count = {} recieve_idx = {} recieve_count " + // "= {}\n", + // i, s_index_[i], s_count_[i], r_index_[i], r_count_[i])); + // } + // fmt::print( + // "[{}] Initialize with size = {} num_steps {} \n {}", this_node_, w_size_, + // num_steps_, str); + } + + void partOneCollective() { + if (is_part_of_adjustment_group_) { + auto const partner = is_even_ ? this_node_ + 1 : this_node_ - 1; + + if (is_even_) { + proxy_[partner].template send<&Allreduce::partOneRightHalf>( + std::vector{val_.begin() + (val_.size() / 2), val_.end()}); + vrt_node_ = this_node_ / 2; + } else { + proxy_[partner].template send<&Allreduce::partOneLeftHalf>( + std::vector{val_.begin(), val_.end() - (val_.size() / 2)}); + vrt_node_ = -1; + } + } else { + vrt_node_ = this_node_ - nprocs_rem_; + } + + if (nprocs_rem_ == 0) { + partTwo(); } } - void rightHalfComplete(AllreduceMsg* msg) { - for (int i = 0; i < msg->vec_.size(); i++) { - val_[(val_.size() / 2) + i] = msg->vec_[i]; + void partOneRightHalf(AllreduceMsg* msg) { + for (int i = 0; i < msg->val_.size(); i++) { + val_[(val_.size() / 2) + i] += msg->val_[i]; } + + // Send to left node + proxy_[theContext()->getNode() - 1] + .template send<&Allreduce::partOneFinalPart>( + std::vector{val_.begin() + (val_.size() / 2), val_.end()}); } - void leftHalf(AllreduceMsg* msg) { - for (int i = 0; i < msg->vec_.size(); i++) { - val_[i] += msg->vec_[i]; + void partOneLeftHalf(AllreduceMsg* msg) { + for (int i = 0; i < msg->val_.size(); i++) { + val_[i] += msg->val_[i]; } } - void leftHalfComplete(AllreduceMsg* msg) { - for (int i = 0; i < msg->vec_.size(); i++) { - val_[i] = msg->vec_[i]; + void partOneFinalPart(AllreduceMsg* msg) { + for (int i = 0; i < msg->val_.size(); i++) { + val_[(val_.size() / 2) + i] = msg->val_[i]; } + + partTwo(); } - void sendHandler(AllreduceMsg* msg) { - uint32_t start = is_even_ ? 0 : val_.size() / 2; - uint32_t end = is_even_ ? val_.size() / 2 : val_.size(); - for (int i = 0; start < end; start++) { - val_[start] += msg->vec_[i++]; + void partTwo() { + auto vdest = vrt_node_ ^ mask_; + auto dest = (vdest < nprocs_rem_) ? vdest * 2 : vdest + nprocs_rem_; + + // fmt::print( + // "[{}] Part2 Step {}: Sending to Node {} starting with idx = {} and count " + // "{} \n", + // this_node_, step_, dest, s_index_[step_], s_count_[step_]); + proxy_[dest].template send<&Allreduce::partTwoHandler>( + std::vector{ + val_.begin() + (s_index_[step_]), + val_.begin() + (s_index_[step_]) + s_count_[step_]}, + step_); + + mask_ <<= 1; + if (step_ + 1 < num_steps_) { + step_++; } } - void reducedHan(AllreduceMsg* msg) { - for (int i = 0; i < msg->vec_.size(); i++) { - val_[val_.size() / 2 + i] = msg->vec_[i]; + void partTwoHandler(AllreduceMsg* msg) { + for (int i = 0; i < msg->val_.size(); i++) { + val_[r_index_[msg->step_] + i] += msg->val_[i]; + } + + // std::string data(128, 0x0); + // for (auto val : msg->val_) { + // data.append(fmt::format("{} ", val)); + // } + // fmt::print( + // "[{}] Part2 Step {}: Received data ({}) idx = {} from {}\n", this_node_, + // msg->step_, data, r_index_[msg->step_], + // theContext()->getFromNodeCurrentTask()); + + if (mask_ < nprocs_pof2_) { + partTwo(); + } else { + step_ = num_steps_ - 1; + mask_ = nprocs_pof2_ >> 1; + partThree(); } } - Allreduce() { is_even_ = theContext()->getNode() % 2 == 0; } + void partThree() { + auto vdest = vrt_node_ ^ mask_; + auto dest = (vdest < nprocs_rem_) ? vdest * 2 : vdest + nprocs_rem_; + + // std::string data(128, 0x0); + // auto subV = std::vector{ + // val_.begin() + (r_index_[step_]), + // val_.begin() + (r_index_[step_]) + r_count_[step_]}; + // for (auto val : subV) { + // data.append(fmt::format("{} ", val)); + // } + + // fmt::print( + // "[{}] Part3 Step {}: Sending to Node {} starting with idx = {} and count " + // "{} " + // "data={} \n", + // this_node_, step_, dest, r_index_[step_], r_count_[step_], data); + + proxy_[dest].template send<&Allreduce::partThreeHandler>( + std::vector{ + val_.begin() + (r_index_[step_]), + val_.begin() + (r_index_[step_]) + r_count_[step_]}, + step_); + + mask_ >>= 1; + step_--; + } + + void partThreeHandler(AllreduceMsg* msg) { + for (int i = 0; i < msg->val_.size(); i++) { + val_[s_index_[msg->step_] + i] = msg->val_[i]; + } + + // std::string data(128, 0x0); + // for (auto val : msg->val_) { + // data.append(fmt::format("{} ", val)); + // } + // fmt::print( + // "[{}] Part3 Step {}: Received data ({}) idx = {} from {}\n", this_node_, + // msg->step_, data, s_index_[msg->step_], + // theContext()->getFromNodeCurrentTask()); + if (mask_ > 0) { + partThree(); + } + } + + NodeType this_node_ = {}; bool is_even_ = false; + vt::objgroup::proxy::Proxy proxy_ = {}; DataT val_ = {}; + NodeType vrt_node_ = {}; + bool is_part_of_adjustment_group_ = false; + int32_t num_steps_ = {}; + int32_t nprocs_pof2_ = {}; + int32_t nprocs_rem_ = {}; + int32_t mask_ = 1; + + size_t w_size_ = {}; + int32_t step_ = 0; + std::vector r_index_ = {}; + std::vector r_count_ = {}; + std::vector s_index_ = {}; + std::vector s_count_ = {}; }; -} // namespace vt::collective::reduce::alleduce +} // namespace vt::collective::reduce::allreduce #endif /*INCLUDED_VT_COLLECTIVE_REDUCE_REDUCE_H*/ diff --git a/src/vt/collective/reduce/allreduce/rabenseifner.h b/src/vt/collective/reduce/allreduce/rabenseifner.h index f15c522a80..bc5275352f 100644 --- a/src/vt/collective/reduce/allreduce/rabenseifner.h +++ b/src/vt/collective/reduce/allreduce/rabenseifner.h @@ -9,11 +9,10 @@ #include -namespace vt::collective::reduce::alleduce { +namespace vt::collective::reduce::allreduce { template class Op, typename... Args> -void allreduce(Args&&... data) { - +void allreduce_r(Args&&... data) { auto msg = vt::makeMessage(std::forward(data)...); auto const this_node = vt::theContext()->getNode(); auto const num_nodes = theContext()->getNumNodes(); @@ -39,7 +38,8 @@ void allreduce(Args&&... data) { vt::runInEpochCollective([=] { if (is_part_of_adjustment_group) { auto const partner = is_even ? this_node + 1 : this_node - 1; - grp_proxy[partner].send<&Reducer::sendHandler>(std::forward(data...)); + grp_proxy[partner].send<&Reducer::sendHandler>( + std::forward(data...)); } }); @@ -123,6 +123,6 @@ void allreduce(Args&&... data) { */ } -} // namespace vt::collective::reduce::alleduce +} // namespace vt::collective::reduce::allreduce -#endif // INCLUDED_VT_COLLECTIVE_REDUCE_ALLREDUCE_RABENSEIFNER_H \ No newline at end of file +#endif // INCLUDED_VT_COLLECTIVE_REDUCE_ALLREDUCE_RABENSEIFNER_H diff --git a/src/vt/objgroup/manager.h b/src/vt/objgroup/manager.h index 447cf2b6e0..f84eddb835 100644 --- a/src/vt/objgroup/manager.h +++ b/src/vt/objgroup/manager.h @@ -291,6 +291,9 @@ struct ObjGroupManager : runtime::component::Component { ProxyType proxy, std::string const& name, std::string const& parent = "" ); +template class Op, typename DataT> +ObjGroupManager::PendingSendType allreduce_r(ProxyType proxy, const DataT& data); + /** * \brief Perform a reduction over an objgroup * diff --git a/src/vt/objgroup/manager.impl.h b/src/vt/objgroup/manager.impl.h index 26ae714e04..f1f64877d2 100644 --- a/src/vt/objgroup/manager.impl.h +++ b/src/vt/objgroup/manager.impl.h @@ -41,6 +41,8 @@ //@HEADER */ +#include "vt/messaging/message/smart_ptr.h" +#include #if !defined INCLUDED_VT_OBJGROUP_MANAGER_IMPL_H #define INCLUDED_VT_OBJGROUP_MANAGER_IMPL_H @@ -57,6 +59,7 @@ #include "vt/collective/collective_alg.h" #include "vt/messaging/active.h" #include "vt/elm/elm_id_bits.h" +#include "vt/collective/reduce/allreduce/allreduce.h" #include @@ -262,6 +265,32 @@ ObjGroupManager::PendingSendType ObjGroupManager::broadcast(MsgSharedPtr m return objgroup::broadcast(msg,han); } +template < + auto f, typename ObjT, template class Op, typename DataT> +ObjGroupManager::PendingSendType +ObjGroupManager::allreduce_r(ProxyType proxy, const DataT& data) { + // check payload size and choose appropriate algorithm + + auto const this_node = vt::theContext()->getNode(); + auto const num_nodes = theContext()->getNumNodes(); + + using Reducer = collective::reduce::allreduce::Allreduce; + + auto grp_proxy = + vt::theObjGroup()->makeCollective("allreduce_rabenseifner"); + + grp_proxy[this_node].template invoke<&Reducer::initialize>( + data, grp_proxy, num_nodes); + + vt::runInEpochCollective([=] { + grp_proxy[this_node].template invoke<&Reducer::partOneCollective>(); + }); + + proxy[this_node].template invoke(grp_proxy.get()->val_); + + return PendingSendType{nullptr}; +} + template *f> ObjGroupManager::PendingSendType ObjGroupManager::reduce( ProxyType proxy, MsgSharedPtr msg, diff --git a/src/vt/objgroup/proxy/proxy_objgroup.h b/src/vt/objgroup/proxy/proxy_objgroup.h index 825c652df0..eeb37e5eff 100644 --- a/src/vt/objgroup/proxy/proxy_objgroup.h +++ b/src/vt/objgroup/proxy/proxy_objgroup.h @@ -198,6 +198,15 @@ struct Proxy { Args&&... args ) const; + template < + auto f, + template class Op = collective::NoneOp, + typename... Args + > + PendingSendType allreduce_h( + Args&&... args + ) const; + /** * \brief Reduce back to a point target. Performs a reduction using operator * `Op` followed by a send to `f` with the result. diff --git a/src/vt/objgroup/proxy/proxy_objgroup.impl.h b/src/vt/objgroup/proxy/proxy_objgroup.impl.h index 337f9fcaab..911f420b0e 100644 --- a/src/vt/objgroup/proxy/proxy_objgroup.impl.h +++ b/src/vt/objgroup/proxy/proxy_objgroup.impl.h @@ -203,6 +203,24 @@ Proxy::allreduce( >(proxy, msg.get(), stamp); } +template +template < + auto f, + template class Op, + typename... Args +> +typename Proxy::PendingSendType +Proxy::allreduce_h( + Args&&... args +) const { + auto proxy = Proxy(*this); + return theObjGroup()->allreduce_r< + f, + ObjT, + Op + >(proxy, std::forward(args)...); +} + template template < auto f, diff --git a/tests/perf/reduce.cc b/tests/perf/reduce.cc index ef469ffc3f..478890d2c9 100644 --- a/tests/perf/reduce.cc +++ b/tests/perf/reduce.cc @@ -41,12 +41,13 @@ //@HEADER */ #include "common/test_harness.h" +#include "vt/collective/collective_alg.h" #include "vt/context/context.h" #include #include #include #include -#include +#include #include INCLUDE_FMT_CORE @@ -62,51 +63,81 @@ struct MyTest : PerfTestHarness { struct NodeObj { explicit NodeObj(MyTest* test_obj) : test_obj_(test_obj) { } - void initialize() { proxy_ = vt::theObjGroup()->getProxy(this); -// data_["Node"] = theContext()->getNode(); } + void initialize() { + proxy_ = vt::theObjGroup()->getProxy(this); + // data_["Node"] = theContext()->getNode(); } } - struct MyMsg : vt::Message {}; + struct MyMsg : vt::Message { }; - void reduceComplete(std::vector in) { - reduce_counter_++; - test_obj_->StopTimer(fmt::format("{} reduce", i)); - test_obj_->GetMemoryUsage(); - if (i < num_iters) { - i++; - auto this_node = theContext()->getNode(); - proxy_[this_node].send(); - } else if (theContext()->getNode() == 0) { - theTerm()->enableTD(); - } + void newReduceComplete(std::vector in) { + // fmt::print( + // "\n[{}]: allreduce_h done! (Size == {}) Results are ...\n", + // theContext()->getNode(), in.size()); + + // for (int node = 0; node < theContext()->getNumNodes(); ++node) { + // if (node == theContext()->getNode()) { + // std::string printer(128, 0x0); + // for (auto val : in) { + // printer.append(fmt::format("{} ", val)); + // } + + // fmt::print("{}\n", printer); + + // theCollective()->barrier(); + // } + // } + + // fmt::print("\n"); } - void perfReduce(MyMsg* in_msg) { - test_obj_->StartTimer(fmt::format("{} reduce", i)); + void reduceComplete(std::vector in) { + // fmt::print( + // "[{}]: allreduce done! Results are ...\n", theContext()->getNode()); + // for (auto val : in) { + // fmt::print("{} ", val); + // } - proxy_.allreduce<&NodeObj::reduceComplete, collective::PlusOp>(data_); + // fmt::print("\n"); } private: MyTest* test_obj_ = nullptr; vt::objgroup::proxy::Proxy proxy_ = {}; - int reduce_counter_ = -1; - int i = 0; - std::vector data_ = {}; }; VT_PERF_TEST(MyTest, test_reduce) { - auto grp_proxy = vt::theObjGroup()->makeCollective( - "test_reduce", this - ); + auto grp_proxy = + vt::theObjGroup()->makeCollective("test_allreduce", this); + + if (theContext()->getNode() == 0) { + theTerm()->disableTD(); + } + + vt::runInEpochCollective([=] { + grp_proxy.allreduce<&NodeObj::reduceComplete, collective::PlusOp>(data); + }); + + if (theContext()->getNode() == 0) { + theTerm()->enableTD(); + } +} + +VT_PERF_TEST(MyTest, test_allreduce) { + auto grp_proxy = + vt::theObjGroup()->makeCollective("test_allreduce", this); if (theContext()->getNode() == 0) { theTerm()->disableTD(); } - grp_proxy[my_node_].invoke<&NodeObj::initialize>(); + vt::runInEpochCollective([=] { + grp_proxy.allreduce_h<&NodeObj::newReduceComplete, collective::PlusOp>( + data); + }); - using MsgType = typename NodeObj::MyMsg; - grp_proxy[my_node_].send(); + if (theContext()->getNode() == 0) { + theTerm()->enableTD(); + } } VT_PERF_TEST_MAIN() diff --git a/tests/perf/send_cost.cc b/tests/perf/send_cost.cc index 34692eaaa8..3ad7e90822 100644 --- a/tests/perf/send_cost.cc +++ b/tests/perf/send_cost.cc @@ -40,6 +40,357 @@ // ***************************************************************************** //@HEADER */ +#include "common/test_harness.h" +#include "vt/collective/collective_alg.h" +#include "vt/configs/error/config_assert.h" +#include "vt/configs/error/hard_error.h" +#include "vt/context/context.h" +#include "vt/messaging/message/shared_message.h" +#include "vt/scheduler/scheduler.h" +#include +#include +#include +#include +#include + +#include + +#include + +using namespace vt; +using namespace vt::tests::perf::common; + +// static constexpr std::array const payloadSizes = { +// 1, 64, 128, 2048, 16384, 524288, 268435456}; + +static constexpr std::array const payloadSizes = {1, 64, 128}; + +vt::EpochType the_epoch = vt::no_epoch; + +struct SendTest : PerfTestHarness { }; + +//////////////////////////////////////// +//////////////// RAW MPI /////////////// +//////////////////////////////////////// + +// VT_PERF_TEST(SendTest, test_send) { +// auto const thisNode = vt::theContext()->getNode(); + +// if (thisNode == 0) { +// vt::theTerm()->disableTD(); +// } + +// auto const lastNode = theContext()->getNumNodes() - 1; + +// auto const prevNode = (thisNode - 1 + num_nodes_) % num_nodes_; +// auto const nextNode = (thisNode + 1) % num_nodes_; +// int data = thisNode; + +// for (auto size : payloadSizes) { +// std::vector dataVec(size, data); +// std::vector recvData(size, data); + +// StartTimer(fmt::format("Payload size {}", size)); + +// MPI_Request request; +// MPI_Irecv( +// &recvData[0], size, MPI_INT, prevNode, 0, MPI_COMM_WORLD, &request); +// MPI_Send(&dataVec[0], size, MPI_INT, nextNode, 0, MPI_COMM_WORLD); + +// MPI_Wait(&request, MPI_STATUS_IGNORE); + +// StopTimer(fmt::format("Payload size {}", size)); +// } + +// if (vt::theContext()->getNode() == 0) { +// vt::theTerm()->enableTD(); +// } +// } + +//////////////////////////////////////// +///////////// OBJECT GROUP ///////////// +//////////////////////////////////////// + +struct NodeObj { + struct PingMsg : Message { + using MessageParentType = vt::Message; + vt_msg_serialize_required(); + std::vector vec_; + + PingMsg() : Message() { } + explicit PingMsg(std::vector data) : Message() { vec_ = data; } + + explicit PingMsg(size_t size) : Message() { + vec_.resize(size, vt::theContext()->getNode() + 1); + } + PingMsg(size_t size, int32_t val) : Message() { vec_.resize(size, val); } + + template + void serialize(SerializerT& s) { + MessageParentType::serialize(s); + s | vec_; + } + }; + + void rightHalf(NodeObj::PingMsg* msg) { + for (int i = 0; i < msg->vec_.size(); i++) { + data_[(data_.size() / 2) + i] += msg->vec_[i]; + } + } + + void rightHalfComplete(NodeObj::PingMsg* msg) { + for (int i = 0; i < msg->vec_.size(); i++) { + data_[(data_.size() / 2) + i] = msg->vec_[i]; + } + } + + void leftHalf(NodeObj::PingMsg* msg) { + for (int i = 0; i < msg->vec_.size(); i++) { + data_[i] += msg->vec_[i]; + } + } + + void leftHalfComplete(NodeObj::PingMsg* msg) { + for (int i = 0; i < msg->vec_.size(); i++) { + data_[i] = msg->vec_[i]; + } + } + + void sendHandler(NodeObj::PingMsg* msg) { + uint32_t start = isEven_ ? 0 : data_.size() / 2; + uint32_t end = isEven_ ? data_.size() / 2 : data_.size(); + for (int i = 0; start < end; start++) { + data_[start] += msg->vec_[i++]; + } + } + + void reducedHan(NodeObj::PingMsg* msg) { + for (int i = 0; i < msg->vec_.size(); i++) { + data_[data_.size() / 2 + i] = msg->vec_[i]; + } + } + + explicit NodeObj(SendTest* test_obj) : test_obj_(test_obj) { + data_.resize(268435456, theContext()->getNode() + 1); + isEven_ = theContext()->getNode() % 2 == 0; + } + + void initialize() { proxy_ = vt::theObjGroup()->getProxy(this); } + void printData() { + for (auto v : data_) { + fmt::print("[{}] {}\n", theContext()->getNode(), v); + } + } + + void printDataFinal(std::vector vec) { + for (auto v : vec) { + // fmt::print("[{}] {}\n", theContext()->getNode(), v); + } + handled_ = true; + } + + bool isEven_ = false; + bool handled_ = false; + SendTest* test_obj_ = nullptr; + vt::objgroup::proxy::Proxy proxy_ = {}; + + std::vector data_ = {}; +}; + +static inline int opal_hibit(int value, int start) { + unsigned int mask; + --start; + mask = 1 << start; + + for (; start >= 0; --start, mask >>= 1) { + if (value & mask) { + break; + } + } + + return start; +} + +// VT_PERF_TEST(SendTest, test_allreduce) { +// auto grp_proxy = +// vt::theObjGroup()->makeCollective("test_objgroup_send", this); +// grp_proxy[my_node_].invoke<&NodeObj::initialize>(); + +// if (theContext()->getNode() == 0) { +// theTerm()->disableTD(); +// } + +// vt::runInEpochCollective([=] { +// grp_proxy.allreduce<&NodeObj::printDataFinal, vt::collective::PlusOp>( +// std::vector(268435456, theContext()->getNode() + 1)); +// }); + +// vtAssert(grp_proxy[theContext()->getNode()].get()->handled_, ""); +// if (vt::theContext()->getNode() == 0) { +// vt::theTerm()->enableTD(); +// } +// } + +VT_PERF_TEST(SendTest, test_objgroup_send) { + auto grp_proxy = + vt::theObjGroup()->makeCollective("test_objgroup_send", this); + grp_proxy[my_node_].invoke<&NodeObj::initialize>(); + + if (theContext()->getNode() == 0) { + theTerm()->disableTD(); + } + + auto const thisNode = vt::theContext()->getNode(); + auto const lastNode = theContext()->getNumNodes() - 1; + + int nsteps = 2; + auto nprocs_rem = 0; + size_t count = 32; //1 << 6; + auto* buf = (int32_t*)malloc(sizeof(int32_t) * count); + auto nprocs_pof2 = 1 << nsteps; + auto rank = theContext()->getNode(); + auto vrank = theContext()->getNode(); + int *rindex = NULL, *rcount = NULL, *sindex = NULL, *scount = NULL; + rindex = (int*)malloc(sizeof(*rindex) * nsteps); + sindex = (int*)malloc(sizeof(*sindex) * nsteps); + rcount = (int*)malloc(sizeof(*rcount) * nsteps); + scount = (int*)malloc(sizeof(*scount) * nsteps); + + int step = 0; + auto wsize = count; + sindex[0] = rindex[0] = 0; + + fmt::print( + "[{}] Starting with numNodes = {} dataSize = {} \n", rank, nprocs_pof2, + count); + for (int mask = 1; mask < nprocs_pof2; mask <<= 1) { + /* + * On each iteration: rindex[step] = sindex[step] -- beginning of the + * current window. Length of the current window is storded in wsize. + */ + int vdest = vrank ^ mask; + /* Translate vdest virtual rank to real rank */ + int dest = (vdest < nprocs_rem) ? vdest * 2 : vdest + nprocs_rem; + + if (rank < dest) { + /* + * Recv into the left half of the current window, send the right + * half of the window to the peer (perform reduce on the left + * half of the current window) + */ + rcount[step] = wsize / 2; + scount[step] = wsize - rcount[step]; + sindex[step] = rindex[step] + rcount[step]; + } else { + /* + * Recv into the right half of the current window, send the left + * half of the window to the peer (perform reduce on the right + * half of the current window) + */ + scount[step] = wsize / 2; + rcount[step] = wsize - scount[step]; + rindex[step] = sindex[step] + scount[step]; + } + + /* Send part of data from the rbuf, recv into the tmp_buf */ + // err = ompi_coll_base_sendrecv( + // (char*)rbuf + (ptrdiff_t)sindex[step] * extent, scount[step], dtype, dest, + // MCA_COLL_BASE_TAG_ALLREDUCE, + // (char*)tmp_buf + (ptrdiff_t)rindex[step] * extent, rcount[step], dtype, + // dest, MCA_COLL_BASE_TAG_ALLREDUCE, comm, MPI_STATUS_IGNORE, rank); + + fmt::print( + "[{}] Sending to rank {} data + offset({}) with count = {}\nPerforming " + "reduce on data starting with offset {} and count {}\n", + rank, dest, sindex[step], scount[step], rindex[step], rcount[step]); + + theCollective()->barrier(); + /* Local reduce: rbuf[] = tmp_buf[] rbuf[] */ + // ompi_op_reduce( + // op, (char*)tmp_buf + (ptrdiff_t)rindex[step] * extent, + // (char*)rbuf + (ptrdiff_t)rindex[step] * extent, rcount[step], dtype); + + /* Move the current window to the received message */ + if (step + 1 < nsteps) { + rindex[step + 1] = rindex[step]; + sindex[step + 1] = rindex[step]; + wsize = rcount[step]; + step++; + } + } + step = nsteps - 1; + + for (int mask = nprocs_pof2 >> 1; mask > 0; mask >>= 1) { + int vdest = vrank ^ mask; + /* Translate vdest virtual rank to real rank */ + int dest = (vdest < nprocs_rem) ? vdest * 2 : vdest + nprocs_rem; + + /* + * Send rcount[step] elements from rbuf[rindex[step]...] + * Recv scount[step] elements to rbuf[sindex[step]...] + */ + // err = ompi_coll_base_sendrecv((char *)rbuf + (ptrdiff_t)rindex[step] * extent, + // rcount[step], dtype, dest, + // MCA_COLL_BASE_TAG_ALLREDUCE, + // (char *)rbuf + (ptrdiff_t)sindex[step] * extent, + // scount[step], dtype, dest, + // MCA_COLL_BASE_TAG_ALLREDUCE, comm, + // MPI_STATUS_IGNORE, rank); + fmt::print( + "[{}] Sending to rank {} data + offset({}) with count = {}\nReceiving " + "data starting with offset {} and count {}\n", + rank, dest, rindex[step], rcount[step], sindex[step], scount[step]); + step--; + } + + if (vt::theContext()->getNode() == 0) { + vt::theTerm()->enableTD(); + } +} + +VT_PERF_TEST_MAIN() +/* +//@HEADER +// ***************************************************************************** +// +// send_cost.cc +// DARMA/vt => Virtual Transport +// +// Copyright 2019-2021 National Technology & Engineering Solutions of Sandia, LLC +// (NTESS). Under the terms of Contract DE-NA0003525 with NTESS, the U.S. +// Government retains certain rights in this software. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are met: +// +// * Redistributions of source code must retain the above copyright notice, +// this list of conditions and the following disclaimer. +// +// * Redistributions in binary form must reproduce the above copyright notice, +// this list of conditions and the following disclaimer in the documentation +// and/or other materials provided with the distribution. +// +// * Neither the name of the copyright holder nor the names of its +// contributors may be used to endorse or promote products derived from this +// software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +// AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +// ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE +// LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +// CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +// SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +// CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +// ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +// POSSIBILITY OF SUCH DAMAGE. +// +// Questions? Contact darma@sandia.gov +// +// ***************************************************************************** +//@HEADER +*/ #include "common/test_harness.h" #include "vt/collective/collective_alg.h"