Skip to content

Commit

Permalink
#2240: Update allreduce perf tests to use array of payload sizes
Browse files Browse the repository at this point in the history
  • Loading branch information
JacobDomagala committed Oct 10, 2024
1 parent 5d79bd0 commit 74bea76
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 93 deletions.
18 changes: 9 additions & 9 deletions src/vt/collective/reduce/allreduce/rabenseifner.impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -267,10 +267,10 @@ void Rabenseifner<DataT, Op, ObjT, finalHandler>::scatterReduceIter() {
auto dest = (vdest < nprocs_rem_) ? vdest * 2 : vdest + nprocs_rem_;
vt_debug_print(
terse, allreduce,
"[{}] Rabenseifer Part2 (step {}): Sending to Node {} starting with idx = {} and "
"Rabenseifner Part2 (step {}): Sending to Node {} starting with idx = {} and "
"count "
"{} \n",
this_node_, scatter_step_, dest, s_index_[scatter_step_],
scatter_step_, dest, s_index_[scatter_step_],
s_count_[scatter_step_]
);

Expand Down Expand Up @@ -310,9 +310,9 @@ void Rabenseifner<DataT, Op, ObjT, finalHandler>::scatterReduceIterHandler(

vt_debug_print(
terse, allreduce,
"[{}] Rabenseifner Part2 (step {}): scatter_mask_= {} nprocs_pof2_ = {}: "
"Rabenseifner Part2 (step {}): scatter_mask_= {} nprocs_pof2_ = {}: "
"idx = {} from {}\n",
this_node_, msg->step_, scatter_mask_, nprocs_pof2_, r_index_[msg->step_],
msg->step_, scatter_mask_, nprocs_pof2_, r_index_[msg->step_],
theContext()->getFromNodeCurrentTask()
);

Expand Down Expand Up @@ -382,10 +382,10 @@ void Rabenseifner<DataT, Op, ObjT, finalHandler>::gatherIter() {

vt_debug_print(
terse, allreduce,
"[{}] Rabenseifner Part3 (step {}): Sending to Node {} starting with idx = {} and "
"Rabenseifner Part3 (step {}): Sending to Node {} starting with idx = {} and "
"count "
"{} \n",
this_node_, gather_step_, dest, r_index_[gather_step_],
gather_step_, dest, r_index_[gather_step_],
r_count_[gather_step_]
);

Expand Down Expand Up @@ -413,8 +413,8 @@ template <
void Rabenseifner<DataT, Op, ObjT, finalHandler>::gatherIterHandler(
AllreduceRbnMsg<DataT>* msg) {
vt_debug_print(
terse, allreduce, "[{}] Rabenseifner Part3 (step {}): Received idx = {} from {}\n",
this_node_, msg->step_, s_index_[msg->step_],
terse, allreduce, "Rabenseifner Part3 (step {}): Received idx = {} from {}\n",
msg->step_, s_index_[msg->step_],
theContext()->getFromNodeCurrentTask()
);

Expand Down Expand Up @@ -456,7 +456,7 @@ template <
void Rabenseifner<DataT, Op, ObjT, finalHandler>::sendToExcludedNodes() {
if (is_part_of_adjustment_group_ and is_even_) {
vt_debug_print(
terse, allreduce, "[{}] Rabenseifner Part4: Sending to Node {} \n", this_node_,
terse, allreduce, "Rabenseifner Part4: Sending to Node {} \n",
this_node_ + 1
);
proxy_[this_node_ + 1]
Expand Down
3 changes: 3 additions & 0 deletions src/vt/configs/arguments/args.cc
Original file line number Diff line number Diff line change
Expand Up @@ -816,6 +816,7 @@ void addDebugPrintArgs(CLI::App& app, AppConfig& appConfig) {
auto ddp = "Enable debug_context = \"" debug_pp(context) "\"";
auto dep = "Enable debug_epoch = \"" debug_pp(epoch) "\"";
auto dfp = "Enable debug_replay = \"" debug_pp(replay) "\"";
auto dgp = "Enable debug_allreduce = \"" debug_pp(allreduce) "\"";

auto r1 = app.add_option("--vt_debug_level", appConfig.vt_debug_level, rq);

Expand Down Expand Up @@ -853,6 +854,7 @@ void addDebugPrintArgs(CLI::App& app, AppConfig& appConfig) {
auto dd = app.add_flag("--vt_debug_context", appConfig.vt_debug_context, ddp);
auto de = app.add_flag("--vt_debug_epoch", appConfig.vt_debug_epoch, dep);
auto df = app.add_flag("--vt_debug_replay", appConfig.vt_debug_replay, dfp);
auto dg = app.add_flag("--vt_debug_allreduce", appConfig.vt_debug_allreduce, dgp);

auto debugGroup = "Debug Print Configuration (must be compile-time enabled)";
r->group(debugGroup);
Expand Down Expand Up @@ -890,6 +892,7 @@ void addDebugPrintArgs(CLI::App& app, AppConfig& appConfig) {
dd->group(debugGroup);
de->group(debugGroup);
df->group(debugGroup);
dg->group(debugGroup);

auto dbq = "Always flush VT runtime prints";
auto eb = app.add_flag("--vt_debug_print_flush", appConfig.vt_debug_print_flush, dbq);
Expand Down
137 changes: 53 additions & 84 deletions tests/perf/allreduce.cc
Original file line number Diff line number Diff line change
Expand Up @@ -58,136 +58,105 @@
using namespace vt;
using namespace vt::tests::perf::common;

static constexpr std::array<size_t, 8> const payloadSizes = {
64, 128, 2048, 16384, 32768, 524288, 1048576, 2097152};

struct MyTest : PerfTestHarness {
void SetUp() override {
PerfTestHarness::SetUp();
data.resize(1 << 16);
for (auto& val : data) {
val = theContext()->getNode() + 1;
}
MyTest() {
DisableGlobalTimer();
}

std::vector<int32_t> data;
};

struct NodeObj {
explicit NodeObj(MyTest* test_obj, const std::string& name) : test_obj_(test_obj), timer_name_(name) { }
explicit NodeObj(MyTest* test_obj, const std::string& name)
: base_name_(name),
test_obj_(test_obj) {
for (auto const payload_size : payloadSizes) {
timer_names_[payload_size] =
fmt::format("{} {}", base_name_, payload_size);
}
}

void initialize() {
proxy_ = vt::theObjGroup()->getProxy<NodeObj>(this);
// data_["Node"] = theContext()->getNode(); }
}
struct MyMsg : vt::Message { };

void recursiveDoubling(std::vector<int32_t> in) {
// std::string printer(1024, 0x0);
// printer.append(fmt::format("\n[{}]: recursiveDoubling done! ", theContext()->getNode()));

// for (int node = 0; node < theContext()->getNumNodes(); ++node) {
// if (node == theContext()->getNode()) {

// for (auto val : in) {
// printer.append(fmt::format("{} ", val));
// }

// fmt::print("{}\n", printer);

// theCollective()->barrier();
// }
// }

// fmt::print("\n");
// const auto p = theContext()->getNumNodes();
// const auto expected = (p * (p + 1)) / 2;
// for (auto val : in) {
// vtAssert(val == expected, "FAILURE!");
// }
test_obj_->StopTimer(timer_name_);
void handlerVec(std::vector<int32_t> vec) {
test_obj_->StopTimer(timer_names_.at(vec.size()));
}

void newReduceComplete(std::vector<int32_t> in) {
// std::string printer(1024, 0x0);
// printer.append(fmt::format("\n[{}]: allreduce_rabenseifner done! ", theContext()->getNode()));

// for (int node = 0; node < theContext()->getNumNodes(); ++node) {
// if (node == theContext()->getNode()) {

// for (auto val : in) {
// printer.append(fmt::format("{} ", val));
// }

// fmt::print("{}\n", printer);

// theCollective()->barrier();
// }
// }

// fmt::print("\n");
// const auto p = theContext()->getNumNodes();
// const auto expected = (p * (p + 1)) / 2;
// for (auto val : in) {
// vtAssert(val == expected, "FAILURE!");
// }
test_obj_->StopTimer(timer_name_);
#if KOKKOS_ENABLED_CHECKPOINT
template <typename Scalar>
void handlerView(Kokkos::View<Scalar*, Kokkos::HostSpace> view) {
test_obj_->StopTimer(timer_names_.at(view.extent(0)));
}
#endif // KOKKOS_ENABLED_CHECKPOINT

void reduceComplete(std::vector<int32_t> in) {
// fmt::print(
// "[{}]: allreduce done! Results are ...\n", theContext()->getNode());
// for (auto val : in) {
// fmt::print("{} ", val);
// }

// fmt::print("\n");
test_obj_->StopTimer(timer_name_);
}

std::string timer_name_ = {};
std::string base_name_ = {};
std::unordered_map<size_t, std::string> timer_names_= {};
MyTest* test_obj_ = nullptr;
vt::objgroup::proxy::Proxy<NodeObj> proxy_ = {};
};

VT_PERF_TEST(MyTest, test_reduce) {
auto grp_proxy =
vt::theObjGroup()->makeCollective<NodeObj>("test_allreduce", this, "Reduce -> Bcast");
vt::theObjGroup()->makeCollective<NodeObj>("test_allreduce", this, "Reduce -> Bcast vector");

for (auto payload_size : payloadSizes) {
data.resize(payload_size, theContext()->getNode() + 1);

theCollective()->barrier();

theCollective()->barrier();
StartTimer(grp_proxy[theContext()->getNode()].get()->timer_name_);
grp_proxy.allreduce<&NodeObj::reduceComplete, collective::PlusOp>(data);
StartTimer(grp_proxy[my_node_].get()->timer_names_.at(payload_size));
grp_proxy.allreduce<&NodeObj::handlerVec, collective::PlusOp>(data);
}
}

VT_PERF_TEST(MyTest, test_allreduce_rabenseifner) {
auto proxy =
vt::theObjGroup()->makeCollective<NodeObj>("test_allreduce_new", this, "Rabenseifner");
auto proxy = vt::theObjGroup()->makeCollective<NodeObj>(
"test_allreduce_rabenseifner", this, "Rabenseifner vector"
);

using DataT = decltype(data);
using Reducer = collective::reduce::allreduce::Rabenseifner<
DataT, collective::PlusOp, NodeObj, &NodeObj::newReduceComplete>;
DataT, collective::PlusOp, NodeObj, &NodeObj::handlerVec>;

auto grp_proxy = vt::theObjGroup()->makeCollective<Reducer>(
"allreduce_rabenseifner", proxy, num_nodes_, data);
grp_proxy[my_node_].get()->proxy_ = grp_proxy;

theCollective()->barrier();
StartTimer(proxy[theContext()->getNode()].get()->timer_name_);
grp_proxy[my_node_].template invoke<&Reducer::allreduce>();
for (auto payload_size : payloadSizes) {
data.resize(payload_size, theContext()->getNode() + 1);

theCollective()->barrier();
StartTimer(proxy[my_node_].get()->timer_names_.at(payload_size));
grp_proxy[my_node_].template invoke<&Reducer::allreduce>();
}
}

VT_PERF_TEST(MyTest, test_allreduce_recursive_doubling) {
auto proxy =
vt::theObjGroup()->makeCollective<NodeObj>("test_allreduce_new_2", this, "Recursive doubling");
auto proxy = vt::theObjGroup()->makeCollective<NodeObj>(
"test_allreduce_recursive_doubling", this, "Recursive doubling vector"
);

using DataT = decltype(data);
using Reducer = collective::reduce::allreduce::RecursiveDoubling<
DataT, collective::PlusOp, NodeObj, &NodeObj::recursiveDoubling>;
DataT, collective::PlusOp, NodeObj, &NodeObj::handlerVec>;

auto grp_proxy = vt::theObjGroup()->makeCollective<Reducer>(
"allreduce_recursive_doubling", proxy, num_nodes_, data);
grp_proxy[my_node_].get()->proxy_ = grp_proxy;

theCollective()->barrier();
StartTimer(proxy[theContext()->getNode()].get()->timer_name_);
grp_proxy[my_node_].template invoke<&Reducer::allreduce>();
for (auto payload_size : payloadSizes) {
data.resize(payload_size, theContext()->getNode() + 1);

theCollective()->barrier();
StartTimer(proxy[my_node_].get()->timer_names_.at(payload_size));
grp_proxy[my_node_].template invoke<&Reducer::allreduce>();
}
}

VT_PERF_TEST_MAIN()

0 comments on commit 74bea76

Please sign in to comment.