Skip to content

Commit

Permalink
Port to 1.9.1
Browse files Browse the repository at this point in the history
  • Loading branch information
JPWatson committed May 3, 2018
1 parent 00a167f commit 13236dd
Show file tree
Hide file tree
Showing 5 changed files with 64 additions and 23 deletions.
2 changes: 1 addition & 1 deletion RELEASE.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#### Port
Aeron.NET has been ported against Java version:
- Agrona: 0.9.12-30-gec52107
- Aeron: 1.9.0
- Aeron: 1.9.1
Binary file modified driver/media-driver.jar
Binary file not shown.
2 changes: 1 addition & 1 deletion driver/version.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
Driver source:
http://repo1.maven.org/maven2/io/aeron/aeron-all/1.9.0/aeron-all-1.9.0.jar
http://repo1.maven.org/maven2/io/aeron/aeron-all/1.9.1/aeron-all-1.9.1.jar
7 changes: 5 additions & 2 deletions src/Adaptive.Cluster/Client/AeronCluster.cs
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,8 @@ private Publication ConnectToCluster()
}
else
{
publications[i].Dispose();

publications[i]?.Dispose();
}
}

Expand All @@ -290,7 +291,7 @@ private Publication ConnectToCluster()
{
for (int i = 0; i < memberCount; i++)
{
publications[i].Dispose();
CloseHelper.QuietDispose(publications[i]);
}

throw new TimeoutException("awaiting connection to cluster");
Expand All @@ -308,6 +309,8 @@ private Publication ConnectToCluster()
{
if (_nanoClock.NanoTime() > deadlineNs)
{
CloseHelper.QuietDispose(publication);

throw new TimeoutException("awaiting connection to cluster");
}

Expand Down
76 changes: 57 additions & 19 deletions src/Adaptive.Cluster/Client/EgressAdapter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,23 @@ public class EgressAdapter : IFragmentHandler
/// </summary>
public static readonly int SESSION_HEADER_LENGTH = MessageHeaderDecoder.ENCODED_LENGTH + SessionHeaderDecoder.BLOCK_LENGTH;

private readonly long _clusterSessionId;
private readonly int _fragmentLimit;
private readonly MessageHeaderDecoder _messageHeaderDecoder = new MessageHeaderDecoder();
private readonly SessionEventDecoder _sessionEventDecoder = new SessionEventDecoder();
private readonly NewLeaderEventDecoder _newLeaderEventDecoder = new NewLeaderEventDecoder();
private readonly SessionHeaderDecoder _sessionHeaderDecoder = new SessionHeaderDecoder();
private readonly FragmentAssembler _fragmentAssembler;
private readonly IEgressListener _listener;
private readonly Subscription _subscription;
private readonly int _fragmentLimit;

public EgressAdapter(IEgressListener listener, Subscription subscription, int fragmentLimit)

public EgressAdapter(
IEgressListener listener,
long clusterSessionId,
Subscription subscription,
int fragmentLimit)
{
_clusterSessionId = clusterSessionId;
_fragmentAssembler = new FragmentAssembler(this);
_listener = listener;
_subscription = subscription;
Expand All @@ -43,42 +49,74 @@ public void OnFragment(IDirectBuffer buffer, int offset, int length, Header head
switch (templateId)
{
case SessionEventDecoder.TEMPLATE_ID:
{
_sessionEventDecoder.Wrap(
buffer,
offset + MessageHeaderDecoder.ENCODED_LENGTH,
_messageHeaderDecoder.BlockLength(),
_messageHeaderDecoder.Version());

_listener.SessionEvent(
_sessionEventDecoder.CorrelationId(),
_sessionEventDecoder.ClusterSessionId(),
_sessionEventDecoder.Code(),
_sessionEventDecoder.Detail());
var sessionId = _sessionEventDecoder.ClusterSessionId();
if (sessionId == _clusterSessionId)
{

_listener.SessionEvent(
_sessionEventDecoder.CorrelationId(),
_sessionEventDecoder.ClusterSessionId(),
_sessionEventDecoder.Code(),
_sessionEventDecoder.Detail());
}

break;
}

case NewLeaderEventDecoder.TEMPLATE_ID:
_newLeaderEventDecoder.Wrap(buffer, offset + MessageHeaderDecoder.ENCODED_LENGTH, _messageHeaderDecoder.BlockLength(), _messageHeaderDecoder.Version());
{
_newLeaderEventDecoder.Wrap(
buffer,
offset + MessageHeaderDecoder.ENCODED_LENGTH,
_messageHeaderDecoder.BlockLength(),
_messageHeaderDecoder.Version());

var sessionId = _newLeaderEventDecoder.ClusterSessionId();
if (sessionId == _clusterSessionId)
{
_listener.NewLeader(
_newLeaderEventDecoder.LastCorrelationId(),
_newLeaderEventDecoder.ClusterSessionId(),
_newLeaderEventDecoder.LastMessageTimestamp(),
_newLeaderEventDecoder.LeadershipTimestamp(),
_newLeaderEventDecoder.LeadershipTermId(),
_newLeaderEventDecoder.LeaderMemberId(),
_newLeaderEventDecoder.MemberEndpoints());
}

_listener.NewLeader(_newLeaderEventDecoder.LastCorrelationId(), _newLeaderEventDecoder.ClusterSessionId(), _newLeaderEventDecoder.LastMessageTimestamp(), _newLeaderEventDecoder.LeadershipTimestamp(), _newLeaderEventDecoder.LeadershipTermId(), _newLeaderEventDecoder.LeaderMemberId(), _newLeaderEventDecoder.MemberEndpoints());
break;
}

case SessionHeaderDecoder.TEMPLATE_ID:
{
_sessionHeaderDecoder.Wrap(
buffer,
offset + MessageHeaderDecoder.ENCODED_LENGTH,
_messageHeaderDecoder.BlockLength(),
_messageHeaderDecoder.Version());

_listener.OnMessage(
_sessionHeaderDecoder.CorrelationId(),
_sessionHeaderDecoder.ClusterSessionId(),
_sessionHeaderDecoder.Timestamp(),
buffer,
offset + SESSION_HEADER_LENGTH,
length - SESSION_HEADER_LENGTH,
header);
var sessionId = _sessionHeaderDecoder.ClusterSessionId();
if (sessionId == _clusterSessionId)
{
_listener.OnMessage(
_sessionHeaderDecoder.CorrelationId(),
_sessionHeaderDecoder.ClusterSessionId(),
_sessionHeaderDecoder.Timestamp(),
buffer,
offset + SESSION_HEADER_LENGTH,
length - SESSION_HEADER_LENGTH,
header);
}
break;

}

case ChallengeDecoder.TEMPLATE_ID:
break;

Expand Down

0 comments on commit 13236dd

Please sign in to comment.