Skip to content

Commit

Permalink
Port to 1.9.0
Browse files Browse the repository at this point in the history
  • Loading branch information
JPWatson committed Apr 23, 2018
1 parent 8accb02 commit a98f8e1
Show file tree
Hide file tree
Showing 47 changed files with 919 additions and 449 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ using(Publication publisher = aeron.AddPublication(channel, streamId)) {
#### Fragment Handler
A fragment handler is a delegate used for processing data that is has been received. The buffer will either contain a whole message or a fragment of a message to be reassembled.
```csharp
static void PrintMessage(UnsafeBuffer buffer, int offset, int length, Header header)
static void PrintMessage(IDirectBuffer buffer, int offset, int length, Header header)
{
var message = buffer.GetStringWithoutLengthUtf8(offset, length);
Console.WriteLine($"Message Received: '{message}'");
Expand Down
2 changes: 1 addition & 1 deletion RELEASE.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#### Port
Aeron.NET has been ported against Java version:
- Agrona: 0.9.12-30-gec52107
- Aeron: 1.8.2-196-gd0e9417c2
- Aeron: 1.8.2-257-g9abef0124
2 changes: 1 addition & 1 deletion driver/Aeron.Driver.nuspec
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
<package >
<metadata>
<id>Aeron.Driver</id>
<version>1.8.3</version>
<version>1.9.0</version>
<title>Aeron Driver</title>
<authors>Adaptive Financial Consulting Ltd.</authors>
<owners>Adaptive Financial Consulting Ltd.</owners>
Expand Down
12 changes: 0 additions & 12 deletions driver/aeron.patch

This file was deleted.

3 changes: 2 additions & 1 deletion driver/build-driver.sh
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,12 @@ git fetch -q
[ -z "$AERON_VERSION" ] && AERON_VERSION=`git describe --tags origin/master`
echo Building Aeron $AERON_VERSION
git checkout -qf $AERON_VERSION
git apply $AERON_PATCH
./gradlew -x test
cd $WD

echo "Driver built from source" > $VERSION_FILE
echo "Agrona: $AGRONA_VERSION" >> $VERSION_FILE
echo "SBE: $SBE_VERSION" >> $VERSION_FILE
echo "Aeron: $AERON_VERSION" >> $VERSION_FILE

cp $AERON_BUILD_DIR/aeron-all/build/libs/aeron-all-*-SNAPSHOT.jar $WD/media-driver.jar
Binary file modified driver/media-driver.jar
Binary file not shown.
6 changes: 2 additions & 4 deletions driver/version.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,2 @@
Driver built from source
Agrona: 0.9.15-35-g9e6c532
SBE: 1.7.9-33-gc3884639
Aeron: 1.8.2-196-gd0e9417c2
Driver source:
http://repo1.maven.org/maven2/io/aeron/aeron-all/1.9.0/aeron-all-1.9.0.jar
2 changes: 1 addition & 1 deletion scripts/build-prerelease-nuget-packages.bat
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
@echo off
pushd %~dp0..
SET suffix=pre8
SET suffix=alpha
SET nuget_source=https://www.myget.org/F/aeron/api/v2/package

del nupkgs\*.nupkg
Expand Down
4 changes: 3 additions & 1 deletion scripts/build-release-nuget-packages.bat
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@ SET nuget_source=https://api.nuget.org/v3/index.json
del nupkgs\*.nupkg

call dotnet pack src\Adaptive.Aeron\Adaptive.Aeron.csproj -c Release --output ..\..\nupkgs
call dotnet pack src\Adaptive.Agrona\Adaptive.Agrona.csproj -c Release --output ..\..\nupkgs
call dotnet pack src\Adaptive.Agrona\Adaptive.Agrona.csproj -c Release --output ..\..\nupkgs
call dotnet pack src\Adaptive.Archiver\Adaptive.Archiver.csproj -c Release --output ..\..\nupkgs
call .\scripts\nuget pack .\driver\Aeron.Driver.nuspec -OutputDirectory nupkgs

call dotnet nuget push nupkgs\Agrona.*.nupkg -s %nuget_source%
call dotnet nuget push nupkgs\Aeron.Client.*.nupkg -s %nuget_source%
call dotnet nuget push nupkgs\Aeron.Archiver.*.nupkg -s %nuget_source%
call dotnet nuget push nupkgs\Aeron.Driver.*.nupkg -s %nuget_source%

popd
2 changes: 1 addition & 1 deletion src/Adaptive.Aeron/Adaptive.Aeron.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
<PropertyGroup>
<TargetFrameworks>netstandard2.0;net45</TargetFrameworks>
<PackageId>Aeron.Client</PackageId>
<VersionPrefix>1.8.3</VersionPrefix>
<VersionPrefix>1.9.0</VersionPrefix>
<Authors>Adaptive Financial Consulting Ltd.</Authors>
<Company>Adaptive Financial Consulting Ltd.</Company>
<Product>Aeron Client</Product>
Expand Down
54 changes: 54 additions & 0 deletions src/Adaptive.Aeron/Status/HeartbeatStatus.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
using Adaptive.Agrona;
using Adaptive.Agrona.Concurrent.Status;

namespace Adaptive.Aeron.Status
{
/// <summary>
/// Allocate a counter for tracking the last heartbeat of an entity.
/// </summary>
public class HeartbeatStatus
{
/// <summary>
/// Offset in the key meta data for the registration id of the counter.
/// </summary>
public const int REGISTRATION_ID_OFFSET = 0;

/// <summary>
/// Allocate a counter for tracking the last heartbeat of an entity.
/// </summary>
/// <param name="tempBuffer"> to be used for labels and key. </param>
/// <param name="name"> of the counter for the label. </param>
/// <param name="typeId"> of the counter for classification. </param>
/// <param name="countersManager"> from which to allocated the underlying storage. </param>
/// <param name="registrationId"> to be associated with the counter. </param>
/// <returns> a new <seealso cref="AtomicCounter"/> for tracking the last heartbeat. </returns>
public static AtomicCounter Allocate(
IMutableDirectBuffer tempBuffer,
string name,
int typeId,
CountersManager countersManager,
long registrationId)
{
return new AtomicCounter(countersManager.ValuesBuffer,
AllocateCounterId(tempBuffer, name, typeId, countersManager, registrationId), countersManager);
}

public static int AllocateCounterId(
IMutableDirectBuffer tempBuffer,
string name,
int typeId,
CountersManager countersManager,
long registrationId)
{
tempBuffer.PutLong(REGISTRATION_ID_OFFSET, registrationId);
int keyLength = REGISTRATION_ID_OFFSET + BitUtil.SIZE_OF_LONG;

int labelLength = 0;
labelLength += tempBuffer.PutStringWithoutLengthAscii(keyLength + labelLength, name);
labelLength += tempBuffer.PutStringWithoutLengthAscii(keyLength + labelLength, ": ");
labelLength += tempBuffer.PutLongAscii(keyLength + labelLength, registrationId);

return countersManager.Allocate(typeId, tempBuffer, 0, keyLength, tempBuffer, keyLength, labelLength);
}
}
}
2 changes: 1 addition & 1 deletion src/Adaptive.Agrona/Adaptive.Agrona.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<TargetFrameworks>netstandard2.0;net45</TargetFrameworks>
<AllowUnsafeBlocks>true</AllowUnsafeBlocks>
<PackageId>Agrona</PackageId>
<VersionPrefix>1.8.3</VersionPrefix>
<VersionPrefix>1.9.0</VersionPrefix>
<Authors>Adaptive Financial Consulting Ltd.</Authors>
<Company>Adaptive Financial Consulting Ltd.</Company>
<Product>Agrona libraries initially included in Aeron Client</Product>
Expand Down
2 changes: 1 addition & 1 deletion src/Adaptive.Archiver/Adaptive.Archiver.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<TargetFrameworks>netstandard2.0;net45</TargetFrameworks>
<AllowUnsafeBlocks>true</AllowUnsafeBlocks>
<PackageId>Aeron.Archiver</PackageId>
<VersionPrefix>1.8.3</VersionPrefix>
<VersionPrefix>1.9.0</VersionPrefix>
<Authors>Adaptive Financial Consulting Ltd.</Authors>
<Company>Adaptive Financial Consulting Ltd.</Company>
<Product>Clustering libraries over the Aeron transport</Product>
Expand Down
10 changes: 5 additions & 5 deletions src/Adaptive.Archiver/ArchiveProxy.cs
Original file line number Diff line number Diff line change
Expand Up @@ -373,17 +373,17 @@ private bool Offer(int length)

if (result == Publication.CLOSED)
{
throw new System.InvalidOperationException("Connection to the archive has been closed");
throw new System.InvalidOperationException("connection to the archive has been closed");
}

if (result == Publication.NOT_CONNECTED)
{
throw new System.InvalidOperationException("Connection to the archive is no longer available");
throw new System.InvalidOperationException("connection to the archive is no longer available");
}

if (result == Publication.MAX_POSITION_EXCEEDED)
{
throw new System.InvalidOperationException("Publication failed due to max position being reached");
throw new System.InvalidOperationException("publication failed due to max position being reached");
}

if (--attempts <= 0)
Expand Down Expand Up @@ -415,12 +415,12 @@ private bool OfferWithTimeout(int length, AgentInvoker aeronClientInvoker)

if (result == Publication.CLOSED)
{
throw new System.InvalidOperationException("Connection to the archive has been closed");
throw new System.InvalidOperationException("connection to the archive has been closed");
}

if (result == Publication.MAX_POSITION_EXCEEDED)
{
throw new System.InvalidOperationException("Publication failed due to max position being reached");
throw new System.InvalidOperationException("publication failed due to max position being reached");
}

if (nanoClock.NanoTime() > deadlineNs)
Expand Down
2 changes: 1 addition & 1 deletion src/Adaptive.Archiver/ControlResponseAdaptor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public virtual void OnFragment(IDirectBuffer buffer, int offset, int length, Hea
break;

default:
throw new InvalidOperationException("Unknown templateId: " + templateId);
throw new InvalidOperationException("unknown templateId: " + templateId);
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/Adaptive.Archiver/ControlResponsePoller.cs
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ public virtual ControlledFragmentHandlerAction OnFragment(IDirectBuffer buffer,
break;

default:
throw new System.InvalidOperationException("Unknown templateId: " + templateId);
throw new System.InvalidOperationException("unknown templateId: " + templateId);
}

pollComplete = true;
Expand Down
2 changes: 1 addition & 1 deletion src/Adaptive.Archiver/RecordingDescriptorPoller.cs
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ public virtual ControlledFragmentHandlerAction OnFragment(IDirectBuffer buffer,
break;

default:
throw new System.InvalidOperationException("Unknown templateId: " + templateId);
throw new System.InvalidOperationException("unknown templateId: " + templateId);
}

return ControlledFragmentHandlerAction.CONTINUE;
Expand Down
70 changes: 47 additions & 23 deletions src/Adaptive.Archiver/RecordingEventsAdapter.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
using Adaptive.Aeron;
using System;
using Adaptive.Aeron;
using Adaptive.Aeron.LogBuffer;
using Adaptive.Agrona;
using Adaptive.Agrona.Concurrent;
using Adaptive.Archiver.Codecs;

namespace Adaptive.Archiver
Expand All @@ -11,14 +11,14 @@ namespace Adaptive.Archiver
/// </summary>
public class RecordingEventsAdapter : IFragmentHandler
{
private readonly MessageHeaderDecoder messageHeaderDecoder = new MessageHeaderDecoder();
private readonly RecordingStartedDecoder recordingStartedDecoder = new RecordingStartedDecoder();
private readonly RecordingProgressDecoder recordingProgressDecoder = new RecordingProgressDecoder();
private readonly RecordingStoppedDecoder recordingStoppedDecoder = new RecordingStoppedDecoder();
private readonly MessageHeaderDecoder _messageHeaderDecoder = new MessageHeaderDecoder();
private readonly RecordingStartedDecoder _recordingStartedDecoder = new RecordingStartedDecoder();
private readonly RecordingProgressDecoder _recordingProgressDecoder = new RecordingProgressDecoder();
private readonly RecordingStoppedDecoder _recordingStoppedDecoder = new RecordingStoppedDecoder();

private readonly int fragmentLimit;
private readonly IRecordingEventsListener listener;
private readonly Subscription subscription;
private readonly int _fragmentLimit;
private readonly IRecordingEventsListener _listener;
private readonly Subscription _subscription;

/// <summary>
/// Create a poller for a given subscription to an archive for recording events.
Expand All @@ -28,47 +28,71 @@ public class RecordingEventsAdapter : IFragmentHandler
/// <param name="fragmentLimit"> to apply for each polling operation. </param>
public RecordingEventsAdapter(IRecordingEventsListener listener, Subscription subscription, int fragmentLimit)
{
this.fragmentLimit = fragmentLimit;
this.listener = listener;
this.subscription = subscription;
_fragmentLimit = fragmentLimit;
_listener = listener;
_subscription = subscription;
}

/// <summary>
/// Poll for recording events and dispatch them to the <seealso cref="RecordingEventsListener"/> for this instance.
/// Poll for recording events and dispatch them to the <seealso cref="IRecordingEventsListener"/> for this instance.
/// </summary>
/// <returns> the number of fragments read during the operation. Zero if no events are available. </returns>
public virtual int Poll()
{
return subscription.Poll(this, fragmentLimit);
return _subscription.Poll(this, _fragmentLimit);
}

public virtual void OnFragment(IDirectBuffer buffer, int offset, int length, Header header)
{
messageHeaderDecoder.Wrap(buffer, offset);
_messageHeaderDecoder.Wrap(buffer, offset);

int templateId = messageHeaderDecoder.TemplateId();
int templateId = _messageHeaderDecoder.TemplateId();
switch (templateId)
{
case RecordingStartedDecoder.TEMPLATE_ID:
recordingStartedDecoder.Wrap(buffer, offset + MessageHeaderDecoder.ENCODED_LENGTH, messageHeaderDecoder.BlockLength(), messageHeaderDecoder.Version());
_recordingStartedDecoder.Wrap(
buffer,
offset + MessageHeaderDecoder.ENCODED_LENGTH,
_messageHeaderDecoder.BlockLength(),
_messageHeaderDecoder.Version());

listener.OnStart(recordingStartedDecoder.RecordingId(), recordingStartedDecoder.StartPosition(), recordingStartedDecoder.SessionId(), recordingStartedDecoder.StreamId(), recordingStartedDecoder.Channel(), recordingStartedDecoder.SourceIdentity());
_listener.OnStart(
_recordingStartedDecoder.RecordingId(),
_recordingStartedDecoder.StartPosition(),
_recordingStartedDecoder.SessionId(),
_recordingStartedDecoder.StreamId(),
_recordingStartedDecoder.Channel(),
_recordingStartedDecoder.SourceIdentity());
break;

case RecordingProgressDecoder.TEMPLATE_ID:
recordingProgressDecoder.Wrap(buffer, offset + MessageHeaderDecoder.ENCODED_LENGTH, messageHeaderDecoder.BlockLength(), messageHeaderDecoder.Version());
_recordingProgressDecoder.Wrap(
buffer,
offset + MessageHeaderDecoder.ENCODED_LENGTH,
_messageHeaderDecoder.BlockLength(),
_messageHeaderDecoder.Version());

listener.OnProgress(recordingProgressDecoder.RecordingId(), recordingProgressDecoder.StartPosition(), recordingProgressDecoder.Position());
_listener.OnProgress(
_recordingProgressDecoder.RecordingId(),
_recordingProgressDecoder.StartPosition(),
_recordingProgressDecoder.Position());
break;

case RecordingStoppedDecoder.TEMPLATE_ID:
recordingStoppedDecoder.Wrap(buffer, offset + MessageHeaderDecoder.ENCODED_LENGTH, messageHeaderDecoder.BlockLength(), messageHeaderDecoder.Version());
_recordingStoppedDecoder.Wrap(
buffer,
offset + MessageHeaderDecoder.ENCODED_LENGTH,
_messageHeaderDecoder.BlockLength(),
_messageHeaderDecoder.Version());

listener.OnStop(recordingStoppedDecoder.RecordingId(), recordingStoppedDecoder.StartPosition(), recordingStoppedDecoder.StopPosition());
_listener.OnStop(
_recordingStoppedDecoder.RecordingId(),
_recordingStoppedDecoder.StartPosition(),
_recordingStoppedDecoder.StopPosition());
break;

default:
throw new System.InvalidOperationException("Unknown templateId: " + templateId);
throw new InvalidOperationException("unknown templateId: " + templateId);
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/Adaptive.Archiver/RecordingEventsPoller.cs
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ public virtual void OnFragment(IDirectBuffer buffer, int offset, int length, Hea
break;

default:
throw new System.InvalidOperationException("Unknown templateId: " + templateId);
throw new System.InvalidOperationException("unknown templateId: " + templateId);
}

pollComplete = true;
Expand Down
2 changes: 1 addition & 1 deletion src/Adaptive.Cluster/Adaptive.Cluster.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<TargetFrameworks>netstandard2.0;net45</TargetFrameworks>
<AllowUnsafeBlocks>true</AllowUnsafeBlocks>
<PackageId>Aeron.Cluster</PackageId>
<VersionPrefix>1.8.3</VersionPrefix>
<VersionPrefix>1.9.0</VersionPrefix>
<Authors>Adaptive Financial Consulting Ltd.</Authors>
<Company>Adaptive Financial Consulting Ltd.</Company>
<Product>Clustering libraries over the Aeron transport</Product>
Expand Down
Loading

0 comments on commit a98f8e1

Please sign in to comment.