Skip to content

Commit

Permalink
Merge pull request #101 from Azure/develop
Browse files Browse the repository at this point in the history
Changes to how amqp transactions are handled. (#100)
  • Loading branch information
xinchen10 authored Feb 14, 2018
2 parents 0090113 + 22db896 commit 38b170b
Show file tree
Hide file tree
Showing 9 changed files with 154 additions and 93 deletions.
2 changes: 2 additions & 0 deletions Microsoft.Azure.Amqp/Amqp/AmqpLink.cs
Original file line number Diff line number Diff line change
Expand Up @@ -610,6 +610,7 @@ protected bool SendDelivery(Delivery delivery)
transfer.DeliveryTag = delivery.DeliveryTag;
transfer.MessageFormat = delivery.MessageFormat ?? AmqpConstants.AmqpMessageFormat;
transfer.Batchable = delivery.Batchable;
transfer.State = delivery.State;
if (delivery.Settled)
{
transfer.Settled = true;
Expand Down Expand Up @@ -879,6 +880,7 @@ void OnReceiveTransfer(Transfer transfer, Frame rawFrame)
delivery.Settled = transfer.Settled();
delivery.Batchable = transfer.Batchable();
delivery.MessageFormat = transfer.MessageFormat;
delivery.State = transfer.State;
TransactionalState txnState = transfer.State as TransactionalState;
if (txnState != null)
{
Expand Down
66 changes: 52 additions & 14 deletions Microsoft.Azure.Amqp/Amqp/ReceivingAmqpLink.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public sealed class ReceivingAmqpLink : AmqpLink

Action<AmqpMessage> messageListener;
SizeBasedFlowQueue messageQueue;
WorkCollection<ArraySegment<byte>, DisposeAsyncResult, Outcome> pendingDispositions;
WorkCollection<ArraySegment<byte>, DisposeAsyncResult, DeliveryState> pendingDispositions;
AmqpMessage currentMessage;
LinkedList<ReceiveAsyncResult> waiterList;

Expand Down Expand Up @@ -241,17 +241,27 @@ public bool EndReceiveMessages(IAsyncResult result, out IEnumerable<AmqpMessage>
}

public Task<Outcome> DisposeMessageAsync(ArraySegment<byte> deliveryTag, Outcome outcome, bool batchable, TimeSpan timeout)
{
return this.DisposeMessageAsync(deliveryTag, AmqpConstants.NullBinary, outcome, batchable, timeout);
}

public Task<Outcome> DisposeMessageAsync(ArraySegment<byte> deliveryTag, ArraySegment<byte> txnId, Outcome outcome, bool batchable, TimeSpan timeout)
{
return TaskHelpers.CreateTask(
(c, s) => this.BeginDisposeMessage(deliveryTag, outcome, batchable, timeout, c, s),
(c, s) => this.BeginDisposeMessage(deliveryTag, txnId, outcome, batchable, timeout, c, s),
a => ((ReceivingAmqpLink)a.AsyncState).EndDisposeMessage(a),
this);
}

public IAsyncResult BeginDisposeMessage(ArraySegment<byte> deliveryTag, Outcome outcome, bool batchable, TimeSpan timeout, AsyncCallback callback, object state)
{
return this.BeginDisposeMessage(deliveryTag, AmqpConstants.NullBinary, outcome, batchable, timeout, callback, state);
}

public IAsyncResult BeginDisposeMessage(ArraySegment<byte> deliveryTag, ArraySegment<byte> txnId, Outcome outcome, bool batchable, TimeSpan timeout, AsyncCallback callback, object state)
{
this.ThrowIfClosed();
return new DisposeAsyncResult(this, deliveryTag, outcome, batchable, timeout, callback, state);
return new DisposeAsyncResult(this, deliveryTag, txnId, outcome, batchable, timeout, callback, state);
}

public Outcome EndDisposeMessage(IAsyncResult result)
Expand Down Expand Up @@ -318,7 +328,7 @@ protected override bool OpenInternal()
{
this.messageQueue = new SizeBasedFlowQueue(this);
this.waiterList = new LinkedList<ReceiveAsyncResult>();
this.pendingDispositions = new WorkCollection<ArraySegment<byte>, DisposeAsyncResult, Outcome>(ByteArrayComparer.Instance);
this.pendingDispositions = new WorkCollection<ArraySegment<byte>, DisposeAsyncResult, DeliveryState>(ByteArrayComparer.Instance);
bool syncComplete = base.OpenInternal();
if (this.LinkCredit > 0)
{
Expand All @@ -334,14 +344,10 @@ protected override void OnDisposeDeliveryInternal(Delivery delivery)
// in the EO delivery scenario, and also in transaction case.
AmqpTrace.Provider.AmqpDispose(this, delivery.DeliveryId.Value, delivery.Settled, delivery.State);
DeliveryState deliveryState = delivery.State;
if (delivery.Transactional())
{
deliveryState = ((TransactionalState)delivery.State).Outcome;
}

if (deliveryState != null)
{
this.pendingDispositions.CompleteWork(delivery.DeliveryTag, false, (Outcome)deliveryState);
this.pendingDispositions.CompleteWork(delivery.DeliveryTag, false, deliveryState);
}
}

Expand Down Expand Up @@ -735,16 +741,18 @@ static void OnTimer(object state)
}
}

sealed class DisposeAsyncResult : AsyncResult, IWork<Outcome>
sealed class DisposeAsyncResult : AsyncResult, IWork<DeliveryState>
{
readonly ReceivingAmqpLink link;
readonly ArraySegment<byte> deliveryTag;
readonly bool batchable;
Outcome outcome;
ArraySegment<byte> txnId;

public DisposeAsyncResult(
ReceivingAmqpLink link,
ArraySegment<byte> deliveryTag,
ArraySegment<byte> deliveryTag,
ArraySegment<byte> txnId,
Outcome outcome,
bool batchable,
TimeSpan timeout,
Expand All @@ -756,6 +764,7 @@ public DisposeAsyncResult(
this.deliveryTag = deliveryTag;
this.batchable = batchable;
this.outcome = outcome;
this.txnId = txnId;
this.link.pendingDispositions.StartWork(deliveryTag, this);
}

Expand All @@ -766,16 +775,45 @@ public static Outcome End(IAsyncResult result)

public void Start()
{
if (!link.DisposeDelivery(deliveryTag, false, outcome, batchable))
DeliveryState deliveryState;
if (txnId.Array != null)
{
deliveryState = new TransactionalState()
{
Outcome = this.outcome,
TxnId = this.txnId
};
}
else
{
deliveryState = this.outcome;
}

if (!link.DisposeDelivery(deliveryTag, false, deliveryState, batchable))
{
// Delivery tag not found
link.pendingDispositions.CompleteWork(deliveryTag, true, AmqpConstants.RejectedNotFoundOutcome);
}
}

public void Done(bool completedSynchronously, Outcome outcome)
public void Done(bool completedSynchronously, DeliveryState state)
{
this.outcome = outcome;
if (state is Outcome outcome)
{
this.outcome = outcome;
}
else
{
if (state is TransactionalState transactionalState)
{
this.outcome = transactionalState.Outcome;
}
else
{
this.Complete(completedSynchronously, new AmqpException(AmqpErrorCode.IllegalState, $"DeliveryState '{state.GetType()}' is not valid for disposition."));
}
}

this.Complete(completedSynchronously);
}

Expand Down
14 changes: 10 additions & 4 deletions Microsoft.Azure.Amqp/Amqp/RequestResponseAmqpLink.cs
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,12 @@ public Task<AmqpMessage> RequestAsync(AmqpMessage request, TimeSpan timeout)

public IAsyncResult BeginRequest(AmqpMessage request, TimeSpan timeout, AsyncCallback callback, object state)
{
return new RequestAsyncResult(this, request, timeout, callback, state);
return this.BeginRequest(request, AmqpConstants.NullBinary, timeout, callback, state);
}

public IAsyncResult BeginRequest(AmqpMessage request, ArraySegment<byte> txnId, TimeSpan timeout, AsyncCallback callback, object state)
{
return new RequestAsyncResult(this, request, txnId, timeout, callback, state);
}

public AmqpMessage EndRequest(IAsyncResult result)
Expand Down Expand Up @@ -252,15 +257,16 @@ sealed class RequestAsyncResult : TimeoutAsyncResult<MessageId>, IWork<AmqpMessa
{
readonly RequestResponseAmqpLink parent;
readonly MessageId requestId;
readonly ArraySegment<byte> transactionId;
AmqpMessage request;
AmqpMessage response;

public RequestAsyncResult(RequestResponseAmqpLink parent, AmqpMessage request, TimeSpan timeout, AsyncCallback callback, object state)
public RequestAsyncResult(RequestResponseAmqpLink parent, AmqpMessage request, ArraySegment<byte> txnId, TimeSpan timeout, AsyncCallback callback, object state)
: base(timeout, callback, state)
{
this.parent = parent;
this.request = request;

this.transactionId = txnId;
this.requestId = "request" + (ulong)Interlocked.Increment(ref this.parent.nextRequestId);
this.request.Properties.MessageId = this.requestId;
this.request.Properties.ReplyTo = this.parent.replyTo;
Expand All @@ -280,7 +286,7 @@ public static AmqpMessage End(IAsyncResult result)
public void Start()
{
this.SetTimer();
this.parent.sender.SendMessageNoWait(this.request, AmqpConstants.EmptyBinary, AmqpConstants.NullBinary);
this.parent.sender.SendMessageNoWait(this.request, AmqpConstants.EmptyBinary, this.transactionId);
this.request = null;
}

Expand Down
135 changes: 75 additions & 60 deletions Microsoft.Azure.Amqp/Amqp/Transaction/Controller.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,96 +5,67 @@ namespace Microsoft.Azure.Amqp.Transaction
{
using System;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Amqp.Encoding;
using Microsoft.Azure.Amqp.Framing;

public sealed class Controller
public sealed class Controller : AmqpObject
{
SendingAmqpLink sendLink;
readonly SendingAmqpLink controllerLink;
long messageTag;

public static AmqpMessage CreateCommandMessage(IAmqpSerializable command)
{
AmqpValue value = new AmqpValue() { Value = command };
return AmqpMessage.Create(value);
}
readonly TimeSpan operationTimeout;

public IAsyncResult BeginOpen(AmqpSession session, TimeSpan timeout, AsyncCallback callback, object state)
public Controller(AmqpSession amqpSession, TimeSpan operationTimeout)
: base("controller")
{
this.operationTimeout = operationTimeout;
string uniqueueName = Guid.NewGuid().ToString("N");
Source source = new Source();
source.Address = uniqueueName;
source.DistributionMode = DistributionMode.Move;

Coordinator coordinator = new Coordinator();
AmqpLinkSettings settings = new AmqpLinkSettings();
settings.Source = source;
settings.Target = coordinator;
settings.LinkName = uniqueueName;
settings.Role = false;

this.sendLink = new SendingAmqpLink(session, settings);
return this.sendLink.BeginOpen(timeout, callback, state);
}

public void EndOpen(IAsyncResult result)
{
this.sendLink.EndOpen(result);
}

public void Open(AmqpSession session, TimeSpan timeout)
{
this.EndOpen(this.BeginOpen(session, timeout, null, null));
}

public IAsyncResult BeginClose(TimeSpan timeout, AsyncCallback callback, object state)
{
return this.sendLink.BeginClose(timeout, callback, state);
}

public void EndClose(IAsyncResult result)
{
this.sendLink.EndClose(result);
this.sendLink = null;
}
var source = new Source
{
Address = uniqueueName,
DistributionMode = DistributionMode.Move
};
var coordinator = new Coordinator();
var settings = new AmqpLinkSettings
{
Source = source,
Target = coordinator,
LinkName = uniqueueName,
Role = false
};

public void Close(TimeSpan timeout)
{
this.EndClose(this.BeginClose(timeout, null, null));
this.controllerLink = new SendingAmqpLink(amqpSession, settings);
}

public IAsyncResult BeginDeclare(TimeSpan timeout, AsyncCallback callback, object state)
public async Task<ArraySegment<byte>> DeclareAsync()
{
AmqpTrace.Provider.AmqpLogOperationInformational(this, TraceOperation.Execute, "BeginDeclare");
Declare declare = new Declare();

AmqpMessage message = Controller.CreateCommandMessage(declare);
return this.sendLink.BeginSendMessage(message, this.GetDeliveryTag(), AmqpConstants.NullBinary, timeout, callback, state);
}
DeliveryState deliveryState = await Task<DeliveryState>.Factory.FromAsync(
this.controllerLink.BeginSendMessage(message, this.GetDeliveryTag(), AmqpConstants.NullBinary, this.operationTimeout, null, null),
this.controllerLink.EndSendMessage);

public ArraySegment<byte> EndDeclare(IAsyncResult result)
{
DeliveryState deliveryState = this.sendLink.EndSendMessage(result);
this.ThrowIfRejected(deliveryState);
AmqpTrace.Provider.AmqpLogOperationInformational(this, TraceOperation.Execute, "EndDeclare");
return ((Declared)deliveryState).TxnId;
}

public IAsyncResult BeginDischange(ArraySegment<byte> txnId, bool fail, TimeSpan timeout, AsyncCallback callback, object state)
public async Task DischargeAsync(ArraySegment<byte> txnId, bool fail)
{
AmqpTrace.Provider.AmqpLogOperationInformational(this, TraceOperation.Execute, "BeginDischange");
Discharge discharge = new Discharge()
Discharge discharge = new Discharge
{
TxnId = txnId,
Fail = fail
};

AmqpMessage message = Controller.CreateCommandMessage(discharge);
return this.sendLink.BeginSendMessage(message, this.GetDeliveryTag(), AmqpConstants.NullBinary, timeout, callback, state);
}

public void EndDischarge(IAsyncResult result)
{
DeliveryState deliveryState = this.sendLink.EndSendMessage(result);
DeliveryState deliveryState = await Task<DeliveryState>.Factory.FromAsync(
this.controllerLink.BeginSendMessage(message, this.GetDeliveryTag(), AmqpConstants.NullBinary, this.operationTimeout, null, null),
this.controllerLink.EndSendMessage);
this.ThrowIfRejected(deliveryState);
AmqpTrace.Provider.AmqpLogOperationInformational(this, TraceOperation.Execute, "EndDischange");
}
Expand All @@ -104,6 +75,50 @@ public override string ToString()
return "controller";
}

protected override bool OpenInternal()
{
var result = this.controllerLink.BeginOpen(this.operationTimeout, OnLinkOpen, this);
return result.IsCompleted;
}

protected override bool CloseInternal()
{
this.controllerLink.SafeClose();
this.controllerLink.Session.SafeClose();
return true;
}

protected override void AbortInternal()
{
this.controllerLink.Abort();
this.controllerLink.Session.Abort();
}

static AmqpMessage CreateCommandMessage(IAmqpSerializable command)
{
AmqpValue value = new AmqpValue { Value = command };
return AmqpMessage.Create(value);
}

static void OnLinkOpen(IAsyncResult asyncResult)
{
var thisPtr = (Controller) asyncResult.AsyncState;
Exception ex = null;
try
{
thisPtr.controllerLink.EndOpen(asyncResult);
}
catch (Exception exception) when (!Fx.IsFatal(exception))
{
ex = exception;
}

if (!asyncResult.CompletedSynchronously)
{
thisPtr.CompleteOpen(false, ex);
}
}

void ThrowIfRejected(DeliveryState deliveryState)
{
if (deliveryState.DescriptorCode == Rejected.Code)
Expand Down
2 changes: 1 addition & 1 deletion Microsoft.Azure.Amqp/Amqp/Transaction/TxnCapabilities.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ static class TxnCapabilities
{
public static readonly AmqpSymbol LocalTransactions = "amqp:local-transactions";
public static readonly AmqpSymbol DistributedTxn = "amqp:distributed-transactions";
public static readonly AmqpSymbol PrototableTransactions = "amqp:prototable-transactions";
public static readonly AmqpSymbol PromotableTransactions = "amqp:promotable-transactions";
public static readonly AmqpSymbol MultiTxnsPerSsn = "amqp:multi-txns-per-ssn";
public static readonly AmqpSymbol MultiSsnsPerTxn = "amqp:multi-ssns-per-txn";
}
Expand Down
2 changes: 1 addition & 1 deletion Microsoft.Azure.Amqp/Microsoft.Azure.Amqp.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

<PropertyGroup>
<Description>Microsoft.Azure.Amqp Class Library</Description>
<VersionPrefix>2.0.0</VersionPrefix>
<VersionPrefix>2.2.0</VersionPrefix>
<Authors>microsoft</Authors>
<TargetFrameworks>net45;netstandard1.3</TargetFrameworks>
<NoWarn>CS1734,CS1591</NoWarn>
Expand Down
Loading

0 comments on commit 38b170b

Please sign in to comment.