Skip to content

Commit

Permalink
Port to 1.14.0
Browse files Browse the repository at this point in the history
  • Loading branch information
JPWatson committed Jan 7, 2019
1 parent 03b0ea8 commit 7153e58
Show file tree
Hide file tree
Showing 26 changed files with 1,201 additions and 98 deletions.
4 changes: 2 additions & 2 deletions RELEASE.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#### Port
Aeron.NET has been ported against Java version:
- Agrona: 0.9.28
- Aeron: 1.13.0
- Agrona: 0.9.29
- Aeron: 1.14.0
2 changes: 1 addition & 1 deletion driver/Aeron.Driver.nuspec
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
<package >
<metadata>
<id>Aeron.Driver</id>
<version>1.13.0</version>
<version>1.14.0</version>
<title>Aeron Driver</title>
<authors>Adaptive Financial Consulting Ltd.</authors>
<owners>Adaptive Financial Consulting Ltd.</owners>
Expand Down
Binary file modified driver/media-driver.jar
Binary file not shown.
2 changes: 1 addition & 1 deletion driver/version.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
Driver source:
https://repo1.maven.org/maven2/io/aeron/aeron-all/1.13.0/aeron-all-1.13.0.jar
https://repo1.maven.org/maven2/io/aeron/aeron-all/1.14.0/aeron-all-1.14.0.jar
2 changes: 1 addition & 1 deletion src/Adaptive.Aeron/Adaptive.Aeron.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
<PropertyGroup>
<TargetFrameworks>netstandard2.0;net45</TargetFrameworks>
<PackageId>Aeron.Client</PackageId>
<VersionPrefix>1.13.0</VersionPrefix>
<VersionPrefix>1.14.0</VersionPrefix>
<Authors>Adaptive Financial Consulting Ltd.</Authors>
<Company>Adaptive Financial Consulting Ltd.</Company>
<Product>Aeron Client</Product>
Expand Down
18 changes: 6 additions & 12 deletions src/Adaptive.Aeron/Aeron.cs
Original file line number Diff line number Diff line change
Expand Up @@ -168,20 +168,13 @@ public void Dispose()
if (null != _conductorRunner)
{
_conductorRunner.Dispose();
if (!_conductorRunner.IsClosed)
{
throw new AeronException("failed to close Aeron client");
}
}
else
{
_conductorInvoker.Dispose();
if (!_conductorInvoker.IsClosed)
{
throw new AeronException("failed to close Aeron client");
}
}

_conductor.ClientClose();
_ctx.Dispose();
}
}
Expand Down Expand Up @@ -556,17 +549,17 @@ public class Context : IDisposable
/// Qualifier for a value which is a tag for reference. This prefix is use in the param value.
/// </summary>
public const string TAG_PREFIX = "tag:";

/// <summary>
/// Parameter name for channel URI param to indicate if term buffers should be sparse. Value is boolean.
/// </summary>
public const string SPARSE_PARAM_NAME = "sparse";

/// <summary>
/// Parameter name for channel URI param to indicate an alias for the given URI. Value not interpreted by Aeron.
/// </summary>
public const string ALIAS_PARAM_NAME = "alias";

/// <summary>
/// Get the default directory name to be used if <seealso cref="AeronDirectoryName(String)"/> is not set. This will take
/// the <seealso cref="AERON_DIR_PROP_NAME"/> if set and if not then <seealso cref="AERON_DIR_PROP_DEFAULT"/>.
Expand Down Expand Up @@ -1293,7 +1286,8 @@ private void ConnectToDriver()
}
}

private static MappedByteBuffer WaitForFileMapping(FileInfo cncFile, long deadLineMs, IEpochClock epochClock)
private static MappedByteBuffer WaitForFileMapping(FileInfo cncFile, long deadLineMs,
IEpochClock epochClock)
{
try
{
Expand Down
12 changes: 2 additions & 10 deletions src/Adaptive.Aeron/ChannelUriStringBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,15 @@ public ChannelUriStringBuilder Clear()
_tags = null;
_alias = null;
_reliable = null;
_sparse = null;
_ttl = null;
_mtu = null;
_termLength = null;
_initialTermId = null;
_termId = null;
_termOffset = null;
_sessionId = null;
_linger = null;
_isSessionIdTagged = false;

return this;
Expand Down Expand Up @@ -720,16 +722,6 @@ public string Build()
return _sb.ToString();
}

/// <summary>
/// Call <seealso cref="Convert.ToInt32(String)"/> only if the value param is not null. Else pass null on.
/// </summary>
/// <param name="value"> to check for null and convert if not null. </param>
/// <returns> null if value param is null or result of <seealso cref="Convert.ToInt32(String)"/>. </returns>
public static int? IntegerValueOf(string value)
{
return null == value ? (int?) null : Convert.ToInt32(value);
}

private static string PrefixTag(bool isTagged, int value)
{
return isTagged ? TAG_PREFIX + value : value.ToString();
Expand Down
27 changes: 9 additions & 18 deletions src/Adaptive.Aeron/ClientConductor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -109,20 +109,14 @@ public void OnStart()
public void OnClose()
{
_clientLock.Lock();

try
{
if (!_isClosed)
{
_isClosed = true;

int lingeringResourcesSize = _lingeringResources.Count;
ForceCloseResources();

if (_lingeringResources.Count > lingeringResourcesSize)
{
Aeron.Sleep(15);
}
Thread.Yield();

for (int i = 0, size = _lingeringResources.Count; i < size; i++)
{
Expand All @@ -138,6 +132,11 @@ public void OnClose()
}
}

internal void ClientClose()
{
_driverProxy.ClientClose();
}

public int DoWork()
{
int workCount = 0;
Expand Down Expand Up @@ -180,7 +179,7 @@ public void OnError(long correlationId, int codeValue, ErrorCode errorCode, stri
public void OnChannelEndpointError(int statusIndicatorId, string message)
{
var resourcesToRemove = new List<long>();

try
{
foreach (var item in _resourceByRegIdMap)
Expand Down Expand Up @@ -757,15 +756,7 @@ private void AwaitResponse(long correlationId)

do
{
try
{
Thread.Sleep(1);
}
catch (ThreadInterruptedException)
{
Thread.CurrentThread.Interrupt();
throw;
}
Thread.Sleep(1);

Service(correlationId);

Expand All @@ -780,6 +771,7 @@ private void AwaitResponse(long correlationId)
}

Thread.Sleep(0); // check interrupt

} while (deadlineNs - _nanoClock.NanoTime() > 0);

throw new DriverTimeoutException("no response from MediaDriver within (ms):" + _driverTimeoutMs);
Expand Down Expand Up @@ -828,7 +820,6 @@ private int CheckLiveness(long nowNs)
if (_epochClock.Time() > (_driverProxy.TimeOfLastDriverKeepaliveMs() + _driverTimeoutMs))
{
OnClose();

throw new DriverTimeoutException("MediaDriver keepalive older than (ms): " + _driverTimeoutMs);
}

Expand Down
14 changes: 11 additions & 3 deletions src/Adaptive.Aeron/DriverProxy.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ namespace Adaptive.Aeron
///
/// Writes commands into the client conductor buffer.
///
/// Note: this class is not thread safe and is expecting to be called under the {@link ClientConductor} main lock.
/// Note: this class is not thread safe and is expecting to be called within <see cref="Aeron.Context.ClientLock(Adaptive.Agrona.Concurrent.ILock)"/>
/// with the exception of <see cref="ClientClose"/> which is thread safe.
/// </summary>
public class DriverProxy
{
Expand Down Expand Up @@ -277,8 +278,15 @@ public long RemoveCounter(long registrationId)

public void ClientClose()
{
_correlatedMessage.CorrelationId(_toDriverCommandBuffer.NextCorrelationId());
_toDriverCommandBuffer.Write(ControlProtocolEvents.CLIENT_CLOSE, _buffer, 0, CorrelatedMessageFlyweight.LENGTH);
var buffer = new UnsafeBuffer(new byte[CorrelatedMessageFlyweight.LENGTH]);

new CorrelatedMessageFlyweight()
.Wrap(buffer, 0)
.ClientId(_correlatedMessage.ClientId())
.CorrelationId(Aeron.NULL_VALUE);

//_correlatedMessage.CorrelationId(_toDriverCommandBuffer.NextCorrelationId());
_toDriverCommandBuffer.Write(ControlProtocolEvents.CLIENT_CLOSE, buffer, 0, CorrelatedMessageFlyweight.LENGTH);
}
}
}
2 changes: 1 addition & 1 deletion src/Adaptive.Agrona/Adaptive.Agrona.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<TargetFrameworks>netstandard2.0;net45</TargetFrameworks>
<AllowUnsafeBlocks>true</AllowUnsafeBlocks>
<PackageId>Agrona</PackageId>
<VersionPrefix>1.13.0</VersionPrefix>
<VersionPrefix>1.14.0</VersionPrefix>
<Authors>Adaptive Financial Consulting Ltd.</Authors>
<Company>Adaptive Financial Consulting Ltd.</Company>
<Product>Agrona libraries initially included in Aeron Client</Product>
Expand Down
17 changes: 15 additions & 2 deletions src/Adaptive.Agrona/Concurrent/AgentRunner.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public class AgentRunner : IDisposable
/// <summary>
/// Indicates that the runner is being closed.
/// </summary>
private static readonly Thread TOMBSTONE = null;
private static readonly Thread TOMBSTONE = new Thread(() => { });

private static readonly int RETRY_CLOSE_TIMEOUT_MS = 3000;

Expand Down Expand Up @@ -174,7 +174,20 @@ public void Dispose()
_isRunning = false;

var thread = _thread.GetAndSet(TOMBSTONE);
if (TOMBSTONE != thread && null != thread)

if (null == thread)
{
try
{
IsClosed = true;
_agent.OnClose();
}
catch (Exception ex)
{
_errorHandler(ex);
}
}
if (TOMBSTONE != thread)
{
while (true)
{
Expand Down
2 changes: 1 addition & 1 deletion src/Adaptive.Archiver/Adaptive.Archiver.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<TargetFrameworks>netstandard2.0;net45</TargetFrameworks>
<AllowUnsafeBlocks>true</AllowUnsafeBlocks>
<PackageId>Aeron.Archiver</PackageId>
<VersionPrefix>1.13.0</VersionPrefix>
<VersionPrefix>1.14.0</VersionPrefix>
<Authors>Adaptive Financial Consulting Ltd.</Authors>
<Company>Adaptive Financial Consulting Ltd.</Company>
<Product>Archiving over the Aeron transport</Product>
Expand Down
4 changes: 3 additions & 1 deletion src/Adaptive.Archiver/AeronArchive.cs
Original file line number Diff line number Diff line change
Expand Up @@ -775,10 +775,11 @@ public int ListRecording(long recordingId, IRecordingDescriptorConsumer consumer
}

/// <summary>
/// Get the position recorded for an active recording.
/// Get the position recorded for an active recording. If no active recording the return <see cref="NULL_POSITION"/>
/// </summary>
/// <param name="recordingId"> of the active recording for which the position is required. </param>
/// <returns> the recorded position for the active recording or <seealso cref="NULL_POSITION"/> if recording not active. </returns>
/// <seealso cref="GetStopPosition"/>
public long GetRecordingPosition(long recordingId)
{
_lock.Lock();
Expand All @@ -804,6 +805,7 @@ public long GetRecordingPosition(long recordingId)
/// </summary>
/// <param name="recordingId"> of the active recording for which the position is required. </param>
/// <returns> the stop position, or <seealso cref="AeronArchive.NULL_POSITION"/> if still active. </returns>
/// <seealso cref="GetRecordingPosition"/>
public long GetStopPosition(long recordingId)
{
_lock.Lock();
Expand Down
2 changes: 1 addition & 1 deletion src/Adaptive.Cluster/Adaptive.Cluster.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<TargetFrameworks>netstandard2.0;net45</TargetFrameworks>
<AllowUnsafeBlocks>true</AllowUnsafeBlocks>
<PackageId>Aeron.Cluster</PackageId>
<VersionPrefix>1.13.0</VersionPrefix>
<VersionPrefix>1.14.0</VersionPrefix>
<Authors>Adaptive Financial Consulting Ltd.</Authors>
<Company>Adaptive Financial Consulting Ltd.</Company>
<Product>Clustering libraries over the Aeron transport</Product>
Expand Down
2 changes: 0 additions & 2 deletions src/Adaptive.Cluster/Codecs/ClusterAction.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@ public enum ClusterAction : int
SUSPEND = 0,
RESUME = 1,
SNAPSHOT = 2,
SHUTDOWN = 3,
ABORT = 4,
NULL_VALUE = -2147483648
}
}
4 changes: 2 additions & 2 deletions src/Adaptive.Cluster/Codecs/ClusterActionRequestDecoder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -333,8 +333,8 @@ public StringBuilder AppendTo(StringBuilder builder)
builder.Append("Timestamp=");
builder.Append(Timestamp());
builder.Append('|');
//Token{signal=BEGIN_FIELD, name='action', referencedName='null', description='null', id=4, version=0, deprecated=0, encodedLength=0, offset=24, componentTokenCount=9, encoding=Encoding{presence=REQUIRED, primitiveType=null, byteOrder=LITTLE_ENDIAN, minValue=null, maxValue=null, nullValue=null, constValue=null, characterEncoding='null', epoch='unix', timeUnit=nanosecond, semanticType='null'}}
//Token{signal=BEGIN_ENUM, name='ClusterAction', referencedName='null', description='Action to be taken by a cluster nodes', id=-1, version=0, deprecated=0, encodedLength=4, offset=24, componentTokenCount=7, encoding=Encoding{presence=REQUIRED, primitiveType=INT32, byteOrder=LITTLE_ENDIAN, minValue=null, maxValue=null, nullValue=null, constValue=null, characterEncoding='null', epoch='null', timeUnit=null, semanticType='null'}}
//Token{signal=BEGIN_FIELD, name='action', referencedName='null', description='null', id=4, version=0, deprecated=0, encodedLength=0, offset=24, componentTokenCount=7, encoding=Encoding{presence=REQUIRED, primitiveType=null, byteOrder=LITTLE_ENDIAN, minValue=null, maxValue=null, nullValue=null, constValue=null, characterEncoding='null', epoch='unix', timeUnit=nanosecond, semanticType='null'}}
//Token{signal=BEGIN_ENUM, name='ClusterAction', referencedName='null', description='Action to be taken by a cluster nodes', id=-1, version=0, deprecated=0, encodedLength=4, offset=24, componentTokenCount=5, encoding=Encoding{presence=REQUIRED, primitiveType=INT32, byteOrder=LITTLE_ENDIAN, minValue=null, maxValue=null, nullValue=null, constValue=null, characterEncoding='null', epoch='null', timeUnit=null, semanticType='null'}}
builder.Append("Action=");
builder.Append(Action());

Expand Down
Loading

0 comments on commit 7153e58

Please sign in to comment.