Skip to content

Commit

Permalink
#2240: Fix failing unit and performance tests for multiple allreduce …
Browse files Browse the repository at this point in the history
…in flight
  • Loading branch information
JacobDomagala committed Oct 10, 2024
1 parent da46b94 commit 641c9dd
Show file tree
Hide file tree
Showing 13 changed files with 143 additions and 60 deletions.
9 changes: 4 additions & 5 deletions src/vt/collective/reduce/allreduce/data_handler.h
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@

/*
//@HEADER
// *****************************************************************************
Expand Down Expand Up @@ -47,9 +46,9 @@

#include <vector>

#ifdef KOKKOS_ENABLED_CHECKPOINT
#ifdef MAGISTRATE_KOKKOS_ENABLED
#include <Kokkos_Core.hpp>
#endif
#endif // MAGISTRATE_KOKKOS_ENABLED

namespace vt::collective::reduce::allreduce {

Expand Down Expand Up @@ -101,7 +100,7 @@ class DataHandler<std::vector<T>> {
// }
};

#if KOKKOS_ENABLED_CHECKPOINT
#if MAGISTRATE_KOKKOS_ENABLED

template <typename T, typename... Props>
class DataHandler<Kokkos::View<T*, Kokkos::HostSpace, Props...>> {
Expand Down Expand Up @@ -146,7 +145,7 @@ class DataHandler<Kokkos::View<T*, Kokkos::HostSpace, Props...>> {
// }
};

#endif // KOKKOS_ENABLED_CHECKPOINT
#endif // MAGISTRATE_KOKKOS_ENABLED

} // namespace vt::collective::reduce::allreduce

Expand Down
3 changes: 3 additions & 0 deletions src/vt/collective/reduce/allreduce/rabenseifner.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
#include "vt/registry/auto/auto_registry.h"
#include "vt/pipe/pipe_manager.h"
#include "data_handler.h"
#include "type.h"

#include <cstdint>

Expand Down Expand Up @@ -358,6 +359,8 @@ struct Rabenseifner {

NodeType vrt_node_ = {};
bool is_part_of_adjustment_group_ = false;
static inline const std::string name_ = "Rabenseifner";
static const ReducerType type_ = ReducerType::Rabenseifner;
};

} // namespace vt::collective::reduce::allreduce
Expand Down
21 changes: 14 additions & 7 deletions src/vt/collective/reduce/allreduce/rabenseifner.impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,9 @@ void Rabenseifner<DataT, Op, ObjT, finalHandler>::initialize(size_t id, Args&&..
auto& state = states_[id];
state.val_ = DataType::toVec(std::forward<Args>(data)...);

initializeState(id);
if(not state.initialized_){
initializeState(id);
}

int step = 0;
state.size_ = state.val_.size();
Expand Down Expand Up @@ -347,11 +349,7 @@ void Rabenseifner<DataT, Op, ObjT, finalHandler>::scatterReduceIter(size_t id) {
auto& state = states_.at(id);
auto vdest = vrt_node_ ^ state.scatter_mask_;
auto dest = (vdest < nprocs_rem_) ? vdest * 2 : vdest + nprocs_rem_;
vtAssert(dest < num_nodes_, fmt::format("Rabenseifner Part2 (Send step {}): To Node {} starting with idx = {} and "
"count "
"{} ID = {}\n",
state.scatter_step_, dest, state.s_index_[state.scatter_step_],
state.s_count_[state.scatter_step_], id));

vt_debug_print(
terse, allreduce,
"Rabenseifner Part2 (Send step {}): To Node {} starting with idx = {} and "
Expand Down Expand Up @@ -383,7 +381,16 @@ template <
>
void Rabenseifner<DataT, Op, ObjT, finalHandler>::scatterReduceIterHandler(
AllreduceRbnRawMsg<Scalar>* msg) {
auto& state = states_.at(msg->id_);
auto& state = states_[msg->id_];

if(not state.initialized_){
initializeState(msg->id_);
state.scatter_messages_[msg->step_] = promoteMsg(msg);
state.scatter_steps_recv_[msg->step_] = true;
state.scatter_num_recv_++;

return;
}

state.scatter_messages_[msg->step_] = promoteMsg(msg);
state.scatter_steps_recv_[msg->step_] = true;
Expand Down
2 changes: 2 additions & 0 deletions src/vt/collective/reduce/allreduce/recursive_doubling.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
#include "vt/configs/error/config_assert.h"
#include "vt/messaging/message/smart_ptr.h"
#include "data_handler.h"
#include "type.h"

#include <tuple>
#include <cstdint>
Expand Down Expand Up @@ -254,6 +255,7 @@ struct RecursiveDoubling {

NodeType vrt_node_ = {};
bool is_part_of_adjustment_group_ = false;
static inline ReducerType type_ = ReducerType::RedursiveDoubling;
};

} // namespace vt::collective::reduce::allreduce
Expand Down
4 changes: 4 additions & 0 deletions src/vt/collective/reduce/allreduce/recursive_doubling.impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,10 @@ void RecursiveDoubling<DataT, Op, ObjT, finalHandler>::finalPart(size_t id) {
parent_proxy_[this_node_].template invoke<finalHandler>(state.val_);

state.completed_ = true;

state.adjust_message_ = nullptr;
state.messages_.clear();

states_.erase(id);
// std::fill(state.messages_.begin(), state.messages_.end(), nullptr);

Expand Down
63 changes: 63 additions & 0 deletions src/vt/collective/reduce/allreduce/type.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
//@HEADER
// *****************************************************************************
//
// type.h
// DARMA/vt => Virtual Transport
//
// Copyright 2019-2021 National Technology & Engineering Solutions of Sandia, LLC
// (NTESS). Under the terms of Contract DE-NA0003525 with NTESS, the U.S.
// Government retains certain rights in this software.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are met:
//
// * Redistributions of source code must retain the above copyright notice,
// this list of conditions and the following disclaimer.
//
// * Redistributions in binary form must reproduce the above copyright notice,
// this list of conditions and the following disclaimer in the documentation
// and/or other materials provided with the distribution.
//
// * Neither the name of the copyright holder nor the names of its
// contributors may be used to endorse or promote products derived from this
// software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
// AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
// ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
// LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
// CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
// SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
// CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
// ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
// POSSIBILITY OF SUCH DAMAGE.
//
// Questions? Contact [email protected]
//
// *****************************************************************************
//@HEADER
*/

#if !defined INCLUDED_VT_COLLECTIVE_REDUCE_ALLREDUCE_TYPE_H
#define INCLUDED_VT_COLLECTIVE_REDUCE_ALLREDUCE_TYPE_H

#include <string>

namespace vt::collective::reduce::allreduce {

enum class ReducerType { Rabenseifner, RedursiveDoubling };

inline std::string TypeToString(ReducerType t) {
if (t == ReducerType::Rabenseifner) {
return "Rabenseifner";
} else {
return "RedursiveDoubling";
}
}

} // namespace vt::collective::reduce::allreduce

#endif /*INCLUDED_VT_COLLECTIVE_REDUCE_ALLREDUCE_TYPE_H*/
4 changes: 2 additions & 2 deletions src/vt/collective/reduce/operators/functors/plus_op.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ struct PlusOp<std::tuple<Params...>> {
}
};

#if KOKKOS_ENABLED_CHECKPOINT
#if MAGISTRATE_KOKKOS_ENABLED
template <typename T>
struct PlusOp<Kokkos::View<T*, Kokkos::HostSpace>> {
void operator()(
Expand All @@ -76,7 +76,7 @@ struct PlusOp<Kokkos::View<T*, Kokkos::HostSpace>> {
KOKKOS_LAMBDA(const int i) { v1(i) += v2(i); });
}
};
#endif
#endif // MAGISTRATE_KOKKOS_ENABLED

template <typename T>
struct PlusOp< std::vector<T> > {
Expand Down
4 changes: 2 additions & 2 deletions src/vt/objgroup/manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
//@HEADER
*/

#include "vt/configs/types/types_type.h"
#if !defined INCLUDED_VT_OBJGROUP_MANAGER_H
#define INCLUDED_VT_OBJGROUP_MANAGER_H

Expand Down Expand Up @@ -509,7 +508,8 @@ ObjGroupManager::PendingSendType allreduce(ProxyType<ObjT> proxy, Args&&... data
/// Map of object groups' labels
std::unordered_map<ObjGroupProxyType, std::string> labels_;

std::unordered_map<ObjGroupProxyType, ObjGroupProxyType> reducers_;
std::unordered_map<ObjGroupProxyType, ObjGroupProxyType> reducersRD_;
std::unordered_map<ObjGroupProxyType, ObjGroupProxyType> reducersR_;
};

}} /* end namespace vt::objgroup */
Expand Down
55 changes: 30 additions & 25 deletions src/vt/objgroup/manager.impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
#include "vt/messaging/message/smart_ptr.h"
#include "vt/collective/reduce/allreduce/rabenseifner.h"
#include "vt/collective/reduce/allreduce/recursive_doubling.h"
#include "vt/collective/reduce/allreduce/type.h"
#include <utility>
#include <array>

Expand Down Expand Up @@ -270,32 +271,36 @@ ObjGroupManager::PendingSendType ObjGroupManager::broadcast(MsgSharedPtr<MsgT> m
template <typename Reducer, typename ObjT, typename... Args>
ObjGroupManager::PendingSendType ObjGroupManager::allreduce(
ProxyType<ObjT> proxy, Args&&... data) {
using namespace vt::collective::reduce::allreduce;

auto const this_node = vt::theContext()->getNode();
auto const num_nodes = theContext()->getNumNodes();
size_t id = 0;

proxy::Proxy<Reducer> grp_proxy = {};

auto& reducers = Reducer::type_ == ReducerType::Rabenseifner ? reducersR_ : reducersRD_;
if (reducers.find(proxy.getProxy()) != reducers.end()) {
auto* obj = reinterpret_cast<Reducer*>(
objs_.at(reducers.at(proxy.getProxy()))->getPtr()
);
id = obj->generateNewId();
obj->initialize(id, std::forward<Args>(data)...);
grp_proxy = obj->proxy_;
} else {
grp_proxy = vt::theObjGroup()->makeCollective<Reducer>(
TypeToString(Reducer::type_), proxy,
num_nodes, std::forward<Args>(data)...
);
grp_proxy[this_node].get()->proxy_ = grp_proxy;
reducers[proxy.getProxy()] = grp_proxy.getProxy();
id = grp_proxy[this_node].get()->id_ - 1;
}

return PendingSendType{
theTerm()->getEpoch(), [&] {
auto const this_node = vt::theContext()->getNode();
auto const num_nodes = theContext()->getNumNodes();

proxy::Proxy<Reducer> grp_proxy = {};

if (reducers_.find(proxy.getProxy()) != reducers_.end()) {
auto* obj = reinterpret_cast<Reducer*>(
objs_.at(reducers_.at(proxy.getProxy()))->getPtr()
);
auto const id = obj->generateNewId();
obj->initialize(id, std::forward<Args>(data)...);
grp_proxy = obj->proxy_;
grp_proxy[this_node].template invoke<&Reducer::allreduce>(id);
} else {
grp_proxy = vt::theObjGroup()->makeCollective<Reducer>(
"allreduce_rabenseifner", proxy, num_nodes,
std::forward<Args>(data)...);
grp_proxy[this_node].get()->proxy_ = grp_proxy;
reducers_[proxy.getProxy()] = grp_proxy.getProxy();
grp_proxy[this_node].template invoke<&Reducer::allreduce>(
grp_proxy[this_node].get()->id_ - 1
);
}
}};
theTerm()->getEpoch(),
[&] { grp_proxy[this_node].template invoke<&Reducer::allreduce>(id); }
};
}

template <auto f, typename ObjT, template <typename Arg> class Op, typename DataT, typename... Args>
Expand Down
16 changes: 8 additions & 8 deletions tests/perf/allreduce.cc
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@
#include <vt/collective/reduce/allreduce/recursive_doubling.h>

#include <fmt-vt/core.h>
#ifdef KOKKOS_ENABLED_CHECKPOINT
#ifdef MAGISTRATE_KOKKOS_ENABLED
#include <Kokkos_Core.hpp>
#endif

Expand Down Expand Up @@ -92,13 +92,13 @@ struct NodeObj {
allreduce_done_ = true;
}

#if KOKKOS_ENABLED_CHECKPOINT
#if MAGISTRATE_KOKKOS_ENABLED
template <typename Scalar>
void handlerView(Kokkos::View<Scalar*, Kokkos::HostSpace> view) {
test_obj_->StopTimer(timer_names_.at(view.extent(0)));
allreduce_done_ = true;
}
#endif // KOKKOS_ENABLED_CHECKPOINT
#endif // MAGISTRATE_KOKKOS_ENABLED


std::string base_name_ = {};
Expand Down Expand Up @@ -126,7 +126,7 @@ VT_PERF_TEST(MyTest, test_reduce) {
}


#if KOKKOS_ENABLED_CHECKPOINT
#if MAGISTRATE_KOKKOS_ENABLED

struct MyTestKokkos : PerfTestHarness {
MyTestKokkos() {
Expand Down Expand Up @@ -182,7 +182,7 @@ VT_PERF_TEST(MyTest, test_allreduce_rabenseifner) {
}
}

#if KOKKOS_ENABLED_CHECKPOINT
#if MAGISTRATE_KOKKOS_ENABLED
VT_PERF_TEST(MyTestKokkos, test_allreduce_rabenseifner_kokkos) {
auto proxy = vt::theObjGroup()->makeCollective<NodeObj<MyTestKokkos>>(
"test_allreduce_rabenseifner", this, "Rabenseifner view"
Expand All @@ -206,7 +206,7 @@ VT_PERF_TEST(MyTestKokkos, test_allreduce_rabenseifner_kokkos) {
obj_ptr->allreduce_done_ = false;
}
}
#endif // KOKKOS_ENABLED_CHECKPOINT
#endif // MAGISTRATE_KOKKOS_ENABLED

VT_PERF_TEST(MyTest, test_allreduce_recursive_doubling) {
auto proxy = vt::theObjGroup()->makeCollective<NodeObj<MyTest>>(
Expand All @@ -231,7 +231,7 @@ VT_PERF_TEST(MyTest, test_allreduce_recursive_doubling) {
}
}

#if KOKKOS_ENABLED_CHECKPOINT
#if MAGISTRATE_KOKKOS_ENABLED
VT_PERF_TEST(MyTestKokkos, test_allreduce_recursive_doubling_kokkos) {
auto proxy = vt::theObjGroup()->makeCollective<NodeObj<MyTestKokkos>>(
"test_allreduce_rabenseifner", this, "Recursive doubling view"
Expand All @@ -255,6 +255,6 @@ VT_PERF_TEST(MyTestKokkos, test_allreduce_recursive_doubling_kokkos) {
obj_ptr->allreduce_done_ = false;
}
}
#endif // KOKKOS_ENABLED_CHECKPOINT
#endif // MAGISTRATE_KOKKOS_ENABLED

VT_PERF_TEST_MAIN()
12 changes: 6 additions & 6 deletions tests/perf/common/test_harness_macros.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,9 @@

#include "test_harness_base.h"

#if KOKKOS_ENABLED_CHECKPOINT
#if MAGISTRATE_KOKKOS_ENABLED
#include "Kokkos_Core.hpp"
#endif
#endif // MAGISTRATE_KOKKOS_ENABLED

#include <vector>
#include <memory>
Expand Down Expand Up @@ -109,15 +109,15 @@ struct PerfTestRegistry{
void StructName##TestName::TestFunc()

inline void tryInitializeKokkos() {
#if KOKKOS_ENABLED_CHECKPOINT
#if MAGISTRATE_KOKKOS_ENABLED
Kokkos::initialize();
#endif
#endif // MAGISTRATE_KOKKOS_ENABLED
}

inline void tryFinalizeKokkos() {
#if KOKKOS_ENABLED_CHECKPOINT
#if MAGISTRATE_KOKKOS_ENABLED
Kokkos::finalize();
#endif
#endif // MAGISTRATE_KOKKOS_ENABLED
}

#define VT_PERF_TEST_MAIN() \
Expand Down
Loading

0 comments on commit 641c9dd

Please sign in to comment.