Skip to content

Commit

Permalink
Port to 1.15.0
Browse files Browse the repository at this point in the history
  • Loading branch information
JPWatson committed Jan 21, 2019
1 parent f235244 commit 32c13be
Show file tree
Hide file tree
Showing 36 changed files with 801 additions and 188 deletions.
4 changes: 2 additions & 2 deletions RELEASE.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#### Port
Aeron.NET has been ported against Java version:
- Agrona: 0.9.29
- Aeron: 1.14.0
- Agrona: 0.9.30
- Aeron: 1.15.0
2 changes: 1 addition & 1 deletion driver/Aeron.Driver.nuspec
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
<package >
<metadata>
<id>Aeron.Driver</id>
<version>1.14.0</version>
<version>1.15.0</version>
<title>Aeron Driver</title>
<authors>Adaptive Financial Consulting Ltd.</authors>
<owners>Adaptive Financial Consulting Ltd.</owners>
Expand Down
Binary file modified driver/media-driver.jar
Binary file not shown.
49 changes: 48 additions & 1 deletion src/Adaptive.Aeron.Tests/ClientConductorTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,13 @@ public class ClientConductorTest
private SubscriptionReadyFlyweight SubscriptionReady;
private OperationSucceededFlyweight OperationSuccess;
private ErrorResponseFlyweight ErrorResponse;
private ClientTimeoutFlyweight ClientTimeout;

private UnsafeBuffer PublicationReadyBuffer;
private UnsafeBuffer SubscriptionReadyBuffer;
private UnsafeBuffer OperationSuccessBuffer;
private UnsafeBuffer ErrorMessageBuffer;
private UnsafeBuffer ClientTimeoutBuffer;

private CopyBroadcastReceiver MockToClientReceiver;

Expand Down Expand Up @@ -98,11 +100,13 @@ public void SetUp()
SubscriptionReady = new SubscriptionReadyFlyweight();
OperationSuccess = new OperationSucceededFlyweight();
ErrorResponse = new ErrorResponseFlyweight();
ClientTimeout = new ClientTimeoutFlyweight();

PublicationReadyBuffer = new UnsafeBuffer(new byte[SEND_BUFFER_CAPACITY]);
SubscriptionReadyBuffer = new UnsafeBuffer(new byte[SEND_BUFFER_CAPACITY]);
OperationSuccessBuffer = new UnsafeBuffer(new byte[SEND_BUFFER_CAPACITY]);
ErrorMessageBuffer = new UnsafeBuffer(new byte[SEND_BUFFER_CAPACITY]);
ClientTimeoutBuffer = new UnsafeBuffer(new byte[SEND_BUFFER_CAPACITY]);

CounterValuesBuffer = new UnsafeBuffer(new byte[COUNTER_BUFFER_LENGTH]);
MockToClientReceiver = A.Fake<CopyBroadcastReceiver>();
Expand Down Expand Up @@ -144,6 +148,7 @@ public void SetUp()
SubscriptionReady.Wrap(SubscriptionReadyBuffer, 0);
OperationSuccess.Wrap(OperationSuccessBuffer, 0);
ErrorResponse.Wrap(ErrorMessageBuffer, 0);
ClientTimeout.Wrap(ClientTimeoutBuffer, 0);

PublicationReady.CorrelationId(CORRELATION_ID);
PublicationReady.RegistrationId(CORRELATION_ID);
Expand Down Expand Up @@ -497,9 +502,51 @@ public void ShouldTimeoutInterServiceIfTooLongBetweenDoWorkCalls()

A.CallTo(() => MockClientErrorHandler(A<ConductorServiceTimeoutException>._)).MustHaveHappened();

Assert.True(Conductor.IsClosed());
Assert.True(Conductor.IsTerminating());
}

[Test]
public void ShouldTerminateAndErrorOnClientTimeoutFromDriver()
{
SuppressPrintError = true;

Conductor.OnClientTimeout();

A.CallTo(() => MockClientErrorHandler.Invoke(A<Exception>._)).MustHaveHappened();

bool threwException = false;
try
{
Conductor.DoWork();
}
catch (AgentTerminationException)
{
threwException = true;
}

Assert.True(threwException);
Assert.True(Conductor.IsTerminating());
}

[Test]
public void ShouldNotCloseAndErrorOnClientTimeoutForAnotherClientIdFromDriver()
{
WhenReceiveBroadcastOnMessage(
ControlProtocolEvents.ON_CLIENT_TIMEOUT,
ClientTimeoutBuffer,
buffer =>
{
ClientTimeout.ClientId(Conductor.DriverListenerAdapter().ClientId + 1);
return ClientTimeoutFlyweight.LENGTH;
});

Conductor.DoWork();

A.CallTo(() => MockClientErrorHandler.Invoke(A<Exception>._)).MustNotHaveHappened();

Assert.False(Conductor.IsClosed());
}

private void WhenReceiveBroadcastOnMessage(int msgTypeId, IMutableDirectBuffer buffer, Func<IMutableDirectBuffer, int> filler)
{
A.CallTo(() => MockToClientReceiver.Receive(A<MessageHandler>._)).Invokes(() =>
Expand Down
2 changes: 1 addition & 1 deletion src/Adaptive.Aeron/Adaptive.Aeron.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
<PropertyGroup>
<TargetFrameworks>netstandard2.0;net45</TargetFrameworks>
<PackageId>Aeron.Client</PackageId>
<VersionPrefix>1.14.0</VersionPrefix>
<VersionPrefix>1.15.0</VersionPrefix>
<Authors>Adaptive Financial Consulting Ltd.</Authors>
<Company>Adaptive Financial Consulting Ltd.</Company>
<Product>Aeron Client</Product>
Expand Down
32 changes: 17 additions & 15 deletions src/Adaptive.Aeron/Aeron.cs
Original file line number Diff line number Diff line change
Expand Up @@ -173,9 +173,6 @@ public void Dispose()
{
_conductorInvoker.Dispose();
}

_conductor.ClientClose();
_ctx.Dispose();
}
}

Expand Down Expand Up @@ -380,8 +377,11 @@ public static long ResourceLingerDurationNs()
/// A number of the properties are for testing and should not be set by end users.
///
/// <b>Note:</b> Do not reuse instances of the context across different <seealso cref="Aeron"/> clients.
///
/// The context will be owned be <see cref="ClientConductor"/> after a successful
/// <see cref="Aeron.Connect(Context)"/> and closed via <see cref="Aeron.Dispose"/>
/// </summary>
public class Context : IDisposable
public class Context
{
private long _clientId;
private bool _useConductorAgentInvoker = false;
Expand Down Expand Up @@ -536,16 +536,6 @@ static Context()
/// </summary>
public const string MDC_CONTROL_MODE = "control-mode";

/// <summary>
/// Key for the session id for a publication or restricted subscription.
/// </summary>
public const string SESSION_ID_PARAM_NAME = "session-id";

/// <summary>
/// Key for the linger timeout for a publication to wait around after draining in nanoseconds.
/// </summary>
public const string LINGER_PARAM_NAME = "linger";

/// <summary>
/// Valid value for <seealso cref="MDC_CONTROL_MODE"/> when manual control is desired.
/// </summary>
Expand All @@ -555,7 +545,17 @@ static Context()
/// Valid value for <seealso cref="MDC_CONTROL_MODE_PARAM_NAME"/> when dynamic control is desired. Default value.
/// </summary>
public const string MDC_CONTROL_MODE_DYNAMIC = "dynamic";

/// <summary>
/// Key for the session id for a publication or restricted subscription.
/// </summary>
public const string SESSION_ID_PARAM_NAME = "session-id";

/// <summary>
/// Key for the linger timeout for a publication to wait around after draining in nanoseconds.
/// </summary>
public const string LINGER_PARAM_NAME = "linger";

/// <summary>
/// Parameter name for channel URI param to indicate if a subscribed must be reliable or not. Value is boolean.
/// </summary>
Expand Down Expand Up @@ -1223,8 +1223,10 @@ public IThreadFactory ThreadFactory()
/// </summary>
public void Dispose()
{
IoUtil.Unmap(_cncByteBuffer);
var cncByteBuffer = _cncByteBuffer;
_cncByteBuffer = null;
IoUtil.Unmap(cncByteBuffer);


_cncMetaDataBuffer?.Dispose();
_countersMetaDataBuffer?.Dispose();
Expand Down
5 changes: 5 additions & 0 deletions src/Adaptive.Aeron/ChannelUri.cs
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,11 @@ public override string ToString()
/// <param name="termLength"> for the stream. </param>
public void InitialPosition(long position, int initialTermId, int termLength)
{
if (position < 0 || 0 != (position & (FrameDescriptor.FRAME_ALIGNMENT - 1)))
{
throw new ArgumentException("invalid position: " + position);
}

int bitsToShift = LogBufferDescriptor.PositionBitsToShift(termLength);
int termId = LogBufferDescriptor.ComputeTermIdFromPosition(position, bitsToShift, initialTermId);
int termOffset = (int)(position & (termLength - 1));
Expand Down
89 changes: 47 additions & 42 deletions src/Adaptive.Aeron/ChannelUriStringBuilder.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using System;
using System.Text;
using Adaptive.Aeron.LogBuffer;
using static Adaptive.Aeron.LogBuffer.FrameDescriptor;

namespace Adaptive.Aeron
{
Expand All @@ -25,7 +26,6 @@ public class ChannelUriStringBuilder
private string _tags;
private string _alias;
private bool? _reliable;
private bool? _sparse;
private int? _ttl;
private int? _mtu;
private int? _termLength;
Expand All @@ -34,6 +34,7 @@ public class ChannelUriStringBuilder
private int? _termOffset;
private int? _sessionId;
private long? _linger;
private bool? _sparse;
private bool _isSessionIdTagged;

/// <summary>
Expand Down Expand Up @@ -277,30 +278,6 @@ public ChannelUriStringBuilder Reliable(bool? isReliable)
return _reliable;
}

/// <summary>
/// Set to indicate if a term log buffer should be sparse on disk or not. Sparse saves space at the potential
/// expense of latency.
/// </summary>
/// <param name="isSparse"> true if the term buffer log is sparse on disk. </param>
/// <returns> this for a fluent API. </returns>
/// <see cref="Aeron.Context.SPARSE_PARAM_NAME"/>
public ChannelUriStringBuilder Sparse(bool? isSparse)
{
_sparse = isSparse;
return this;
}

/// <summary>
/// Get if a term log buffer should be sparse on disk or not. Sparse saves space at the potential expense of latency.
/// </summary>
/// <returns> true if the term buffer log is sparse on disk. </returns>
/// <see cref="Aeron.Context.SPARSE_PARAM_NAME"/>
public bool? Sparse()
{
return _sparse;
}


/// <summary>
/// Set the Time To Live (TTL) for a multicast datagram. Valid values are 0-255 for the number of hops the datagram
/// can progress along.
Expand Down Expand Up @@ -345,7 +322,7 @@ public ChannelUriStringBuilder Mtu(int? mtu)
throw new ArgumentException("MTU not in range 32-65504: " + mtu);
}

if ((mtu & (FrameDescriptor.FRAME_ALIGNMENT - 1)) != 0)
if ((mtu & (FRAME_ALIGNMENT - 1)) != 0)
{
throw new ArgumentException("MTU not a multiple of FRAME_ALIGNMENT: mtu=" + mtu);
}
Expand Down Expand Up @@ -454,7 +431,7 @@ public ChannelUriStringBuilder TermOffset(int? termOffset)
throw new ArgumentException("term offset not in range 0-1g: " + termOffset);
}

if (0 != (termOffset & (FrameDescriptor.FRAME_ALIGNMENT - 1)))
if (0 != (termOffset & (FRAME_ALIGNMENT - 1)))
{
throw new ArgumentException("term offset not multiple of FRAME_ALIGNMENT: " + termOffset);
}
Expand Down Expand Up @@ -525,6 +502,29 @@ public ChannelUriStringBuilder Linger(long? lingerNs)
return _linger;
}

/// <summary>
/// Set to indicate if a term log buffer should be sparse on disk or not. Sparse saves space at the potential
/// expense of latency.
/// </summary>
/// <param name="isSparse"> true if the term buffer log is sparse on disk. </param>
/// <returns> this for a fluent API. </returns>
/// <see cref="Aeron.Context.SPARSE_PARAM_NAME"/>
public ChannelUriStringBuilder Sparse(bool? isSparse)
{
_sparse = isSparse;
return this;
}

/// <summary>
/// Get if a term log buffer should be sparse on disk or not. Sparse saves space at the potential expense of latency.
/// </summary>
/// <returns> true if the term buffer log is sparse on disk. </returns>
/// <see cref="Aeron.Context.SPARSE_PARAM_NAME"/>
public bool? Sparse()
{
return _sparse;
}

/// <summary>
/// Set the tags for a channel used by a publication or subscription. Tags can be used to identify or tag a
/// channel so that a configuration can be referenced and reused.
Expand Down Expand Up @@ -606,6 +606,11 @@ public string Alias()
/// <returns> this for a fluent API. </returns>
public ChannelUriStringBuilder InitialPosition(long position, int initialTermId, int termLength)
{
if (position < 0 || 0 != (position & (FRAME_ALIGNMENT - 1)))
{
throw new ArgumentException("invalid position: " + position);
}

int bitsToShift = LogBufferDescriptor.PositionBitsToShift(termLength);

_initialTermId = initialTermId;
Expand Down Expand Up @@ -657,21 +662,6 @@ public string Build()
_sb.Append(Aeron.Context.MDC_CONTROL_MODE_PARAM_NAME).Append('=').Append(_controlMode).Append('|');
}

if (null != _reliable)
{
_sb.Append(Aeron.Context.RELIABLE_STREAM_PARAM_NAME).Append('=').Append(_reliable).Append('|');
}

if (null != _sparse)
{
_sb.Append(Aeron.Context.SPARSE_PARAM_NAME).Append('=').Append(_sparse).Append('|');
}

if (null != _ttl)
{
_sb.Append(Aeron.Context.TTL_PARAM_NAME).Append('=').Append(_ttl.Value).Append('|');
}

if (null != _mtu)
{
_sb.Append(Aeron.Context.MTU_LENGTH_PARAM_NAME).Append('=').Append(_mtu.Value).Append('|');
Expand Down Expand Up @@ -703,6 +693,16 @@ public string Build()
_sb.Append(Aeron.Context.SESSION_ID_PARAM_NAME).Append('=').Append(PrefixTag(_isSessionIdTagged, _sessionId.Value)).Append('|');
}

if (null != _ttl)
{
_sb.Append(Aeron.Context.TTL_PARAM_NAME).Append('=').Append(_ttl.Value).Append('|');
}

if (null != _reliable)
{
_sb.Append(Aeron.Context.RELIABLE_STREAM_PARAM_NAME).Append('=').Append(_reliable).Append('|');
}

if (null != _linger)
{
_sb.Append(Aeron.Context.LINGER_PARAM_NAME).Append('=').Append(_linger.Value).Append('|');
Expand All @@ -712,6 +712,11 @@ public string Build()
{
_sb.Append(Aeron.Context.ALIAS_PARAM_NAME).Append('=').Append(_alias).Append('|');
}

if (null != _sparse)
{
_sb.Append(Aeron.Context.SPARSE_PARAM_NAME).Append('=').Append(_sparse).Append('|');
}

char lastChar = _sb[_sb.Length - 1];
if (lastChar == '|' || lastChar == '?')
Expand Down
Loading

0 comments on commit 32c13be

Please sign in to comment.