diff --git a/RELEASE.md b/RELEASE.md index b15f5cf3..10f91775 100644 --- a/RELEASE.md +++ b/RELEASE.md @@ -1,4 +1,4 @@ #### Port Aeron.NET has been ported against Java version: - Agrona: 0.9.18-13-g0378ffa -- Aeron: 1.10.1 +- Aeron: 1.10.2 diff --git a/driver/Aeron.Driver.nuspec b/driver/Aeron.Driver.nuspec index 266cab04..6f81462e 100644 --- a/driver/Aeron.Driver.nuspec +++ b/driver/Aeron.Driver.nuspec @@ -2,7 +2,7 @@ Aeron.Driver - 1.10.1 + 1.10.2 Aeron Driver Adaptive Financial Consulting Ltd. Adaptive Financial Consulting Ltd. diff --git a/driver/media-driver.jar b/driver/media-driver.jar index 1b688470..b890a97b 100644 Binary files a/driver/media-driver.jar and b/driver/media-driver.jar differ diff --git a/driver/version.txt b/driver/version.txt index 737e11f9..4f98ce74 100644 --- a/driver/version.txt +++ b/driver/version.txt @@ -1,2 +1,2 @@ Driver source: -http://repo1.maven.org/maven2/io/aeron/aeron-all/1.10.1/aeron-all-1.10.1.jar +http://repo1.maven.org/maven2/io/aeron/aeron-all/1.10.2/aeron-all-1.10.2.jar diff --git a/scripts/build-prerelease-nuget-packages - Copy.bat b/scripts/build-prerelease-nuget-packages - Copy.bat new file mode 100644 index 00000000..d6f17ae9 --- /dev/null +++ b/scripts/build-prerelease-nuget-packages - Copy.bat @@ -0,0 +1,19 @@ +@echo off +pushd %~dp0.. +SET nuget_source=https://www.myget.org/F/aeron/api/v2/package + +del nupkgs\*.nupkg + +call dotnet pack src\Adaptive.Aeron\Adaptive.Aeron.csproj -c Release --output ..\..\nupkgs +call dotnet pack src\Adaptive.Agrona\Adaptive.Agrona.csproj -c Release --output ..\..\nupkgs +call dotnet pack src\Adaptive.Cluster\Adaptive.Cluster.csproj -c Release --output ..\..\nupkgs +call dotnet pack src\Adaptive.Archiver\Adaptive.Archiver.csproj -c Release --output ..\..\nupkgs +call .\scripts\nuget pack .\driver\Aeron.Driver.nuspec -OutputDirectory nupkgs + +call dotnet nuget push nupkgs\Agrona.*.nupkg -s %nuget_source% +call dotnet nuget push nupkgs\Aeron.Client.*.nupkg -s %nuget_source% +call dotnet nuget push nupkgs\Aeron.Driver.*.nupkg -s %nuget_source% +call dotnet nuget push nupkgs\Aeron.Cluster.*.nupkg -s %nuget_source% +call dotnet nuget push nupkgs\Aeron.Archiver.*.nupkg -s %nuget_source% + +popd \ No newline at end of file diff --git a/src/Adaptive.Aeron/Adaptive.Aeron.csproj b/src/Adaptive.Aeron/Adaptive.Aeron.csproj index c7e75b84..6ef5f470 100644 --- a/src/Adaptive.Aeron/Adaptive.Aeron.csproj +++ b/src/Adaptive.Aeron/Adaptive.Aeron.csproj @@ -2,7 +2,7 @@ netstandard2.0;net45 Aeron.Client - 1.10.1 + 1.10.2 Adaptive Financial Consulting Ltd. Adaptive Financial Consulting Ltd. Aeron Client diff --git a/src/Adaptive.Aeron/Aeron.cs b/src/Adaptive.Aeron/Aeron.cs index 8e750315..cc19bf0f 100644 --- a/src/Adaptive.Aeron/Aeron.cs +++ b/src/Adaptive.Aeron/Aeron.cs @@ -120,7 +120,6 @@ internal Aeron(Context ctx) /// /// Threads required for interacting with the media driver are created and managed within the Aeron instance. /// - /// /// /// the new instance connected to the Media Driver. public static Aeron Connect() diff --git a/src/Adaptive.Aeron/ClientConductor.cs b/src/Adaptive.Aeron/ClientConductor.cs index 904a0b52..aef03376 100644 --- a/src/Adaptive.Aeron/ClientConductor.cs +++ b/src/Adaptive.Aeron/ClientConductor.cs @@ -47,6 +47,7 @@ internal class ClientConductor : IAgent, IDriverEventsListener private long _timeOfLastResourcesCheckNs; private long _timeOfLastServiceNs; private bool _isClosed; + private bool _isInCallback; private string _stashedChannel; private RegistrationException _driverException; @@ -270,6 +271,8 @@ public void OnAvailableImage( AvailableImageHandler handler = subscription.AvailableImageHandler(); if (null != handler) { + _isInCallback = true; + try { handler(image); @@ -278,6 +281,10 @@ public void OnAvailableImage( { HandleError(ex); } + finally + { + _isInCallback = false; + } } subscription.AddImage(image); @@ -295,6 +302,8 @@ public void OnUnavailableImage(long correlationId, long subscriptionRegistration UnavailableImageHandler handler = subscription.UnavailableImageHandler(); if (null != handler) { + _isInCallback = true; + try { handler(image); @@ -303,6 +312,10 @@ public void OnUnavailableImage(long correlationId, long subscriptionRegistration { HandleError(ex); } + finally + { + _isInCallback = false; + } } } } @@ -318,6 +331,8 @@ public void OnAvailableCounter(long registrationId, int counterId) { if (null != _availableCounterHandler) { + _isInCallback = true; + try { _availableCounterHandler(_countersReader, registrationId, counterId); @@ -326,6 +341,10 @@ public void OnAvailableCounter(long registrationId, int counterId) { HandleError(ex); } + finally + { + _isInCallback = false; + } } } @@ -333,6 +352,8 @@ public void OnUnavailableCounter(long registrationId, int counterId) { if (null != _unavailableCounterHandler) { + _isInCallback = true; + try { _unavailableCounterHandler(_countersReader, registrationId, counterId); @@ -341,6 +362,10 @@ public void OnUnavailableCounter(long registrationId, int counterId) { HandleError(ex); } + finally + { + _isInCallback = false; + } } } @@ -360,6 +385,7 @@ internal ConcurrentPublication AddPublication(string channel, int streamId) try { EnsureOpen(); + EnsureNotReentrant(); _stashedChannel = channel; long registrationId = _driverProxy.AddPublication(channel, streamId); @@ -379,6 +405,7 @@ internal virtual ExclusivePublication AddExclusivePublication(string channel, in try { EnsureOpen(); + EnsureNotReentrant(); _stashedChannel = channel; long registrationId = _driverProxy.AddExclusivePublication(channel, streamId); @@ -402,6 +429,7 @@ internal virtual void ReleasePublication(Publication publication) publication.InternalClose(); EnsureOpen(); + EnsureNotReentrant(); var removedPublication = _resourceByRegIdMap[publication.RegistrationId]; @@ -429,6 +457,7 @@ internal virtual Subscription AddSubscription(string channel, int streamId, Avai try { EnsureOpen(); + EnsureNotReentrant(); long correlationId = _driverProxy.AddSubscription(channel, streamId); Subscription subscription = new Subscription(this, channel, streamId, correlationId, availableImageHandler, unavailableImageHandler); @@ -455,6 +484,7 @@ internal virtual void ReleaseSubscription(Subscription subscription) subscription.InternalClose(); EnsureOpen(); + EnsureNotReentrant(); long registrationId = subscription.RegistrationId; AwaitResponse(_driverProxy.RemoveSubscription(registrationId)); @@ -473,6 +503,7 @@ internal virtual void AddDestination(long registrationId, string endpointChannel try { EnsureOpen(); + EnsureNotReentrant(); AwaitResponse(_driverProxy.AddDestination(registrationId, endpointChannel)); } @@ -488,6 +519,7 @@ internal virtual void RemoveDestination(long registrationId, string endpointChan try { EnsureOpen(); + EnsureNotReentrant(); AwaitResponse(_driverProxy.RemoveDestination(registrationId, endpointChannel)); } @@ -503,6 +535,7 @@ internal void AddRcvDestination(long registrationId, string endpointChannel) try { EnsureOpen(); + EnsureNotReentrant(); AwaitResponse(_driverProxy.AddRcvDestination(registrationId, endpointChannel)); } @@ -518,7 +551,8 @@ internal void RemoveRcvDestination(long registrationId, string endpointChannel) try { EnsureOpen(); - + EnsureNotReentrant(); + AwaitResponse(_driverProxy.RemoveRcvDestination(registrationId, endpointChannel)); } finally @@ -534,6 +568,7 @@ internal virtual Counter AddCounter(int typeId, IDirectBuffer keyBuffer, int key try { EnsureOpen(); + EnsureNotReentrant(); if (keyLength < 0 || keyLength > CountersManager.MAX_KEY_LENGTH) { @@ -563,6 +598,7 @@ internal Counter AddCounter(int typeId, string label) try { EnsureOpen(); + EnsureNotReentrant(); if (label.Length > CountersManager.MAX_LABEL_LENGTH) { @@ -591,6 +627,7 @@ internal virtual void ReleaseCounter(Counter counter) counter.InternalClose(); EnsureOpen(); + EnsureNotReentrant(); long registrationId = counter.RegistrationId(); AwaitResponse(_driverProxy.RemoveCounter(registrationId)); @@ -646,6 +683,14 @@ private void EnsureOpen() } } + private void EnsureNotReentrant() + { + if (_isInCallback) + { + throw new AeronException("Reentrant calls not permitted during callbacks"); + } + } + private LogBuffers LogBuffers(long registrationId, string logFileName) { LogBuffers logBuffers = _logBuffersByIdMap[registrationId]; diff --git a/src/Adaptive.Agrona/Adaptive.Agrona.csproj b/src/Adaptive.Agrona/Adaptive.Agrona.csproj index 0f6fcf75..c800e5bf 100644 --- a/src/Adaptive.Agrona/Adaptive.Agrona.csproj +++ b/src/Adaptive.Agrona/Adaptive.Agrona.csproj @@ -3,7 +3,7 @@ netstandard2.0;net45 true Agrona - 1.10.1 + 1.10.2 Adaptive Financial Consulting Ltd. Adaptive Financial Consulting Ltd. Agrona libraries initially included in Aeron Client diff --git a/src/Adaptive.Archiver/Adaptive.Archiver.csproj b/src/Adaptive.Archiver/Adaptive.Archiver.csproj index f3567a07..ca6e2aa1 100644 --- a/src/Adaptive.Archiver/Adaptive.Archiver.csproj +++ b/src/Adaptive.Archiver/Adaptive.Archiver.csproj @@ -3,7 +3,7 @@ netstandard2.0;net45 true Aeron.Archiver - 1.10.1 + 1.10.2 Adaptive Financial Consulting Ltd. Adaptive Financial Consulting Ltd. Archiving over the Aeron transport diff --git a/src/Adaptive.Archiver/AeronArchive.cs b/src/Adaptive.Archiver/AeronArchive.cs index 544eed31..aa0c3b87 100644 --- a/src/Adaptive.Archiver/AeronArchive.cs +++ b/src/Adaptive.Archiver/AeronArchive.cs @@ -398,7 +398,7 @@ public ExclusivePublication AddRecordedExclusivePublication(string channel, int /// to be recorded. /// to be recorded. /// of the publication to be recorded. - /// the correlationId used to identify the request. + /// the subscriptionId of the recording. public long StartRecording(string channel, int streamId, SourceLocation sourceLocation) { _lock.Lock(); @@ -411,9 +411,7 @@ public long StartRecording(string channel, int streamId, SourceLocation sourceLo throw new ArchiveException("failed to send start recording request"); } - PollForResponse(correlationId); - - return correlationId; + return PollForResponse(correlationId); } finally { @@ -432,7 +430,7 @@ public long StartRecording(string channel, int streamId, SourceLocation sourceLo /// to be recorded. /// to be recorded. /// of the publication to be recorded. - /// the correlationId used to identify the request. + /// the subscriptionId of the recording. public long ExtendRecording(long recordingId, string channel, int streamId, SourceLocation sourceLocation) { @@ -447,9 +445,7 @@ public long ExtendRecording(long recordingId, string channel, int streamId, throw new ArchiveException("failed to send extend recording request"); } - PollForResponse(correlationId); - - return correlationId; + return PollForResponse(correlationId); } finally { @@ -500,16 +496,48 @@ public void StopRecording(Publication publication) } /// - /// Start a replay for a length in bytes of a recording from a position. If the position is + /// Stop recording for a subscriptionId that has been returned from + /// or + /// . + /// + /// the subscription was registered with for the recording. + public void StopRecording(long subscriptionId) + { + _lock.Lock(); + try + { + long correlationId = aeron.NextCorrelationId(); + + if (!archiveProxy.StopRecording(subscriptionId, correlationId, controlSessionId)) + { + throw new ArchiveException("failed to send stop recording request"); + } + + PollForResponse(correlationId); + } + finally + { + _lock.Unlock(); + } + } + + + /// + /// Start a replay for a length in bytes of a recording from a position. If the position is /// then the stream will be replayed from the start. + /// + /// The lower 32-bits of the returned value contain the of the received replay. All + /// 64-bits are required to uniquely identify the replay when calling . The lower 32-bits + /// can be obtained by casting the value to an . + /// /// /// to be replayed. - /// from which the replay should begin or if from the start. - /// of the stream to be replayed. Use to follow a live recording or to replay the whole stream of unknown length. + /// from which the replay should begin or if from the start. + /// of the stream to be replayed. Use to follow a live recording or to replay the whole stream of unknown length. /// to which the replay should be sent. /// to which the replay should be sent. - /// the id of the replay session which will be the same as the of the received - /// replay for correlation with the matching channel and stream id. + /// the id of the replay session which will be the same as the of the received + /// replay for correlation with the matching channel and stream id in the lower 32 bits. public long StartReplay(long recordingId, long position, long length, string replayChannel, int replayStreamId) { diff --git a/src/Adaptive.Archiver/ArchiveProxy.cs b/src/Adaptive.Archiver/ArchiveProxy.cs index f7c909db..8808b7a6 100644 --- a/src/Adaptive.Archiver/ArchiveProxy.cs +++ b/src/Adaptive.Archiver/ArchiveProxy.cs @@ -29,6 +29,10 @@ public class ArchiveProxy private readonly ReplayRequestEncoder replayRequestEncoder = new ReplayRequestEncoder(); private readonly StopReplayRequestEncoder stopReplayRequestEncoder = new StopReplayRequestEncoder(); private readonly StopRecordingRequestEncoder stopRecordingRequestEncoder = new StopRecordingRequestEncoder(); + + private readonly StopRecordingSubscriptionRequestEncoder stopRecordingSubscriptionRequestEncoder = + new StopRecordingSubscriptionRequestEncoder(); + private readonly ListRecordingsRequestEncoder listRecordingsRequestEncoder = new ListRecordingsRequestEncoder(); private readonly ListRecordingsForUriRequestEncoder listRecordingsForUriRequestEncoder = new ListRecordingsForUriRequestEncoder(); private readonly ListRecordingRequestEncoder listRecordingRequestEncoder = new ListRecordingRequestEncoder(); @@ -190,6 +194,24 @@ public virtual bool StopRecording(string channel, int streamId, long correlation return Offer(stopRecordingRequestEncoder.EncodedLength()); } + /// + /// Stop an active recording by the it was registered with. + /// + /// that identifies the subscription in the archive doing the recording. + /// for this request. + /// for this request. + /// true if successfully offered otherwise false. + public bool StopRecording(long subscriptionId, long correlationId, long controlSessionId) + { + stopRecordingSubscriptionRequestEncoder + .WrapAndApplyHeader(buffer, 0, messageHeaderEncoder) + .ControlSessionId(controlSessionId) + .CorrelationId(correlationId) + .SubscriptionId(subscriptionId); + + return Offer(stopRecordingSubscriptionRequestEncoder.EncodedLength()); + } + /// /// Replay a recording from a given position. /// diff --git a/src/Adaptive.Archiver/Codecs/StopRecordingSubscriptionRequestDecoder.cs b/src/Adaptive.Archiver/Codecs/StopRecordingSubscriptionRequestDecoder.cs new file mode 100644 index 00000000..39d846a4 --- /dev/null +++ b/src/Adaptive.Archiver/Codecs/StopRecordingSubscriptionRequestDecoder.cs @@ -0,0 +1,302 @@ +/* Generated SBE (Simple Binary Encoding) message codec */ +using System; +using System.Text; +using System.Collections.Generic; +using Adaptive.Agrona; + + +namespace Adaptive.Archiver.Codecs { + +public class StopRecordingSubscriptionRequestDecoder +{ + public const ushort BLOCK_LENGTH = 24; + public const ushort TEMPLATE_ID = 14; + public const ushort SCHEMA_ID = 1; + public const ushort SCHEMA_VERSION = 0; + + private StopRecordingSubscriptionRequestDecoder _parentMessage; + private IDirectBuffer _buffer; + protected int _offset; + protected int _limit; + protected int _actingBlockLength; + protected int _actingVersion; + + public StopRecordingSubscriptionRequestDecoder() + { + _parentMessage = this; + } + + public ushort SbeBlockLength() + { + return BLOCK_LENGTH; + } + + public ushort SbeTemplateId() + { + return TEMPLATE_ID; + } + + public ushort SbeSchemaId() + { + return SCHEMA_ID; + } + + public ushort SbeSchemaVersion() + { + return SCHEMA_VERSION; + } + + public string SbeSemanticType() + { + return ""; + } + + public IDirectBuffer Buffer() + { + return _buffer; + } + + public int Offset() + { + return _offset; + } + + public StopRecordingSubscriptionRequestDecoder Wrap( + IDirectBuffer buffer, int offset, int actingBlockLength, int actingVersion) + { + this._buffer = buffer; + this._offset = offset; + this._actingBlockLength = actingBlockLength; + this._actingVersion = actingVersion; + Limit(offset + actingBlockLength); + + return this; + } + + public int EncodedLength() + { + return _limit - _offset; + } + + public int Limit() + { + return _limit; + } + + public void Limit(int limit) + { + this._limit = limit; + } + + public static int ControlSessionIdId() + { + return 1; + } + + public static int ControlSessionIdSinceVersion() + { + return 0; + } + + public static int ControlSessionIdEncodingOffset() + { + return 0; + } + + public static int ControlSessionIdEncodingLength() + { + return 8; + } + + public static string ControlSessionIdMetaAttribute(MetaAttribute metaAttribute) + { + switch (metaAttribute) + { + case MetaAttribute.EPOCH: return "unix"; + case MetaAttribute.TIME_UNIT: return "nanosecond"; + case MetaAttribute.SEMANTIC_TYPE: return ""; + case MetaAttribute.PRESENCE: return "required"; + } + + return ""; + } + + public static long ControlSessionIdNullValue() + { + return -9223372036854775808L; + } + + public static long ControlSessionIdMinValue() + { + return -9223372036854775807L; + } + + public static long ControlSessionIdMaxValue() + { + return 9223372036854775807L; + } + + public long ControlSessionId() + { + return _buffer.GetLong(_offset + 0, ByteOrder.LittleEndian); + } + + + public static int CorrelationIdId() + { + return 2; + } + + public static int CorrelationIdSinceVersion() + { + return 0; + } + + public static int CorrelationIdEncodingOffset() + { + return 8; + } + + public static int CorrelationIdEncodingLength() + { + return 8; + } + + public static string CorrelationIdMetaAttribute(MetaAttribute metaAttribute) + { + switch (metaAttribute) + { + case MetaAttribute.EPOCH: return "unix"; + case MetaAttribute.TIME_UNIT: return "nanosecond"; + case MetaAttribute.SEMANTIC_TYPE: return ""; + case MetaAttribute.PRESENCE: return "required"; + } + + return ""; + } + + public static long CorrelationIdNullValue() + { + return -9223372036854775808L; + } + + public static long CorrelationIdMinValue() + { + return -9223372036854775807L; + } + + public static long CorrelationIdMaxValue() + { + return 9223372036854775807L; + } + + public long CorrelationId() + { + return _buffer.GetLong(_offset + 8, ByteOrder.LittleEndian); + } + + + public static int SubscriptionIdId() + { + return 2; + } + + public static int SubscriptionIdSinceVersion() + { + return 0; + } + + public static int SubscriptionIdEncodingOffset() + { + return 16; + } + + public static int SubscriptionIdEncodingLength() + { + return 8; + } + + public static string SubscriptionIdMetaAttribute(MetaAttribute metaAttribute) + { + switch (metaAttribute) + { + case MetaAttribute.EPOCH: return "unix"; + case MetaAttribute.TIME_UNIT: return "nanosecond"; + case MetaAttribute.SEMANTIC_TYPE: return ""; + case MetaAttribute.PRESENCE: return "required"; + } + + return ""; + } + + public static long SubscriptionIdNullValue() + { + return -9223372036854775808L; + } + + public static long SubscriptionIdMinValue() + { + return -9223372036854775807L; + } + + public static long SubscriptionIdMaxValue() + { + return 9223372036854775807L; + } + + public long SubscriptionId() + { + return _buffer.GetLong(_offset + 16, ByteOrder.LittleEndian); + } + + + + public override string ToString() + { + return AppendTo(new StringBuilder(100)).ToString(); + } + + public StringBuilder AppendTo(StringBuilder builder) + { + int originalLimit = Limit(); + Limit(_offset + _actingBlockLength); + builder.Append("[StopRecordingSubscriptionRequest](sbeTemplateId="); + builder.Append(TEMPLATE_ID); + builder.Append("|sbeSchemaId="); + builder.Append(SCHEMA_ID); + builder.Append("|sbeSchemaVersion="); + if (_parentMessage._actingVersion != SCHEMA_VERSION) + { + builder.Append(_parentMessage._actingVersion); + builder.Append('/'); + } + builder.Append(SCHEMA_VERSION); + builder.Append("|sbeBlockLength="); + if (_actingBlockLength != BLOCK_LENGTH) + { + builder.Append(_actingBlockLength); + builder.Append('/'); + } + builder.Append(BLOCK_LENGTH); + builder.Append("):"); + //Token{signal=BEGIN_FIELD, name='controlSessionId', referencedName='null', description='null', id=1, version=0, deprecated=0, encodedLength=0, offset=0, componentTokenCount=3, encoding=Encoding{presence=REQUIRED, primitiveType=null, byteOrder=LITTLE_ENDIAN, minValue=null, maxValue=null, nullValue=null, constValue=null, characterEncoding='null', epoch='unix', timeUnit=nanosecond, semanticType='null'}} + //Token{signal=ENCODING, name='int64', referencedName='null', description='null', id=-1, version=0, deprecated=0, encodedLength=8, offset=0, componentTokenCount=1, encoding=Encoding{presence=REQUIRED, primitiveType=INT64, byteOrder=LITTLE_ENDIAN, minValue=null, maxValue=null, nullValue=null, constValue=null, characterEncoding='null', epoch='unix', timeUnit=nanosecond, semanticType='null'}} + builder.Append("ControlSessionId="); + builder.Append(ControlSessionId()); + builder.Append('|'); + //Token{signal=BEGIN_FIELD, name='correlationId', referencedName='null', description='null', id=2, version=0, deprecated=0, encodedLength=0, offset=8, componentTokenCount=3, encoding=Encoding{presence=REQUIRED, primitiveType=null, byteOrder=LITTLE_ENDIAN, minValue=null, maxValue=null, nullValue=null, constValue=null, characterEncoding='null', epoch='unix', timeUnit=nanosecond, semanticType='null'}} + //Token{signal=ENCODING, name='int64', referencedName='null', description='null', id=-1, version=0, deprecated=0, encodedLength=8, offset=8, componentTokenCount=1, encoding=Encoding{presence=REQUIRED, primitiveType=INT64, byteOrder=LITTLE_ENDIAN, minValue=null, maxValue=null, nullValue=null, constValue=null, characterEncoding='null', epoch='unix', timeUnit=nanosecond, semanticType='null'}} + builder.Append("CorrelationId="); + builder.Append(CorrelationId()); + builder.Append('|'); + //Token{signal=BEGIN_FIELD, name='subscriptionId', referencedName='null', description='null', id=2, version=0, deprecated=0, encodedLength=0, offset=16, componentTokenCount=3, encoding=Encoding{presence=REQUIRED, primitiveType=null, byteOrder=LITTLE_ENDIAN, minValue=null, maxValue=null, nullValue=null, constValue=null, characterEncoding='null', epoch='unix', timeUnit=nanosecond, semanticType='null'}} + //Token{signal=ENCODING, name='int64', referencedName='null', description='null', id=-1, version=0, deprecated=0, encodedLength=8, offset=16, componentTokenCount=1, encoding=Encoding{presence=REQUIRED, primitiveType=INT64, byteOrder=LITTLE_ENDIAN, minValue=null, maxValue=null, nullValue=null, constValue=null, characterEncoding='null', epoch='unix', timeUnit=nanosecond, semanticType='null'}} + builder.Append("SubscriptionId="); + builder.Append(SubscriptionId()); + + Limit(originalLimit); + + return builder; + } +} +} diff --git a/src/Adaptive.Archiver/Codecs/StopRecordingSubscriptionRequestEncoder.cs b/src/Adaptive.Archiver/Codecs/StopRecordingSubscriptionRequestEncoder.cs new file mode 100644 index 00000000..03364721 --- /dev/null +++ b/src/Adaptive.Archiver/Codecs/StopRecordingSubscriptionRequestEncoder.cs @@ -0,0 +1,209 @@ +/* Generated SBE (Simple Binary Encoding) message codec */ +using System; +using System.Text; +using System.Collections.Generic; +using Adaptive.Agrona; + + +namespace Adaptive.Archiver.Codecs { + +public class StopRecordingSubscriptionRequestEncoder +{ + public const ushort BLOCK_LENGTH = 24; + public const ushort TEMPLATE_ID = 14; + public const ushort SCHEMA_ID = 1; + public const ushort SCHEMA_VERSION = 0; + + private StopRecordingSubscriptionRequestEncoder _parentMessage; + private IMutableDirectBuffer _buffer; + protected int _offset; + protected int _limit; + + public StopRecordingSubscriptionRequestEncoder() + { + _parentMessage = this; + } + + public ushort SbeBlockLength() + { + return BLOCK_LENGTH; + } + + public ushort SbeTemplateId() + { + return TEMPLATE_ID; + } + + public ushort SbeSchemaId() + { + return SCHEMA_ID; + } + + public ushort SbeSchemaVersion() + { + return SCHEMA_VERSION; + } + + public string SbeSemanticType() + { + return ""; + } + + public IMutableDirectBuffer Buffer() + { + return _buffer; + } + + public int Offset() + { + return _offset; + } + + public StopRecordingSubscriptionRequestEncoder Wrap(IMutableDirectBuffer buffer, int offset) + { + this._buffer = buffer; + this._offset = offset; + Limit(offset + BLOCK_LENGTH); + + return this; + } + + public StopRecordingSubscriptionRequestEncoder WrapAndApplyHeader( + IMutableDirectBuffer buffer, int offset, MessageHeaderEncoder headerEncoder) + { + headerEncoder + .Wrap(buffer, offset) + .BlockLength(BLOCK_LENGTH) + .TemplateId(TEMPLATE_ID) + .SchemaId(SCHEMA_ID) + .Version(SCHEMA_VERSION); + + return Wrap(buffer, offset + MessageHeaderEncoder.ENCODED_LENGTH); + } + + public int EncodedLength() + { + return _limit - _offset; + } + + public int Limit() + { + return _limit; + } + + public void Limit(int limit) + { + this._limit = limit; + } + + public static int ControlSessionIdEncodingOffset() + { + return 0; + } + + public static int ControlSessionIdEncodingLength() + { + return 8; + } + + public static long ControlSessionIdNullValue() + { + return -9223372036854775808L; + } + + public static long ControlSessionIdMinValue() + { + return -9223372036854775807L; + } + + public static long ControlSessionIdMaxValue() + { + return 9223372036854775807L; + } + + public StopRecordingSubscriptionRequestEncoder ControlSessionId(long value) + { + _buffer.PutLong(_offset + 0, value, ByteOrder.LittleEndian); + return this; + } + + + public static int CorrelationIdEncodingOffset() + { + return 8; + } + + public static int CorrelationIdEncodingLength() + { + return 8; + } + + public static long CorrelationIdNullValue() + { + return -9223372036854775808L; + } + + public static long CorrelationIdMinValue() + { + return -9223372036854775807L; + } + + public static long CorrelationIdMaxValue() + { + return 9223372036854775807L; + } + + public StopRecordingSubscriptionRequestEncoder CorrelationId(long value) + { + _buffer.PutLong(_offset + 8, value, ByteOrder.LittleEndian); + return this; + } + + + public static int SubscriptionIdEncodingOffset() + { + return 16; + } + + public static int SubscriptionIdEncodingLength() + { + return 8; + } + + public static long SubscriptionIdNullValue() + { + return -9223372036854775808L; + } + + public static long SubscriptionIdMinValue() + { + return -9223372036854775807L; + } + + public static long SubscriptionIdMaxValue() + { + return 9223372036854775807L; + } + + public StopRecordingSubscriptionRequestEncoder SubscriptionId(long value) + { + _buffer.PutLong(_offset + 16, value, ByteOrder.LittleEndian); + return this; + } + + + + public override string ToString() + { + return AppendTo(new StringBuilder(100)).ToString(); + } + + public StringBuilder AppendTo(StringBuilder builder) + { + StopRecordingSubscriptionRequestDecoder writer = new StopRecordingSubscriptionRequestDecoder(); + writer.Wrap(_buffer, _offset, BLOCK_LENGTH, SCHEMA_VERSION); + + return writer.AppendTo(builder); + } +} +} diff --git a/src/Adaptive.Archiver/aeron-archive-codecs.xml b/src/Adaptive.Archiver/aeron-archive-codecs.xml index d477323c..9416653a 100644 --- a/src/Adaptive.Archiver/aeron-archive-codecs.xml +++ b/src/Adaptive.Archiver/aeron-archive-codecs.xml @@ -159,6 +159,14 @@ + + + + + + netstandard2.0;net45 true Aeron.Cluster - 1.10.1 + 1.10.2 Adaptive Financial Consulting Ltd. Adaptive Financial Consulting Ltd. Clustering libraries over the Aeron transport diff --git a/src/Adaptive.Cluster/Codecs/StopCatchupDecoder.cs b/src/Adaptive.Cluster/Codecs/StopCatchupDecoder.cs new file mode 100644 index 00000000..bf9c27e4 --- /dev/null +++ b/src/Adaptive.Cluster/Codecs/StopCatchupDecoder.cs @@ -0,0 +1,243 @@ +/* Generated SBE (Simple Binary Encoding) message codec */ +using System; +using System.Text; +using System.Collections.Generic; +using Adaptive.Agrona; + + +namespace Adaptive.Cluster.Codecs { + +public class StopCatchupDecoder +{ + public const ushort BLOCK_LENGTH = 8; + public const ushort TEMPLATE_ID = 57; + public const ushort SCHEMA_ID = 1; + public const ushort SCHEMA_VERSION = 1; + + private StopCatchupDecoder _parentMessage; + private IDirectBuffer _buffer; + protected int _offset; + protected int _limit; + protected int _actingBlockLength; + protected int _actingVersion; + + public StopCatchupDecoder() + { + _parentMessage = this; + } + + public ushort SbeBlockLength() + { + return BLOCK_LENGTH; + } + + public ushort SbeTemplateId() + { + return TEMPLATE_ID; + } + + public ushort SbeSchemaId() + { + return SCHEMA_ID; + } + + public ushort SbeSchemaVersion() + { + return SCHEMA_VERSION; + } + + public string SbeSemanticType() + { + return ""; + } + + public IDirectBuffer Buffer() + { + return _buffer; + } + + public int Offset() + { + return _offset; + } + + public StopCatchupDecoder Wrap( + IDirectBuffer buffer, int offset, int actingBlockLength, int actingVersion) + { + this._buffer = buffer; + this._offset = offset; + this._actingBlockLength = actingBlockLength; + this._actingVersion = actingVersion; + Limit(offset + actingBlockLength); + + return this; + } + + public int EncodedLength() + { + return _limit - _offset; + } + + public int Limit() + { + return _limit; + } + + public void Limit(int limit) + { + this._limit = limit; + } + + public static int ReplaySessionIdId() + { + return 1; + } + + public static int ReplaySessionIdSinceVersion() + { + return 0; + } + + public static int ReplaySessionIdEncodingOffset() + { + return 0; + } + + public static int ReplaySessionIdEncodingLength() + { + return 4; + } + + public static string ReplaySessionIdMetaAttribute(MetaAttribute metaAttribute) + { + switch (metaAttribute) + { + case MetaAttribute.EPOCH: return "unix"; + case MetaAttribute.TIME_UNIT: return "nanosecond"; + case MetaAttribute.SEMANTIC_TYPE: return ""; + case MetaAttribute.PRESENCE: return "required"; + } + + return ""; + } + + public static int ReplaySessionIdNullValue() + { + return -2147483648; + } + + public static int ReplaySessionIdMinValue() + { + return -2147483647; + } + + public static int ReplaySessionIdMaxValue() + { + return 2147483647; + } + + public int ReplaySessionId() + { + return _buffer.GetInt(_offset + 0, ByteOrder.LittleEndian); + } + + + public static int FollowerMemberIdId() + { + return 2; + } + + public static int FollowerMemberIdSinceVersion() + { + return 0; + } + + public static int FollowerMemberIdEncodingOffset() + { + return 4; + } + + public static int FollowerMemberIdEncodingLength() + { + return 4; + } + + public static string FollowerMemberIdMetaAttribute(MetaAttribute metaAttribute) + { + switch (metaAttribute) + { + case MetaAttribute.EPOCH: return "unix"; + case MetaAttribute.TIME_UNIT: return "nanosecond"; + case MetaAttribute.SEMANTIC_TYPE: return ""; + case MetaAttribute.PRESENCE: return "required"; + } + + return ""; + } + + public static int FollowerMemberIdNullValue() + { + return -2147483648; + } + + public static int FollowerMemberIdMinValue() + { + return -2147483647; + } + + public static int FollowerMemberIdMaxValue() + { + return 2147483647; + } + + public int FollowerMemberId() + { + return _buffer.GetInt(_offset + 4, ByteOrder.LittleEndian); + } + + + + public override string ToString() + { + return AppendTo(new StringBuilder(100)).ToString(); + } + + public StringBuilder AppendTo(StringBuilder builder) + { + int originalLimit = Limit(); + Limit(_offset + _actingBlockLength); + builder.Append("[StopCatchup](sbeTemplateId="); + builder.Append(TEMPLATE_ID); + builder.Append("|sbeSchemaId="); + builder.Append(SCHEMA_ID); + builder.Append("|sbeSchemaVersion="); + if (_parentMessage._actingVersion != SCHEMA_VERSION) + { + builder.Append(_parentMessage._actingVersion); + builder.Append('/'); + } + builder.Append(SCHEMA_VERSION); + builder.Append("|sbeBlockLength="); + if (_actingBlockLength != BLOCK_LENGTH) + { + builder.Append(_actingBlockLength); + builder.Append('/'); + } + builder.Append(BLOCK_LENGTH); + builder.Append("):"); + //Token{signal=BEGIN_FIELD, name='replaySessionId', referencedName='null', description='null', id=1, version=0, deprecated=0, encodedLength=0, offset=0, componentTokenCount=3, encoding=Encoding{presence=REQUIRED, primitiveType=null, byteOrder=LITTLE_ENDIAN, minValue=null, maxValue=null, nullValue=null, constValue=null, characterEncoding='null', epoch='unix', timeUnit=nanosecond, semanticType='null'}} + //Token{signal=ENCODING, name='int32', referencedName='null', description='null', id=-1, version=0, deprecated=0, encodedLength=4, offset=0, componentTokenCount=1, encoding=Encoding{presence=REQUIRED, primitiveType=INT32, byteOrder=LITTLE_ENDIAN, minValue=null, maxValue=null, nullValue=null, constValue=null, characterEncoding='null', epoch='unix', timeUnit=nanosecond, semanticType='null'}} + builder.Append("ReplaySessionId="); + builder.Append(ReplaySessionId()); + builder.Append('|'); + //Token{signal=BEGIN_FIELD, name='followerMemberId', referencedName='null', description='null', id=2, version=0, deprecated=0, encodedLength=0, offset=4, componentTokenCount=3, encoding=Encoding{presence=REQUIRED, primitiveType=null, byteOrder=LITTLE_ENDIAN, minValue=null, maxValue=null, nullValue=null, constValue=null, characterEncoding='null', epoch='unix', timeUnit=nanosecond, semanticType='null'}} + //Token{signal=ENCODING, name='int32', referencedName='null', description='null', id=-1, version=0, deprecated=0, encodedLength=4, offset=4, componentTokenCount=1, encoding=Encoding{presence=REQUIRED, primitiveType=INT32, byteOrder=LITTLE_ENDIAN, minValue=null, maxValue=null, nullValue=null, constValue=null, characterEncoding='null', epoch='unix', timeUnit=nanosecond, semanticType='null'}} + builder.Append("FollowerMemberId="); + builder.Append(FollowerMemberId()); + + Limit(originalLimit); + + return builder; + } +} +} diff --git a/src/Adaptive.Cluster/Codecs/StopCatchupEncoder.cs b/src/Adaptive.Cluster/Codecs/StopCatchupEncoder.cs new file mode 100644 index 00000000..e3876b4b --- /dev/null +++ b/src/Adaptive.Cluster/Codecs/StopCatchupEncoder.cs @@ -0,0 +1,177 @@ +/* Generated SBE (Simple Binary Encoding) message codec */ +using System; +using System.Text; +using System.Collections.Generic; +using Adaptive.Agrona; + + +namespace Adaptive.Cluster.Codecs { + +public class StopCatchupEncoder +{ + public const ushort BLOCK_LENGTH = 8; + public const ushort TEMPLATE_ID = 57; + public const ushort SCHEMA_ID = 1; + public const ushort SCHEMA_VERSION = 1; + + private StopCatchupEncoder _parentMessage; + private IMutableDirectBuffer _buffer; + protected int _offset; + protected int _limit; + + public StopCatchupEncoder() + { + _parentMessage = this; + } + + public ushort SbeBlockLength() + { + return BLOCK_LENGTH; + } + + public ushort SbeTemplateId() + { + return TEMPLATE_ID; + } + + public ushort SbeSchemaId() + { + return SCHEMA_ID; + } + + public ushort SbeSchemaVersion() + { + return SCHEMA_VERSION; + } + + public string SbeSemanticType() + { + return ""; + } + + public IMutableDirectBuffer Buffer() + { + return _buffer; + } + + public int Offset() + { + return _offset; + } + + public StopCatchupEncoder Wrap(IMutableDirectBuffer buffer, int offset) + { + this._buffer = buffer; + this._offset = offset; + Limit(offset + BLOCK_LENGTH); + + return this; + } + + public StopCatchupEncoder WrapAndApplyHeader( + IMutableDirectBuffer buffer, int offset, MessageHeaderEncoder headerEncoder) + { + headerEncoder + .Wrap(buffer, offset) + .BlockLength(BLOCK_LENGTH) + .TemplateId(TEMPLATE_ID) + .SchemaId(SCHEMA_ID) + .Version(SCHEMA_VERSION); + + return Wrap(buffer, offset + MessageHeaderEncoder.ENCODED_LENGTH); + } + + public int EncodedLength() + { + return _limit - _offset; + } + + public int Limit() + { + return _limit; + } + + public void Limit(int limit) + { + this._limit = limit; + } + + public static int ReplaySessionIdEncodingOffset() + { + return 0; + } + + public static int ReplaySessionIdEncodingLength() + { + return 4; + } + + public static int ReplaySessionIdNullValue() + { + return -2147483648; + } + + public static int ReplaySessionIdMinValue() + { + return -2147483647; + } + + public static int ReplaySessionIdMaxValue() + { + return 2147483647; + } + + public StopCatchupEncoder ReplaySessionId(int value) + { + _buffer.PutInt(_offset + 0, value, ByteOrder.LittleEndian); + return this; + } + + + public static int FollowerMemberIdEncodingOffset() + { + return 4; + } + + public static int FollowerMemberIdEncodingLength() + { + return 4; + } + + public static int FollowerMemberIdNullValue() + { + return -2147483648; + } + + public static int FollowerMemberIdMinValue() + { + return -2147483647; + } + + public static int FollowerMemberIdMaxValue() + { + return 2147483647; + } + + public StopCatchupEncoder FollowerMemberId(int value) + { + _buffer.PutInt(_offset + 4, value, ByteOrder.LittleEndian); + return this; + } + + + + public override string ToString() + { + return AppendTo(new StringBuilder(100)).ToString(); + } + + public StringBuilder AppendTo(StringBuilder builder) + { + StopCatchupDecoder writer = new StopCatchupDecoder(); + writer.Wrap(_buffer, _offset, BLOCK_LENGTH, SCHEMA_VERSION); + + return writer.AppendTo(builder); + } +} +} diff --git a/src/Adaptive.Cluster/Service/ClusteredServiceAgent.cs b/src/Adaptive.Cluster/Service/ClusteredServiceAgent.cs index 9a41629a..d50b7c0e 100644 --- a/src/Adaptive.Cluster/Service/ClusteredServiceAgent.cs +++ b/src/Adaptive.Cluster/Service/ClusteredServiceAgent.cs @@ -8,6 +8,7 @@ using Adaptive.Agrona.Concurrent; using Adaptive.Agrona.Concurrent.Status; using Adaptive.Archiver; +using Adaptive.Archiver.Codecs; using Adaptive.Cluster.Client; using Adaptive.Cluster.Codecs; @@ -513,8 +514,11 @@ private long OnTakeSnapshot(long logPosition, long leadershipTermId) long recordingId; using (AeronArchive archive = AeronArchive.Connect(archiveCtx)) - using(Publication publication = archive.AddRecordedExclusivePublication(ctx.SnapshotChannel(), ctx.SnapshotStreamId())) + using(Publication publication = aeron.AddExclusivePublication(ctx.SnapshotChannel(), ctx.SnapshotStreamId())) { + var channel = ChannelUri.AddSessionId(ctx.SnapshotChannel(), publication.SessionId); + long subscriptionId = archive.StartRecording(channel, ctx.SnapshotStreamId(), SourceLocation.LOCAL); + try { CountersReader counters = aeron.CountersReader(); @@ -528,7 +532,7 @@ private long OnTakeSnapshot(long logPosition, long leadershipTermId) } finally { - archive.StopRecording(publication); + archive.StopRecording(subscriptionId); } } diff --git a/src/Adaptive.Cluster/Service/ServiceHeartbeat.cs b/src/Adaptive.Cluster/Service/ServiceHeartbeat.cs index 547c01a4..d43b8f3e 100644 --- a/src/Adaptive.Cluster/Service/ServiceHeartbeat.cs +++ b/src/Adaptive.Cluster/Service/ServiceHeartbeat.cs @@ -24,12 +24,14 @@ public class ServiceHeartbeat /// public const int SERVICE_HEARTBEAT_TYPE_ID = 206; + public const int SERVICE_ID_OFFSET = 0; + /// /// Human readable name for the counter. /// public const string NAME = "service-heartbeat: serviceId="; - public static readonly int KEY_LENGTH = BitUtil.SIZE_OF_INT; + public static readonly int KEY_LENGTH = SERVICE_ID_OFFSET + BitUtil.SIZE_OF_INT; /// /// Allocate a counter to represent the heartbeat of a clustered service. @@ -40,6 +42,8 @@ public class ServiceHeartbeat /// the for the commit position. public static Counter Allocate(Aeron.Aeron aeron, IMutableDirectBuffer tempBuffer, int serviceId) { + tempBuffer.PutInt(SERVICE_ID_OFFSET, serviceId); + int labelOffset = 0; labelOffset += tempBuffer.PutStringWithoutLengthAscii(KEY_LENGTH + labelOffset, NAME); labelOffset += tempBuffer.PutIntAscii(KEY_LENGTH + labelOffset, serviceId); @@ -64,7 +68,7 @@ public static int FindCounterId(CountersReader counters, int serviceId) int recordOffset = CountersReader.MetaDataOffset(i); if (buffer.GetInt(recordOffset + CountersReader.TYPE_ID_OFFSET) == SERVICE_HEARTBEAT_TYPE_ID && - buffer.GetInt(recordOffset + CountersReader.KEY_OFFSET) == serviceId) + buffer.GetInt(recordOffset + CountersReader.KEY_OFFSET + SERVICE_ID_OFFSET) == serviceId) { return i; } diff --git a/src/Adaptive.Cluster/aeron-cluster-codecs.xml b/src/Adaptive.Cluster/aeron-cluster-codecs.xml index 7644dd91..9d3f6f6a 100644 --- a/src/Adaptive.Cluster/aeron-cluster-codecs.xml +++ b/src/Adaptive.Cluster/aeron-cluster-codecs.xml @@ -351,6 +351,13 @@ + + + + +