Skip to content

Commit

Permalink
update to 1.4.0
Browse files Browse the repository at this point in the history
  • Loading branch information
JPWatson committed Aug 23, 2017
1 parent 6475b35 commit e834ded
Show file tree
Hide file tree
Showing 63 changed files with 2,606 additions and 945 deletions.
4 changes: 2 additions & 2 deletions RELEASE.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#### Port
Aeron.NET has been ported against Java version:
- Agrona: v0.9.6 (b6904e5)
- Aeron: v1.3.0 (baa52e0)
- Agrona: v0.9.7 (e3408d6)
- Aeron: v1.4.0 (cc48906)
Binary file modified driver/media-driver.jar
Binary file not shown.
4 changes: 2 additions & 2 deletions driver/start-media-driver.bat
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
@echo off
echo Starting Media Driver...
echo Media Driver Started...
java -cp media-driver.jar ^
io.aeron.driver.MediaDriver -Daeron.threading.mode=SHARED
io.aeron.driver.MediaDriver ipc.properties
echo Media Driver Stopped.
pause
20 changes: 10 additions & 10 deletions src/Adaptive.Aeron.Tests/BufferBuilderTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,15 @@ public void Setup()
[Test]
public void ShouldInitialiseToDefaultValues()
{
Assert.That(_bufferBuilder.Capacity(), Is.EqualTo(BufferBuilder.INITIAL_CAPACITY));
Assert.That(_bufferBuilder.Buffer().Capacity, Is.EqualTo(BufferBuilder.INITIAL_CAPACITY));
Assert.That(_bufferBuilder.Capacity(), Is.EqualTo(BufferBuilder.MIN_ALLOCATED_CAPACITY));
Assert.That(_bufferBuilder.Buffer().Capacity, Is.EqualTo(BufferBuilder.MIN_ALLOCATED_CAPACITY));
Assert.That(_bufferBuilder.Limit(), Is.EqualTo(0));
}

[Test]
public void ShouldAppendNothingForZeroLength()
{
UnsafeBuffer srcBuffer = new UnsafeBuffer(new byte[BufferBuilder.INITIAL_CAPACITY]);
UnsafeBuffer srcBuffer = new UnsafeBuffer(new byte[BufferBuilder.MIN_ALLOCATED_CAPACITY]);

_bufferBuilder.Append(srcBuffer, 0, 0);

Expand All @@ -52,7 +52,7 @@ public void ShouldAppendNothingForZeroLength()
[Test]
public void ShouldGrowToMultipleOfInitialCapaity()
{
int srcCapacity = BufferBuilder.INITIAL_CAPACITY * 5;
int srcCapacity = BufferBuilder.MIN_ALLOCATED_CAPACITY * 5;
UnsafeBuffer srcBuffer = new UnsafeBuffer(new byte[srcCapacity]);

_bufferBuilder.Append(srcBuffer, 0, srcBuffer.Capacity);
Expand All @@ -64,7 +64,7 @@ public void ShouldGrowToMultipleOfInitialCapaity()
[Test]
public void ShouldAppendThenReset()
{
UnsafeBuffer srcBuffer = new UnsafeBuffer(new byte[BufferBuilder.INITIAL_CAPACITY]);
UnsafeBuffer srcBuffer = new UnsafeBuffer(new byte[BufferBuilder.MIN_ALLOCATED_CAPACITY]);

_bufferBuilder.Append(srcBuffer, 0, srcBuffer.Capacity);

Expand All @@ -78,7 +78,7 @@ public void ShouldAppendThenReset()
[Test]
public void ShouldAppendOneBufferWithoutResizing()
{
var srcBuffer = new UnsafeBuffer(new byte[BufferBuilder.INITIAL_CAPACITY]);
var srcBuffer = new UnsafeBuffer(new byte[BufferBuilder.MIN_ALLOCATED_CAPACITY]);
var bytes = Encoding.UTF8.GetBytes("Hello World");
srcBuffer.PutBytes(0, bytes, 0, bytes.Length);

Expand All @@ -88,14 +88,14 @@ public void ShouldAppendOneBufferWithoutResizing()
_bufferBuilder.Buffer().GetBytes(0, temp, 0, bytes.Length);

Assert.That(_bufferBuilder.Limit(), Is.EqualTo(bytes.Length));
Assert.That(_bufferBuilder.Capacity(), Is.EqualTo(BufferBuilder.INITIAL_CAPACITY));
Assert.That(_bufferBuilder.Capacity(), Is.EqualTo(BufferBuilder.MIN_ALLOCATED_CAPACITY));
Assert.That(temp, Is.EqualTo(bytes));
}

[Test]
public void ShouldAppendTwoBuffersWithoutResizing()
{
UnsafeBuffer srcBuffer = new UnsafeBuffer(new byte[BufferBuilder.INITIAL_CAPACITY]);
UnsafeBuffer srcBuffer = new UnsafeBuffer(new byte[BufferBuilder.MIN_ALLOCATED_CAPACITY]);
byte[] bytes = Encoding.UTF8.GetBytes("1111111122222222");
srcBuffer.PutBytes(0, bytes, 0, bytes.Length);

Expand All @@ -106,7 +106,7 @@ public void ShouldAppendTwoBuffersWithoutResizing()
_bufferBuilder.Buffer().GetBytes(0, temp, 0, bytes.Length);

Assert.That(_bufferBuilder.Limit(), Is.EqualTo(bytes.Length));
Assert.That(_bufferBuilder.Capacity(), Is.EqualTo(BufferBuilder.INITIAL_CAPACITY));
Assert.That(_bufferBuilder.Capacity(), Is.EqualTo(BufferBuilder.MIN_ALLOCATED_CAPACITY));
Assert.That(temp, Is.EqualTo(bytes));
}

Expand Down Expand Up @@ -176,7 +176,7 @@ public void ShouldAppendTwoBuffersAndResize()
[Test]
public void ShouldCompactBufferToLowerLimit()
{
int bufferLength = BufferBuilder.INITIAL_CAPACITY / 2;
int bufferLength = BufferBuilder.MIN_ALLOCATED_CAPACITY / 2;
byte[] buffer = new byte[bufferLength];
UnsafeBuffer srcBuffer = new UnsafeBuffer(buffer);

Expand Down
50 changes: 37 additions & 13 deletions src/Adaptive.Aeron.Tests/ClientConductorTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ public class ClientConductorTest
private const long INTER_SERVICE_TIMEOUT_MS = 1000;
private const long PUBLICATION_CONNECTION_TIMEOUT_MS = 5000;

private const int SUBSCRIBER_POSITION_ID = 2;
private const int SUBSCRIBER_POSITION_REGISTRATION_ID = 4001;

private const string SOURCE_INFO = "127.0.0.1:40789";

private PublicationBuffersReadyFlyweight PublicationReady;
Expand All @@ -77,7 +80,6 @@ public class ClientConductorTest
private UnavailableImageHandler MockUnavailableImageHandler;
private ILogBuffersFactory LogBuffersFactory;
private ILock mockClientLock = A.Fake<ILock>();
private Dictionary<long, long> SubscriberPositionMap;
private bool SuppressPrintError = false;


Expand Down Expand Up @@ -107,9 +109,7 @@ public void SetUp()
MockUnavailableImageHandler = A.Fake<UnavailableImageHandler>();

LogBuffersFactory = A.Fake<ILogBuffersFactory>();

SubscriberPositionMap = new Dictionary<long, long>(); // should return -1 when element does not exist


DriverProxy = A.Fake<DriverProxy>();

A.CallTo(() => mockClientLock.TryLock()).Returns(true);
Expand Down Expand Up @@ -145,12 +145,11 @@ public void SetUp()
ErrorResponse.Wrap(ErrorMessageBuffer, 0);

PublicationReady.CorrelationId(CORRELATION_ID);
PublicationReady.RegistrationId(CORRELATION_ID);
PublicationReady.SessionId(SESSION_ID_1);
PublicationReady.StreamId(STREAM_ID_1);
PublicationReady.LogFileName(SESSION_ID_1 + "-log");

SubscriberPositionMap.Add(CORRELATION_ID, 0);


CorrelatedMessage.CorrelationId(CLOSE_CORRELATION_ID);

var termBuffersSession1 = new UnsafeBuffer[LogBufferDescriptor.PARTITION_COUNT];
Expand Down Expand Up @@ -325,6 +324,7 @@ public void ClosingPublicationDoesNotRemoveOtherPublications()
PublicationReady.SessionId(SESSION_ID_2);
PublicationReady.LogFileName(SESSION_ID_2 + "-log");
PublicationReady.CorrelationId(CORRELATION_ID_2);
PublicationReady.RegistrationId(CORRELATION_ID_2);
return PublicationReady.Length();
});

Expand All @@ -344,6 +344,7 @@ public void ShouldNotMapBuffersForUnknownCorrelationId()
WhenReceiveBroadcastOnMessage(ControlProtocolEvents.ON_PUBLICATION_READY, PublicationReadyBuffer, buffer =>
{
PublicationReady.CorrelationId(UNKNOWN_CORRELATION_ID);
PublicationReady.RegistrationId(UNKNOWN_CORRELATION_ID);
return PublicationReady.Length();
});

Expand Down Expand Up @@ -432,9 +433,16 @@ public void ClientNotifiedOfNewImageShouldMapLogFile()
return CorrelatedMessageFlyweight.LENGTH;
});

Conductor.AddSubscription(CHANNEL, STREAM_ID_1);
Subscription subscription = Conductor.AddSubscription(CHANNEL, STREAM_ID_1);

Conductor.OnAvailableImage(STREAM_ID_1, SESSION_ID_1, SubscriberPositionMap, SESSION_ID_1 + "-log", SOURCE_INFO, CORRELATION_ID);
Conductor.OnAvailableImage(
CORRELATION_ID,
STREAM_ID_1,
SESSION_ID_1,
subscription.RegistrationId,
SUBSCRIBER_POSITION_ID,
SESSION_ID_1 + "-log",
SOURCE_INFO);

A.CallTo(() => LogBuffersFactory.Map(SESSION_ID_1 + "-log", A<MapMode>._)).MustHaveHappened();
}
Expand All @@ -450,13 +458,20 @@ public void ClientNotifiedOfNewAndInactiveImages()

var subscription = Conductor.AddSubscription(CHANNEL, STREAM_ID_1);

Conductor.OnAvailableImage(STREAM_ID_1, SESSION_ID_1, SubscriberPositionMap, SESSION_ID_1 + "-log", SOURCE_INFO, CORRELATION_ID);
Conductor.OnAvailableImage(
CORRELATION_ID,
STREAM_ID_1,
SESSION_ID_1,
subscription.RegistrationId,
SUBSCRIBER_POSITION_ID,
SESSION_ID_1 + "-log",
SOURCE_INFO);

Assert.False(subscription.HasNoImages());

A.CallTo(() => MockAvailableImageHandler(A<Image>._)).MustHaveHappened();

Conductor.OnUnavailableImage(STREAM_ID_1, CORRELATION_ID);
Conductor.OnUnavailableImage(CORRELATION_ID, STREAM_ID_1);

A.CallTo(() => MockUnavailableImageHandler(A<Image>._)).MustHaveHappened();

Expand All @@ -466,7 +481,14 @@ public void ClientNotifiedOfNewAndInactiveImages()
[Test]
public void ShouldIgnoreUnknownNewImage()
{
Conductor.OnAvailableImage(STREAM_ID_2, SESSION_ID_2, SubscriberPositionMap, SESSION_ID_2 + "-log", SOURCE_INFO, CORRELATION_ID_2);
Conductor.OnAvailableImage(
CORRELATION_ID_2,
STREAM_ID_2,
SESSION_ID_2,
SUBSCRIBER_POSITION_REGISTRATION_ID,
SUBSCRIBER_POSITION_ID,
SESSION_ID_2 + "-log",
SOURCE_INFO);

A.CallTo(() => LogBuffersFactory.Map(A<string>._, A<MapMode>._)).MustNotHaveHappened();
A.CallTo(() => MockAvailableImageHandler(A<Image>._)).MustNotHaveHappened();
Expand All @@ -475,7 +497,7 @@ public void ShouldIgnoreUnknownNewImage()
[Test]
public void ShouldIgnoreUnknownInactiveImage()
{
Conductor.OnUnavailableImage(STREAM_ID_2, CORRELATION_ID_2);
Conductor.OnUnavailableImage(CORRELATION_ID_2, STREAM_ID_2);

A.CallTo(() => LogBuffersFactory.Map(A<string>._, A<MapMode>._)).MustNotHaveHappened();
A.CallTo(() => MockAvailableImageHandler(A<Image>._)).MustNotHaveHappened();
Expand All @@ -493,6 +515,8 @@ public void ShouldTimeoutInterServiceIfTooLongBetweenDoWorkCalls()
Conductor.DoWork();

A.CallTo(() => MockClientErrorHandler(A<ConductorServiceTimeoutException>._)).MustHaveHappened();

Assert.True(Conductor.IsClosed());
}

private void WhenReceiveBroadcastOnMessage(int msgTypeId, IMutableDirectBuffer buffer, Func<IMutableDirectBuffer, int> filler)
Expand Down
3 changes: 2 additions & 1 deletion src/Adaptive.Aeron.Tests/DriverProxyTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,14 @@ public class DriverProxyTest
public void Setup()
{
conductorBuffer = new ManyToOneRingBuffer(new UnsafeBuffer(new byte[RingBufferDescriptor.TrailerLength + 1024]));
conductor = new DriverProxy(conductorBuffer);
conductor = new DriverProxy(conductorBuffer, CLIENT_ID);
}

public const string CHANNEL = "aeron:udp?interface=localhost:40123|endpoint=localhost:40124";

private const int STREAM_ID = 1;
private const long CORRELATION_ID = 3;
private const long CLIENT_ID = 7;
private IRingBuffer conductorBuffer;
private DriverProxy conductor;

Expand Down
Loading

0 comments on commit e834ded

Please sign in to comment.