diff --git a/README.md b/README.md index 0e61c1be..f8cf6b35 100644 --- a/README.md +++ b/README.md @@ -56,7 +56,7 @@ using(Publication publisher = aeron.AddPublication(channel, streamId)) { #### Fragment Handler A fragment handler is a delegate used for processing data that is has been received. The buffer will either contain a whole message or a fragment of a message to be reassembled. ```csharp -static void PrintMessage(UnsafeBuffer buffer, int offset, int length, Header header) +static void PrintMessage(IDirectBuffer buffer, int offset, int length, Header header) { var message = buffer.GetStringWithoutLengthUtf8(offset, length); Console.WriteLine($"Message Received: '{message}'"); diff --git a/RELEASE.md b/RELEASE.md index 501736aa..f3d4325e 100644 --- a/RELEASE.md +++ b/RELEASE.md @@ -1,4 +1,4 @@ #### Port Aeron.NET has been ported against Java version: - Agrona: 0.9.12-30-gec52107 -- Aeron: 1.8.2-196-gd0e9417c2 +- Aeron: 1.8.2-257-g9abef0124 diff --git a/driver/Aeron.Driver.nuspec b/driver/Aeron.Driver.nuspec index 1fab7415..e72345e7 100644 --- a/driver/Aeron.Driver.nuspec +++ b/driver/Aeron.Driver.nuspec @@ -2,7 +2,7 @@ Aeron.Driver - 1.8.3 + 1.9.0 Aeron Driver Adaptive Financial Consulting Ltd. Adaptive Financial Consulting Ltd. diff --git a/driver/aeron.patch b/driver/aeron.patch deleted file mode 100644 index f12c07fd..00000000 --- a/driver/aeron.patch +++ /dev/null @@ -1,12 +0,0 @@ -diff --git a/build.gradle b/build.gradle -index f5d80f486..9d80c60dd 100644 ---- a/build.gradle -+++ b/build.gradle -@@ -699,6 +699,7 @@ project(':aeron-all') { - dependencies { - compile project(':aeron-client') - compile project(':aeron-driver') -+ compile project(':aeron-cluster') - compile project(':aeron-archive') - compile project(':aeron-samples') - } diff --git a/driver/build-driver.sh b/driver/build-driver.sh index 6ed23977..fb08ef19 100644 --- a/driver/build-driver.sh +++ b/driver/build-driver.sh @@ -40,7 +40,6 @@ git fetch -q [ -z "$AERON_VERSION" ] && AERON_VERSION=`git describe --tags origin/master` echo Building Aeron $AERON_VERSION git checkout -qf $AERON_VERSION -git apply $AERON_PATCH ./gradlew -x test cd $WD @@ -48,3 +47,5 @@ echo "Driver built from source" > $VERSION_FILE echo "Agrona: $AGRONA_VERSION" >> $VERSION_FILE echo "SBE: $SBE_VERSION" >> $VERSION_FILE echo "Aeron: $AERON_VERSION" >> $VERSION_FILE + +cp $AERON_BUILD_DIR/aeron-all/build/libs/aeron-all-*-SNAPSHOT.jar $WD/media-driver.jar diff --git a/driver/media-driver.jar b/driver/media-driver.jar index 3d2dd6ab..fdc87200 100644 Binary files a/driver/media-driver.jar and b/driver/media-driver.jar differ diff --git a/driver/version.txt b/driver/version.txt index 14945de9..0870618d 100644 --- a/driver/version.txt +++ b/driver/version.txt @@ -1,4 +1,2 @@ -Driver built from source -Agrona: 0.9.15-35-g9e6c532 -SBE: 1.7.9-33-gc3884639 -Aeron: 1.8.2-196-gd0e9417c2 +Driver source: +http://repo1.maven.org/maven2/io/aeron/aeron-all/1.9.0/aeron-all-1.9.0.jar diff --git a/scripts/build-prerelease-nuget-packages.bat b/scripts/build-prerelease-nuget-packages.bat index 6ec23453..82ac2bdb 100644 --- a/scripts/build-prerelease-nuget-packages.bat +++ b/scripts/build-prerelease-nuget-packages.bat @@ -1,6 +1,6 @@ @echo off pushd %~dp0.. -SET suffix=pre8 +SET suffix=alpha SET nuget_source=https://www.myget.org/F/aeron/api/v2/package del nupkgs\*.nupkg diff --git a/scripts/build-release-nuget-packages.bat b/scripts/build-release-nuget-packages.bat index ae75aac1..bcd08768 100644 --- a/scripts/build-release-nuget-packages.bat +++ b/scripts/build-release-nuget-packages.bat @@ -5,11 +5,13 @@ SET nuget_source=https://api.nuget.org/v3/index.json del nupkgs\*.nupkg call dotnet pack src\Adaptive.Aeron\Adaptive.Aeron.csproj -c Release --output ..\..\nupkgs -call dotnet pack src\Adaptive.Agrona\Adaptive.Agrona.csproj -c Release --output ..\..\nupkgs +call dotnet pack src\Adaptive.Agrona\Adaptive.Agrona.csproj -c Release --output ..\..\nupkgs +call dotnet pack src\Adaptive.Archiver\Adaptive.Archiver.csproj -c Release --output ..\..\nupkgs call .\scripts\nuget pack .\driver\Aeron.Driver.nuspec -OutputDirectory nupkgs call dotnet nuget push nupkgs\Agrona.*.nupkg -s %nuget_source% call dotnet nuget push nupkgs\Aeron.Client.*.nupkg -s %nuget_source% +call dotnet nuget push nupkgs\Aeron.Archiver.*.nupkg -s %nuget_source% call dotnet nuget push nupkgs\Aeron.Driver.*.nupkg -s %nuget_source% popd \ No newline at end of file diff --git a/src/Adaptive.Aeron/Adaptive.Aeron.csproj b/src/Adaptive.Aeron/Adaptive.Aeron.csproj index 3be218d0..9d2d21e0 100644 --- a/src/Adaptive.Aeron/Adaptive.Aeron.csproj +++ b/src/Adaptive.Aeron/Adaptive.Aeron.csproj @@ -2,7 +2,7 @@ netstandard2.0;net45 Aeron.Client - 1.8.3 + 1.9.0 Adaptive Financial Consulting Ltd. Adaptive Financial Consulting Ltd. Aeron Client diff --git a/src/Adaptive.Aeron/Status/HeartbeatStatus.cs b/src/Adaptive.Aeron/Status/HeartbeatStatus.cs new file mode 100644 index 00000000..082660da --- /dev/null +++ b/src/Adaptive.Aeron/Status/HeartbeatStatus.cs @@ -0,0 +1,54 @@ +using Adaptive.Agrona; +using Adaptive.Agrona.Concurrent.Status; + +namespace Adaptive.Aeron.Status +{ + /// + /// Allocate a counter for tracking the last heartbeat of an entity. + /// + public class HeartbeatStatus + { + /// + /// Offset in the key meta data for the registration id of the counter. + /// + public const int REGISTRATION_ID_OFFSET = 0; + + /// + /// Allocate a counter for tracking the last heartbeat of an entity. + /// + /// to be used for labels and key. + /// of the counter for the label. + /// of the counter for classification. + /// from which to allocated the underlying storage. + /// to be associated with the counter. + /// a new for tracking the last heartbeat. + public static AtomicCounter Allocate( + IMutableDirectBuffer tempBuffer, + string name, + int typeId, + CountersManager countersManager, + long registrationId) + { + return new AtomicCounter(countersManager.ValuesBuffer, + AllocateCounterId(tempBuffer, name, typeId, countersManager, registrationId), countersManager); + } + + public static int AllocateCounterId( + IMutableDirectBuffer tempBuffer, + string name, + int typeId, + CountersManager countersManager, + long registrationId) + { + tempBuffer.PutLong(REGISTRATION_ID_OFFSET, registrationId); + int keyLength = REGISTRATION_ID_OFFSET + BitUtil.SIZE_OF_LONG; + + int labelLength = 0; + labelLength += tempBuffer.PutStringWithoutLengthAscii(keyLength + labelLength, name); + labelLength += tempBuffer.PutStringWithoutLengthAscii(keyLength + labelLength, ": "); + labelLength += tempBuffer.PutLongAscii(keyLength + labelLength, registrationId); + + return countersManager.Allocate(typeId, tempBuffer, 0, keyLength, tempBuffer, keyLength, labelLength); + } + } +} \ No newline at end of file diff --git a/src/Adaptive.Agrona/Adaptive.Agrona.csproj b/src/Adaptive.Agrona/Adaptive.Agrona.csproj index 8dbbb263..f6d7e15a 100644 --- a/src/Adaptive.Agrona/Adaptive.Agrona.csproj +++ b/src/Adaptive.Agrona/Adaptive.Agrona.csproj @@ -3,7 +3,7 @@ netstandard2.0;net45 true Agrona - 1.8.3 + 1.9.0 Adaptive Financial Consulting Ltd. Adaptive Financial Consulting Ltd. Agrona libraries initially included in Aeron Client diff --git a/src/Adaptive.Archiver/Adaptive.Archiver.csproj b/src/Adaptive.Archiver/Adaptive.Archiver.csproj index 351c9c18..086aa6e6 100644 --- a/src/Adaptive.Archiver/Adaptive.Archiver.csproj +++ b/src/Adaptive.Archiver/Adaptive.Archiver.csproj @@ -3,7 +3,7 @@ netstandard2.0;net45 true Aeron.Archiver - 1.8.3 + 1.9.0 Adaptive Financial Consulting Ltd. Adaptive Financial Consulting Ltd. Clustering libraries over the Aeron transport diff --git a/src/Adaptive.Archiver/ArchiveProxy.cs b/src/Adaptive.Archiver/ArchiveProxy.cs index 68efc17e..29182e32 100644 --- a/src/Adaptive.Archiver/ArchiveProxy.cs +++ b/src/Adaptive.Archiver/ArchiveProxy.cs @@ -373,17 +373,17 @@ private bool Offer(int length) if (result == Publication.CLOSED) { - throw new System.InvalidOperationException("Connection to the archive has been closed"); + throw new System.InvalidOperationException("connection to the archive has been closed"); } if (result == Publication.NOT_CONNECTED) { - throw new System.InvalidOperationException("Connection to the archive is no longer available"); + throw new System.InvalidOperationException("connection to the archive is no longer available"); } if (result == Publication.MAX_POSITION_EXCEEDED) { - throw new System.InvalidOperationException("Publication failed due to max position being reached"); + throw new System.InvalidOperationException("publication failed due to max position being reached"); } if (--attempts <= 0) @@ -415,12 +415,12 @@ private bool OfferWithTimeout(int length, AgentInvoker aeronClientInvoker) if (result == Publication.CLOSED) { - throw new System.InvalidOperationException("Connection to the archive has been closed"); + throw new System.InvalidOperationException("connection to the archive has been closed"); } if (result == Publication.MAX_POSITION_EXCEEDED) { - throw new System.InvalidOperationException("Publication failed due to max position being reached"); + throw new System.InvalidOperationException("publication failed due to max position being reached"); } if (nanoClock.NanoTime() > deadlineNs) diff --git a/src/Adaptive.Archiver/ControlResponseAdaptor.cs b/src/Adaptive.Archiver/ControlResponseAdaptor.cs index 65a8a9e7..55c11b93 100644 --- a/src/Adaptive.Archiver/ControlResponseAdaptor.cs +++ b/src/Adaptive.Archiver/ControlResponseAdaptor.cs @@ -82,7 +82,7 @@ public virtual void OnFragment(IDirectBuffer buffer, int offset, int length, Hea break; default: - throw new InvalidOperationException("Unknown templateId: " + templateId); + throw new InvalidOperationException("unknown templateId: " + templateId); } } diff --git a/src/Adaptive.Archiver/ControlResponsePoller.cs b/src/Adaptive.Archiver/ControlResponsePoller.cs index 6795e440..754f4cce 100644 --- a/src/Adaptive.Archiver/ControlResponsePoller.cs +++ b/src/Adaptive.Archiver/ControlResponsePoller.cs @@ -163,7 +163,7 @@ public virtual ControlledFragmentHandlerAction OnFragment(IDirectBuffer buffer, break; default: - throw new System.InvalidOperationException("Unknown templateId: " + templateId); + throw new System.InvalidOperationException("unknown templateId: " + templateId); } pollComplete = true; diff --git a/src/Adaptive.Archiver/RecordingDescriptorPoller.cs b/src/Adaptive.Archiver/RecordingDescriptorPoller.cs index 3f87d3db..63d43db0 100644 --- a/src/Adaptive.Archiver/RecordingDescriptorPoller.cs +++ b/src/Adaptive.Archiver/RecordingDescriptorPoller.cs @@ -161,7 +161,7 @@ public virtual ControlledFragmentHandlerAction OnFragment(IDirectBuffer buffer, break; default: - throw new System.InvalidOperationException("Unknown templateId: " + templateId); + throw new System.InvalidOperationException("unknown templateId: " + templateId); } return ControlledFragmentHandlerAction.CONTINUE; diff --git a/src/Adaptive.Archiver/RecordingEventsAdapter.cs b/src/Adaptive.Archiver/RecordingEventsAdapter.cs index a0adc011..fd8a435c 100644 --- a/src/Adaptive.Archiver/RecordingEventsAdapter.cs +++ b/src/Adaptive.Archiver/RecordingEventsAdapter.cs @@ -1,7 +1,7 @@ -using Adaptive.Aeron; +using System; +using Adaptive.Aeron; using Adaptive.Aeron.LogBuffer; using Adaptive.Agrona; -using Adaptive.Agrona.Concurrent; using Adaptive.Archiver.Codecs; namespace Adaptive.Archiver @@ -11,14 +11,14 @@ namespace Adaptive.Archiver /// public class RecordingEventsAdapter : IFragmentHandler { - private readonly MessageHeaderDecoder messageHeaderDecoder = new MessageHeaderDecoder(); - private readonly RecordingStartedDecoder recordingStartedDecoder = new RecordingStartedDecoder(); - private readonly RecordingProgressDecoder recordingProgressDecoder = new RecordingProgressDecoder(); - private readonly RecordingStoppedDecoder recordingStoppedDecoder = new RecordingStoppedDecoder(); + private readonly MessageHeaderDecoder _messageHeaderDecoder = new MessageHeaderDecoder(); + private readonly RecordingStartedDecoder _recordingStartedDecoder = new RecordingStartedDecoder(); + private readonly RecordingProgressDecoder _recordingProgressDecoder = new RecordingProgressDecoder(); + private readonly RecordingStoppedDecoder _recordingStoppedDecoder = new RecordingStoppedDecoder(); - private readonly int fragmentLimit; - private readonly IRecordingEventsListener listener; - private readonly Subscription subscription; + private readonly int _fragmentLimit; + private readonly IRecordingEventsListener _listener; + private readonly Subscription _subscription; /// /// Create a poller for a given subscription to an archive for recording events. @@ -28,47 +28,71 @@ public class RecordingEventsAdapter : IFragmentHandler /// to apply for each polling operation. public RecordingEventsAdapter(IRecordingEventsListener listener, Subscription subscription, int fragmentLimit) { - this.fragmentLimit = fragmentLimit; - this.listener = listener; - this.subscription = subscription; + _fragmentLimit = fragmentLimit; + _listener = listener; + _subscription = subscription; } /// - /// Poll for recording events and dispatch them to the for this instance. + /// Poll for recording events and dispatch them to the for this instance. /// /// the number of fragments read during the operation. Zero if no events are available. public virtual int Poll() { - return subscription.Poll(this, fragmentLimit); + return _subscription.Poll(this, _fragmentLimit); } public virtual void OnFragment(IDirectBuffer buffer, int offset, int length, Header header) { - messageHeaderDecoder.Wrap(buffer, offset); + _messageHeaderDecoder.Wrap(buffer, offset); - int templateId = messageHeaderDecoder.TemplateId(); + int templateId = _messageHeaderDecoder.TemplateId(); switch (templateId) { case RecordingStartedDecoder.TEMPLATE_ID: - recordingStartedDecoder.Wrap(buffer, offset + MessageHeaderDecoder.ENCODED_LENGTH, messageHeaderDecoder.BlockLength(), messageHeaderDecoder.Version()); + _recordingStartedDecoder.Wrap( + buffer, + offset + MessageHeaderDecoder.ENCODED_LENGTH, + _messageHeaderDecoder.BlockLength(), + _messageHeaderDecoder.Version()); - listener.OnStart(recordingStartedDecoder.RecordingId(), recordingStartedDecoder.StartPosition(), recordingStartedDecoder.SessionId(), recordingStartedDecoder.StreamId(), recordingStartedDecoder.Channel(), recordingStartedDecoder.SourceIdentity()); + _listener.OnStart( + _recordingStartedDecoder.RecordingId(), + _recordingStartedDecoder.StartPosition(), + _recordingStartedDecoder.SessionId(), + _recordingStartedDecoder.StreamId(), + _recordingStartedDecoder.Channel(), + _recordingStartedDecoder.SourceIdentity()); break; case RecordingProgressDecoder.TEMPLATE_ID: - recordingProgressDecoder.Wrap(buffer, offset + MessageHeaderDecoder.ENCODED_LENGTH, messageHeaderDecoder.BlockLength(), messageHeaderDecoder.Version()); + _recordingProgressDecoder.Wrap( + buffer, + offset + MessageHeaderDecoder.ENCODED_LENGTH, + _messageHeaderDecoder.BlockLength(), + _messageHeaderDecoder.Version()); - listener.OnProgress(recordingProgressDecoder.RecordingId(), recordingProgressDecoder.StartPosition(), recordingProgressDecoder.Position()); + _listener.OnProgress( + _recordingProgressDecoder.RecordingId(), + _recordingProgressDecoder.StartPosition(), + _recordingProgressDecoder.Position()); break; case RecordingStoppedDecoder.TEMPLATE_ID: - recordingStoppedDecoder.Wrap(buffer, offset + MessageHeaderDecoder.ENCODED_LENGTH, messageHeaderDecoder.BlockLength(), messageHeaderDecoder.Version()); + _recordingStoppedDecoder.Wrap( + buffer, + offset + MessageHeaderDecoder.ENCODED_LENGTH, + _messageHeaderDecoder.BlockLength(), + _messageHeaderDecoder.Version()); - listener.OnStop(recordingStoppedDecoder.RecordingId(), recordingStoppedDecoder.StartPosition(), recordingStoppedDecoder.StopPosition()); + _listener.OnStop( + _recordingStoppedDecoder.RecordingId(), + _recordingStoppedDecoder.StartPosition(), + _recordingStoppedDecoder.StopPosition()); break; default: - throw new System.InvalidOperationException("Unknown templateId: " + templateId); + throw new InvalidOperationException("unknown templateId: " + templateId); } } } diff --git a/src/Adaptive.Archiver/RecordingEventsPoller.cs b/src/Adaptive.Archiver/RecordingEventsPoller.cs index c4bd21c2..5a00734e 100644 --- a/src/Adaptive.Archiver/RecordingEventsPoller.cs +++ b/src/Adaptive.Archiver/RecordingEventsPoller.cs @@ -135,7 +135,7 @@ public virtual void OnFragment(IDirectBuffer buffer, int offset, int length, Hea break; default: - throw new System.InvalidOperationException("Unknown templateId: " + templateId); + throw new System.InvalidOperationException("unknown templateId: " + templateId); } pollComplete = true; diff --git a/src/Adaptive.Cluster/Adaptive.Cluster.csproj b/src/Adaptive.Cluster/Adaptive.Cluster.csproj index b3367b97..2e1afe40 100644 --- a/src/Adaptive.Cluster/Adaptive.Cluster.csproj +++ b/src/Adaptive.Cluster/Adaptive.Cluster.csproj @@ -3,7 +3,7 @@ netstandard2.0;net45 true Aeron.Cluster - 1.8.3 + 1.9.0 Adaptive Financial Consulting Ltd. Adaptive Financial Consulting Ltd. Clustering libraries over the Aeron transport diff --git a/src/Adaptive.Cluster/Client/AeronCluster.cs b/src/Adaptive.Cluster/Client/AeronCluster.cs index 2cf8eb49..df201945 100644 --- a/src/Adaptive.Cluster/Client/AeronCluster.cs +++ b/src/Adaptive.Cluster/Client/AeronCluster.cs @@ -288,7 +288,12 @@ private Publication ConnectToCluster() if (_nanoClock.NanoTime() > deadlineNs) { - throw new TimeoutException("Awaiting connection to cluster"); + for (int i = 0; i < memberCount; i++) + { + publications[i].Dispose(); + } + + throw new TimeoutException("awaiting connection to cluster"); } _idleStrategy.Idle(); @@ -303,7 +308,7 @@ private Publication ConnectToCluster() { if (_nanoClock.NanoTime() > deadlineNs) { - throw new TimeoutException("Awaiting connection to cluster"); + throw new TimeoutException("awaiting connection to cluster"); } _idleStrategy.Idle(); @@ -367,7 +372,7 @@ private void PollNextResponse(long deadlineNs, long correlationId, EgressPoller { if (_nanoClock.NanoTime() > deadlineNs) { - throw new TimeoutException("Awaiting response for correlationId=" + correlationId); + throw new TimeoutException("awaiting response for correlationId=" + correlationId); } _idleStrategy.Idle(); @@ -405,12 +410,12 @@ private long SendConnectRequest(byte[] encodedCredentials, long deadlineNs) if (Publication.CLOSED == result) { - throw new InvalidOperationException("Unexpected close from cluster"); + throw new InvalidOperationException("unexpected close from cluster"); } if (_nanoClock.NanoTime() > deadlineNs) { - throw new TimeoutException("Failed to connect to cluster"); + throw new TimeoutException("failed to connect to cluster"); } _idleStrategy.Idle(); @@ -451,7 +456,7 @@ private long SendChallengeResponse(long sessionId, byte[] encodedCredentials, lo if (_nanoClock.NanoTime() > deadlineNs) { - throw new TimeoutException("Failed to connect to cluster"); + throw new TimeoutException("failed to connect to cluster"); } _idleStrategy.Idle(); @@ -464,7 +469,7 @@ private static void CheckResult(long result) { if (result == Publication.NOT_CONNECTED || result == Publication.CLOSED || result == Publication.MAX_POSITION_EXCEEDED) { - throw new InvalidOperationException("Unexpected publication state: " + result); + throw new InvalidOperationException("unexpected publication state: " + result); } } diff --git a/src/Adaptive.Cluster/Client/EgressAdapter.cs b/src/Adaptive.Cluster/Client/EgressAdapter.cs index 35dcded6..8b9baf8b 100644 --- a/src/Adaptive.Cluster/Client/EgressAdapter.cs +++ b/src/Adaptive.Cluster/Client/EgressAdapter.cs @@ -2,7 +2,6 @@ using Adaptive.Aeron; using Adaptive.Aeron.LogBuffer; using Adaptive.Agrona; -using Adaptive.Agrona.Concurrent; using Adaptive.Cluster.Codecs; namespace Adaptive.Cluster.Client @@ -44,9 +43,17 @@ public void OnFragment(IDirectBuffer buffer, int offset, int length, Header head switch (templateId) { case SessionEventDecoder.TEMPLATE_ID: - _sessionEventDecoder.Wrap(buffer, offset + MessageHeaderDecoder.ENCODED_LENGTH, _messageHeaderDecoder.BlockLength(), _messageHeaderDecoder.Version()); + _sessionEventDecoder.Wrap( + buffer, + offset + MessageHeaderDecoder.ENCODED_LENGTH, + _messageHeaderDecoder.BlockLength(), + _messageHeaderDecoder.Version()); - _listener.SessionEvent(_sessionEventDecoder.CorrelationId(), _sessionEventDecoder.ClusterSessionId(), _sessionEventDecoder.Code(), _sessionEventDecoder.Detail()); + _listener.SessionEvent( + _sessionEventDecoder.CorrelationId(), + _sessionEventDecoder.ClusterSessionId(), + _sessionEventDecoder.Code(), + _sessionEventDecoder.Detail()); break; case NewLeaderEventDecoder.TEMPLATE_ID: @@ -56,13 +63,27 @@ public void OnFragment(IDirectBuffer buffer, int offset, int length, Header head break; case SessionHeaderDecoder.TEMPLATE_ID: - _sessionHeaderDecoder.Wrap(buffer, offset + MessageHeaderDecoder.ENCODED_LENGTH, _messageHeaderDecoder.BlockLength(), _messageHeaderDecoder.Version()); + _sessionHeaderDecoder.Wrap( + buffer, + offset + MessageHeaderDecoder.ENCODED_LENGTH, + _messageHeaderDecoder.BlockLength(), + _messageHeaderDecoder.Version()); - _listener.OnMessage(_sessionHeaderDecoder.CorrelationId(), _sessionHeaderDecoder.ClusterSessionId(), _sessionHeaderDecoder.Timestamp(), buffer, offset + SESSION_HEADER_LENGTH, length - SESSION_HEADER_LENGTH, header); + _listener.OnMessage( + _sessionHeaderDecoder.CorrelationId(), + _sessionHeaderDecoder.ClusterSessionId(), + _sessionHeaderDecoder.Timestamp(), + buffer, + offset + SESSION_HEADER_LENGTH, + length - SESSION_HEADER_LENGTH, + header); + break; + + case ChallengeDecoder.TEMPLATE_ID: break; default: - throw new InvalidOperationException("Unknown templateId: " + templateId); + throw new InvalidOperationException("unknown templateId: " + templateId); } } } diff --git a/src/Adaptive.Cluster/Client/EgressPoller.cs b/src/Adaptive.Cluster/Client/EgressPoller.cs index 97b92fc5..3c289ec6 100644 --- a/src/Adaptive.Cluster/Client/EgressPoller.cs +++ b/src/Adaptive.Cluster/Client/EgressPoller.cs @@ -170,7 +170,7 @@ public ControlledFragmentHandlerAction OnFragment(IDirectBuffer buffer, int offs break; default: - throw new InvalidOperationException("Unknown templateId: " + templateId); + throw new InvalidOperationException("unknown templateId: " + templateId); } pollComplete = true; diff --git a/src/Adaptive.Cluster/Codecs/AppendedPositionDecoder.cs b/src/Adaptive.Cluster/Codecs/AppendedPositionDecoder.cs index 37d022c0..88f86048 100644 --- a/src/Adaptive.Cluster/Codecs/AppendedPositionDecoder.cs +++ b/src/Adaptive.Cluster/Codecs/AppendedPositionDecoder.cs @@ -10,7 +10,7 @@ namespace Adaptive.Cluster.Codecs { public class AppendedPositionDecoder { public const ushort BLOCK_LENGTH = 20; - public const ushort TEMPLATE_ID = 53; + public const ushort TEMPLATE_ID = 54; public const ushort SCHEMA_ID = 1; public const ushort SCHEMA_VERSION = 1; @@ -88,27 +88,27 @@ public void Limit(int limit) this._limit = limit; } - public static int TermPositionId() + public static int LogPositionId() { return 1; } - public static int TermPositionSinceVersion() + public static int LogPositionSinceVersion() { return 0; } - public static int TermPositionEncodingOffset() + public static int LogPositionEncodingOffset() { return 0; } - public static int TermPositionEncodingLength() + public static int LogPositionEncodingLength() { return 8; } - public static string TermPositionMetaAttribute(MetaAttribute metaAttribute) + public static string LogPositionMetaAttribute(MetaAttribute metaAttribute) { switch (metaAttribute) { @@ -121,22 +121,22 @@ public static string TermPositionMetaAttribute(MetaAttribute metaAttribute) return ""; } - public static long TermPositionNullValue() + public static long LogPositionNullValue() { return -9223372036854775808L; } - public static long TermPositionMinValue() + public static long LogPositionMinValue() { return -9223372036854775807L; } - public static long TermPositionMaxValue() + public static long LogPositionMaxValue() { return 9223372036854775807L; } - public long TermPosition() + public long LogPosition() { return _buffer.GetLong(_offset + 0, ByteOrder.LittleEndian); } @@ -279,10 +279,10 @@ public StringBuilder AppendTo(StringBuilder builder) } builder.Append(BLOCK_LENGTH); builder.Append("):"); - //Token{signal=BEGIN_FIELD, name='termPosition', referencedName='null', description='null', id=1, version=0, deprecated=0, encodedLength=0, offset=0, componentTokenCount=3, encoding=Encoding{presence=REQUIRED, primitiveType=null, byteOrder=LITTLE_ENDIAN, minValue=null, maxValue=null, nullValue=null, constValue=null, characterEncoding='null', epoch='unix', timeUnit=nanosecond, semanticType='null'}} + //Token{signal=BEGIN_FIELD, name='logPosition', referencedName='null', description='null', id=1, version=0, deprecated=0, encodedLength=0, offset=0, componentTokenCount=3, encoding=Encoding{presence=REQUIRED, primitiveType=null, byteOrder=LITTLE_ENDIAN, minValue=null, maxValue=null, nullValue=null, constValue=null, characterEncoding='null', epoch='unix', timeUnit=nanosecond, semanticType='null'}} //Token{signal=ENCODING, name='int64', referencedName='null', description='null', id=-1, version=0, deprecated=0, encodedLength=8, offset=0, componentTokenCount=1, encoding=Encoding{presence=REQUIRED, primitiveType=INT64, byteOrder=LITTLE_ENDIAN, minValue=null, maxValue=null, nullValue=null, constValue=null, characterEncoding='null', epoch='unix', timeUnit=nanosecond, semanticType='null'}} - builder.Append("TermPosition="); - builder.Append(TermPosition()); + builder.Append("LogPosition="); + builder.Append(LogPosition()); builder.Append('|'); //Token{signal=BEGIN_FIELD, name='leadershipTermId', referencedName='null', description='null', id=2, version=0, deprecated=0, encodedLength=0, offset=8, componentTokenCount=3, encoding=Encoding{presence=REQUIRED, primitiveType=null, byteOrder=LITTLE_ENDIAN, minValue=null, maxValue=null, nullValue=null, constValue=null, characterEncoding='null', epoch='unix', timeUnit=nanosecond, semanticType='null'}} //Token{signal=ENCODING, name='int64', referencedName='null', description='null', id=-1, version=0, deprecated=0, encodedLength=8, offset=8, componentTokenCount=1, encoding=Encoding{presence=REQUIRED, primitiveType=INT64, byteOrder=LITTLE_ENDIAN, minValue=null, maxValue=null, nullValue=null, constValue=null, characterEncoding='null', epoch='unix', timeUnit=nanosecond, semanticType='null'}} diff --git a/src/Adaptive.Cluster/Codecs/AppendedPositionEncoder.cs b/src/Adaptive.Cluster/Codecs/AppendedPositionEncoder.cs index cf2faad5..9254c853 100644 --- a/src/Adaptive.Cluster/Codecs/AppendedPositionEncoder.cs +++ b/src/Adaptive.Cluster/Codecs/AppendedPositionEncoder.cs @@ -10,7 +10,7 @@ namespace Adaptive.Cluster.Codecs { public class AppendedPositionEncoder { public const ushort BLOCK_LENGTH = 20; - public const ushort TEMPLATE_ID = 53; + public const ushort TEMPLATE_ID = 54; public const ushort SCHEMA_ID = 1; public const ushort SCHEMA_VERSION = 1; @@ -96,32 +96,32 @@ public void Limit(int limit) this._limit = limit; } - public static int TermPositionEncodingOffset() + public static int LogPositionEncodingOffset() { return 0; } - public static int TermPositionEncodingLength() + public static int LogPositionEncodingLength() { return 8; } - public static long TermPositionNullValue() + public static long LogPositionNullValue() { return -9223372036854775808L; } - public static long TermPositionMinValue() + public static long LogPositionMinValue() { return -9223372036854775807L; } - public static long TermPositionMaxValue() + public static long LogPositionMaxValue() { return 9223372036854775807L; } - public AppendedPositionEncoder TermPosition(long value) + public AppendedPositionEncoder LogPosition(long value) { _buffer.PutLong(_offset + 0, value, ByteOrder.LittleEndian); return this; diff --git a/src/Adaptive.Cluster/Codecs/CanvassPositionDecoder.cs b/src/Adaptive.Cluster/Codecs/CanvassPositionDecoder.cs new file mode 100644 index 00000000..07dec85b --- /dev/null +++ b/src/Adaptive.Cluster/Codecs/CanvassPositionDecoder.cs @@ -0,0 +1,302 @@ +/* Generated SBE (Simple Binary Encoding) message codec */ +using System; +using System.Text; +using System.Collections.Generic; +using Adaptive.Agrona; + + +namespace Adaptive.Cluster.Codecs { + +public class CanvassPositionDecoder +{ + public const ushort BLOCK_LENGTH = 20; + public const ushort TEMPLATE_ID = 50; + public const ushort SCHEMA_ID = 1; + public const ushort SCHEMA_VERSION = 1; + + private CanvassPositionDecoder _parentMessage; + private IDirectBuffer _buffer; + protected int _offset; + protected int _limit; + protected int _actingBlockLength; + protected int _actingVersion; + + public CanvassPositionDecoder() + { + _parentMessage = this; + } + + public ushort SbeBlockLength() + { + return BLOCK_LENGTH; + } + + public ushort SbeTemplateId() + { + return TEMPLATE_ID; + } + + public ushort SbeSchemaId() + { + return SCHEMA_ID; + } + + public ushort SbeSchemaVersion() + { + return SCHEMA_VERSION; + } + + public string SbeSemanticType() + { + return ""; + } + + public IDirectBuffer Buffer() + { + return _buffer; + } + + public int Offset() + { + return _offset; + } + + public CanvassPositionDecoder Wrap( + IDirectBuffer buffer, int offset, int actingBlockLength, int actingVersion) + { + this._buffer = buffer; + this._offset = offset; + this._actingBlockLength = actingBlockLength; + this._actingVersion = actingVersion; + Limit(offset + actingBlockLength); + + return this; + } + + public int EncodedLength() + { + return _limit - _offset; + } + + public int Limit() + { + return _limit; + } + + public void Limit(int limit) + { + this._limit = limit; + } + + public static int LogPositionId() + { + return 1; + } + + public static int LogPositionSinceVersion() + { + return 0; + } + + public static int LogPositionEncodingOffset() + { + return 0; + } + + public static int LogPositionEncodingLength() + { + return 8; + } + + public static string LogPositionMetaAttribute(MetaAttribute metaAttribute) + { + switch (metaAttribute) + { + case MetaAttribute.EPOCH: return "unix"; + case MetaAttribute.TIME_UNIT: return "nanosecond"; + case MetaAttribute.SEMANTIC_TYPE: return ""; + case MetaAttribute.PRESENCE: return "required"; + } + + return ""; + } + + public static long LogPositionNullValue() + { + return -9223372036854775808L; + } + + public static long LogPositionMinValue() + { + return -9223372036854775807L; + } + + public static long LogPositionMaxValue() + { + return 9223372036854775807L; + } + + public long LogPosition() + { + return _buffer.GetLong(_offset + 0, ByteOrder.LittleEndian); + } + + + public static int LeadershipTermIdId() + { + return 2; + } + + public static int LeadershipTermIdSinceVersion() + { + return 0; + } + + public static int LeadershipTermIdEncodingOffset() + { + return 8; + } + + public static int LeadershipTermIdEncodingLength() + { + return 8; + } + + public static string LeadershipTermIdMetaAttribute(MetaAttribute metaAttribute) + { + switch (metaAttribute) + { + case MetaAttribute.EPOCH: return "unix"; + case MetaAttribute.TIME_UNIT: return "nanosecond"; + case MetaAttribute.SEMANTIC_TYPE: return ""; + case MetaAttribute.PRESENCE: return "required"; + } + + return ""; + } + + public static long LeadershipTermIdNullValue() + { + return -9223372036854775808L; + } + + public static long LeadershipTermIdMinValue() + { + return -9223372036854775807L; + } + + public static long LeadershipTermIdMaxValue() + { + return 9223372036854775807L; + } + + public long LeadershipTermId() + { + return _buffer.GetLong(_offset + 8, ByteOrder.LittleEndian); + } + + + public static int FollowerMemberIdId() + { + return 3; + } + + public static int FollowerMemberIdSinceVersion() + { + return 0; + } + + public static int FollowerMemberIdEncodingOffset() + { + return 16; + } + + public static int FollowerMemberIdEncodingLength() + { + return 4; + } + + public static string FollowerMemberIdMetaAttribute(MetaAttribute metaAttribute) + { + switch (metaAttribute) + { + case MetaAttribute.EPOCH: return "unix"; + case MetaAttribute.TIME_UNIT: return "nanosecond"; + case MetaAttribute.SEMANTIC_TYPE: return ""; + case MetaAttribute.PRESENCE: return "required"; + } + + return ""; + } + + public static int FollowerMemberIdNullValue() + { + return -2147483648; + } + + public static int FollowerMemberIdMinValue() + { + return -2147483647; + } + + public static int FollowerMemberIdMaxValue() + { + return 2147483647; + } + + public int FollowerMemberId() + { + return _buffer.GetInt(_offset + 16, ByteOrder.LittleEndian); + } + + + + public override string ToString() + { + return AppendTo(new StringBuilder(100)).ToString(); + } + + public StringBuilder AppendTo(StringBuilder builder) + { + int originalLimit = Limit(); + Limit(_offset + _actingBlockLength); + builder.Append("[CanvassPosition](sbeTemplateId="); + builder.Append(TEMPLATE_ID); + builder.Append("|sbeSchemaId="); + builder.Append(SCHEMA_ID); + builder.Append("|sbeSchemaVersion="); + if (_parentMessage._actingVersion != SCHEMA_VERSION) + { + builder.Append(_parentMessage._actingVersion); + builder.Append('/'); + } + builder.Append(SCHEMA_VERSION); + builder.Append("|sbeBlockLength="); + if (_actingBlockLength != BLOCK_LENGTH) + { + builder.Append(_actingBlockLength); + builder.Append('/'); + } + builder.Append(BLOCK_LENGTH); + builder.Append("):"); + //Token{signal=BEGIN_FIELD, name='logPosition', referencedName='null', description='null', id=1, version=0, deprecated=0, encodedLength=0, offset=0, componentTokenCount=3, encoding=Encoding{presence=REQUIRED, primitiveType=null, byteOrder=LITTLE_ENDIAN, minValue=null, maxValue=null, nullValue=null, constValue=null, characterEncoding='null', epoch='unix', timeUnit=nanosecond, semanticType='null'}} + //Token{signal=ENCODING, name='int64', referencedName='null', description='null', id=-1, version=0, deprecated=0, encodedLength=8, offset=0, componentTokenCount=1, encoding=Encoding{presence=REQUIRED, primitiveType=INT64, byteOrder=LITTLE_ENDIAN, minValue=null, maxValue=null, nullValue=null, constValue=null, characterEncoding='null', epoch='unix', timeUnit=nanosecond, semanticType='null'}} + builder.Append("LogPosition="); + builder.Append(LogPosition()); + builder.Append('|'); + //Token{signal=BEGIN_FIELD, name='leadershipTermId', referencedName='null', description='null', id=2, version=0, deprecated=0, encodedLength=0, offset=8, componentTokenCount=3, encoding=Encoding{presence=REQUIRED, primitiveType=null, byteOrder=LITTLE_ENDIAN, minValue=null, maxValue=null, nullValue=null, constValue=null, characterEncoding='null', epoch='unix', timeUnit=nanosecond, semanticType='null'}} + //Token{signal=ENCODING, name='int64', referencedName='null', description='null', id=-1, version=0, deprecated=0, encodedLength=8, offset=8, componentTokenCount=1, encoding=Encoding{presence=REQUIRED, primitiveType=INT64, byteOrder=LITTLE_ENDIAN, minValue=null, maxValue=null, nullValue=null, constValue=null, characterEncoding='null', epoch='unix', timeUnit=nanosecond, semanticType='null'}} + builder.Append("LeadershipTermId="); + builder.Append(LeadershipTermId()); + builder.Append('|'); + //Token{signal=BEGIN_FIELD, name='followerMemberId', referencedName='null', description='null', id=3, version=0, deprecated=0, encodedLength=0, offset=16, componentTokenCount=3, encoding=Encoding{presence=REQUIRED, primitiveType=null, byteOrder=LITTLE_ENDIAN, minValue=null, maxValue=null, nullValue=null, constValue=null, characterEncoding='null', epoch='unix', timeUnit=nanosecond, semanticType='null'}} + //Token{signal=ENCODING, name='int32', referencedName='null', description='null', id=-1, version=0, deprecated=0, encodedLength=4, offset=16, componentTokenCount=1, encoding=Encoding{presence=REQUIRED, primitiveType=INT32, byteOrder=LITTLE_ENDIAN, minValue=null, maxValue=null, nullValue=null, constValue=null, characterEncoding='null', epoch='unix', timeUnit=nanosecond, semanticType='null'}} + builder.Append("FollowerMemberId="); + builder.Append(FollowerMemberId()); + + Limit(originalLimit); + + return builder; + } +} +} diff --git a/src/Adaptive.Cluster/Codecs/CanvassPositionEncoder.cs b/src/Adaptive.Cluster/Codecs/CanvassPositionEncoder.cs new file mode 100644 index 00000000..ad6651e0 --- /dev/null +++ b/src/Adaptive.Cluster/Codecs/CanvassPositionEncoder.cs @@ -0,0 +1,209 @@ +/* Generated SBE (Simple Binary Encoding) message codec */ +using System; +using System.Text; +using System.Collections.Generic; +using Adaptive.Agrona; + + +namespace Adaptive.Cluster.Codecs { + +public class CanvassPositionEncoder +{ + public const ushort BLOCK_LENGTH = 20; + public const ushort TEMPLATE_ID = 50; + public const ushort SCHEMA_ID = 1; + public const ushort SCHEMA_VERSION = 1; + + private CanvassPositionEncoder _parentMessage; + private IMutableDirectBuffer _buffer; + protected int _offset; + protected int _limit; + + public CanvassPositionEncoder() + { + _parentMessage = this; + } + + public ushort SbeBlockLength() + { + return BLOCK_LENGTH; + } + + public ushort SbeTemplateId() + { + return TEMPLATE_ID; + } + + public ushort SbeSchemaId() + { + return SCHEMA_ID; + } + + public ushort SbeSchemaVersion() + { + return SCHEMA_VERSION; + } + + public string SbeSemanticType() + { + return ""; + } + + public IMutableDirectBuffer Buffer() + { + return _buffer; + } + + public int Offset() + { + return _offset; + } + + public CanvassPositionEncoder Wrap(IMutableDirectBuffer buffer, int offset) + { + this._buffer = buffer; + this._offset = offset; + Limit(offset + BLOCK_LENGTH); + + return this; + } + + public CanvassPositionEncoder WrapAndApplyHeader( + IMutableDirectBuffer buffer, int offset, MessageHeaderEncoder headerEncoder) + { + headerEncoder + .Wrap(buffer, offset) + .BlockLength(BLOCK_LENGTH) + .TemplateId(TEMPLATE_ID) + .SchemaId(SCHEMA_ID) + .Version(SCHEMA_VERSION); + + return Wrap(buffer, offset + MessageHeaderEncoder.ENCODED_LENGTH); + } + + public int EncodedLength() + { + return _limit - _offset; + } + + public int Limit() + { + return _limit; + } + + public void Limit(int limit) + { + this._limit = limit; + } + + public static int LogPositionEncodingOffset() + { + return 0; + } + + public static int LogPositionEncodingLength() + { + return 8; + } + + public static long LogPositionNullValue() + { + return -9223372036854775808L; + } + + public static long LogPositionMinValue() + { + return -9223372036854775807L; + } + + public static long LogPositionMaxValue() + { + return 9223372036854775807L; + } + + public CanvassPositionEncoder LogPosition(long value) + { + _buffer.PutLong(_offset + 0, value, ByteOrder.LittleEndian); + return this; + } + + + public static int LeadershipTermIdEncodingOffset() + { + return 8; + } + + public static int LeadershipTermIdEncodingLength() + { + return 8; + } + + public static long LeadershipTermIdNullValue() + { + return -9223372036854775808L; + } + + public static long LeadershipTermIdMinValue() + { + return -9223372036854775807L; + } + + public static long LeadershipTermIdMaxValue() + { + return 9223372036854775807L; + } + + public CanvassPositionEncoder LeadershipTermId(long value) + { + _buffer.PutLong(_offset + 8, value, ByteOrder.LittleEndian); + return this; + } + + + public static int FollowerMemberIdEncodingOffset() + { + return 16; + } + + public static int FollowerMemberIdEncodingLength() + { + return 4; + } + + public static int FollowerMemberIdNullValue() + { + return -2147483648; + } + + public static int FollowerMemberIdMinValue() + { + return -2147483647; + } + + public static int FollowerMemberIdMaxValue() + { + return 2147483647; + } + + public CanvassPositionEncoder FollowerMemberId(int value) + { + _buffer.PutInt(_offset + 16, value, ByteOrder.LittleEndian); + return this; + } + + + + public override string ToString() + { + return AppendTo(new StringBuilder(100)).ToString(); + } + + public StringBuilder AppendTo(StringBuilder builder) + { + CanvassPositionDecoder writer = new CanvassPositionDecoder(); + writer.Wrap(_buffer, _offset, BLOCK_LENGTH, SCHEMA_VERSION); + + return writer.AppendTo(builder); + } +} +} diff --git a/src/Adaptive.Cluster/Codecs/CommitPositionDecoder.cs b/src/Adaptive.Cluster/Codecs/CommitPositionDecoder.cs index 38794b5a..0df91272 100644 --- a/src/Adaptive.Cluster/Codecs/CommitPositionDecoder.cs +++ b/src/Adaptive.Cluster/Codecs/CommitPositionDecoder.cs @@ -10,7 +10,7 @@ namespace Adaptive.Cluster.Codecs { public class CommitPositionDecoder { public const ushort BLOCK_LENGTH = 20; - public const ushort TEMPLATE_ID = 54; + public const ushort TEMPLATE_ID = 55; public const ushort SCHEMA_ID = 1; public const ushort SCHEMA_VERSION = 1; @@ -88,27 +88,27 @@ public void Limit(int limit) this._limit = limit; } - public static int TermPositionId() + public static int LogPositionId() { return 1; } - public static int TermPositionSinceVersion() + public static int LogPositionSinceVersion() { return 0; } - public static int TermPositionEncodingOffset() + public static int LogPositionEncodingOffset() { return 0; } - public static int TermPositionEncodingLength() + public static int LogPositionEncodingLength() { return 8; } - public static string TermPositionMetaAttribute(MetaAttribute metaAttribute) + public static string LogPositionMetaAttribute(MetaAttribute metaAttribute) { switch (metaAttribute) { @@ -121,22 +121,22 @@ public static string TermPositionMetaAttribute(MetaAttribute metaAttribute) return ""; } - public static long TermPositionNullValue() + public static long LogPositionNullValue() { return -9223372036854775808L; } - public static long TermPositionMinValue() + public static long LogPositionMinValue() { return -9223372036854775807L; } - public static long TermPositionMaxValue() + public static long LogPositionMaxValue() { return 9223372036854775807L; } - public long TermPosition() + public long LogPosition() { return _buffer.GetLong(_offset + 0, ByteOrder.LittleEndian); } @@ -279,10 +279,10 @@ public StringBuilder AppendTo(StringBuilder builder) } builder.Append(BLOCK_LENGTH); builder.Append("):"); - //Token{signal=BEGIN_FIELD, name='termPosition', referencedName='null', description='null', id=1, version=0, deprecated=0, encodedLength=0, offset=0, componentTokenCount=3, encoding=Encoding{presence=REQUIRED, primitiveType=null, byteOrder=LITTLE_ENDIAN, minValue=null, maxValue=null, nullValue=null, constValue=null, characterEncoding='null', epoch='unix', timeUnit=nanosecond, semanticType='null'}} + //Token{signal=BEGIN_FIELD, name='logPosition', referencedName='null', description='null', id=1, version=0, deprecated=0, encodedLength=0, offset=0, componentTokenCount=3, encoding=Encoding{presence=REQUIRED, primitiveType=null, byteOrder=LITTLE_ENDIAN, minValue=null, maxValue=null, nullValue=null, constValue=null, characterEncoding='null', epoch='unix', timeUnit=nanosecond, semanticType='null'}} //Token{signal=ENCODING, name='int64', referencedName='null', description='null', id=-1, version=0, deprecated=0, encodedLength=8, offset=0, componentTokenCount=1, encoding=Encoding{presence=REQUIRED, primitiveType=INT64, byteOrder=LITTLE_ENDIAN, minValue=null, maxValue=null, nullValue=null, constValue=null, characterEncoding='null', epoch='unix', timeUnit=nanosecond, semanticType='null'}} - builder.Append("TermPosition="); - builder.Append(TermPosition()); + builder.Append("LogPosition="); + builder.Append(LogPosition()); builder.Append('|'); //Token{signal=BEGIN_FIELD, name='leadershipTermId', referencedName='null', description='null', id=2, version=0, deprecated=0, encodedLength=0, offset=8, componentTokenCount=3, encoding=Encoding{presence=REQUIRED, primitiveType=null, byteOrder=LITTLE_ENDIAN, minValue=null, maxValue=null, nullValue=null, constValue=null, characterEncoding='null', epoch='unix', timeUnit=nanosecond, semanticType='null'}} //Token{signal=ENCODING, name='int64', referencedName='null', description='null', id=-1, version=0, deprecated=0, encodedLength=8, offset=8, componentTokenCount=1, encoding=Encoding{presence=REQUIRED, primitiveType=INT64, byteOrder=LITTLE_ENDIAN, minValue=null, maxValue=null, nullValue=null, constValue=null, characterEncoding='null', epoch='unix', timeUnit=nanosecond, semanticType='null'}} diff --git a/src/Adaptive.Cluster/Codecs/CommitPositionEncoder.cs b/src/Adaptive.Cluster/Codecs/CommitPositionEncoder.cs index 2dac8294..c2d5531d 100644 --- a/src/Adaptive.Cluster/Codecs/CommitPositionEncoder.cs +++ b/src/Adaptive.Cluster/Codecs/CommitPositionEncoder.cs @@ -10,7 +10,7 @@ namespace Adaptive.Cluster.Codecs { public class CommitPositionEncoder { public const ushort BLOCK_LENGTH = 20; - public const ushort TEMPLATE_ID = 54; + public const ushort TEMPLATE_ID = 55; public const ushort SCHEMA_ID = 1; public const ushort SCHEMA_VERSION = 1; @@ -96,32 +96,32 @@ public void Limit(int limit) this._limit = limit; } - public static int TermPositionEncodingOffset() + public static int LogPositionEncodingOffset() { return 0; } - public static int TermPositionEncodingLength() + public static int LogPositionEncodingLength() { return 8; } - public static long TermPositionNullValue() + public static long LogPositionNullValue() { return -9223372036854775808L; } - public static long TermPositionMinValue() + public static long LogPositionMinValue() { return -9223372036854775807L; } - public static long TermPositionMaxValue() + public static long LogPositionMaxValue() { return 9223372036854775807L; } - public CommitPositionEncoder TermPosition(long value) + public CommitPositionEncoder LogPosition(long value) { _buffer.PutLong(_offset + 0, value, ByteOrder.LittleEndian); return this; diff --git a/src/Adaptive.Cluster/Codecs/NewLeadershipTermDecoder.cs b/src/Adaptive.Cluster/Codecs/NewLeadershipTermDecoder.cs index f115d990..6285cb1f 100644 --- a/src/Adaptive.Cluster/Codecs/NewLeadershipTermDecoder.cs +++ b/src/Adaptive.Cluster/Codecs/NewLeadershipTermDecoder.cs @@ -9,8 +9,8 @@ namespace Adaptive.Cluster.Codecs { public class NewLeadershipTermDecoder { - public const ushort BLOCK_LENGTH = 32; - public const ushort TEMPLATE_ID = 52; + public const ushort BLOCK_LENGTH = 24; + public const ushort TEMPLATE_ID = 53; public const ushort SCHEMA_ID = 1; public const ushort SCHEMA_VERSION = 1; @@ -88,27 +88,27 @@ public void Limit(int limit) this._limit = limit; } - public static int LastBaseLogPositionId() + public static int LogPositionId() { return 1; } - public static int LastBaseLogPositionSinceVersion() + public static int LogPositionSinceVersion() { return 0; } - public static int LastBaseLogPositionEncodingOffset() + public static int LogPositionEncodingOffset() { return 0; } - public static int LastBaseLogPositionEncodingLength() + public static int LogPositionEncodingLength() { return 8; } - public static string LastBaseLogPositionMetaAttribute(MetaAttribute metaAttribute) + public static string LogPositionMetaAttribute(MetaAttribute metaAttribute) { switch (metaAttribute) { @@ -121,84 +121,30 @@ public static string LastBaseLogPositionMetaAttribute(MetaAttribute metaAttribut return ""; } - public static long LastBaseLogPositionNullValue() + public static long LogPositionNullValue() { return -9223372036854775808L; } - public static long LastBaseLogPositionMinValue() + public static long LogPositionMinValue() { return -9223372036854775807L; } - public static long LastBaseLogPositionMaxValue() + public static long LogPositionMaxValue() { return 9223372036854775807L; } - public long LastBaseLogPosition() + public long LogPosition() { return _buffer.GetLong(_offset + 0, ByteOrder.LittleEndian); } - public static int LastTermPositionId() - { - return 2; - } - - public static int LastTermPositionSinceVersion() - { - return 0; - } - - public static int LastTermPositionEncodingOffset() - { - return 8; - } - - public static int LastTermPositionEncodingLength() - { - return 8; - } - - public static string LastTermPositionMetaAttribute(MetaAttribute metaAttribute) - { - switch (metaAttribute) - { - case MetaAttribute.EPOCH: return "unix"; - case MetaAttribute.TIME_UNIT: return "nanosecond"; - case MetaAttribute.SEMANTIC_TYPE: return ""; - case MetaAttribute.PRESENCE: return "required"; - } - - return ""; - } - - public static long LastTermPositionNullValue() - { - return -9223372036854775808L; - } - - public static long LastTermPositionMinValue() - { - return -9223372036854775807L; - } - - public static long LastTermPositionMaxValue() - { - return 9223372036854775807L; - } - - public long LastTermPosition() - { - return _buffer.GetLong(_offset + 8, ByteOrder.LittleEndian); - } - - public static int LeadershipTermIdId() { - return 3; + return 2; } public static int LeadershipTermIdSinceVersion() @@ -208,7 +154,7 @@ public static int LeadershipTermIdSinceVersion() public static int LeadershipTermIdEncodingOffset() { - return 16; + return 8; } public static int LeadershipTermIdEncodingLength() @@ -246,13 +192,13 @@ public static long LeadershipTermIdMaxValue() public long LeadershipTermId() { - return _buffer.GetLong(_offset + 16, ByteOrder.LittleEndian); + return _buffer.GetLong(_offset + 8, ByteOrder.LittleEndian); } public static int LeaderMemberIdId() { - return 4; + return 3; } public static int LeaderMemberIdSinceVersion() @@ -262,7 +208,7 @@ public static int LeaderMemberIdSinceVersion() public static int LeaderMemberIdEncodingOffset() { - return 24; + return 16; } public static int LeaderMemberIdEncodingLength() @@ -300,13 +246,13 @@ public static int LeaderMemberIdMaxValue() public int LeaderMemberId() { - return _buffer.GetInt(_offset + 24, ByteOrder.LittleEndian); + return _buffer.GetInt(_offset + 16, ByteOrder.LittleEndian); } public static int LogSessionIdId() { - return 5; + return 4; } public static int LogSessionIdSinceVersion() @@ -316,7 +262,7 @@ public static int LogSessionIdSinceVersion() public static int LogSessionIdEncodingOffset() { - return 28; + return 20; } public static int LogSessionIdEncodingLength() @@ -354,7 +300,7 @@ public static int LogSessionIdMaxValue() public int LogSessionId() { - return _buffer.GetInt(_offset + 28, ByteOrder.LittleEndian); + return _buffer.GetInt(_offset + 20, ByteOrder.LittleEndian); } @@ -387,28 +333,23 @@ public StringBuilder AppendTo(StringBuilder builder) } builder.Append(BLOCK_LENGTH); builder.Append("):"); - //Token{signal=BEGIN_FIELD, name='lastBaseLogPosition', referencedName='null', description='null', id=1, version=0, deprecated=0, encodedLength=0, offset=0, componentTokenCount=3, encoding=Encoding{presence=REQUIRED, primitiveType=null, byteOrder=LITTLE_ENDIAN, minValue=null, maxValue=null, nullValue=null, constValue=null, characterEncoding='null', epoch='unix', timeUnit=nanosecond, semanticType='null'}} + //Token{signal=BEGIN_FIELD, name='logPosition', referencedName='null', description='null', id=1, version=0, deprecated=0, encodedLength=0, offset=0, componentTokenCount=3, encoding=Encoding{presence=REQUIRED, primitiveType=null, byteOrder=LITTLE_ENDIAN, minValue=null, maxValue=null, nullValue=null, constValue=null, characterEncoding='null', epoch='unix', timeUnit=nanosecond, semanticType='null'}} //Token{signal=ENCODING, name='int64', referencedName='null', description='null', id=-1, version=0, deprecated=0, encodedLength=8, offset=0, componentTokenCount=1, encoding=Encoding{presence=REQUIRED, primitiveType=INT64, byteOrder=LITTLE_ENDIAN, minValue=null, maxValue=null, nullValue=null, constValue=null, characterEncoding='null', epoch='unix', timeUnit=nanosecond, semanticType='null'}} - builder.Append("LastBaseLogPosition="); - builder.Append(LastBaseLogPosition()); + builder.Append("LogPosition="); + builder.Append(LogPosition()); builder.Append('|'); - //Token{signal=BEGIN_FIELD, name='lastTermPosition', referencedName='null', description='null', id=2, version=0, deprecated=0, encodedLength=0, offset=8, componentTokenCount=3, encoding=Encoding{presence=REQUIRED, primitiveType=null, byteOrder=LITTLE_ENDIAN, minValue=null, maxValue=null, nullValue=null, constValue=null, characterEncoding='null', epoch='unix', timeUnit=nanosecond, semanticType='null'}} + //Token{signal=BEGIN_FIELD, name='leadershipTermId', referencedName='null', description='null', id=2, version=0, deprecated=0, encodedLength=0, offset=8, componentTokenCount=3, encoding=Encoding{presence=REQUIRED, primitiveType=null, byteOrder=LITTLE_ENDIAN, minValue=null, maxValue=null, nullValue=null, constValue=null, characterEncoding='null', epoch='unix', timeUnit=nanosecond, semanticType='null'}} //Token{signal=ENCODING, name='int64', referencedName='null', description='null', id=-1, version=0, deprecated=0, encodedLength=8, offset=8, componentTokenCount=1, encoding=Encoding{presence=REQUIRED, primitiveType=INT64, byteOrder=LITTLE_ENDIAN, minValue=null, maxValue=null, nullValue=null, constValue=null, characterEncoding='null', epoch='unix', timeUnit=nanosecond, semanticType='null'}} - builder.Append("LastTermPosition="); - builder.Append(LastTermPosition()); - builder.Append('|'); - //Token{signal=BEGIN_FIELD, name='leadershipTermId', referencedName='null', description='null', id=3, version=0, deprecated=0, encodedLength=0, offset=16, componentTokenCount=3, encoding=Encoding{presence=REQUIRED, primitiveType=null, byteOrder=LITTLE_ENDIAN, minValue=null, maxValue=null, nullValue=null, constValue=null, characterEncoding='null', epoch='unix', timeUnit=nanosecond, semanticType='null'}} - //Token{signal=ENCODING, name='int64', referencedName='null', description='null', id=-1, version=0, deprecated=0, encodedLength=8, offset=16, componentTokenCount=1, encoding=Encoding{presence=REQUIRED, primitiveType=INT64, byteOrder=LITTLE_ENDIAN, minValue=null, maxValue=null, nullValue=null, constValue=null, characterEncoding='null', epoch='unix', timeUnit=nanosecond, semanticType='null'}} builder.Append("LeadershipTermId="); builder.Append(LeadershipTermId()); builder.Append('|'); - //Token{signal=BEGIN_FIELD, name='leaderMemberId', referencedName='null', description='null', id=4, version=0, deprecated=0, encodedLength=0, offset=24, componentTokenCount=3, encoding=Encoding{presence=REQUIRED, primitiveType=null, byteOrder=LITTLE_ENDIAN, minValue=null, maxValue=null, nullValue=null, constValue=null, characterEncoding='null', epoch='unix', timeUnit=nanosecond, semanticType='null'}} - //Token{signal=ENCODING, name='int32', referencedName='null', description='null', id=-1, version=0, deprecated=0, encodedLength=4, offset=24, componentTokenCount=1, encoding=Encoding{presence=REQUIRED, primitiveType=INT32, byteOrder=LITTLE_ENDIAN, minValue=null, maxValue=null, nullValue=null, constValue=null, characterEncoding='null', epoch='unix', timeUnit=nanosecond, semanticType='null'}} + //Token{signal=BEGIN_FIELD, name='leaderMemberId', referencedName='null', description='null', id=3, version=0, deprecated=0, encodedLength=0, offset=16, componentTokenCount=3, encoding=Encoding{presence=REQUIRED, primitiveType=null, byteOrder=LITTLE_ENDIAN, minValue=null, maxValue=null, nullValue=null, constValue=null, characterEncoding='null', epoch='unix', timeUnit=nanosecond, semanticType='null'}} + //Token{signal=ENCODING, name='int32', referencedName='null', description='null', id=-1, version=0, deprecated=0, encodedLength=4, offset=16, componentTokenCount=1, encoding=Encoding{presence=REQUIRED, primitiveType=INT32, byteOrder=LITTLE_ENDIAN, minValue=null, maxValue=null, nullValue=null, constValue=null, characterEncoding='null', epoch='unix', timeUnit=nanosecond, semanticType='null'}} builder.Append("LeaderMemberId="); builder.Append(LeaderMemberId()); builder.Append('|'); - //Token{signal=BEGIN_FIELD, name='logSessionId', referencedName='null', description='null', id=5, version=0, deprecated=0, encodedLength=0, offset=28, componentTokenCount=3, encoding=Encoding{presence=REQUIRED, primitiveType=null, byteOrder=LITTLE_ENDIAN, minValue=null, maxValue=null, nullValue=null, constValue=null, characterEncoding='null', epoch='unix', timeUnit=nanosecond, semanticType='null'}} - //Token{signal=ENCODING, name='int32', referencedName='null', description='null', id=-1, version=0, deprecated=0, encodedLength=4, offset=28, componentTokenCount=1, encoding=Encoding{presence=REQUIRED, primitiveType=INT32, byteOrder=LITTLE_ENDIAN, minValue=null, maxValue=null, nullValue=null, constValue=null, characterEncoding='null', epoch='unix', timeUnit=nanosecond, semanticType='null'}} + //Token{signal=BEGIN_FIELD, name='logSessionId', referencedName='null', description='null', id=4, version=0, deprecated=0, encodedLength=0, offset=20, componentTokenCount=3, encoding=Encoding{presence=REQUIRED, primitiveType=null, byteOrder=LITTLE_ENDIAN, minValue=null, maxValue=null, nullValue=null, constValue=null, characterEncoding='null', epoch='unix', timeUnit=nanosecond, semanticType='null'}} + //Token{signal=ENCODING, name='int32', referencedName='null', description='null', id=-1, version=0, deprecated=0, encodedLength=4, offset=20, componentTokenCount=1, encoding=Encoding{presence=REQUIRED, primitiveType=INT32, byteOrder=LITTLE_ENDIAN, minValue=null, maxValue=null, nullValue=null, constValue=null, characterEncoding='null', epoch='unix', timeUnit=nanosecond, semanticType='null'}} builder.Append("LogSessionId="); builder.Append(LogSessionId()); diff --git a/src/Adaptive.Cluster/Codecs/NewLeadershipTermEncoder.cs b/src/Adaptive.Cluster/Codecs/NewLeadershipTermEncoder.cs index 85694d1f..f8dece5a 100644 --- a/src/Adaptive.Cluster/Codecs/NewLeadershipTermEncoder.cs +++ b/src/Adaptive.Cluster/Codecs/NewLeadershipTermEncoder.cs @@ -9,8 +9,8 @@ namespace Adaptive.Cluster.Codecs { public class NewLeadershipTermEncoder { - public const ushort BLOCK_LENGTH = 32; - public const ushort TEMPLATE_ID = 52; + public const ushort BLOCK_LENGTH = 24; + public const ushort TEMPLATE_ID = 53; public const ushort SCHEMA_ID = 1; public const ushort SCHEMA_VERSION = 1; @@ -96,73 +96,41 @@ public void Limit(int limit) this._limit = limit; } - public static int LastBaseLogPositionEncodingOffset() + public static int LogPositionEncodingOffset() { return 0; } - public static int LastBaseLogPositionEncodingLength() + public static int LogPositionEncodingLength() { return 8; } - public static long LastBaseLogPositionNullValue() + public static long LogPositionNullValue() { return -9223372036854775808L; } - public static long LastBaseLogPositionMinValue() + public static long LogPositionMinValue() { return -9223372036854775807L; } - public static long LastBaseLogPositionMaxValue() + public static long LogPositionMaxValue() { return 9223372036854775807L; } - public NewLeadershipTermEncoder LastBaseLogPosition(long value) + public NewLeadershipTermEncoder LogPosition(long value) { _buffer.PutLong(_offset + 0, value, ByteOrder.LittleEndian); return this; } - public static int LastTermPositionEncodingOffset() - { - return 8; - } - - public static int LastTermPositionEncodingLength() - { - return 8; - } - - public static long LastTermPositionNullValue() - { - return -9223372036854775808L; - } - - public static long LastTermPositionMinValue() - { - return -9223372036854775807L; - } - - public static long LastTermPositionMaxValue() - { - return 9223372036854775807L; - } - - public NewLeadershipTermEncoder LastTermPosition(long value) - { - _buffer.PutLong(_offset + 8, value, ByteOrder.LittleEndian); - return this; - } - - public static int LeadershipTermIdEncodingOffset() { - return 16; + return 8; } public static int LeadershipTermIdEncodingLength() @@ -187,14 +155,14 @@ public static long LeadershipTermIdMaxValue() public NewLeadershipTermEncoder LeadershipTermId(long value) { - _buffer.PutLong(_offset + 16, value, ByteOrder.LittleEndian); + _buffer.PutLong(_offset + 8, value, ByteOrder.LittleEndian); return this; } public static int LeaderMemberIdEncodingOffset() { - return 24; + return 16; } public static int LeaderMemberIdEncodingLength() @@ -219,14 +187,14 @@ public static int LeaderMemberIdMaxValue() public NewLeadershipTermEncoder LeaderMemberId(int value) { - _buffer.PutInt(_offset + 24, value, ByteOrder.LittleEndian); + _buffer.PutInt(_offset + 16, value, ByteOrder.LittleEndian); return this; } public static int LogSessionIdEncodingOffset() { - return 28; + return 20; } public static int LogSessionIdEncodingLength() @@ -251,7 +219,7 @@ public static int LogSessionIdMaxValue() public NewLeadershipTermEncoder LogSessionId(int value) { - _buffer.PutInt(_offset + 28, value, ByteOrder.LittleEndian); + _buffer.PutInt(_offset + 20, value, ByteOrder.LittleEndian); return this; } diff --git a/src/Adaptive.Cluster/Codecs/RequestVoteDecoder.cs b/src/Adaptive.Cluster/Codecs/RequestVoteDecoder.cs index 4e449553..dcba4c59 100644 --- a/src/Adaptive.Cluster/Codecs/RequestVoteDecoder.cs +++ b/src/Adaptive.Cluster/Codecs/RequestVoteDecoder.cs @@ -9,8 +9,8 @@ namespace Adaptive.Cluster.Codecs { public class RequestVoteDecoder { - public const ushort BLOCK_LENGTH = 28; - public const ushort TEMPLATE_ID = 50; + public const ushort BLOCK_LENGTH = 20; + public const ushort TEMPLATE_ID = 51; public const ushort SCHEMA_ID = 1; public const ushort SCHEMA_VERSION = 1; @@ -88,27 +88,27 @@ public void Limit(int limit) this._limit = limit; } - public static int LastBaseLogPositionId() + public static int LogPositionId() { return 1; } - public static int LastBaseLogPositionSinceVersion() + public static int LogPositionSinceVersion() { return 0; } - public static int LastBaseLogPositionEncodingOffset() + public static int LogPositionEncodingOffset() { return 0; } - public static int LastBaseLogPositionEncodingLength() + public static int LogPositionEncodingLength() { return 8; } - public static string LastBaseLogPositionMetaAttribute(MetaAttribute metaAttribute) + public static string LogPositionMetaAttribute(MetaAttribute metaAttribute) { switch (metaAttribute) { @@ -121,84 +121,30 @@ public static string LastBaseLogPositionMetaAttribute(MetaAttribute metaAttribut return ""; } - public static long LastBaseLogPositionNullValue() + public static long LogPositionNullValue() { return -9223372036854775808L; } - public static long LastBaseLogPositionMinValue() + public static long LogPositionMinValue() { return -9223372036854775807L; } - public static long LastBaseLogPositionMaxValue() + public static long LogPositionMaxValue() { return 9223372036854775807L; } - public long LastBaseLogPosition() + public long LogPosition() { return _buffer.GetLong(_offset + 0, ByteOrder.LittleEndian); } - public static int LastTermPositionId() - { - return 2; - } - - public static int LastTermPositionSinceVersion() - { - return 0; - } - - public static int LastTermPositionEncodingOffset() - { - return 8; - } - - public static int LastTermPositionEncodingLength() - { - return 8; - } - - public static string LastTermPositionMetaAttribute(MetaAttribute metaAttribute) - { - switch (metaAttribute) - { - case MetaAttribute.EPOCH: return "unix"; - case MetaAttribute.TIME_UNIT: return "nanosecond"; - case MetaAttribute.SEMANTIC_TYPE: return ""; - case MetaAttribute.PRESENCE: return "required"; - } - - return ""; - } - - public static long LastTermPositionNullValue() - { - return -9223372036854775808L; - } - - public static long LastTermPositionMinValue() - { - return -9223372036854775807L; - } - - public static long LastTermPositionMaxValue() - { - return 9223372036854775807L; - } - - public long LastTermPosition() - { - return _buffer.GetLong(_offset + 8, ByteOrder.LittleEndian); - } - - public static int CandidateTermIdId() { - return 3; + return 2; } public static int CandidateTermIdSinceVersion() @@ -208,7 +154,7 @@ public static int CandidateTermIdSinceVersion() public static int CandidateTermIdEncodingOffset() { - return 16; + return 8; } public static int CandidateTermIdEncodingLength() @@ -246,13 +192,13 @@ public static long CandidateTermIdMaxValue() public long CandidateTermId() { - return _buffer.GetLong(_offset + 16, ByteOrder.LittleEndian); + return _buffer.GetLong(_offset + 8, ByteOrder.LittleEndian); } public static int CandidateMemberIdId() { - return 4; + return 3; } public static int CandidateMemberIdSinceVersion() @@ -262,7 +208,7 @@ public static int CandidateMemberIdSinceVersion() public static int CandidateMemberIdEncodingOffset() { - return 24; + return 16; } public static int CandidateMemberIdEncodingLength() @@ -300,7 +246,7 @@ public static int CandidateMemberIdMaxValue() public int CandidateMemberId() { - return _buffer.GetInt(_offset + 24, ByteOrder.LittleEndian); + return _buffer.GetInt(_offset + 16, ByteOrder.LittleEndian); } @@ -333,23 +279,18 @@ public StringBuilder AppendTo(StringBuilder builder) } builder.Append(BLOCK_LENGTH); builder.Append("):"); - //Token{signal=BEGIN_FIELD, name='lastBaseLogPosition', referencedName='null', description='null', id=1, version=0, deprecated=0, encodedLength=0, offset=0, componentTokenCount=3, encoding=Encoding{presence=REQUIRED, primitiveType=null, byteOrder=LITTLE_ENDIAN, minValue=null, maxValue=null, nullValue=null, constValue=null, characterEncoding='null', epoch='unix', timeUnit=nanosecond, semanticType='null'}} + //Token{signal=BEGIN_FIELD, name='logPosition', referencedName='null', description='null', id=1, version=0, deprecated=0, encodedLength=0, offset=0, componentTokenCount=3, encoding=Encoding{presence=REQUIRED, primitiveType=null, byteOrder=LITTLE_ENDIAN, minValue=null, maxValue=null, nullValue=null, constValue=null, characterEncoding='null', epoch='unix', timeUnit=nanosecond, semanticType='null'}} //Token{signal=ENCODING, name='int64', referencedName='null', description='null', id=-1, version=0, deprecated=0, encodedLength=8, offset=0, componentTokenCount=1, encoding=Encoding{presence=REQUIRED, primitiveType=INT64, byteOrder=LITTLE_ENDIAN, minValue=null, maxValue=null, nullValue=null, constValue=null, characterEncoding='null', epoch='unix', timeUnit=nanosecond, semanticType='null'}} - builder.Append("LastBaseLogPosition="); - builder.Append(LastBaseLogPosition()); + builder.Append("LogPosition="); + builder.Append(LogPosition()); builder.Append('|'); - //Token{signal=BEGIN_FIELD, name='lastTermPosition', referencedName='null', description='null', id=2, version=0, deprecated=0, encodedLength=0, offset=8, componentTokenCount=3, encoding=Encoding{presence=REQUIRED, primitiveType=null, byteOrder=LITTLE_ENDIAN, minValue=null, maxValue=null, nullValue=null, constValue=null, characterEncoding='null', epoch='unix', timeUnit=nanosecond, semanticType='null'}} + //Token{signal=BEGIN_FIELD, name='candidateTermId', referencedName='null', description='null', id=2, version=0, deprecated=0, encodedLength=0, offset=8, componentTokenCount=3, encoding=Encoding{presence=REQUIRED, primitiveType=null, byteOrder=LITTLE_ENDIAN, minValue=null, maxValue=null, nullValue=null, constValue=null, characterEncoding='null', epoch='unix', timeUnit=nanosecond, semanticType='null'}} //Token{signal=ENCODING, name='int64', referencedName='null', description='null', id=-1, version=0, deprecated=0, encodedLength=8, offset=8, componentTokenCount=1, encoding=Encoding{presence=REQUIRED, primitiveType=INT64, byteOrder=LITTLE_ENDIAN, minValue=null, maxValue=null, nullValue=null, constValue=null, characterEncoding='null', epoch='unix', timeUnit=nanosecond, semanticType='null'}} - builder.Append("LastTermPosition="); - builder.Append(LastTermPosition()); - builder.Append('|'); - //Token{signal=BEGIN_FIELD, name='candidateTermId', referencedName='null', description='null', id=3, version=0, deprecated=0, encodedLength=0, offset=16, componentTokenCount=3, encoding=Encoding{presence=REQUIRED, primitiveType=null, byteOrder=LITTLE_ENDIAN, minValue=null, maxValue=null, nullValue=null, constValue=null, characterEncoding='null', epoch='unix', timeUnit=nanosecond, semanticType='null'}} - //Token{signal=ENCODING, name='int64', referencedName='null', description='null', id=-1, version=0, deprecated=0, encodedLength=8, offset=16, componentTokenCount=1, encoding=Encoding{presence=REQUIRED, primitiveType=INT64, byteOrder=LITTLE_ENDIAN, minValue=null, maxValue=null, nullValue=null, constValue=null, characterEncoding='null', epoch='unix', timeUnit=nanosecond, semanticType='null'}} builder.Append("CandidateTermId="); builder.Append(CandidateTermId()); builder.Append('|'); - //Token{signal=BEGIN_FIELD, name='candidateMemberId', referencedName='null', description='null', id=4, version=0, deprecated=0, encodedLength=0, offset=24, componentTokenCount=3, encoding=Encoding{presence=REQUIRED, primitiveType=null, byteOrder=LITTLE_ENDIAN, minValue=null, maxValue=null, nullValue=null, constValue=null, characterEncoding='null', epoch='unix', timeUnit=nanosecond, semanticType='null'}} - //Token{signal=ENCODING, name='int32', referencedName='null', description='null', id=-1, version=0, deprecated=0, encodedLength=4, offset=24, componentTokenCount=1, encoding=Encoding{presence=REQUIRED, primitiveType=INT32, byteOrder=LITTLE_ENDIAN, minValue=null, maxValue=null, nullValue=null, constValue=null, characterEncoding='null', epoch='unix', timeUnit=nanosecond, semanticType='null'}} + //Token{signal=BEGIN_FIELD, name='candidateMemberId', referencedName='null', description='null', id=3, version=0, deprecated=0, encodedLength=0, offset=16, componentTokenCount=3, encoding=Encoding{presence=REQUIRED, primitiveType=null, byteOrder=LITTLE_ENDIAN, minValue=null, maxValue=null, nullValue=null, constValue=null, characterEncoding='null', epoch='unix', timeUnit=nanosecond, semanticType='null'}} + //Token{signal=ENCODING, name='int32', referencedName='null', description='null', id=-1, version=0, deprecated=0, encodedLength=4, offset=16, componentTokenCount=1, encoding=Encoding{presence=REQUIRED, primitiveType=INT32, byteOrder=LITTLE_ENDIAN, minValue=null, maxValue=null, nullValue=null, constValue=null, characterEncoding='null', epoch='unix', timeUnit=nanosecond, semanticType='null'}} builder.Append("CandidateMemberId="); builder.Append(CandidateMemberId()); diff --git a/src/Adaptive.Cluster/Codecs/RequestVoteEncoder.cs b/src/Adaptive.Cluster/Codecs/RequestVoteEncoder.cs index 9c053005..e9e584f6 100644 --- a/src/Adaptive.Cluster/Codecs/RequestVoteEncoder.cs +++ b/src/Adaptive.Cluster/Codecs/RequestVoteEncoder.cs @@ -9,8 +9,8 @@ namespace Adaptive.Cluster.Codecs { public class RequestVoteEncoder { - public const ushort BLOCK_LENGTH = 28; - public const ushort TEMPLATE_ID = 50; + public const ushort BLOCK_LENGTH = 20; + public const ushort TEMPLATE_ID = 51; public const ushort SCHEMA_ID = 1; public const ushort SCHEMA_VERSION = 1; @@ -96,73 +96,41 @@ public void Limit(int limit) this._limit = limit; } - public static int LastBaseLogPositionEncodingOffset() + public static int LogPositionEncodingOffset() { return 0; } - public static int LastBaseLogPositionEncodingLength() + public static int LogPositionEncodingLength() { return 8; } - public static long LastBaseLogPositionNullValue() + public static long LogPositionNullValue() { return -9223372036854775808L; } - public static long LastBaseLogPositionMinValue() + public static long LogPositionMinValue() { return -9223372036854775807L; } - public static long LastBaseLogPositionMaxValue() + public static long LogPositionMaxValue() { return 9223372036854775807L; } - public RequestVoteEncoder LastBaseLogPosition(long value) + public RequestVoteEncoder LogPosition(long value) { _buffer.PutLong(_offset + 0, value, ByteOrder.LittleEndian); return this; } - public static int LastTermPositionEncodingOffset() - { - return 8; - } - - public static int LastTermPositionEncodingLength() - { - return 8; - } - - public static long LastTermPositionNullValue() - { - return -9223372036854775808L; - } - - public static long LastTermPositionMinValue() - { - return -9223372036854775807L; - } - - public static long LastTermPositionMaxValue() - { - return 9223372036854775807L; - } - - public RequestVoteEncoder LastTermPosition(long value) - { - _buffer.PutLong(_offset + 8, value, ByteOrder.LittleEndian); - return this; - } - - public static int CandidateTermIdEncodingOffset() { - return 16; + return 8; } public static int CandidateTermIdEncodingLength() @@ -187,14 +155,14 @@ public static long CandidateTermIdMaxValue() public RequestVoteEncoder CandidateTermId(long value) { - _buffer.PutLong(_offset + 16, value, ByteOrder.LittleEndian); + _buffer.PutLong(_offset + 8, value, ByteOrder.LittleEndian); return this; } public static int CandidateMemberIdEncodingOffset() { - return 24; + return 16; } public static int CandidateMemberIdEncodingLength() @@ -219,7 +187,7 @@ public static int CandidateMemberIdMaxValue() public RequestVoteEncoder CandidateMemberId(int value) { - _buffer.PutInt(_offset + 24, value, ByteOrder.LittleEndian); + _buffer.PutInt(_offset + 16, value, ByteOrder.LittleEndian); return this; } diff --git a/src/Adaptive.Cluster/Codecs/VoteDecoder.cs b/src/Adaptive.Cluster/Codecs/VoteDecoder.cs index 3c211a4a..372dbe9a 100644 --- a/src/Adaptive.Cluster/Codecs/VoteDecoder.cs +++ b/src/Adaptive.Cluster/Codecs/VoteDecoder.cs @@ -10,7 +10,7 @@ namespace Adaptive.Cluster.Codecs { public class VoteDecoder { public const ushort BLOCK_LENGTH = 20; - public const ushort TEMPLATE_ID = 51; + public const ushort TEMPLATE_ID = 52; public const ushort SCHEMA_ID = 1; public const ushort SCHEMA_VERSION = 1; diff --git a/src/Adaptive.Cluster/Codecs/VoteEncoder.cs b/src/Adaptive.Cluster/Codecs/VoteEncoder.cs index ba98abb9..c0429138 100644 --- a/src/Adaptive.Cluster/Codecs/VoteEncoder.cs +++ b/src/Adaptive.Cluster/Codecs/VoteEncoder.cs @@ -10,7 +10,7 @@ namespace Adaptive.Cluster.Codecs { public class VoteEncoder { public const ushort BLOCK_LENGTH = 20; - public const ushort TEMPLATE_ID = 51; + public const ushort TEMPLATE_ID = 52; public const ushort SCHEMA_ID = 1; public const ushort SCHEMA_VERSION = 1; diff --git a/src/Adaptive.Cluster/Service/ClientSession.cs b/src/Adaptive.Cluster/Service/ClientSession.cs index 9da2f0ed..d6539f6e 100644 --- a/src/Adaptive.Cluster/Service/ClientSession.cs +++ b/src/Adaptive.Cluster/Service/ClientSession.cs @@ -16,6 +16,11 @@ public class ClientSession /// public static readonly int SESSION_HEADER_LENGTH = MessageHeaderEncoder.ENCODED_LENGTH + SessionHeaderEncoder.BLOCK_LENGTH; + /// + /// Return value to indicate egress to a session is mocked out by the cluster when in follower mode. + /// + public const long MOCKED_OFFER = 1; + private readonly long _id; private readonly int _responseStreamId; private readonly string _responseChannel; @@ -25,6 +30,7 @@ public class ClientSession private readonly DirectBufferVector _messageBuffer = new DirectBufferVector(); private readonly SessionHeaderEncoder _sessionHeaderEncoder = new SessionHeaderEncoder(); private readonly ICluster _cluster; + private bool _isClosing; internal ClientSession(long sessionId, int responseStreamId, string responseChannel, byte[] encodedPrincipal, ICluster cluster) { @@ -77,6 +83,12 @@ public byte[] EncodedPrincipal() return _encodedPrincipal; } + /// + /// Indicates that a request to close this session has been made. + /// + /// whether a request to close this session has been made. + public bool IsClosing => _isClosing; + /// /// Non-blocking publish of a partial buffer containing a message to a cluster. /// @@ -85,12 +97,12 @@ public byte[] EncodedPrincipal() /// offset in the buffer at which the encoded message begins. /// in bytes of the encoded message. /// the same as when in - /// otherwise 1. + /// otherwise . public long Offer(long correlationId, IDirectBuffer buffer, int offset, int length) { if (_cluster.Role() != ClusterRole.Leader) { - return 1; + return MOCKED_OFFER; } _sessionHeaderEncoder.CorrelationId(correlationId); @@ -104,12 +116,17 @@ internal void Connect(Aeron.Aeron aeron) { if (null != _responsePublication) { - throw new InvalidOperationException("Response publication already present"); + throw new InvalidOperationException("response publication already present"); } _responsePublication = aeron.AddPublication(_responseChannel, _responseStreamId); } + internal void MarkClosing() + { + _isClosing = true; + } + internal void Disconnect() { _responsePublication?.Dispose(); diff --git a/src/Adaptive.Cluster/Service/ClusterMarkFile.cs b/src/Adaptive.Cluster/Service/ClusterMarkFile.cs index b9bc45a3..78b2fa8b 100644 --- a/src/Adaptive.Cluster/Service/ClusterMarkFile.cs +++ b/src/Adaptive.Cluster/Service/ClusterMarkFile.cs @@ -45,7 +45,7 @@ public ClusterMarkFile( { if (version != MarkFileHeaderDecoder.SCHEMA_VERSION) { - throw new ArgumentException("Mark file version " + version + " does not match software:" + MarkFileHeaderDecoder.SCHEMA_VERSION); + throw new ArgumentException("mark file version " + version + " does not match software:" + MarkFileHeaderDecoder.SCHEMA_VERSION); } }, null); @@ -93,7 +93,7 @@ public ClusterMarkFile(DirectoryInfo directory, string filename, IEpochClock epo { if (version != MarkFileHeaderDecoder.SCHEMA_VERSION) { - throw new ArgumentException("Mark file version " + version + " does not match software:" + MarkFileHeaderDecoder.SCHEMA_VERSION); + throw new ArgumentException("mark file version " + version + " does not match software:" + MarkFileHeaderDecoder.SCHEMA_VERSION); } }, logger); diff --git a/src/Adaptive.Cluster/Service/ClusterRole.cs b/src/Adaptive.Cluster/Service/ClusterRole.cs index f840d67b..9d7f3ebf 100644 --- a/src/Adaptive.Cluster/Service/ClusterRole.cs +++ b/src/Adaptive.Cluster/Service/ClusterRole.cs @@ -3,7 +3,7 @@ public enum ClusterRole : long { /// - /// The cluster node is a follower of the current leader. + /// The cluster node is a follower in the current leadership term. /// Follower = 0, @@ -13,7 +13,7 @@ public enum ClusterRole : long Candidate = 1, /// - /// The cluster node is the leader of the current leadership term. + /// The cluster node is the leader for the current leadership term. /// Leader = 2 } diff --git a/src/Adaptive.Cluster/Service/ClusteredServiceAgent.cs b/src/Adaptive.Cluster/Service/ClusteredServiceAgent.cs index 20be93ab..db48ada8 100644 --- a/src/Adaptive.Cluster/Service/ClusteredServiceAgent.cs +++ b/src/Adaptive.Cluster/Service/ClusteredServiceAgent.cs @@ -112,7 +112,7 @@ public int DoWork() { markFile.UpdateActivityTimestamp(nowMs); cachedEpochClock.Update(nowMs); - heartbeatCounter.SetOrdered(nowMs); + CheckHealthAndUpdateHeartbeat(nowMs); } int workCount = logAdapter.Poll(); @@ -153,9 +153,21 @@ public ICollection GetClientSessions() public bool CloseSession(long clusterSessionId) { - if (sessionByIdMap.ContainsKey(clusterSessionId)) + if (!sessionByIdMap.ContainsKey(clusterSessionId)) + { + throw new ArgumentException("unknown clusterSessionId: " + clusterSessionId); + } + + ClientSession clientSession = sessionByIdMap[clusterSessionId]; + + if (clientSession.IsClosing) + { + return true; + } + + if (serviceControlPublisher.CloseSession(clusterSessionId)) { - serviceControlPublisher.CloseSession(clusterSessionId); + clientSession.MarkClosing(); return true; } @@ -167,14 +179,14 @@ public long TimeMs() return timestampMs; } - public void ScheduleTimer(long correlationId, long deadlineMs) + public bool ScheduleTimer(long correlationId, long deadlineMs) { - serviceControlPublisher.ScheduleTimer(correlationId, deadlineMs); + return serviceControlPublisher.ScheduleTimer(correlationId, deadlineMs); } - public void CancelTimer(long correlationId) + public bool CancelTimer(long correlationId) { - serviceControlPublisher.CancelTimer(correlationId); + return serviceControlPublisher.CancelTimer(correlationId); } public void OnScheduleTimer(long correlationId, long deadline) @@ -267,6 +279,14 @@ internal void AddSession(long clusterSessionId, int responseStreamId, string res sessionByIdMap[clusterSessionId] = session; } + private void CheckHealthAndUpdateHeartbeat(long nowMs) + { + if (null != logAdapter && !logAdapter.Image().Closed) + { + heartbeatCounter.SetOrdered(nowMs); + } + } + private void Role(ClusterRole newRole) { if (newRole != role) @@ -287,8 +307,8 @@ private void CheckForSnapshot(CountersReader counters, int recoveryCounterId) RecordingLog.Entry snapshotEntry = recordingLog.GetSnapshot(leadershipTermId, termPosition); if (null == snapshotEntry) { - throw new System.InvalidOperationException( - "No snapshot available for term position: " + termPosition); + throw new InvalidOperationException( + "no snapshot available for term position: " + termPosition); } termBaseLogPosition = snapshotEntry.termBaseLogPosition + snapshotEntry.termPosition; @@ -312,7 +332,6 @@ private void CheckForReplay(CountersReader counters, int recoveryCounterId) for (int i = 0; i < replayTermCount; i++) { AwaitActiveLog(); - int counterId = activeLog.commitPositionId; leadershipTermId = CommitPos.GetLeadershipTermId(counters, counterId); termBaseLogPosition = CommitPos.GetTermBaseLogPosition(counters, counterId); // TODO MARK @@ -367,7 +386,7 @@ private void ConsumeImage(Image image, BoundedLogAdapter adapter) { if (!image.IsEndOfStream()) { - throw new System.InvalidOperationException("Unexpected close of replay"); + throw new InvalidOperationException("unexpected close of replay"); } break; @@ -424,7 +443,7 @@ private void JoinActiveLog(CountersReader counters) int commitPositionId = activeLog.commitPositionId; if (!CommitPos.IsActive(counters, commitPositionId)) { - throw new System.InvalidOperationException("CommitPos counter not active: " + commitPositionId); + throw new InvalidOperationException("CommitPos counter not active: " + commitPositionId); } int logSessionId = activeLog.sessionId; @@ -484,7 +503,7 @@ private void LoadSnapshot(long recordingId) RecordingExtent recordingExtent = new RecordingExtent(); if (0 == archive.ListRecording(recordingId, recordingExtent)) { - throw new System.InvalidOperationException("Could not find recordingId: " + recordingId); + throw new InvalidOperationException("could not find recordingId: " + recordingId); } string channel = ctx.ReplayChannel(); @@ -520,7 +539,7 @@ private void LoadState(Image image) if (image.Closed) { - throw new System.InvalidOperationException("Snapshot ended unexpectedly"); + throw new InvalidOperationException("snapshot ended unexpectedly"); } idleStrategy.Idle(fragments); @@ -572,7 +591,7 @@ private void AwaitRecordingComplete( if (!RecordingPos.IsActive(counters, counterId, recordingId)) { - throw new InvalidOperationException("Recording has stopped unexpectedly: " + recordingId); + throw new InvalidOperationException("recording has stopped unexpectedly: " + recordingId); } archive.CheckForErrorResponse(); @@ -643,7 +662,7 @@ private void FindHeartbeatCounter(CountersReader counters) if (CountersReader.NULL_COUNTER_ID == heartbeatCounterId) { - throw new InvalidOperationException("Failed to find heartbeat counter"); + throw new InvalidOperationException("failed to find heartbeat counter"); } heartbeatCounter = new AtomicCounter(counters.ValuesBuffer, heartbeatCounterId); @@ -657,7 +676,7 @@ private static void CheckInterruptedStatus() } catch (ThreadInterruptedException) { - throw new AgentTerminationException("Unexpected interrupt during operation"); + throw new AgentTerminationException("unexpected interrupt during operation"); } } diff --git a/src/Adaptive.Cluster/Service/ClusteredServiceContainer.cs b/src/Adaptive.Cluster/Service/ClusteredServiceContainer.cs index 71c21d2d..179f2a75 100644 --- a/src/Adaptive.Cluster/Service/ClusteredServiceContainer.cs +++ b/src/Adaptive.Cluster/Service/ClusteredServiceContainer.cs @@ -437,7 +437,7 @@ public void Conclude() if (null == errorCounter) { - throw new InvalidOperationException("Error counter must be supplied"); + throw new InvalidOperationException("error counter must be supplied"); } if (null == countedErrorHandler) diff --git a/src/Adaptive.Cluster/Service/ICluster.cs b/src/Adaptive.Cluster/Service/ICluster.cs index 065beb78..5f70921c 100644 --- a/src/Adaptive.Cluster/Service/ICluster.cs +++ b/src/Adaptive.Cluster/Service/ICluster.cs @@ -1,4 +1,5 @@ -using System.Collections.Generic; +using System; +using System.Collections.Generic; using System.Collections.ObjectModel; namespace Adaptive.Cluster.Service @@ -34,10 +35,11 @@ public interface ICluster ICollection GetClientSessions(); /// - /// Close a cluster session. + /// Request the close of a by sending the request to the consensus module. /// /// to be closed. - /// true if the instruction is successfully or false if the session id does not exist. + /// true if the event to close a session was sent or false if back pressure was applied. + /// if the clusterSessionId is not recognised. bool CloseSession(long clusterSessionId); /// @@ -47,20 +49,25 @@ public interface ICluster long TimeMs(); /// - /// Schedule a timer for a given deadline and provide a correlation id to identify the timer when it expires. - /// - /// If the correlationId is for an existing scheduled timer then it will be reschedule to the new deadline. + /// Schedule a timer for a given deadline and provide a correlation id to identify the timer when it expires or + /// for cancellation. /// - /// + /// If the correlationId is for an existing scheduled timer then it will be reschedule to the new deadline. + /// /// /// to identify the timer when it expires. /// after which the timer will fire. - void ScheduleTimer(long correlationId, long deadlineMs); + /// true if the event to schedule a timer has been sent or false if back pressure is applied. + /// + bool ScheduleTimer(long correlationId, long deadlineMs); /// /// Cancel a previous scheduled timer. /// - /// for the scheduled timer. - void CancelTimer(long correlationId); + /// for the timer provided when it was scheduled. + /// true if the event to cancel a scheduled timer has been sent or false if back pressure is applied. + /// + bool CancelTimer(long correlationId); + } } \ No newline at end of file diff --git a/src/Adaptive.Cluster/Service/RecordingLog.cs b/src/Adaptive.Cluster/Service/RecordingLog.cs index f392d550..9c0e8b4b 100644 --- a/src/Adaptive.Cluster/Service/RecordingLog.cs +++ b/src/Adaptive.Cluster/Service/RecordingLog.cs @@ -654,7 +654,7 @@ public void TombstoneEntry(long leadershipTermId, int entryIndex) if (-1 == index) { - throw new System.ArgumentException("Unknown entry index: " + entryIndex); + throw new ArgumentException("unknown entry index: " + entryIndex); } buffer.PutInt(0, NULL_VALUE, ByteOrder.LittleEndian); @@ -733,7 +733,7 @@ private static void GetRecordingExtent(AeronArchive archive, RecordingExtent rec { if (archive.ListRecording(entry.recordingId, recordingExtent) == 0) { - throw new InvalidOperationException("Unknown recording id: " + entry.recordingId); + throw new InvalidOperationException("unknown recording id: " + entry.recordingId); } } @@ -748,7 +748,7 @@ private int GetLeadershipTermEntryIndex(long leadershipTermId) } } - throw new ArgumentException("Unknown leadershipTermId: " + leadershipTermId); + throw new ArgumentException("unknown leadershipTermId: " + leadershipTermId); } private static ReplayStep PlanRecovery(List steps, List entries, AeronArchive archive) diff --git a/src/Adaptive.Cluster/Service/ServiceControlAdapter.cs b/src/Adaptive.Cluster/Service/ServiceControlAdapter.cs index c9dce58f..93d95e0a 100644 --- a/src/Adaptive.Cluster/Service/ServiceControlAdapter.cs +++ b/src/Adaptive.Cluster/Service/ServiceControlAdapter.cs @@ -89,7 +89,7 @@ public void OnFragment(IDirectBuffer buffer, int offset, int length, Header head break; default: - throw new ArgumentException("Unknown template id: " + templateId); + throw new ArgumentException("unknown template id: " + templateId); } } } diff --git a/src/Adaptive.Cluster/Service/ServiceControlPublisher.cs b/src/Adaptive.Cluster/Service/ServiceControlPublisher.cs index 22d866c4..77759188 100644 --- a/src/Adaptive.Cluster/Service/ServiceControlPublisher.cs +++ b/src/Adaptive.Cluster/Service/ServiceControlPublisher.cs @@ -28,7 +28,7 @@ public void Dispose() _publication?.Dispose(); } - public void ScheduleTimer(long correlationId, long deadlineMs) + public bool ScheduleTimer(long correlationId, long deadlineMs) { int length = MessageHeaderEncoder.ENCODED_LENGTH + ScheduleTimerEncoder.BLOCK_LENGTH; @@ -45,16 +45,16 @@ public void ScheduleTimer(long correlationId, long deadlineMs) _bufferClaim.Commit(); - return; + return true; } CheckResult(result); } while (--attempts > 0); - throw new InvalidOperationException("Failed to schedule timer"); + return false; } - public void CancelTimer(long correlationId) + public bool CancelTimer(long correlationId) { int length = MessageHeaderEncoder.ENCODED_LENGTH + CancelTimerEncoder.BLOCK_LENGTH; @@ -70,13 +70,13 @@ public void CancelTimer(long correlationId) _bufferClaim.Commit(); - return; + return true; } CheckResult(result); } while (--attempts > 0); - throw new InvalidOperationException("Failed to schedule timer"); + return false; } public void AckAction(long logPosition, long leadershipTermId, int serviceId, ClusterAction action) @@ -104,7 +104,7 @@ public void AckAction(long logPosition, long leadershipTermId, int serviceId, Cl CheckResult(result); } while (--attempts > 0); - throw new InvalidOperationException("Failed to send ACK"); + throw new InvalidOperationException("failed to send ACK"); } public void JoinLog( @@ -140,10 +140,10 @@ public void JoinLog( CheckResult(result); } while (--attempts > 0); - throw new InvalidOperationException("Failed to send log connect request"); + throw new InvalidOperationException("failed to send log connect request"); } - public void CloseSession(long clusterSessionId) + public bool CloseSession(long clusterSessionId) { int length = MessageHeaderEncoder.ENCODED_LENGTH + CloseSessionEncoder.BLOCK_LENGTH; @@ -159,20 +159,20 @@ public void CloseSession(long clusterSessionId) _bufferClaim.Commit(); - return; + return true; } CheckResult(result); } while (--attempts > 0); - throw new InvalidOperationException("Failed to schedule timer"); + return false; } private static void CheckResult(long result) { if (result == Publication.NOT_CONNECTED || result == Publication.CLOSED || result == Publication.MAX_POSITION_EXCEEDED) { - throw new InvalidOperationException("Unexpected publication state: " + result); + throw new InvalidOperationException("unexpected publication state: " + result); } } } diff --git a/src/Adaptive.Cluster/Service/ServiceSnapshotLoader.cs b/src/Adaptive.Cluster/Service/ServiceSnapshotLoader.cs index e277c036..49e096ef 100644 --- a/src/Adaptive.Cluster/Service/ServiceSnapshotLoader.cs +++ b/src/Adaptive.Cluster/Service/ServiceSnapshotLoader.cs @@ -47,7 +47,7 @@ public ControlledFragmentHandlerAction OnFragment(IDirectBuffer buffer, int offs long typeId = snapshotMarkerDecoder.TypeId(); if (typeId != ClusteredServiceContainer.SNAPSHOT_TYPE_ID) { - throw new InvalidOperationException("Unexpected snapshot type: " + typeId); + throw new InvalidOperationException("unexpected snapshot type: " + typeId); } switch (snapshotMarkerDecoder.Mark()) @@ -55,7 +55,7 @@ public ControlledFragmentHandlerAction OnFragment(IDirectBuffer buffer, int offs case SnapshotMark.BEGIN: if (inSnapshot) { - throw new InvalidOperationException("Already in snapshot"); + throw new InvalidOperationException("already in snapshot"); } inSnapshot = true; @@ -64,7 +64,7 @@ public ControlledFragmentHandlerAction OnFragment(IDirectBuffer buffer, int offs case SnapshotMark.END: if (!inSnapshot) { - throw new InvalidOperationException("Missing begin snapshot"); + throw new InvalidOperationException("missing begin snapshot"); } isDone = true; @@ -92,7 +92,7 @@ public ControlledFragmentHandlerAction OnFragment(IDirectBuffer buffer, int offs break; default: - throw new InvalidOperationException("Unknown template id: " + templateId); + throw new InvalidOperationException("unknown template id: " + templateId); } return ControlledFragmentHandlerAction.CONTINUE; diff --git a/src/Adaptive.Cluster/Service/SnapshotTaker.cs b/src/Adaptive.Cluster/Service/SnapshotTaker.cs index f4cc01fc..d1883c2b 100644 --- a/src/Adaptive.Cluster/Service/SnapshotTaker.cs +++ b/src/Adaptive.Cluster/Service/SnapshotTaker.cs @@ -59,7 +59,7 @@ protected static void CheckInterruptedStatus() } catch (ThreadInterruptedException) { - throw new AgentTerminationException("Unexpected interrupt during operation"); + throw new AgentTerminationException("unexpected interrupt during operation"); } } @@ -67,7 +67,7 @@ protected static void CheckResult(long result) { if (result == Publication.NOT_CONNECTED || result == Publication.CLOSED || result == Publication.MAX_POSITION_EXCEEDED) { - throw new System.InvalidOperationException("Unexpected publication state: " + result); + throw new System.InvalidOperationException("unexpected publication state: " + result); } } diff --git a/src/Adaptive.Cluster/aeron-cluster-codecs.xml b/src/Adaptive.Cluster/aeron-cluster-codecs.xml index fbf777f8..1bf3ec31 100644 --- a/src/Adaptive.Cluster/aeron-cluster-codecs.xml +++ b/src/Adaptive.Cluster/aeron-cluster-codecs.xml @@ -270,9 +270,8 @@ The leader replicates a log stream to all followers for its term as leader. Leadership term ids are monotonic. The followers persist the replicated log locally and send updates to the leader with the highest position they - have persisted in the current term. Each term has a position starting at zero with the total log position - reached being an accumulation of the term positions across all leadership terms. Each leadership term starts - with base log position for the accumulated term positions. + have persisted. The log position is an accumulation of the term positions over the leadership terms. + Each leadership term starts with base log position for the accumulated term positions. The leader gathers the positions reached by the followers, plus its own locally persisted log, and publishes the highest position for a quorum of the member nodes including itself which can be committed to the state @@ -283,17 +282,24 @@ increasing state. --> - + + + + + + - - - - + + + @@ -302,27 +308,26 @@ - - - - - + + + + - + - +