-
Notifications
You must be signed in to change notification settings - Fork 72
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
DrainAsync does not wait for messages to be released #252
Comments
That's beyond the purpose of the DrainAsync API which is to use up the credits and stop the message flow afterwards. It does not change the behavior of inflight messages before the drain completes. To satisfy your requirement, we would need to introduce a new API like the following: |
The desired API wouldn't have to return the messages. It could just release the received messages. Something like DrainAndReleaseAsync. But we could also probably implement the suggested workaround in the Service Bus library. |
Related PR - Azure/azure-sdk-for-net#40457 |
@xinchen10 I tried replacing our call to using var backgroundCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
var drainTask = link.DrainAsyc(cancellationToken).ConfigureAwait(false);
List<AmqpMessage> additionalMessages = new();
try
{
_ = Task.Run(async () =>
{
while (!backgroundCts.Token.IsCancellationRequested)
{
var additionalMessage = await link.ReceiveMessageAsync(maxWaitTime ?? timeout, cancellationToken).ConfigureAwait(false);
additionalMessages.Add(additionalMessage);
}
}, cancellationToken);
await drainTask;
backgroundCts.Cancel();
}
// add additional messages to messages to return However, I haven't been able to do so in a way that would guarantee the receive call would not exceed the drain call and basically just undo credits being drained. The FIFO in sessions ordering issue is worse with this approach - and I often see Service Bus session lock lost exceptions. Is this the approach what you had in mind? I wonder if the drain and release API needs some transparency into the lower-level information - whether that means implementing on the AMQP side or using some information already exposed to better inform if we should do another receive call. |
@xinchen10 mentioned that DrainAsync can be updated to reset the total link credits to 0 so that it works correctly when prefetch is enabled. Currently, we have to do the following to resume prefetching after draining: link.Settings.TotalLinkCredit = 0;
link.SetTotalLinkCredit((uint)_prefetchCount, true, true); |
The DrainAsync method on ReceivingAmqpLink does not wait for any received messages that will end up being released to actually be released. Messages will be released here if there is no active receive call. When draining, it would be better if this "cleanup" step could be awaited so that draining would provide deterministic behavior for consumers. In the Service Bus library, we rely on DrainAsync to ensure FIFO ordering of sessions.
The text was updated successfully, but these errors were encountered: