diff --git a/Microsoft.Azure.Amqp/Amqp/AmqpLink.cs b/Microsoft.Azure.Amqp/Amqp/AmqpLink.cs index 85aeeb6a..3e8a8407 100644 --- a/Microsoft.Azure.Amqp/Amqp/AmqpLink.cs +++ b/Microsoft.Azure.Amqp/Amqp/AmqpLink.cs @@ -50,6 +50,8 @@ protected AmqpLink(string type, AmqpSession session, AmqpLinkSettings linkSettin this.syncRoot = new object(); this.settings = linkSettings; this.linkCredit = this.settings.TotalLinkCredit; + this.DefaultOpenTimeout = this.settings.OperationTimeout; + this.DefaultCloseTimeout = this.settings.OperationTimeout; Source source = (Source)this.settings.Source; if (source != null) diff --git a/Microsoft.Azure.Amqp/Amqp/SendingAmqpLink.cs b/Microsoft.Azure.Amqp/Amqp/SendingAmqpLink.cs index b5bd7800..178f2582 100644 --- a/Microsoft.Azure.Amqp/Amqp/SendingAmqpLink.cs +++ b/Microsoft.Azure.Amqp/Amqp/SendingAmqpLink.cs @@ -89,7 +89,7 @@ public IAsyncResult BeginSendMessage( return this.BeginSendMessage(message, deliveryTag, txnId, timeout, CancellationToken.None, callback, state); } - IAsyncResult BeginSendMessage( + internal IAsyncResult BeginSendMessage( AmqpMessage message, ArraySegment deliveryTag, ArraySegment txnId, diff --git a/Microsoft.Azure.Amqp/Amqp/Transaction/Controller.cs b/Microsoft.Azure.Amqp/Amqp/Transaction/Controller.cs index 5ece5cd8..101bf29e 100644 --- a/Microsoft.Azure.Amqp/Amqp/Transaction/Controller.cs +++ b/Microsoft.Azure.Amqp/Amqp/Transaction/Controller.cs @@ -38,22 +38,39 @@ public Controller(AmqpSession amqpSession, TimeSpan operationTimeout) this.controllerLink = new SendingAmqpLink(amqpSession, settings); } - public async Task> DeclareAsync() + public Task> DeclareAsync() + { + return this.DeclareAsync(CancellationToken.None); + } + + public async Task> DeclareAsync(CancellationToken cancellationToken) { AmqpTrace.Provider.AmqpLogOperationInformational(this, TraceOperation.Execute, "BeginDeclare"); Declare declare = new Declare(); AmqpMessage message = Controller.CreateCommandMessage(declare); DeliveryState deliveryState = await Task.Factory.FromAsync( - this.controllerLink.BeginSendMessage(message, this.GetDeliveryTag(), AmqpConstants.NullBinary, this.operationTimeout, null, null), - this.controllerLink.EndSendMessage); + (m, k, c, s) => + { + var thisPtr = (Controller)s; + return thisPtr.controllerLink.BeginSendMessage(m, thisPtr.GetDeliveryTag(), AmqpConstants.NullBinary, thisPtr.operationTimeout, k, c, s); + }, + r => ((Controller)r.AsyncState).controllerLink.EndSendMessage(r), + message, + cancellationToken, + this); this.ThrowIfRejected(deliveryState); AmqpTrace.Provider.AmqpLogOperationInformational(this, TraceOperation.Execute, "EndDeclare"); return ((Declared)deliveryState).TxnId; } - public async Task DischargeAsync(ArraySegment txnId, bool fail) + public Task DischargeAsync(ArraySegment txnId, bool fail) + { + return this.DischargeAsync(txnId, fail, CancellationToken.None); + } + + public async Task DischargeAsync(ArraySegment txnId, bool fail, CancellationToken cancellationToken) { AmqpTrace.Provider.AmqpLogOperationInformational(this, TraceOperation.Execute, "BeginDischange"); Discharge discharge = new Discharge @@ -64,8 +81,16 @@ public async Task DischargeAsync(ArraySegment txnId, bool fail) AmqpMessage message = Controller.CreateCommandMessage(discharge); DeliveryState deliveryState = await Task.Factory.FromAsync( - this.controllerLink.BeginSendMessage(message, this.GetDeliveryTag(), AmqpConstants.NullBinary, this.operationTimeout, null, null), - this.controllerLink.EndSendMessage); + (m, k, c, s) => + { + var thisPtr = (Controller)s; + return thisPtr.controllerLink.BeginSendMessage(m, thisPtr.GetDeliveryTag(), AmqpConstants.NullBinary, thisPtr.operationTimeout, k, c, s); + }, + r => ((Controller)r.AsyncState).controllerLink.EndSendMessage(r), + message, + cancellationToken, + this); + this.ThrowIfRejected(deliveryState); AmqpTrace.Provider.AmqpLogOperationInformational(this, TraceOperation.Execute, "EndDischange"); }