-
Notifications
You must be signed in to change notification settings - Fork 864
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
coll/base,tuned: introduce allgather_reduce allreduce algorithm #11871
Conversation
@hppritcha In case you are interested, you can try it out with the mca settings:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
First round of review.
tmp_recv = (char *) tmp_buf; | ||
|
||
int req_index = 0; | ||
for (int peer_rank = 0; peer_rank < comm_size; peer_rank++) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
An algorithmic comment:
With this loop rank 0 gets hammered by all processes at the same time in the beginning, then the load moves in sync to all ranks sending to 1, etc. I wonder if distributing the load would help at all. My idea would be to start each rank at a different offset, then wrap around during the loop.
I don't know if doing such a modulo would help though, since presumably these isends are helping to distribute the load by being non-blocking (ie, perhaps I should consider them to be sent simultaneously)
Edit: I forgot this is primarily targeted at intra. In that case this matters even less. Disregard.
Edit2: Wait, the collective name has intra in it, but I think it's really targeting inter.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. Let me experiment with load balancing tricks and see if that makes a difference.
But practically this algo should only be used in smallish communicators, I imagine load balancing will make more sense at large scales.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In general we solve this by starting each process from rank + 1
and iterating over all processes (with a modulo until the peer become == rank
).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@bosilca Thanks for the pointer I have updated the code accordingly. Reran benchmarks but did not see difference on performance(32 nodes).
char *inbuf; | ||
for (int peer_rank = 0; peer_rank < comm_size; peer_rank++) { | ||
inbuf = tmp_buf + (peer_rank * incr); | ||
if (0 == peer_rank && !commutative) { | ||
/* Sort the data buffer for non-commutative operations */ | ||
memcpy(rbuf, inbuf, incr); | ||
continue; | ||
} | ||
ompi_op_reduce(op, (void *) inbuf, rbuf, count, dtype); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm confused about this part. It seems that the if statement can only be hit on the first iteration of the loop (when peer_rank is 0) and its only effect is to prime the loop to start the reduction with rbuf, however isn't this required for both commutative and non-commutative operations? Also, this isn't sorting the data buffer, you've already collected the data in sorted order above, so I think the comment is misleading.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is to ensure the order of reduction is the same across ranks. For example:
- Rank 0 sends 1.11
- Rank 1 sends 2.22
- Rank 3 sends 3.33
- ...
If we want to do MPI_SUM
which is non-commutative, we need to ensure the order 1.11 + 2.22 + 3.33 + ...
Without L1333, we have something different:
- Rank 0 does
1.11 + 2.22 + 3.33 + ...
- Rank 1 does
2.22 + 1.11 + 3.33 + ...
- Rank 2 does
3.33 + 1.11 + 2.22 + ...
- ...
This means every rank could potentially get different result.
But I sense there might be a more elegant/efficient way to ensure this. I'm open to better ideas.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But after wait_all and because all your recv's put their results in the buffer in the correct order, there is no ordering left to resolve... The buffer is ordered by peer rank already, or did I miss something?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The problem is on ompi_op_reduce(op, inbuf, rbuf, count, dtype)
, i.e.
Perform a reduction operation with count elements of type dtype in
the buffers source and target. The target buffer obtains the
result (i.e., the original values in the target buffer are reduced
with the values in the source buffer and the result is stored in
the target buffer).
On L1292 rbuf
is populated with send buff data, so this means the reduction order will be different for each rank.
be21c80
to
911e885
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The cost of this is sequential algorithm is O((P-1) * (M+R))
, where P is the number of participating processes, M is the latency of a message and R is the cost of the operator for each fragment. This algorithm minimize the potential overlap, if we assume each local send/recv reaches a significant margin of the memory bandwidth. Also, becasue you separate the data movement and use, you will have less benefit from reuse, which will also increase the memory traffic of your algorithm. Thus, my question is, how much shall the latency be in order for this algorithm to be beneficial compared with the usual non-linear algorithms for the reduction ?
Second, one must be careful when this is used as it demands a significant amount of memory (up to p ^2 per node). The selection criteria must be very strict.
tmp_recv = (char *) tmp_buf; | ||
|
||
int req_index = 0; | ||
for (int peer_rank = 0; peer_rank < comm_size; peer_rank++) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In general we solve this by starting each process from rank + 1
and iterating over all processes (with a modulo until the peer become == rank
).
7fe9eb5
to
e1f0823
Compare
@bosilca I agree with you. As you correctly pointed out, the key is to be very strict about when this algorithm can be applied. Contrasting it with the existing algorithms, it likely consumes more network bandwidth because of the
I think you meant p^2 for all nodes, i.e. p for each node. So overall the benefit is more pronounced for slower interconnects with high bandwidth. For EFA the nominal round-trip latency is ~15 us, while IB might achieve 1.2us - as long as we keep the communicator*message small enough, there should be a latency reduction up to half a round-trip. |
This patch introduces a new allreduce algorithm implemented as an allgather followed by local reduction. The change is motivated by the longer latency of tcp/EFA traffic. Current allreduce algorithms require a round trip to and from a selected root process. This algorithm avoids the round trip over network and therefore reduces total latency. However, this communication pattern is not scalable for large communicators, and should only be used for inter-node allreduce. Co-authored-by: Matt Koop <[email protected]> Co-authored-by: Wenduo Wang <[email protected]> Signed-off-by: Wenduo Wang <[email protected]>
e1f0823
to
5a7f814
Compare
This patch introduces a new allreduce algorithm implemented as an allgather followed by local reduction.
The change is motivated by the longer latency of tcp/EFA traffic. Current allreduce algorithms require a round trip to and from a selected root process. This algorithm avoids the round trip over network and therefore reduces total latency.
However, this communication pattern is not scalable for large communicators, and should only be used for inter-node allreduce.