Skip to content

Commit

Permalink
Cancellation token for txn controller.
Browse files Browse the repository at this point in the history
Link open/close timeout from settings.
  • Loading branch information
xinchen10 committed Aug 18, 2021
1 parent 5c499b9 commit bff6105
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 7 deletions.
2 changes: 2 additions & 0 deletions Microsoft.Azure.Amqp/Amqp/AmqpLink.cs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ protected AmqpLink(string type, AmqpSession session, AmqpLinkSettings linkSettin
this.syncRoot = new object();
this.settings = linkSettings;
this.linkCredit = this.settings.TotalLinkCredit;
this.DefaultOpenTimeout = this.settings.OperationTimeout;
this.DefaultCloseTimeout = this.settings.OperationTimeout;

Source source = (Source)this.settings.Source;
if (source != null)
Expand Down
2 changes: 1 addition & 1 deletion Microsoft.Azure.Amqp/Amqp/SendingAmqpLink.cs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ public IAsyncResult BeginSendMessage(
return this.BeginSendMessage(message, deliveryTag, txnId, timeout, CancellationToken.None, callback, state);
}

IAsyncResult BeginSendMessage(
internal IAsyncResult BeginSendMessage(
AmqpMessage message,
ArraySegment<byte> deliveryTag,
ArraySegment<byte> txnId,
Expand Down
37 changes: 31 additions & 6 deletions Microsoft.Azure.Amqp/Amqp/Transaction/Controller.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,22 +38,39 @@ public Controller(AmqpSession amqpSession, TimeSpan operationTimeout)
this.controllerLink = new SendingAmqpLink(amqpSession, settings);
}

public async Task<ArraySegment<byte>> DeclareAsync()
public Task<ArraySegment<byte>> DeclareAsync()
{
return this.DeclareAsync(CancellationToken.None);
}

public async Task<ArraySegment<byte>> DeclareAsync(CancellationToken cancellationToken)
{
AmqpTrace.Provider.AmqpLogOperationInformational(this, TraceOperation.Execute, "BeginDeclare");
Declare declare = new Declare();

AmqpMessage message = Controller.CreateCommandMessage(declare);
DeliveryState deliveryState = await Task<DeliveryState>.Factory.FromAsync(
this.controllerLink.BeginSendMessage(message, this.GetDeliveryTag(), AmqpConstants.NullBinary, this.operationTimeout, null, null),
this.controllerLink.EndSendMessage);
(m, k, c, s) =>
{
var thisPtr = (Controller)s;
return thisPtr.controllerLink.BeginSendMessage(m, thisPtr.GetDeliveryTag(), AmqpConstants.NullBinary, thisPtr.operationTimeout, k, c, s);
},
r => ((Controller)r.AsyncState).controllerLink.EndSendMessage(r),
message,
cancellationToken,
this);

this.ThrowIfRejected(deliveryState);
AmqpTrace.Provider.AmqpLogOperationInformational(this, TraceOperation.Execute, "EndDeclare");
return ((Declared)deliveryState).TxnId;
}

public async Task DischargeAsync(ArraySegment<byte> txnId, bool fail)
public Task DischargeAsync(ArraySegment<byte> txnId, bool fail)
{
return this.DischargeAsync(txnId, fail, CancellationToken.None);
}

public async Task DischargeAsync(ArraySegment<byte> txnId, bool fail, CancellationToken cancellationToken)
{
AmqpTrace.Provider.AmqpLogOperationInformational(this, TraceOperation.Execute, "BeginDischange");
Discharge discharge = new Discharge
Expand All @@ -64,8 +81,16 @@ public async Task DischargeAsync(ArraySegment<byte> txnId, bool fail)

AmqpMessage message = Controller.CreateCommandMessage(discharge);
DeliveryState deliveryState = await Task<DeliveryState>.Factory.FromAsync(
this.controllerLink.BeginSendMessage(message, this.GetDeliveryTag(), AmqpConstants.NullBinary, this.operationTimeout, null, null),
this.controllerLink.EndSendMessage);
(m, k, c, s) =>
{
var thisPtr = (Controller)s;
return thisPtr.controllerLink.BeginSendMessage(m, thisPtr.GetDeliveryTag(), AmqpConstants.NullBinary, thisPtr.operationTimeout, k, c, s);
},
r => ((Controller)r.AsyncState).controllerLink.EndSendMessage(r),
message,
cancellationToken,
this);

this.ThrowIfRejected(deliveryState);
AmqpTrace.Provider.AmqpLogOperationInformational(this, TraceOperation.Execute, "EndDischange");
}
Expand Down

0 comments on commit bff6105

Please sign in to comment.