Skip to content

Commit

Permalink
Port to 1.10.4
Browse files Browse the repository at this point in the history
  • Loading branch information
JPWatson committed Aug 1, 2018
1 parent 6f3ed0a commit 7a87b75
Show file tree
Hide file tree
Showing 13 changed files with 128 additions and 27 deletions.
4 changes: 2 additions & 2 deletions RELEASE.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#### Port
Aeron.NET has been ported against Java version:
- Agrona: 0.9.18-13-g0378ffa
- Aeron: 1.10.2
- Agrona: 0.9.22
- Aeron: 1.10.4
2 changes: 1 addition & 1 deletion driver/Aeron.Driver.nuspec
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
<package >
<metadata>
<id>Aeron.Driver</id>
<version>1.10.3</version>
<version>1.10.4</version>
<title>Aeron Driver</title>
<authors>Adaptive Financial Consulting Ltd.</authors>
<owners>Adaptive Financial Consulting Ltd.</owners>
Expand Down
Binary file modified driver/media-driver.jar
Binary file not shown.
2 changes: 1 addition & 1 deletion driver/version.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
Driver source:
http://repo1.maven.org/maven2/io/aeron/aeron-all/1.10.3/aeron-all-1.10.3.jar
http://repo1.maven.org/maven2/io/aeron/aeron-all/1.10.4/aeron-all-1.10.4.jar
2 changes: 1 addition & 1 deletion src/Adaptive.Aeron/Adaptive.Aeron.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
<PropertyGroup>
<TargetFrameworks>netstandard2.0;net45</TargetFrameworks>
<PackageId>Aeron.Client</PackageId>
<VersionPrefix>1.10.3</VersionPrefix>
<VersionPrefix>1.10.4</VersionPrefix>
<Authors>Adaptive Financial Consulting Ltd.</Authors>
<Company>Adaptive Financial Consulting Ltd.</Company>
<Product>Aeron Client</Product>
Expand Down
15 changes: 12 additions & 3 deletions src/Adaptive.Aeron/Aeron.cs
Original file line number Diff line number Diff line change
Expand Up @@ -526,6 +526,11 @@ public class Context : IDisposable
/// Key for the tags for a channel
/// </summary>
public const string TAGS_PARAM_NAME = "tags";

/// <summary>
/// Parameter name for channel URI param to indicate if term buffers should be sparse. Value is boolean.
/// </summary>
public const string SPARSE_PARAM_NAME = "sparse";

/// <summary>
/// Get the default directory name to be used if <seealso cref="AeronDirectoryName(String)"/> is not set. This will take
Expand Down Expand Up @@ -1418,12 +1423,16 @@ public int SaveErrorLog(StreamWriter writer, MappedByteBuffer cncByteBuffer)
cncVersion);
}

int distinctErrorCount = 0;
UnsafeBuffer buffer = CncFileDescriptor.CreateErrorLogBuffer(cncByteBuffer, cncMetaDataBuffer);

void ErrorConsumer(int count, long firstTimestamp, long lastTimestamp, string ex)
=> FormatError(writer, count, firstTimestamp, lastTimestamp, ex);
if (ErrorLogReader.HasErrors(buffer))
{
void ErrorConsumer(int count, long firstTimestamp, long lastTimestamp, string ex)
=> FormatError(writer, count, firstTimestamp, lastTimestamp, ex);

var distinctErrorCount = ErrorLogReader.Read(buffer, ErrorConsumer);
distinctErrorCount = ErrorLogReader.Read(buffer, ErrorConsumer);
}

writer.WriteLine();
writer.WriteLine("{0} distinct errors observed.", distinctErrorCount);
Expand Down
30 changes: 30 additions & 0 deletions src/Adaptive.Aeron/ChannelUriStringBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ public class ChannelUriStringBuilder
private string _controlMode;
private string _tags;
private bool? _reliable;
private bool? _sparse;
private int? _ttl;
private int? _mtu;
private int? _termLength;
Expand Down Expand Up @@ -272,6 +273,30 @@ public ChannelUriStringBuilder Reliable(bool? isReliable)
return _reliable;
}

/// <summary>
/// Set to indicate if a term log buffer should be sparse on disk or not. Sparse saves space at the potential
/// expense of latency.
/// </summary>
/// <param name="isSparse"> true if the term buffer log is sparse on disk. </param>
/// <returns> this for a fluent API. </returns>
/// <see cref="Aeron.Context.SPARSE_PARAM_NAME"/>
public ChannelUriStringBuilder Sparse(bool? isSparse)
{
_sparse = isSparse;
return this;
}

/// <summary>
/// Get if a term log buffer should be sparse on disk or not. Sparse saves space at the potential expense of latency.
/// </summary>
/// <returns> true if the term buffer log is sparse on disk. </returns>
/// <see cref="Aeron.Context.SPARSE_PARAM_NAME"/>
public bool? Sparse()
{
return _sparse;
}


/// <summary>
/// Set the Time To Live (TTL) for a multicast datagram. Valid values are 0-255 for the number of hops the datagram
/// can progress along.
Expand Down Expand Up @@ -603,6 +628,11 @@ public string Build()
_sb.Append(Aeron.Context.RELIABLE_STREAM_PARAM_NAME).Append('=').Append(_reliable).Append('|');
}

if (null != _sparse)
{
_sb.Append(Aeron.Context.SPARSE_PARAM_NAME).Append('=').Append(_sparse).Append('|');
}

if (null != _ttl)
{
_sb.Append(Aeron.Context.TTL_PARAM_NAME).Append('=').Append(_ttl.Value).Append('|');
Expand Down
2 changes: 1 addition & 1 deletion src/Adaptive.Aeron/Publication.cs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public abstract class Publication : IDisposable
/// The offer failed due to reaching the maximum position of the stream given term buffer length times the total
/// possible number of terms.
/// <para>
/// If this happen then the publication should be closed and a new one added. To make it less likely to happen then
/// If this happens then the publication should be closed and a new one added. To make it less likely to happen then
/// increase the term buffer length.
/// </para>
/// </summary>
Expand Down
2 changes: 1 addition & 1 deletion src/Adaptive.Agrona/Adaptive.Agrona.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<TargetFrameworks>netstandard2.0;net45</TargetFrameworks>
<AllowUnsafeBlocks>true</AllowUnsafeBlocks>
<PackageId>Agrona</PackageId>
<VersionPrefix>1.10.3</VersionPrefix>
<VersionPrefix>1.10.4</VersionPrefix>
<Authors>Adaptive Financial Consulting Ltd.</Authors>
<Company>Adaptive Financial Consulting Ltd.</Company>
<Product>Agrona libraries initially included in Aeron Client</Product>
Expand Down
10 changes: 10 additions & 0 deletions src/Adaptive.Agrona/Concurrent/Errors/ErrorReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,16 @@ namespace Adaptive.Agrona.Concurrent.Errors
/// </summary>
public class ErrorLogReader
{
/// <summary>
/// Has the error buffer any recorded errors?
/// </summary>
/// <param name="buffer"> containing the <seealso cref="DistinctErrorLog"/>. </param>
/// <returns> true if there is at least one error. </returns>
public static bool HasErrors(IAtomicBuffer buffer)
{
return 0 != buffer.GetIntVolatile(DistinctErrorLog.LengthOffset);
}

/// <summary>
/// Read all the errors in a log since the creation of the log.
/// </summary>
Expand Down
2 changes: 1 addition & 1 deletion src/Adaptive.Archiver/Adaptive.Archiver.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<TargetFrameworks>netstandard2.0;net45</TargetFrameworks>
<AllowUnsafeBlocks>true</AllowUnsafeBlocks>
<PackageId>Aeron.Archiver</PackageId>
<VersionPrefix>1.10.3</VersionPrefix>
<VersionPrefix>1.10.4</VersionPrefix>
<Authors>Adaptive Financial Consulting Ltd.</Authors>
<Company>Adaptive Financial Consulting Ltd.</Company>
<Product>Archiving over the Aeron transport</Product>
Expand Down
82 changes: 67 additions & 15 deletions src/Adaptive.Archiver/AeronArchive.cs
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,8 @@ public virtual void CheckForErrorResponse()
if (controlResponsePoller.TemplateId() == ControlResponseDecoder.TEMPLATE_ID &&
controlResponsePoller.Code() == ControlResponseCode.ERROR)
{
throw new ArchiveException(controlResponsePoller.ErrorMessage(), (int) controlResponsePoller.RelevantId());
throw new ArchiveException(controlResponsePoller.ErrorMessage(),
(int) controlResponsePoller.RelevantId());
}
}
}
Expand Down Expand Up @@ -1066,6 +1067,16 @@ public class Configuration
/// </summary>
public const int RECORDING_EVENTS_STREAM_ID_DEFAULT = 30;

/// <summary>
/// Sparse term buffer indicator for control streams.
/// </summary>
private const string CONTROL_TERM_BUFFER_SPARSE_PARAM_NAME = "aeron.archive.control.term.buffer.sparse";

/// <summary>
/// Overrides driver's sparse term buffer indicator for control streams.
/// </summary>
private const bool CONTROL_TERM_BUFFER_SPARSE_DEFAULT = true;

/// <summary>
/// Term length for control streams.
/// </summary>
Expand All @@ -1077,12 +1088,12 @@ public class Configuration
internal const int CONTROL_TERM_BUFFER_LENGTH_DEFAULT = 64 * 1024;

/// <summary>
/// Term length for control streams.
/// MTU length for control streams.
/// </summary>
internal const string CONTROL_MTU_LENGTH_PARAM_NAME = "aeron.archive.control.mtu.length";

/// <summary>
/// MTU to reflect default control term length.
/// MTU to reflect default for the control streams.
/// </summary>
internal const int CONTROL_MTU_LENGTH_DEFAULT = 4 * 1024;

Expand All @@ -1096,11 +1107,22 @@ public static long MessageTimeoutNs()
return Config.GetDurationInNanos(MESSAGE_TIMEOUT_PROP_NAME, MESSAGE_TIMEOUT_DEFAULT_NS);
}

/// <summary>
/// Should term buffer files be sparse for control request and response streams.
/// </summary>
/// <returns> true if term buffer files should be sparse for control request and response streams. </returns>
/// <seealso cref="CONTROL_TERM_BUFFER_SPARSE_PARAM_NAME"/>
public static bool ControlTermBufferSparse()
{
string propValue = Config.GetProperty(CONTROL_TERM_BUFFER_SPARSE_PARAM_NAME);
return null != propValue ? "true".Equals(propValue) : CONTROL_TERM_BUFFER_SPARSE_DEFAULT;
}

/// <summary>
/// Term buffer length to be used for control request and response streams.
/// </summary>
/// <returns> term buffer length to be used for control request and response streams. </returns>
/// <seealso cref= #CONTROL_TERM_BUFFER_LENGTH_PARAM_NAME </seealso>
/// <seealso cref="CONTROL_TERM_BUFFER_LENGTH_PARAM_NAME"></seealso>
public static int ControlTermBufferLength()
{
return Config.GetSizeAsInt(CONTROL_TERM_BUFFER_LENGTH_PARAM_NAME, CONTROL_TERM_BUFFER_LENGTH_DEFAULT);
Expand Down Expand Up @@ -1217,6 +1239,7 @@ public class Context : IDisposable
internal int controlRequestStreamId = Configuration.ControlStreamId();
internal string controlResponseChannel = Configuration.ControlResponseChannel();
internal int controlResponseStreamId = Configuration.ControlResponseStreamId();
internal bool controlTermBufferSparse = Configuration.ControlTermBufferSparse();
internal int controlTermBufferLength = Configuration.ControlTermBufferLength();
internal int controlMtuLength = Configuration.ControlMtuLength();

Expand Down Expand Up @@ -1260,6 +1283,7 @@ public void Conclude()
ChannelUri uri = ChannelUri.Parse(controlRequestChannel);
uri.Put(Aeron.Aeron.Context.TERM_LENGTH_PARAM_NAME, Convert.ToString(controlTermBufferLength));
uri.Put(Aeron.Aeron.Context.MTU_LENGTH_PARAM_NAME, Convert.ToString(controlMtuLength));
uri.Put(Aeron.Aeron.Context.SPARSE_PARAM_NAME, Convert.ToString(controlTermBufferSparse));
controlRequestChannel = uri.ToString();
}

Expand Down Expand Up @@ -1418,9 +1442,31 @@ public int ControlResponseStreamId()
{
return controlResponseStreamId;
}

/// <summary>
/// Should the control streams use sparse file term buffers.
/// </summary>
/// <param name="controlTermBufferSparse"> for the control stream. </param>
/// <returns> this for a fluent API. </returns>
/// <seealso cref="Configuration.CONTROL_TERM_BUFFER_SPARSE_PARAM_NAME"></seealso>
public Context ControlTermBufferSparse(bool controlTermBufferSparse)
{
this.controlTermBufferSparse = controlTermBufferSparse;
return this;
}

/// <summary>
/// Should the control streams use sparse file term buffers.
/// </summary>
/// <returns> true if the control stream should use sparse file term buffers. </returns>
/// <seealso cref="Configuration.CONTROL_TERM_BUFFER_SPARSE_PARAM_NAME"></seealso>
public bool ControlTermBufferSparse()
{
return controlTermBufferSparse;
}

/// <summary>
/// Set the term buffer length for the control stream.
/// Set the term buffer length for the control streams.
/// </summary>
/// <param name="controlTermBufferLength"> for the control stream. </param>
/// <returns> this for a fluent API. </returns>
Expand All @@ -1432,19 +1478,19 @@ public Context ControlTermBufferLength(int controlTermBufferLength)
}

/// <summary>
/// Get the term buffer length for the control steam.
/// Get the term buffer length for the control streams.
/// </summary>
/// <returns> the term buffer length for the control steam. </returns>
/// <returns> the term buffer length for the control streams. </returns>
/// <seealso cref= Configuration#CONTROL_TERM_BUFFER_LENGTH_PARAM_NAME </seealso>
public int ControlTermBufferLength()
{
return controlTermBufferLength;
}

/// <summary>
/// Set the MTU length for the control stream.
/// Set the MTU length for the control streams.
/// </summary>
/// <param name="controlMtuLength"> for the control stream. </param>
/// <param name="controlMtuLength"> for the control streams. </param>
/// <returns> this for a fluent API. </returns>
/// <seealso cref= Configuration#CONTROL_MTU_LENGTH_PARAM_NAME </seealso>
public Context ControlMtuLength(int controlMtuLength)
Expand All @@ -1454,9 +1500,9 @@ public Context ControlMtuLength(int controlMtuLength)
}

/// <summary>
/// Get the MTU length for the control steam.
/// Get the MTU length for the control steams.
/// </summary>
/// <returns> the MTU length for the control steam. </returns>
/// <returns> the MTU length for the control steams. </returns>
/// <seealso cref= Configuration#CONTROL_MTU_LENGTH_PARAM_NAME </seealso>
public int ControlMtuLength()
{
Expand Down Expand Up @@ -1648,7 +1694,8 @@ public AeronArchive Poll()

if (2 == step)
{
if (!archiveProxy.TryConnect(ctx.ControlResponseChannel(), ctx.ControlResponseStreamId(), connectCorrelationId))
if (!archiveProxy.TryConnect(ctx.ControlResponseChannel(), ctx.ControlResponseStreamId(),
connectCorrelationId))
{
return null;
}
Expand All @@ -1667,22 +1714,27 @@ public AeronArchive Poll()
}

controlResponsePoller.Poll();
if (controlResponsePoller.IsPollComplete() && controlResponsePoller.CorrelationId() == connectCorrelationId && controlResponsePoller.TemplateId() == ControlResponseDecoder.TEMPLATE_ID)
if (controlResponsePoller.IsPollComplete() &&
controlResponsePoller.CorrelationId() == connectCorrelationId &&
controlResponsePoller.TemplateId() == ControlResponseDecoder.TEMPLATE_ID)
{
ControlResponseCode code = controlResponsePoller.Code();
if (code != ControlResponseCode.OK)
{
if (code == ControlResponseCode.ERROR)
{
throw new ArchiveException("error: " + controlResponsePoller.ErrorMessage(), (int) controlResponsePoller.RelevantId());
throw new ArchiveException("error: " + controlResponsePoller.ErrorMessage(),
(int) controlResponsePoller.RelevantId());
}

throw new ArchiveException("unexpected response: code=" + code);
}

long controlSessionId = controlResponsePoller.ControlSessionId();
Subscription subscription = controlResponsePoller.Subscription();
return new AeronArchive(ctx, controlResponsePoller, archiveProxy, new RecordingDescriptorPoller(subscription, FRAGMENT_LIMIT, controlSessionId), controlSessionId);
return new AeronArchive(ctx, controlResponsePoller, archiveProxy,
new RecordingDescriptorPoller(subscription, FRAGMENT_LIMIT, controlSessionId),
controlSessionId);
}

return null;
Expand Down
2 changes: 1 addition & 1 deletion src/Adaptive.Cluster/Adaptive.Cluster.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<TargetFrameworks>netstandard2.0;net45</TargetFrameworks>
<AllowUnsafeBlocks>true</AllowUnsafeBlocks>
<PackageId>Aeron.Cluster</PackageId>
<VersionPrefix>1.10.3</VersionPrefix>
<VersionPrefix>1.10.4</VersionPrefix>
<Authors>Adaptive Financial Consulting Ltd.</Authors>
<Company>Adaptive Financial Consulting Ltd.</Company>
<Product>Clustering libraries over the Aeron transport</Product>
Expand Down

0 comments on commit 7a87b75

Please sign in to comment.