Skip to content

Commit

Permalink
Port to 1.9.2
Browse files Browse the repository at this point in the history
  • Loading branch information
JPWatson committed May 8, 2018
1 parent 8354553 commit 290c0ad
Show file tree
Hide file tree
Showing 14 changed files with 185 additions and 140 deletions.
2 changes: 1 addition & 1 deletion driver/Aeron.Driver.nuspec
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
<package >
<metadata>
<id>Aeron.Driver</id>
<version>1.9.1</version>
<version>1.9.2</version>
<title>Aeron Driver</title>
<authors>Adaptive Financial Consulting Ltd.</authors>
<owners>Adaptive Financial Consulting Ltd.</owners>
Expand Down
Binary file modified driver/media-driver.jar
Binary file not shown.
2 changes: 1 addition & 1 deletion driver/version.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
Driver source:
http://repo1.maven.org/maven2/io/aeron/aeron-all/1.9.1/aeron-all-1.9.1.jar
http://repo1.maven.org/maven2/io/aeron/aeron-all/1.9.2/aeron-all-1.9.2.jar
3 changes: 3 additions & 0 deletions scripts/build-release-nuget-packages.bat
Original file line number Diff line number Diff line change
@@ -1,17 +1,20 @@
@echo off
pushd %~dp0..
SET nuget_source=https://api.nuget.org/v3/index.json
SET myget_source=https://www.myget.org/F/aeron/api/v2/package

del nupkgs\*.nupkg

call dotnet pack src\Adaptive.Aeron\Adaptive.Aeron.csproj -c Release --output ..\..\nupkgs
call dotnet pack src\Adaptive.Agrona\Adaptive.Agrona.csproj -c Release --output ..\..\nupkgs
call dotnet pack src\Adaptive.Cluster\Adaptive.Cluster.csproj -c Release --output ..\..\nupkgs
call dotnet pack src\Adaptive.Archiver\Adaptive.Archiver.csproj -c Release --output ..\..\nupkgs
call .\scripts\nuget pack .\driver\Aeron.Driver.nuspec -OutputDirectory nupkgs

call dotnet nuget push nupkgs\Agrona.*.nupkg -s %nuget_source%
call dotnet nuget push nupkgs\Aeron.Client.*.nupkg -s %nuget_source%
call dotnet nuget push nupkgs\Aeron.Archiver.*.nupkg -s %nuget_source%
call dotnet nuget push nupkgs\Aeron.Driver.*.nupkg -s %nuget_source%
call dotnet nuget push nupkgs\Aeron.Cluster.*.nupkg -s %myget_source%

popd
51 changes: 38 additions & 13 deletions src/Adaptive.Aeron/Aeron.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1147,29 +1147,32 @@ public void Dispose()
private void ConnectToDriver()
{
long startTimeMs = _epochClock.Time();
long deadLineMs = startTimeMs + DriverTimeoutMs();
FileInfo cncFile = CncFile();

while (true)
while (null == _toDriverBuffer)
{
cncFile.Refresh();

while (!cncFile.Exists || cncFile.Length <= 0)
{
if (_epochClock.Time() > (startTimeMs + _driverTimeoutMs))
if (_epochClock.Time() > deadLineMs)
{
throw new DriverTimeoutException("CnC file not created: " + cncFile.FullName);
}

Sleep(16);
Sleep(IdleSleepMs);

cncFile.Refresh();
}

_cncByteBuffer = IoUtil.MapExistingFile(CncFile(), CncFileDescriptor.CNC_FILE);
_cncByteBuffer = WaitForFileMapping(cncFile, deadLineMs, _epochClock);
_cncMetaDataBuffer = CncFileDescriptor.CreateMetaDataBuffer(_cncByteBuffer);

int cncVersion;
while (0 == (cncVersion = _cncMetaDataBuffer.GetIntVolatile(CncFileDescriptor.CncVersionOffset(0))))
{
if (_epochClock.Time() > (startTimeMs + DriverTimeoutMs()))
if (_epochClock.Time() > deadLineMs)
{
throw new DriverTimeoutException("CnC file is created but not initialised.");
}
Expand All @@ -1188,7 +1191,7 @@ private void ConnectToDriver()

while (0 == ringBuffer.ConsumerHeartbeatTime())
{
if (_epochClock.Time() > (startTimeMs + DriverTimeoutMs()))
if (_epochClock.Time() > deadLineMs)
{
throw new DriverTimeoutException("No driver heartbeat detected.");
}
Expand All @@ -1199,7 +1202,7 @@ private void ConnectToDriver()
long timeMs = _epochClock.Time();
if (ringBuffer.ConsumerHeartbeatTime() < (timeMs - DriverTimeoutMs()))
{
if (timeMs > (startTimeMs + DriverTimeoutMs()))
if (timeMs > deadLineMs)
{
throw new DriverTimeoutException("No driver heartbeat detected.");
}
Expand All @@ -1212,12 +1215,34 @@ private void ConnectToDriver()
continue;
}

if (null == _toDriverBuffer)
_toDriverBuffer = ringBuffer;
}
}

private static MappedByteBuffer WaitForFileMapping(FileInfo cncFile, long deadLineMs, IEpochClock epochClock)
{
try
{
var fileAccess = FileAccess.ReadWrite;
var fileShare = FileShare.ReadWrite | FileShare.Delete;

var fileStream = cncFile.Open(FileMode.Open, fileAccess, fileShare);

while (fileStream.Length < CncFileDescriptor.CNC_VERSION_FIELD_OFFSET + BitUtil.SIZE_OF_INT)
{
_toDriverBuffer = ringBuffer;
if (epochClock.Time() > deadLineMs)
{
throw new InvalidOperationException("CnC file is created but not populated.");
}

Sleep(IdleSleepMs);
}

break;
return IoUtil.MapExistingFile(fileStream);
}
catch (Exception ex)
{
throw new InvalidOperationException("cannot open CnC file", ex);
}
}

Expand All @@ -1235,7 +1260,7 @@ public MappedByteBuffer MapExistingCncFile(Action<string> logProgress)
{
FileInfo cncFile = new FileInfo(Path.Combine(_aeronDirectory.FullName, CncFileDescriptor.CNC_FILE));

if (cncFile.Exists)
if (cncFile.Exists && cncFile.Length > 0)
{
if (null != logProgress)
{
Expand All @@ -1259,7 +1284,7 @@ public static bool IsDriverActive(DirectoryInfo directory, long driverTimeoutMs,
{
FileInfo cncFile = new FileInfo(Path.Combine(directory.FullName, CncFileDescriptor.CNC_FILE));

if (cncFile.Exists)
if (cncFile.Exists && cncFile.Length > 0)
{
logger("INFO: Aeron CnC file " + cncFile + " exists");

Expand Down Expand Up @@ -1424,4 +1449,4 @@ internal static void Sleep(int durationMs)
}
}
}
}
}
10 changes: 10 additions & 0 deletions src/Adaptive.Aeron/Command/ControlProtocolEvents.cs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,16 @@ public class ControlProtocolEvents
/// Close indication from Client.
/// </summary>
public const int CLIENT_CLOSE = 0x0B;

/// <summary>
/// Add Destination for existing Subscription.
/// </summary>
public const int ADD_RCV_DESTINATION = 0x0C;

/// <summary>
/// Remove Destination for existing Subscription.
/// </summary>
public const int REMOVE_RCV_DESTINATION = 0x0D;

// Media Driver to Clients

Expand Down
26 changes: 22 additions & 4 deletions src/Adaptive.Agrona/IoUtil.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
*/

using System;
using System.ComponentModel;
using System.IO;
using System.IO.MemoryMappedFiles;
using Adaptive.Agrona.Util;
Expand Down Expand Up @@ -78,7 +77,7 @@ public static MappedByteBuffer MapNewFile(FileInfo cncFile, long length, bool fi
}


public static MappedByteBuffer MapNewOrExixtingFile(FileInfo cncFile, long length)
public static MappedByteBuffer MapNewOrExistingFile(FileInfo cncFile, long length)
{
var fileAccess = FileAccess.ReadWrite;
var fileShare = FileShare.ReadWrite | FileShare.Delete;
Expand Down Expand Up @@ -111,11 +110,15 @@ public static MemoryMappedFile OpenMemoryMappedFile(string path)

var fileAccess = FileAccess.ReadWrite;
var fileShare = FileShare.ReadWrite | FileShare.Delete;
var memoryMappedFileAccess = MemoryMappedFileAccess.ReadWrite;


var f = new FileStream(path, FileMode.Open, fileAccess, fileShare);

return OpenMemoryMappedFile(f);
}

private static MemoryMappedFile OpenMemoryMappedFile(FileStream f)
{
var memoryMappedFileAccess = MemoryMappedFileAccess.ReadWrite;
#if NETFULL
return MemoryMappedFile.CreateFromFile(f, null, 0, memoryMappedFileAccess, new MemoryMappedFileSecurity(), HandleInheritability.None, false);
#else
Expand All @@ -124,6 +127,21 @@ public static MemoryMappedFile OpenMemoryMappedFile(string path)
#endif
}

/// <summary>
/// Return MappedByteBuffer for entire file
/// <para>
/// The file itself will be closed, but the mapping will persist.
///
/// </para>
/// </summary>
/// <param name="fileStream"> of the file to map </param>
/// <returns> <seealso cref="MappedByteBuffer"/> for the file </returns>
public static MappedByteBuffer MapExistingFile(FileStream fileStream)
{
return new MappedByteBuffer(OpenMemoryMappedFile(fileStream));
}


/// <summary>
/// Check that file exists, open file, and return MappedByteBuffer for entire file
/// <para>
Expand Down
2 changes: 1 addition & 1 deletion src/Adaptive.Agrona/MarkFile.cs
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ public static MappedByteBuffer MapNewOrExistingCncFile(FileInfo cncFile, bool sh

try
{
cncByteBuffer = IoUtil.MapNewOrExixtingFile(cncFile, totalFileLength);
cncByteBuffer = IoUtil.MapNewOrExistingFile(cncFile, totalFileLength);

UnsafeBuffer cncBuffer = new UnsafeBuffer(cncByteBuffer);

Expand Down
7 changes: 6 additions & 1 deletion src/Adaptive.Archiver/AeronArchive.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@ public class AeronArchive : IDisposable
/// Represents a position that has not been set. Can be used when the position is not known.
/// </summary>
public const long NULL_POSITION = -1L;

/// <summary>
/// Represents a length that has not been set. If null length is provided then replay the whole recorded stream.
/// </summary>
public const long NULL_LENGTH = -1L;

private const int FRAGMENT_LIMIT = 10;

Expand Down Expand Up @@ -489,7 +494,7 @@ public void StopRecording(Publication publication)
/// </summary>
/// <param name="recordingId"> to be replayed. </param>
/// <param name="position"> from which the replay should begin or <seealso cref="#NULL_POSITION"/> if from the start. </param>
/// <param name="length"> of the stream to be replayed. Use <seealso cref="Long#MAX_VALUE"/> to follow a live recording. </param>
/// <param name="length"> of the stream to be replayed. Use <seealso cref="Long#MAX_VALUE"/> to follow a live recording or <see cref="NULL_LENGTH"/> to replay the whole stream of unknown length. </param>
/// <param name="replayChannel"> to which the replay should be sent. </param>
/// <param name="replayStreamId"> to which the replay should be sent. </param>
/// <returns> the id of the replay session which will be the same as the <seealso cref="Image#sessionId()"/> of the received
Expand Down
5 changes: 0 additions & 5 deletions src/Adaptive.Cluster/Service/BoundedLogAdapter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,6 @@ public Image Image()
return image;
}

public int UpperBoundCounterId()
{
return upperBound.CounterId();
}

public bool IsCaughtUp()
{
return image.Position() >= upperBound.Get();
Expand Down
6 changes: 2 additions & 4 deletions src/Adaptive.Cluster/Service/ClientSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -114,12 +114,10 @@ public long Offer(long correlationId, IDirectBuffer buffer, int offset, int leng

internal void Connect(Aeron.Aeron aeron)
{
if (null != _responsePublication)
if (null == _responsePublication)
{
throw new InvalidOperationException("response publication already present");
_responsePublication = aeron.AddPublication(_responseChannel, _responseStreamId);
}

_responsePublication = aeron.AddPublication(_responseChannel, _responseStreamId);
}

internal void MarkClosing()
Expand Down
Loading

0 comments on commit 290c0ad

Please sign in to comment.