Skip to content

Commit

Permalink
Add link default operation timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
xinchen10 committed Jul 15, 2021
1 parent ba792e8 commit fa1b2be
Show file tree
Hide file tree
Showing 5 changed files with 50 additions and 26 deletions.
10 changes: 10 additions & 0 deletions Microsoft.Azure.Amqp/Amqp/AmqpLinkSettings.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ public sealed class AmqpLinkSettings : Attach

public AmqpLinkSettings()
{
this.OperationTimeout = AmqpConstants.DefaultTimeout;
}

public uint TotalLinkCredit
Expand Down Expand Up @@ -71,6 +72,15 @@ public SettleMode SettleType
}
}

/// <summary>
/// Gets or sets the default operation timeout when not explicitly specified in an API.
/// </summary>
public TimeSpan OperationTimeout
{
get;
set;
}

public static AmqpLinkSettings Create(Attach attach)
{
AmqpLinkSettings settings = new AmqpLinkSettings();
Expand Down
4 changes: 2 additions & 2 deletions Microsoft.Azure.Amqp/Amqp/AmqpObject.cs
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ public Task OpenAsync(CancellationToken cancellationToken)
return Task.Factory.FromAsync(
(t, k, c, s) => ((AmqpObject)s).BeginOpen(t, k, c, s),
r => ((AmqpObject)r.AsyncState).EndOpen(r),
TimeSpan.MaxValue,
this.DefaultOpenTimeout,
cancellationToken,
this);
}
Expand All @@ -188,7 +188,7 @@ public Task CloseAsync(CancellationToken cancellationToken)
return Task.Factory.FromAsync(
(t, k, c, s) => ((AmqpObject) s).BeginClose(t, k, c, s),
r => ((AmqpObject) r.AsyncState).EndClose(r),
TimeSpan.MaxValue,
this.DefaultCloseTimeout,
cancellationToken,
this);
}
Expand Down
12 changes: 7 additions & 5 deletions Microsoft.Azure.Amqp/Amqp/ReceivingAmqpLink.cs
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ public Task<AmqpMessage> ReceiveMessageAsync(TimeSpan timeout)

public Task<AmqpMessage> ReceiveMessageAsync(CancellationToken cancellationToken)
{
return this.ReceiveMessageAsync(TimeSpan.MaxValue, cancellationToken);
return this.ReceiveMessageAsync(this.Settings.OperationTimeout, cancellationToken);
}

public Task<AmqpMessage> ReceiveMessageAsync(TimeSpan timeout, CancellationToken cancellationToken)
Expand All @@ -144,12 +144,12 @@ public Task<AmqpMessage> ReceiveMessageAsync(TimeSpan timeout, CancellationToken

public Task<IEnumerable<AmqpMessage>> ReceiveMessagesAsync(int messageCount, TimeSpan batchWaitTimeout)
{
return this.ReceiveMessagesAsync(messageCount, batchWaitTimeout, AmqpConstants.DefaultTimeout, CancellationToken.None);
return this.ReceiveMessagesAsync(messageCount, batchWaitTimeout, this.Settings.OperationTimeout, CancellationToken.None);
}

public Task<IEnumerable<AmqpMessage>> ReceiveMessagesAsync(int messageCount, TimeSpan batchWaitTimeout, CancellationToken cancellationToken)
{
return this.ReceiveMessagesAsync(messageCount, batchWaitTimeout, TimeSpan.MaxValue, cancellationToken);
return this.ReceiveMessagesAsync(messageCount, batchWaitTimeout, this.Settings.OperationTimeout, cancellationToken);
}

public Task<IEnumerable<AmqpMessage>> ReceiveMessagesAsync(int messageCount, TimeSpan batchWaitTimeout, TimeSpan timeout, CancellationToken cancellationToken)
Expand Down Expand Up @@ -278,10 +278,11 @@ public Task<Outcome> DisposeMessageAsync(ArraySegment<byte> deliveryTag, Outcome
public Task<Outcome> DisposeMessageAsync(ArraySegment<byte> deliveryTag, ArraySegment<byte> txnId, Outcome outcome, bool batchable, TimeSpan timeout)
{
return Task.Factory.FromAsync(
(p, t, c, s) => ((ReceivingAmqpLink)s).BeginDisposeMessage(p.DeliveryTag, p.TxnId, p.Outcome, p.Batchable, t, CancellationToken.None, c, s),
(p, t, k, c, s) => ((ReceivingAmqpLink)s).BeginDisposeMessage(p.DeliveryTag, p.TxnId, p.Outcome, p.Batchable, t, k, c, s),
r => ((ReceivingAmqpLink)r.AsyncState).EndDisposeMessage(r),
new DisposeParam(deliveryTag, txnId, outcome, batchable),
timeout,
CancellationToken.None,
this);
}

Expand All @@ -293,9 +294,10 @@ public Task<Outcome> DisposeMessageAsync(ArraySegment<byte> deliveryTag, Outcome
public Task<Outcome> DisposeMessageAsync(ArraySegment<byte> deliveryTag, ArraySegment<byte> txnId, Outcome outcome, bool batchable, CancellationToken cancellationToken)
{
return Task.Factory.FromAsync(
(p, k, c, s) => ((ReceivingAmqpLink)s).BeginDisposeMessage(p.DeliveryTag, p.TxnId, p.Outcome, p.Batchable, TimeSpan.MaxValue, k, c, s),
(p, t, k, c, s) => ((ReceivingAmqpLink)s).BeginDisposeMessage(p.DeliveryTag, p.TxnId, p.Outcome, p.Batchable, t, k, c, s),
r => ((ReceivingAmqpLink)r.AsyncState).EndDisposeMessage(r),
new DisposeParam(deliveryTag, txnId, outcome, batchable),
this.Settings.OperationTimeout,
cancellationToken,
this);
}
Expand Down
2 changes: 1 addition & 1 deletion Microsoft.Azure.Amqp/Amqp/RequestResponseAmqpLink.cs
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ public Task<AmqpMessage> RequestAsync(AmqpMessage request, CancellationToken can
(r, t, k, c, s) => new RequestAsyncResult((RequestResponseAmqpLink)s, r, AmqpConstants.NullBinary, t, k, c, s),
(r) => RequestAsyncResult.End(r),
request,
TimeSpan.MaxValue,
this.sender.Settings.OperationTimeout,
cancellationToken,
this);
}
Expand Down
48 changes: 30 additions & 18 deletions Microsoft.Azure.Amqp/Amqp/SendingAmqpLink.cs
Original file line number Diff line number Diff line change
Expand Up @@ -56,28 +56,26 @@ public void SendMessageNoWait(
this.SendMessageInternal(message, deliveryTag, txnId);
}

public Task<Outcome> SendMessageAsync(
AmqpMessage message,
ArraySegment<byte> deliveryTag,
ArraySegment<byte> txnId,
TimeSpan timeout)
public Task<Outcome> SendMessageAsync(AmqpMessage message, ArraySegment<byte> deliveryTag, ArraySegment<byte> txnId, TimeSpan timeout)
{
return Task.Factory.FromAsync<Outcome>(
(c,s) => this.BeginSendMessage(message, deliveryTag, txnId, timeout, c, s),
this.EndSendMessage,
null);
return Task.Factory.FromAsync(
(p, t, k, c, s) => ((SendingAmqpLink)s).BeginSendMessage(p.Message, p.DeliveryTag, p.TxnId, t, k, c, s),
r => ((SendingAmqpLink)r.AsyncState).EndSendMessage(r),
new SendMessageParam(message, deliveryTag, txnId),
timeout,
CancellationToken.None,
this);
}

public Task<Outcome> SendMessageAsync(
AmqpMessage message,
ArraySegment<byte> deliveryTag,
ArraySegment<byte> txnId,
CancellationToken cancellationToken)
public Task<Outcome> SendMessageAsync(AmqpMessage message, ArraySegment<byte> deliveryTag, ArraySegment<byte> txnId, CancellationToken cancellationToken)
{
return Task.Factory.FromAsync<Outcome>(
(c, s) => this.BeginSendMessage(message, deliveryTag, txnId, TimeSpan.MaxValue, cancellationToken, c, s),
this.EndSendMessage,
null);
return Task.Factory.FromAsync(
(p, t, k, c, s) => ((SendingAmqpLink)s).BeginSendMessage(p.Message, p.DeliveryTag, p.TxnId, t, k, c, s),
r => ((SendingAmqpLink)r.AsyncState).EndSendMessage(r),
new SendMessageParam(message, deliveryTag, txnId),
this.Settings.OperationTimeout,
cancellationToken,
this);
}

public IAsyncResult BeginSendMessage(
Expand Down Expand Up @@ -251,6 +249,20 @@ bool IWorkDelegate<AmqpMessage>.Invoke(AmqpMessage message)
return success;
}

struct SendMessageParam
{
public SendMessageParam(AmqpMessage message, ArraySegment<byte> deliveryTag, ArraySegment<byte> txnId)
{
this.Message = message;
this.DeliveryTag = deliveryTag;
this.TxnId = txnId;
}

public readonly AmqpMessage Message;
public readonly ArraySegment<byte> DeliveryTag;
public readonly ArraySegment<byte> TxnId;
}

sealed class SendAsyncResult : TimeoutAsyncResult<string>, IWork<Outcome>
{
readonly SendingAmqpLink link;
Expand Down

0 comments on commit fa1b2be

Please sign in to comment.