diff --git a/RELEASE.md b/RELEASE.md index 9b30ae15..e6414337 100644 --- a/RELEASE.md +++ b/RELEASE.md @@ -1,4 +1,4 @@ #### Port Aeron.NET has been ported against Java version: -- Agrona: 0.9.22 -- Aeron: 1.10.4 +- Agrona: 0.9.28 +- Aeron: 1.13.0 diff --git a/driver/Aeron.Driver.nuspec b/driver/Aeron.Driver.nuspec index 46653edc..435e47d4 100644 --- a/driver/Aeron.Driver.nuspec +++ b/driver/Aeron.Driver.nuspec @@ -2,7 +2,7 @@ Aeron.Driver - 1.12.0 + 1.13.0 Aeron Driver Adaptive Financial Consulting Ltd. Adaptive Financial Consulting Ltd. diff --git a/driver/media-driver.jar b/driver/media-driver.jar index c553acc1..2318d660 100644 Binary files a/driver/media-driver.jar and b/driver/media-driver.jar differ diff --git a/driver/version.txt b/driver/version.txt index cbda190f..24e164ec 100644 --- a/driver/version.txt +++ b/driver/version.txt @@ -1,2 +1,2 @@ Driver source: -https://repo1.maven.org/maven2/io/aeron/aeron-all/1.12.0/aeron-all-1.12.0.jar +https://repo1.maven.org/maven2/io/aeron/aeron-all/1.13.0/aeron-all-1.13.0.jar diff --git a/src/Adaptive.Aeron/Adaptive.Aeron.csproj b/src/Adaptive.Aeron/Adaptive.Aeron.csproj index 201f87a1..8a6ff67f 100644 --- a/src/Adaptive.Aeron/Adaptive.Aeron.csproj +++ b/src/Adaptive.Aeron/Adaptive.Aeron.csproj @@ -2,7 +2,7 @@ netstandard2.0;net45 Aeron.Client - 1.12.0 + 1.13.0 Adaptive Financial Consulting Ltd. Adaptive Financial Consulting Ltd. Aeron Client diff --git a/src/Adaptive.Aeron/Aeron.cs b/src/Adaptive.Aeron/Aeron.cs index 099ae34d..6c447c58 100644 --- a/src/Adaptive.Aeron/Aeron.cs +++ b/src/Adaptive.Aeron/Aeron.cs @@ -552,11 +552,21 @@ public class Context : IDisposable /// public const string TAGS_PARAM_NAME = "tags"; + /// + /// Qualifier for a value which is a tag for reference. This prefix is use in the param value. + /// + public const string TAG_PREFIX = "tag:"; + /// /// Parameter name for channel URI param to indicate if term buffers should be sparse. Value is boolean. /// public const string SPARSE_PARAM_NAME = "sparse"; - + + /// + /// Parameter name for channel URI param to indicate an alias for the given URI. Value not interpreted by Aeron. + /// + public const string ALIAS_PARAM_NAME = "alias"; + /// /// Get the default directory name to be used if is not set. This will take /// the if set and if not then . diff --git a/src/Adaptive.Aeron/ChannelUri.cs b/src/Adaptive.Aeron/ChannelUri.cs index aa5aaf50..2cc0b558 100644 --- a/src/Adaptive.Aeron/ChannelUri.cs +++ b/src/Adaptive.Aeron/ChannelUri.cs @@ -49,7 +49,7 @@ private enum State private string _prefix; private string _media; private readonly IDictionary _params; - private string[] _tags; + private readonly string[] _tags; /// /// Construct with the components provided to avoid parsing. @@ -62,7 +62,6 @@ public ChannelUri(string prefix, string media, IDictionary @para _prefix = prefix; _media = media; _params = @params; - _tags = SplitTags(_params.GetOrDefault(Aeron.Context.TAGS_PARAM_NAME)); } @@ -185,27 +184,31 @@ public string Remove(string key) /// /// to be lookup. /// true if the key has a value otherwise false. + /// + /// public bool ContainsKey(string key) { return _params.ContainsKey(key); } /// - /// Get the channel tag. + /// Get the channel tag, if it exists, that refers to an another channel. /// - /// channel tag. + /// channel tag if it exists or null if not in this URI. public string ChannelTag() { - return (_tags.Length > CHANNEL_TAG_INDEX) ? _tags[CHANNEL_TAG_INDEX] : null; + return (null != _tags && _tags.Length > CHANNEL_TAG_INDEX) ? _tags[CHANNEL_TAG_INDEX] : null; } /// - /// Get the entity tag. + /// Get the entity tag, if it exists, that refers to an entity such as subscription or publication. /// - /// entity tag. + /// entity tag if it exists or null if not in this URI. + /// + /// public string EntityTag() { - return (_tags.Length > ENTITY_TAG_INDEX) ? _tags[ENTITY_TAG_INDEX] : null; + return (null != _tags && _tags.Length > ENTITY_TAG_INDEX) ? _tags[ENTITY_TAG_INDEX] : null; } /// @@ -275,7 +278,7 @@ public static ChannelUri Parse(string cs) { int position = 0; string prefix; - if (StartsWith(cs, Aeron.Context.SPY_PREFIX)) + if (StartsWith(cs, 0, Aeron.Context.SPY_PREFIX)) { prefix = SPY_QUALIFIER; position = Aeron.Context.SPY_PREFIX.Length; @@ -326,33 +329,28 @@ public static ChannelUri Parse(string cs) break; case State.PARAMS_KEY: - switch (c) + if (c == '=') { - case '=': - key = builder.ToString(); - builder.Length = 0; - state = State.PARAMS_VALUE; - break; - - default: - builder.Append(c); - break; + key = builder.ToString(); + builder.Length = 0; + state = State.PARAMS_VALUE; + } + else + { + builder.Append(c); } - break; case State.PARAMS_VALUE: - switch (c) + if (c == '|') { - case '|': - @params[key] = builder.ToString(); - builder.Length = 0; - state = State.PARAMS_KEY; - break; - - default: - builder.Append(c); - break; + @params[key] = builder.ToString(); + builder.Length = 0; + state = State.PARAMS_KEY; + } + else + { + builder.Append(c); } break; @@ -394,20 +392,24 @@ public static string AddSessionId(string channel, int sessionId) } /// - /// Is the param tagged? (starts with the "tag:" prefix) + /// Is the param value tagged? (starts with the "tag:" prefix) /// /// to check if tagged. /// true if tagged or false if not. + /// + /// public static bool IsTagged(string paramValue) { - return StartsWith(paramValue, "tag:"); + return StartsWith(paramValue, 0, Aeron.Context.TAG_PREFIX); } /// - /// Get the value of the tag from a given parameter. + /// Get the value of the tag from a given parameter value. /// /// to extract the tag value from. /// the value of the tag or if not tagged. + /// + /// public static long GetTag(string paramValue) { return IsTagged(paramValue) ? long.Parse(paramValue.Substring(4, paramValue.Length - 4)) : INVALID_TAG; @@ -430,49 +432,56 @@ private static bool StartsWith(string input, int position, string prefix) return true; } - - private static bool StartsWith(string input, string prefix) - { - return StartsWith(input, 0, prefix); - } - private static string[] SplitTags(string tags) + private static string[] SplitTags(string tagsValue) { - string[] stringArray = new string[0]; + string[] tags = ArrayUtil.EMPTY_STRING_ARRAY; - if (null != tags) + if (null != tagsValue) { - int currentStartIndex = 0; - int tagIndex = 0; - stringArray = new string[2]; - int length = tags.Length; - - for (int i = 0; i < length; i++) + int tagCount = CountTags(tagsValue); + if (tagCount == 1) { - if (tags[i] == ',') - { - string tag = null; + tags = new[]{tagsValue}; + } + else + { + int tagStartPosition = 0; + int tagIndex = 0; + tags = new string[tagCount]; - if (i - currentStartIndex > 0) + for (int i = 0, length = tagsValue.Length; i < length; i++) + { + if (tagsValue[i] == ',') { - tag = tags.Substring(currentStartIndex, i - currentStartIndex); - currentStartIndex = i + 1; - } + tags[tagIndex++] = tagsValue.Substring(tagStartPosition, i - tagStartPosition); + tagStartPosition = i + 1; - stringArray = ArrayUtil.EnsureCapacity(stringArray, tagIndex + 1); - stringArray[tagIndex] = tag; - tagIndex++; + if (tagIndex >= (tagCount - 1)) + { + tags[tagIndex] = tagsValue.Substring(tagStartPosition, length - tagStartPosition); + } + } } } + } - if ((length - currentStartIndex) > 0) + return tags; + } + + private static int CountTags(string tags) + { + int count = 1; + + for (int i = 0, length = tags.Length; i < length; i++) + { + if (tags[i] == ',') { - stringArray = ArrayUtil.EnsureCapacity(stringArray, tagIndex + 1); - stringArray[tagIndex] = tags.Substring(currentStartIndex, length - currentStartIndex); + ++count; } } - return stringArray; + return count; } } } \ No newline at end of file diff --git a/src/Adaptive.Aeron/ChannelUriStringBuilder.cs b/src/Adaptive.Aeron/ChannelUriStringBuilder.cs index e0e528a7..33a7589d 100644 --- a/src/Adaptive.Aeron/ChannelUriStringBuilder.cs +++ b/src/Adaptive.Aeron/ChannelUriStringBuilder.cs @@ -23,6 +23,7 @@ public class ChannelUriStringBuilder private string _controlEndpoint; private string _controlMode; private string _tags; + private string _alias; private bool? _reliable; private bool? _sparse; private int? _ttl; @@ -32,7 +33,7 @@ public class ChannelUriStringBuilder private int? _termId; private int? _termOffset; private int? _sessionId; - private int? _linger; + private long? _linger; private bool _isSessionIdTagged; /// @@ -48,6 +49,7 @@ public ChannelUriStringBuilder Clear() _controlEndpoint = null; _controlMode = null; _tags = null; + _alias = null; _reliable = null; _ttl = null; _mtu = null; @@ -493,13 +495,13 @@ public ChannelUriStringBuilder SessionId(int? sessionId) } /// - /// Set the time a publication will linger in nanoseconds after being drained. This time is so that tail loss + /// Set the time a network publication will linger in nanoseconds after being drained. This time is so that tail loss /// can be recovered. /// /// time for the publication after it is drained. /// this for a fluent API. /// - public ChannelUriStringBuilder Linger(int? lingerNs) + public ChannelUriStringBuilder Linger(long? lingerNs) { if (null != lingerNs && lingerNs < 0) { @@ -511,22 +513,24 @@ public ChannelUriStringBuilder Linger(int? lingerNs) } /// - /// Get the time a publication will linger in nanoseconds after being drained. This time is so that tail loss + /// Get the time a network publication will linger in nanoseconds after being drained. This time is so that tail loss /// can be recovered. /// /// the linger time in nanoseconds a publication will wait around after being drained. /// - public int? Linger() + public long? Linger() { return _linger; } /// - /// Set the tags for a channel, and/or publication or subscription. + /// Set the tags for a channel used by a publication or subscription. Tags can be used to identify or tag a + /// channel so that a configuration can be referenced and reused. /// /// for the channel, publication or subscription. /// this for a fluent API. /// + /// public ChannelUriStringBuilder Tags(string tags) { _tags = tags; @@ -534,10 +538,12 @@ public ChannelUriStringBuilder Tags(string tags) } /// - /// Get the tags for a channel, and/or publication or subscription. + /// Get the tags for a channel used by a publication or subscription. Tags can be used to identify or tag a + /// channel so that a configuration can be referenced and reused. /// /// the tags for a channel, publication or subscription. /// + /// public string Tags() { return _tags; @@ -548,6 +554,8 @@ public string Tags() /// /// for session id /// this for a fluent API. + /// + /// public ChannelUriStringBuilder IsSessionIdTagged(bool isSessionIdTagged) { _isSessionIdTagged = isSessionIdTagged; @@ -558,11 +566,35 @@ public ChannelUriStringBuilder IsSessionIdTagged(bool isSessionIdTagged) /// Is the value for a tagged. /// /// whether the value for a tag reference or not. + /// + /// public bool IsSessionIdTagged() { return _isSessionIdTagged; } + /// + /// Set the alias for a URI. Alias's are not interpreted by Aeron and are to be used by the application + /// + /// for the URI. + /// this for a fluent API. + /// + public ChannelUriStringBuilder Alias(string alias) + { + _alias = alias; + return this; + } + + /// + /// Get the alias present in the URI. + /// + /// alias for the URI. + /// + public string Alias() + { + return _alias; + } + /// /// Initialise a channel for restarting a publication at a given position. /// @@ -674,6 +706,11 @@ public string Build() _sb.Append(Aeron.Context.LINGER_PARAM_NAME).Append('=').Append(_linger.Value).Append('|'); } + if (null != _alias) + { + _sb.Append(Aeron.Context.ALIAS_PARAM_NAME).Append('=').Append(_alias).Append('|'); + } + char lastChar = _sb[_sb.Length - 1]; if (lastChar == '|' || lastChar == '?') { diff --git a/src/Adaptive.Aeron/ConcurrentPublication.cs b/src/Adaptive.Aeron/ConcurrentPublication.cs index 582fc389..795b7c82 100644 --- a/src/Adaptive.Aeron/ConcurrentPublication.cs +++ b/src/Adaptive.Aeron/ConcurrentPublication.cs @@ -53,6 +53,20 @@ internal ConcurrentPublication( _termAppenders[i] = new TermAppender(buffers[i], _logMetaDataBuffer, i); } } + + /// + public override long AvailableWindow + { + get + { + if (_isClosed) + { + return CLOSED; + } + + return _positionLimit.GetVolatile() - Position; + } + } /// public override long Offer(IDirectBuffer buffer, int offset, int length, ReservedValueSupplier reservedValueSupplier = null) diff --git a/src/Adaptive.Aeron/EndOfStreamHandler.cs b/src/Adaptive.Aeron/EndOfStreamHandler.cs index 3b662c3e..cccea62a 100644 --- a/src/Adaptive.Aeron/EndOfStreamHandler.cs +++ b/src/Adaptive.Aeron/EndOfStreamHandler.cs @@ -1,8 +1,11 @@ +using System; + namespace Adaptive.Aeron { /// /// Delegeate for delivery of End of Stream image notification to a /// /// that has reached End Of Stream. + [Obsolete] public delegate void EndOfStreamHandler(Image image); } \ No newline at end of file diff --git a/src/Adaptive.Aeron/ExclusivePublication.cs b/src/Adaptive.Aeron/ExclusivePublication.cs index 3bf8ff84..9575d08d 100644 --- a/src/Adaptive.Aeron/ExclusivePublication.cs +++ b/src/Adaptive.Aeron/ExclusivePublication.cs @@ -44,7 +44,6 @@ namespace Adaptive.Aeron /// /// /// - /// public class ExclusivePublication : Publication { private long _termBeginPosition; @@ -97,7 +96,32 @@ internal ExclusivePublication( LogBufferDescriptor.ComputeTermBeginPosition(_termId, PositionBitsToShift, InitialTermId); } - + public override long Position + { + get + { + if (_isClosed) + { + return CLOSED; + } + + return _termBeginPosition + _termOffset; + } + } + + public override long AvailableWindow + { + get + { + if (_isClosed) + { + return CLOSED; + } + + return _positionLimit.GetVolatile() - (_termBeginPosition + _termOffset); + } + } + /// [MethodImpl(MethodImplOptions.AggressiveInlining)] public override long Offer( @@ -140,8 +164,7 @@ public override long Offer( return newPosition; } - - + /// public override long Offer(IDirectBuffer bufferOne, int offsetOne, int lengthOne, IDirectBuffer bufferTwo, int offsetTwo, int lengthTwo, ReservedValueSupplier reservedValueSupplier = null) { @@ -324,8 +347,7 @@ private long NewPosition(int resultingOffset) _activePartitionIndex = nextIndex; _termOffset = 0; _termId = nextTermId; - _termBeginPosition = - LogBufferDescriptor.ComputeTermBeginPosition(nextTermId, PositionBitsToShift, InitialTermId); + _termBeginPosition += TermBufferLength; var termCount = nextTermId - InitialTermId; diff --git a/src/Adaptive.Aeron/LogBuffer/LogBufferDescriptor.cs b/src/Adaptive.Aeron/LogBuffer/LogBufferDescriptor.cs index 2dc97bcc..aea6ed65 100644 --- a/src/Adaptive.Aeron/LogBuffer/LogBufferDescriptor.cs +++ b/src/Adaptive.Aeron/LogBuffer/LogBufferDescriptor.cs @@ -24,183 +24,181 @@ namespace Adaptive.Aeron.LogBuffer { /// - /// Layout description for log buffers which contains partitions of terms with associated term meta data, - /// plus ending with overall log meta data. - /// - ///
-    ///  +----------------------------+
-    ///  |           Term 0           |
-    ///  +----------------------------+
-    ///  |           Term 1           |
-    ///  +----------------------------+
-    ///  |           Term 2           |
-    ///  +----------------------------+
-    ///  |        Log Meta Data       |
-    ///  +----------------------------+
-    /// 
+ /// Layout description for log buffers which contains partitions of terms with associated term metadata, + /// plus ending with overall log metadata. + ///
+    ///         +----------------------------+
+    ///         |           Term 0           |
+    ///         +----------------------------+
+    ///         |           Term 1           |
+    ///         +----------------------------+
+    ///         |           Term 2           |
+    ///         +----------------------------+
+    ///         |        Log metadata       |
+    ///         +----------------------------+
+    ///     
///
public class LogBufferDescriptor { /// - /// The number of partitions the log is divided into terms and a meta data buffer. + /// The number of partitions the log is divided into terms and a metadata buffer. /// public const int PARTITION_COUNT = 3; /// - /// Section index for which buffer contains the log meta data. - /// - public static readonly int LOG_META_DATA_SECTION_INDEX = PARTITION_COUNT; - - /// - /// Minimum buffer length for a log term + /// Minimum buffer length for a log term /// public const int TERM_MIN_LENGTH = 64 * 1024; /// - /// Maximum buffer length for a log term + /// Maximum buffer length for a log term /// public const int TERM_MAX_LENGTH = 1024 * 1024 * 1024; - + /// - /// Minimum page size + /// Minimum page size /// public const int PAGE_MIN_SIZE = 4 * 1024; /// - /// Maximum page size + /// Maximum page size /// public const int PAGE_MAX_SIZE = 1024 * 1024 * 1024; + /// + /// Section index for which buffer contains the log metadata. + /// + public static readonly int LOG_META_DATA_SECTION_INDEX = PARTITION_COUNT; + // ******************************* - // *** Log Meta Data Constants *** + // *** Log metadata Constants *** // ******************************* /// - /// Offset within the meta data where the tail values are stored. + /// Offset within the metadata where the tail values are stored. /// public static readonly int TERM_TAIL_COUNTERS_OFFSET; /// - /// Offset within the log meta data where the active partition index is stored. + /// Offset within the log metadata where the active partition index is stored. /// public static readonly int LOG_ACTIVE_TERM_COUNT_OFFSET; /// - /// Offset within the log meta data where the position of the End of Stream is stored. + /// Offset within the log metadata where the position of the End of Stream is stored. /// public static readonly int LOG_END_OF_STREAM_POSITION_OFFSET; - + /// - /// Offset within the log meta data where whether the log is connected or not is stored. + /// Offset within the log metadata where whether the log is connected or not is stored. /// public static readonly int LOG_IS_CONNECTED_OFFSET; /// - /// Offset within the log meta data where the active term id is stored. + /// Offset within the log metadata where the active term id is stored. /// public static readonly int LOG_INITIAL_TERM_ID_OFFSET; /// - /// Offset within the log meta data which the length field for the frame header is stored. + /// Offset within the log metadata which the length field for the frame header is stored. /// public static readonly int LOG_DEFAULT_FRAME_HEADER_LENGTH_OFFSET; /// - /// Offset within the log meta data which the MTU length is stored. + /// Offset within the log metadata which the MTU length is stored. /// public static readonly int LOG_MTU_LENGTH_OFFSET; /// - /// Offset within the log meta data which the correlation id is stored. + /// Offset within the log metadata which the correlation id is stored. /// public static readonly int LOG_CORRELATION_ID_OFFSET; /// - /// Offset within the log meta data which the term length is stored. + /// Offset within the log metadata which the term length is stored. /// public static readonly int LOG_TERM_LENGTH_OFFSET; /// - /// Offset within the log meta data which the page size is stored. + /// Offset within the log metadata which the page size is stored. /// public static readonly int LOG_PAGE_SIZE_OFFSET; - + /// - /// Offset at which the default frame headers begin. + /// Offset at which the default frame headers begin. /// public static readonly int LOG_DEFAULT_FRAME_HEADER_OFFSET; /// - /// Maximum length of a frame header + /// Maximum length of a frame header /// public static readonly int LOG_DEFAULT_FRAME_HEADER_MAX_LENGTH = BitUtil.CACHE_LINE_LENGTH * 2; /// - /// Total length of the log meta data buffer in bytes. - /// - ///
-        ///   0                   1                   2                   3
-        ///   0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
-        ///  +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
-        ///  |                       Tail Counter 0                          |
-        ///  |                                                               |
-        ///  +---------------------------------------------------------------+
-        ///  |                       Tail Counter 1                          |
-        ///  |                                                               |
-        ///  +---------------------------------------------------------------+
-        ///  |                       Tail Counter 2                          |
-        ///  |                                                               |
-        ///  +---------------------------------------------------------------+
-        ///  |                      Active Term Count                        |
-        ///  +---------------------------------------------------------------+
-        ///  |                     Cache Line Padding                       ...
-        /// ...                                                              |
-        ///  +---------------------------------------------------------------+
-        ///  |                    End of Stream Position                     |
-        ///  |                                                               |
-        ///  +---------------------------------------------------------------+
-        ///  |                        Is Connected                           |
-        ///  +---------------------------------------------------------------+
-        ///  |                      Cache Line Padding                      ...
-        /// ...                                                              |
-        ///  +---------------------------------------------------------------+
-        ///  |                 Registration / Correlation ID                 |
-        ///  |                                                               |
-        ///  +---------------------------------------------------------------+
-        ///  |                        Initial Term Id                        |
-        ///  +---------------------------------------------------------------+
-        ///  |                  Default Frame Header Length                  |
-        ///  +---------------------------------------------------------------+
-        ///  |                          MTU Length                           |
-        ///  +---------------------------------------------------------------+
-        ///  |                         Term Length                           |
-        ///  +---------------------------------------------------------------+
-        ///  |                          Page Size                            |
-        ///  +---------------------------------------------------------------+
-        ///  |                      Cache Line Padding                      ...
-        /// ...                                                              |
-        ///  +---------------------------------------------------------------+
-        ///  |                     Default Frame Header                     ...
-        /// ...                                                              |
-        ///  +---------------------------------------------------------------+
-        /// 
+ /// Total length of the log metadata buffer in bytes. + ///
+        ///         0                   1                   2                   3
+        ///         0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+        ///         +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+        ///         |                       Tail Counter 0                          |
+        ///         |                                                               |
+        ///         +---------------------------------------------------------------+
+        ///         |                       Tail Counter 1                          |
+        ///         |                                                               |
+        ///         +---------------------------------------------------------------+
+        ///         |                       Tail Counter 2                          |
+        ///         |                                                               |
+        ///         +---------------------------------------------------------------+
+        ///         |                      Active Term Count                        |
+        ///         +---------------------------------------------------------------+
+        ///         |                     Cache Line Padding                       ...
+        ///         ...                                                              |
+        ///         +---------------------------------------------------------------+
+        ///         |                    End of Stream Position                     |
+        ///         |                                                               |
+        ///         +---------------------------------------------------------------+
+        ///         |                        Is Connected                           |
+        ///         +---------------------------------------------------------------+
+        ///         |                      Cache Line Padding                      ...
+        ///         ...                                                              |
+        ///         +---------------------------------------------------------------+
+        ///         |                 Registration / Correlation ID                 |
+        ///         |                                                               |
+        ///         +---------------------------------------------------------------+
+        ///         |                        Initial Term Id                        |
+        ///         +---------------------------------------------------------------+
+        ///         |                  Default Frame Header Length                  |
+        ///         +---------------------------------------------------------------+
+        ///         |                          MTU Length                           |
+        ///         +---------------------------------------------------------------+
+        ///         |                         Term Length                           |
+        ///         +---------------------------------------------------------------+
+        ///         |                          Page Size                            |
+        ///         +---------------------------------------------------------------+
+        ///         |                      Cache Line Padding                      ...
+        ///         ...                                                              |
+        ///         +---------------------------------------------------------------+
+        ///         |                     Default Frame Header                     ...
+        ///         ...                                                              |
+        ///         +---------------------------------------------------------------+
+        ///     
///
public static readonly int LOG_META_DATA_LENGTH; - + static LogBufferDescriptor() { - int offset = 0; + var offset = 0; TERM_TAIL_COUNTERS_OFFSET = offset; - offset += (BitUtil.SIZE_OF_LONG * PARTITION_COUNT); + offset += BitUtil.SIZE_OF_LONG * PARTITION_COUNT; LOG_ACTIVE_TERM_COUNT_OFFSET = offset; - offset = (BitUtil.CACHE_LINE_LENGTH * 2); + offset = BitUtil.CACHE_LINE_LENGTH * 2; LOG_END_OF_STREAM_POSITION_OFFSET = offset; LOG_IS_CONNECTED_OFFSET = LOG_END_OF_STREAM_POSITION_OFFSET + BitUtil.SIZE_OF_LONG; - offset += (BitUtil.CACHE_LINE_LENGTH * 2); + offset += BitUtil.CACHE_LINE_LENGTH * 2; LOG_CORRELATION_ID_OFFSET = offset; LOG_INITIAL_TERM_ID_OFFSET = LOG_CORRELATION_ID_OFFSET + BitUtil.SIZE_OF_LONG; LOG_DEFAULT_FRAME_HEADER_LENGTH_OFFSET = LOG_INITIAL_TERM_ID_OFFSET + BitUtil.SIZE_OF_INT; @@ -215,7 +213,7 @@ static LogBufferDescriptor() } /// - /// Check that term length is valid and alignment is valid. + /// Check that term length is valid and alignment is valid. /// /// to be checked. /// if the length is not as expected. @@ -223,238 +221,222 @@ static LogBufferDescriptor() public static void CheckTermLength(int termLength) { if (termLength < TERM_MIN_LENGTH) - { ThrowHelper.ThrowInvalidOperationException( $"Term length less than min length of {TERM_MIN_LENGTH:D}, length={termLength:D}"); - } if (termLength > TERM_MAX_LENGTH) - { ThrowHelper.ThrowInvalidOperationException( $"Term length more than max length of {TERM_MAX_LENGTH:D}: length = {termLength:D}"); - } - if (!BitUtil.IsPowerOfTwo(termLength)) - { - ThrowHelper.ThrowInvalidOperationException("Term length not a power of 2: length=" + termLength); - } + if (!BitUtil.IsPowerOfTwo(termLength)) ThrowHelper.ThrowInvalidOperationException("Term length not a power of 2: length=" + termLength); } - + /// - /// Check that page size is valid and alignment is valid. + /// Check that page size is valid and alignment is valid. /// /// to be checked. /// if the size is not as expected. public static void CheckPageSize(int pageSize) { - if (pageSize < PAGE_MIN_SIZE) - { - ThrowHelper.ThrowInvalidOperationException($"Page size less than min size of {PAGE_MIN_SIZE}: page size={pageSize}"); - } + if (pageSize < PAGE_MIN_SIZE) ThrowHelper.ThrowInvalidOperationException($"Page size less than min size of {PAGE_MIN_SIZE}: page size={pageSize}"); - if (pageSize > PAGE_MAX_SIZE) - { - ThrowHelper.ThrowInvalidOperationException($"Page size more than max size of {PAGE_MAX_SIZE}: page size={pageSize}"); - } + if (pageSize > PAGE_MAX_SIZE) ThrowHelper.ThrowInvalidOperationException($"Page size more than max size of {PAGE_MAX_SIZE}: page size={pageSize}"); - if (!BitUtil.IsPowerOfTwo(pageSize)) - { - ThrowHelper.ThrowInvalidOperationException($"Page size not a power of 2: page size={pageSize}"); - } + if (!BitUtil.IsPowerOfTwo(pageSize)) ThrowHelper.ThrowInvalidOperationException($"Page size not a power of 2: page size={pageSize}"); } /// - /// Get the value of the initial Term id used for this log. + /// Get the value of the initial Term id used for this log. /// - /// containing the meta data. + /// containing the metadata. /// the value of the initial Term id used for this log. [MethodImpl(MethodImplOptions.AggressiveInlining)] - public static int InitialTermId(UnsafeBuffer logMetaDataBuffer) + public static int InitialTermId(UnsafeBuffer metaDataBuffer) { - return logMetaDataBuffer.GetInt(LOG_INITIAL_TERM_ID_OFFSET); + return metaDataBuffer.GetInt(LOG_INITIAL_TERM_ID_OFFSET); } /// - /// Set the initial term at which this log begins. Initial should be randomised so that stream does not get - /// reused accidentally. + /// Set the initial term at which this log begins. Initial should be randomised so that stream does not get + /// reused accidentally. /// - /// containing the meta data. + /// containing the metadata. /// value to be set. [MethodImpl(MethodImplOptions.AggressiveInlining)] - public static void InitialTermId(UnsafeBuffer logMetaDataBuffer, int initialTermId) + public static void InitialTermId(UnsafeBuffer metaDataBuffer, int initialTermId) { - logMetaDataBuffer.PutInt(LOG_INITIAL_TERM_ID_OFFSET, initialTermId); + metaDataBuffer.PutInt(LOG_INITIAL_TERM_ID_OFFSET, initialTermId); } /// - /// Get the value of the MTU length used for this log. + /// Get the value of the MTU length used for this log. /// - /// containing the meta data. + /// containing the metadata. /// the value of the MTU length used for this log. [MethodImpl(MethodImplOptions.AggressiveInlining)] - public static int MtuLength(UnsafeBuffer logMetaDataBuffer) + public static int MtuLength(UnsafeBuffer metaDataBuffer) { - return logMetaDataBuffer.GetInt(LOG_MTU_LENGTH_OFFSET); + return metaDataBuffer.GetInt(LOG_MTU_LENGTH_OFFSET); } /// - /// Set the MTU length used for this log. + /// Set the MTU length used for this log. /// - /// containing the meta data. + /// containing the metadata. /// value to be set. [MethodImpl(MethodImplOptions.AggressiveInlining)] - public static void MtuLength(UnsafeBuffer logMetaDataBuffer, int mtuLength) + public static void MtuLength(UnsafeBuffer metaDataBuffer, int mtuLength) { - logMetaDataBuffer.PutInt(LOG_MTU_LENGTH_OFFSET, mtuLength); + metaDataBuffer.PutInt(LOG_MTU_LENGTH_OFFSET, mtuLength); } - + /// - /// Get the value of the Term Length used for this log. + /// Get the value of the Term Length used for this log. /// - /// containing the meta data. + /// containing the metadata. /// the value of the term length used for this log. - public static int TermLength(UnsafeBuffer logMetaDataBuffer) + public static int TermLength(UnsafeBuffer metaDataBuffer) { - return logMetaDataBuffer.GetInt(LOG_TERM_LENGTH_OFFSET); + return metaDataBuffer.GetInt(LOG_TERM_LENGTH_OFFSET); } /// - /// Set the term length used for this log. + /// Set the term length used for this log. /// - /// containing the meta data. + /// containing the metadata. /// value to be set. - public static void TermLength(UnsafeBuffer logMetaDataBuffer, int termLength) + public static void TermLength(UnsafeBuffer metaDataBuffer, int termLength) { - logMetaDataBuffer.PutInt(LOG_TERM_LENGTH_OFFSET, termLength); + metaDataBuffer.PutInt(LOG_TERM_LENGTH_OFFSET, termLength); } /// - /// Get the value of the page size used for this log. + /// Get the value of the page size used for this log. /// - /// containing the meta data. + /// containing the metadata. /// the value of the page size used for this log. - public static int PageSize(UnsafeBuffer logMetaDataBuffer) + public static int PageSize(UnsafeBuffer metaDataBuffer) { - return logMetaDataBuffer.GetInt(LOG_PAGE_SIZE_OFFSET); + return metaDataBuffer.GetInt(LOG_PAGE_SIZE_OFFSET); } /// - /// Set the page size used for this log. + /// Set the page size used for this log. /// - /// containing the meta data. + /// containing the metadata. /// value to be set. - public static void PageSize(UnsafeBuffer logMetaDataBuffer, int pageSize) + public static void PageSize(UnsafeBuffer metaDataBuffer, int pageSize) { - logMetaDataBuffer.PutInt(LOG_PAGE_SIZE_OFFSET, pageSize); + metaDataBuffer.PutInt(LOG_PAGE_SIZE_OFFSET, pageSize); } /// - /// Get the value of the correlation ID for this log relating to the command which created it. + /// Get the value of the correlation ID for this log relating to the command which created it. /// - /// containing the meta data. + /// containing the metadata. /// the value of the correlation ID used for this log. [MethodImpl(MethodImplOptions.AggressiveInlining)] - public static long CorrelationId(UnsafeBuffer logMetaDataBuffer) + public static long CorrelationId(UnsafeBuffer metaDataBuffer) { - return logMetaDataBuffer.GetLong(LOG_CORRELATION_ID_OFFSET); + return metaDataBuffer.GetLong(LOG_CORRELATION_ID_OFFSET); } /// - /// Set the correlation ID used for this log relating to the command which created it. + /// Set the correlation ID used for this log relating to the command which created it. /// - /// containing the meta data. + /// containing the metadata. /// value to be set. [MethodImpl(MethodImplOptions.AggressiveInlining)] - public static void CorrelationId(UnsafeBuffer logMetaDataBuffer, long id) + public static void CorrelationId(UnsafeBuffer metaDataBuffer, long id) { - logMetaDataBuffer.PutLong(LOG_CORRELATION_ID_OFFSET, id); + metaDataBuffer.PutLong(LOG_CORRELATION_ID_OFFSET, id); } /// - /// Get whether the log is considered connected or not by the driver. + /// Get whether the log is considered connected or not by the driver. /// - /// containing the meta data. + /// containing the metadata. /// whether the log is considered connected or not by the driver. - public static bool IsConnected(UnsafeBuffer logMetaDataBuffer) + public static bool IsConnected(UnsafeBuffer metaDataBuffer) { - return logMetaDataBuffer.GetIntVolatile(LOG_IS_CONNECTED_OFFSET) == 1; + return metaDataBuffer.GetIntVolatile(LOG_IS_CONNECTED_OFFSET) == 1; } /// - /// Set whether the log is considered connected or not by the driver. + /// Set whether the log is considered connected or not by the driver. /// - /// containing the meta data. + /// containing the metadata. /// or not - public static void IsConnected(UnsafeBuffer logMetaDataBuffer, bool isConnected) + public static void IsConnected(UnsafeBuffer metaDataBuffer, bool isConnected) { - logMetaDataBuffer.PutIntOrdered(LOG_IS_CONNECTED_OFFSET, isConnected ? 1 : 0); + metaDataBuffer.PutIntOrdered(LOG_IS_CONNECTED_OFFSET, isConnected ? 1 : 0); } /// - /// Get the value of the end of stream position. + /// Get the value of the end of stream position. /// - /// containing the meta data. + /// containing the metadata. /// the value of end of stream position - public static long EndOfStreamPosition(UnsafeBuffer logMetaDataBuffer) + public static long EndOfStreamPosition(UnsafeBuffer metaDataBuffer) { - return logMetaDataBuffer.GetLongVolatile(LOG_END_OF_STREAM_POSITION_OFFSET); + return metaDataBuffer.GetLongVolatile(LOG_END_OF_STREAM_POSITION_OFFSET); } /// - /// Set the value of the end of stream position. + /// Set the value of the end of stream position. /// - /// containing the meta data. + /// containing the metadata. /// value of the end of stream position - public static void EndOfStreamPosition(UnsafeBuffer logMetaDataBuffer, long position) + public static void EndOfStreamPosition(UnsafeBuffer metaDataBuffer, long position) { - logMetaDataBuffer.PutLongOrdered(LOG_END_OF_STREAM_POSITION_OFFSET, position); + metaDataBuffer.PutLongOrdered(LOG_END_OF_STREAM_POSITION_OFFSET, position); } /// - /// Get the value of the active term count used by the producer of this log. Consumers may have a different - /// active term count if they are running behind. The read is done with volatile semantics. + /// Get the value of the active term count used by the producer of this log. Consumers may have a different + /// active term count if they are running behind. The read is done with volatile semantics. /// - /// containing the meta data. + /// containing the metadata. /// the value of the active term count used by the producer of this log. - public static int ActiveTermCount(UnsafeBuffer logMetaDataBuffer) + public static int ActiveTermCount(UnsafeBuffer metaDataBuffer) { - return logMetaDataBuffer.GetIntVolatile(LOG_ACTIVE_TERM_COUNT_OFFSET); + return metaDataBuffer.GetIntVolatile(LOG_ACTIVE_TERM_COUNT_OFFSET); } - + /// - /// Set the value of the current active term count for the producer using memory ordered semantics. + /// Set the value of the current active term count for the producer using memory ordered semantics. /// - /// containing the meta data. + /// containing the metadata. /// value of the active term count used by the producer of this log. - public static void ActiveTermCountOrdered(UnsafeBuffer logMetaDataBuffer, int termCount) + public static void ActiveTermCountOrdered(UnsafeBuffer metaDataBuffer, int termCount) { - logMetaDataBuffer.PutIntOrdered(LOG_ACTIVE_TERM_COUNT_OFFSET, termCount); + metaDataBuffer.PutIntOrdered(LOG_ACTIVE_TERM_COUNT_OFFSET, termCount); } - + /// - /// Compare and set the value of the current active term count. + /// Compare and set the value of the current active term count. /// - /// containing the meta data. + /// containing the metadata. /// value of the active term count expected in the log /// value of the active term count to be updated in the log /// true if successful otherwise false. - public static bool CasActiveTermCount(UnsafeBuffer logMetaDataBuffer, int expectedTermCount, int updateTermCount) + public static bool CasActiveTermCount(UnsafeBuffer metaDataBuffer, int expectedTermCount, int updateTermCount) { - return logMetaDataBuffer.CompareAndSetInt(LOG_ACTIVE_TERM_COUNT_OFFSET, expectedTermCount, updateTermCount); + return metaDataBuffer.CompareAndSetInt(LOG_ACTIVE_TERM_COUNT_OFFSET, expectedTermCount, updateTermCount); } - + /// - /// Set the value of the current active partition index for the producer. + /// Set the value of the current active partition index for the producer. /// - /// containing the meta data. + /// containing the metadata. /// value of the active term count used by the producer of this log. - public static void ActiveTermCount(UnsafeBuffer logMetaDataBuffer, int termCount) + public static void ActiveTermCount(UnsafeBuffer metaDataBuffer, int termCount) { - logMetaDataBuffer.PutInt(LOG_ACTIVE_TERM_COUNT_OFFSET, termCount); + metaDataBuffer.PutInt(LOG_ACTIVE_TERM_COUNT_OFFSET, termCount); } /// - /// Rotate to the next partition in sequence for the term id. + /// Rotate to the next partition in sequence for the term id. /// /// partition index /// the next partition index @@ -465,7 +447,7 @@ public static int NextPartitionIndex(int currentIndex) } /// - /// Determine the partition index to be used given the initial term and active term ids. + /// Determine the partition index to be used given the initial term and active term ids. /// /// at which the log buffer usage began /// that is in current usage @@ -477,7 +459,7 @@ public static int IndexByTerm(int initialTermId, int activeTermId) } /// - /// Determine the partition index based on number of terms that have passed. + /// Determine the partition index based on number of terms that have passed. /// /// for the number of terms that have passed. /// the partition index for the term count. @@ -488,7 +470,7 @@ public static int IndexByTermCount(long termCount) } /// - /// Determine the partition index given a stream position. + /// Determine the partition index given a stream position. /// /// in the stream in bytes. /// number of times to right shift the position for term count @@ -496,11 +478,11 @@ public static int IndexByTermCount(long termCount) [MethodImpl(MethodImplOptions.AggressiveInlining)] public static int IndexByPosition(long position, int positionBitsToShift) { - return (int) (((long) ((ulong) position >> positionBitsToShift)) % PARTITION_COUNT); + return (int) ((long) ((ulong) position >> positionBitsToShift) % PARTITION_COUNT); } /// - /// Compute the current position in absolute number of bytes. + /// Compute the current position in absolute number of bytes. /// /// active term id. /// in the term. @@ -514,9 +496,9 @@ public static long ComputePosition(int activeTermId, int termOffset, int positio return (termCount << positionBitsToShift) + termOffset; } - + /// - /// Compute the current position in absolute number of bytes for the beginning of a term. + /// Compute the current position in absolute number of bytes for the beginning of a term. /// /// active term id. /// number of times to left shift the term count @@ -531,7 +513,7 @@ public static long ComputeTermBeginPosition(int activeTermId, int positionBitsTo } /// - /// Compute the term id from a position. + /// Compute the term id from a position. /// /// to calculate from /// number of times to right shift the position @@ -540,36 +522,35 @@ public static long ComputeTermBeginPosition(int activeTermId, int positionBitsTo [MethodImpl(MethodImplOptions.AggressiveInlining)] public static int ComputeTermIdFromPosition(long position, int positionBitsToShift, int initialTermId) { - return ((int) ((long) ((ulong) position >> positionBitsToShift)) + initialTermId); + return (int) (long) ((ulong) position >> positionBitsToShift) + initialTermId; } /// - /// Compute the total length of a log file given the term length. - /// - /// Assumes is 1GB and that filePageSize is 1GB or less and a power of 2. + /// Compute the total length of a log file given the term length. + /// Assumes is 1GB and that filePageSize is 1GB or less and a power of 2. /// /// on which to base the calculation. /// to use for log. /// the total length of the log file. public static long ComputeLogLength(int termLength, int filePageSize) { - if (termLength < (1024 * 1024 * 1024)) - { - return BitUtil.Align((termLength * PARTITION_COUNT) + LOG_META_DATA_LENGTH, filePageSize); - } + if (termLength < 1024 * 1024 * 1024) return BitUtil.Align(termLength * PARTITION_COUNT + LOG_META_DATA_LENGTH, filePageSize); - return (PARTITION_COUNT * (long)termLength) + BitUtil.Align(LOG_META_DATA_LENGTH, filePageSize); + return PARTITION_COUNT * (long) termLength + BitUtil.Align(LOG_META_DATA_LENGTH, filePageSize); } /// - /// Store the default frame header to the log meta data buffer. + /// Store the default frame header to the log metadata buffer. /// - /// into which the default headers should be stored. + /// into which the default headers should be stored. /// to be stored. - /// if the defaultHeader is larger than + /// + /// if the defaultHeader is larger than + /// + /// [MethodImpl(MethodImplOptions.AggressiveInlining)] - public static void StoreDefaultFrameHeader(UnsafeBuffer logMetaDataBuffer, IDirectBuffer defaultHeader) + public static void StoreDefaultFrameHeader(UnsafeBuffer metaDataBuffer, IDirectBuffer defaultHeader) { if (defaultHeader.Capacity != DataHeaderFlyweight.HEADER_LENGTH) { @@ -578,67 +559,63 @@ public static void StoreDefaultFrameHeader(UnsafeBuffer logMetaDataBuffer, IDire return; } - logMetaDataBuffer.PutInt(LOG_DEFAULT_FRAME_HEADER_LENGTH_OFFSET, DataHeaderFlyweight.HEADER_LENGTH); - logMetaDataBuffer.PutBytes(LOG_DEFAULT_FRAME_HEADER_OFFSET, defaultHeader, 0, + metaDataBuffer.PutInt(LOG_DEFAULT_FRAME_HEADER_LENGTH_OFFSET, DataHeaderFlyweight.HEADER_LENGTH); + metaDataBuffer.PutBytes(LOG_DEFAULT_FRAME_HEADER_OFFSET, defaultHeader, 0, DataHeaderFlyweight.HEADER_LENGTH); } /// - /// Get a wrapper around the default frame header from the log meta data. + /// Get a wrapper around the default frame header from the log metadata. /// - /// containing the raw bytes for the default frame header. + /// containing the raw bytes for the default frame header. /// a buffer wrapping the raw bytes. [MethodImpl(MethodImplOptions.AggressiveInlining)] - public static UnsafeBuffer DefaultFrameHeader(UnsafeBuffer logMetaDataBuffer) + public static UnsafeBuffer DefaultFrameHeader(UnsafeBuffer metaDataBuffer) { - return new UnsafeBuffer(logMetaDataBuffer, LOG_DEFAULT_FRAME_HEADER_OFFSET, + return new UnsafeBuffer(metaDataBuffer, LOG_DEFAULT_FRAME_HEADER_OFFSET, DataHeaderFlyweight.HEADER_LENGTH); } /// - /// Apply the default header for a message in a term. + /// Apply the default header for a message in a term. /// - /// containing the default headers. + /// containing the default headers. /// to which the default header should be applied. /// at which the default should be applied. [MethodImpl(MethodImplOptions.AggressiveInlining)] - public static void ApplyDefaultHeader(UnsafeBuffer logMetaDataBuffer, UnsafeBuffer termBuffer, int termOffset) + public static void ApplyDefaultHeader(UnsafeBuffer metaDataBuffer, UnsafeBuffer termBuffer, int termOffset) { - termBuffer.PutBytes(termOffset, logMetaDataBuffer, LOG_DEFAULT_FRAME_HEADER_OFFSET, + termBuffer.PutBytes(termOffset, metaDataBuffer, LOG_DEFAULT_FRAME_HEADER_OFFSET, DataHeaderFlyweight.HEADER_LENGTH); } /// - /// Rotate the log and update the tail counter for the new term. - /// - /// This method is safe for concurrent use. + /// Rotate the log and update the tail counter for the new term. + /// This method is safe for concurrent use. /// - /// for the meta data. + /// for the metadata. /// from which to rotate. /// to be used in the default headers. /// true if log was rotated. - public static bool RotateLog(UnsafeBuffer logMetaDataBuffer, int currentTermCount, int currentTermId) + public static bool RotateLog(UnsafeBuffer metaDataBuffer, int currentTermCount, int currentTermId) { - int nextTermId = currentTermId + 1; - int nextTermCount = currentTermCount + 1; - int nextIndex = IndexByTermCount(nextTermCount); - int expectedTermId = nextTermId - PARTITION_COUNT; + var nextTermId = currentTermId + 1; + var nextTermCount = currentTermCount + 1; + var nextIndex = IndexByTermCount(nextTermCount); + var expectedTermId = nextTermId - PARTITION_COUNT; long rawTail; do { - rawTail = RawTail(logMetaDataBuffer, nextIndex); - if (expectedTermId != TermId(rawTail)) - { - break; - } - } while (!CasRawTail(logMetaDataBuffer, nextIndex, rawTail, PackTail(nextTermId, 0))); + rawTail = RawTail(metaDataBuffer, nextIndex); + if (expectedTermId != TermId(rawTail)) break; + } while (!CasRawTail(metaDataBuffer, nextIndex, rawTail, PackTail(nextTermId, 0))); - return CasActiveTermCount(logMetaDataBuffer, currentTermCount, nextTermCount); + return CasActiveTermCount(metaDataBuffer, currentTermCount, nextTermCount); } /// - /// Set the initial value for the termId in the upper bits of the tail counter. + /// Set the initial value for the termId in the upper bits of the tail counter. /// /// contain the tail counter. /// to be intialized. @@ -650,7 +627,7 @@ public static void InitialiseTailWithTermId(UnsafeBuffer logMetaData, int partit } /// - /// Get the termId from a packed raw tail value. + /// Get the termId from a packed raw tail value. /// /// containing the termId /// the termId from a packed raw tail value. @@ -661,7 +638,7 @@ public static int TermId(long rawTail) } /// - /// Read the termOffset from a packed raw tail value. + /// Read the termOffset from a packed raw tail value. /// /// containing the termOffset. /// that the offset cannot exceed. @@ -675,7 +652,7 @@ public static int TermOffset(long rawTail, long termLength) } /// - /// The termOffset as a result of the append + /// The termOffset as a result of the append /// /// into which the termOffset value has been packed. /// the termOffset after the append @@ -686,7 +663,7 @@ public static int TermOffset(long result) } /// - /// Pack a termId and termOffset into a raw tail value. + /// Pack a termId and termOffset into a raw tail value. /// /// to be packed. /// to be packed. @@ -698,81 +675,80 @@ public static long PackTail(int termId, int termOffset) } /// - /// Set the raw value of the tail for the given partition. + /// Set the raw value of the tail for the given partition. /// - /// containing the tail counters. + /// containing the tail counters. /// for the tail counter. /// to be stored - public static void RawTail(UnsafeBuffer logMetaDataBuffer, int partitionIndex, long rawTail) + public static void RawTail(UnsafeBuffer metaDataBuffer, int partitionIndex, long rawTail) { - logMetaDataBuffer.PutLong(TERM_TAIL_COUNTERS_OFFSET + (BitUtil.SIZE_OF_LONG * partitionIndex), rawTail); + metaDataBuffer.PutLong(TERM_TAIL_COUNTERS_OFFSET + BitUtil.SIZE_OF_LONG * partitionIndex, rawTail); } /// - /// Get the raw value of the tail for the given partition. + /// Get the raw value of the tail for the given partition. /// - /// containing the tail counters. + /// containing the tail counters. /// for the tail counter. /// the raw value of the tail for the current active partition. - public static long RawTail(UnsafeBuffer logMetaDataBuffer, int partitionIndex) + public static long RawTail(UnsafeBuffer metaDataBuffer, int partitionIndex) { - return logMetaDataBuffer.GetLong(TERM_TAIL_COUNTERS_OFFSET + (BitUtil.SIZE_OF_LONG * partitionIndex)); + return metaDataBuffer.GetLong(TERM_TAIL_COUNTERS_OFFSET + BitUtil.SIZE_OF_LONG * partitionIndex); } /// - /// Set the raw value of the tail for the given partition. + /// Set the raw value of the tail for the given partition. /// - /// containing the tail counters. + /// containing the tail counters. /// for the tail counter. /// to be stored - public static void RawTailVolatile(UnsafeBuffer logMetaDataBuffer, int partitionIndex, long rawTail) + public static void RawTailVolatile(UnsafeBuffer metaDataBuffer, int partitionIndex, long rawTail) { - logMetaDataBuffer.PutLongVolatile(TERM_TAIL_COUNTERS_OFFSET + (BitUtil.SIZE_OF_LONG * partitionIndex), + metaDataBuffer.PutLongVolatile(TERM_TAIL_COUNTERS_OFFSET + BitUtil.SIZE_OF_LONG * partitionIndex, rawTail); } /// - /// Get the raw value of the tail for the given partition. + /// Get the raw value of the tail for the given partition. /// - /// containing the tail counters. + /// containing the tail counters. /// for the tail counter. /// the raw value of the tail for the current active partition. [MethodImpl(MethodImplOptions.AggressiveInlining)] - public static long RawTailVolatile(UnsafeBuffer logMetaDataBuffer, int partitionIndex) + public static long RawTailVolatile(UnsafeBuffer metaDataBuffer, int partitionIndex) { - return logMetaDataBuffer.GetLongVolatile(TERM_TAIL_COUNTERS_OFFSET + BitUtil.SIZE_OF_LONG * partitionIndex); + return metaDataBuffer.GetLongVolatile(TERM_TAIL_COUNTERS_OFFSET + BitUtil.SIZE_OF_LONG * partitionIndex); } /// - /// Get the raw value of the tail for the current active partition. + /// Get the raw value of the tail for the current active partition. /// - /// logMetaDataBuffer containing the tail counters. + /// metaDataBuffer containing the tail counters. /// the raw value of the tail for the current active partition. [MethodImpl(MethodImplOptions.AggressiveInlining)] - public static long RawTailVolatile(UnsafeBuffer logMetaDataBuffer) + public static long RawTailVolatile(UnsafeBuffer metaDataBuffer) { - var partitionIndex = IndexByTermCount(ActiveTermCount(logMetaDataBuffer)); - return logMetaDataBuffer.GetLongVolatile(TERM_TAIL_COUNTERS_OFFSET + BitUtil.SIZE_OF_LONG * partitionIndex); + var partitionIndex = IndexByTermCount(ActiveTermCount(metaDataBuffer)); + return metaDataBuffer.GetLongVolatile(TERM_TAIL_COUNTERS_OFFSET + BitUtil.SIZE_OF_LONG * partitionIndex); } - + /// - /// Compare and set the raw value of the tail for the given partition. + /// Compare and set the raw value of the tail for the given partition. /// - /// containing the tail counters. + /// containing the tail counters. /// for the tail counter. /// expected current value. /// to be applied. /// true if the update was successful otherwise false. - /// [MethodImpl(MethodImplOptions.AggressiveInlining)] - public static bool CasRawTail(UnsafeBuffer logMetaDataBuffer, int partitionIndex, long expectedRawTail, long updateRawTail) + public static bool CasRawTail(UnsafeBuffer metaDataBuffer, int partitionIndex, long expectedRawTail, long updateRawTail) { - var index = TERM_TAIL_COUNTERS_OFFSET + (BitUtil.SIZE_OF_LONG * partitionIndex); - return logMetaDataBuffer.CompareAndSetLong(index, expectedRawTail, updateRawTail); + var index = TERM_TAIL_COUNTERS_OFFSET + BitUtil.SIZE_OF_LONG * partitionIndex; + return metaDataBuffer.CompareAndSetLong(index, expectedRawTail, updateRawTail); } - + /// - /// Get the number of bits to shift when dividing or multiplying by the term buffer length. + /// Get the number of bits to shift when dividing or multiplying by the term buffer length. /// /// to compute the number of bits to shift for. /// the number of bits to shift to divide or multiply by the term buffer length. diff --git a/src/Adaptive.Aeron/Publication.cs b/src/Adaptive.Aeron/Publication.cs index 8f08b8b2..52b3e50a 100644 --- a/src/Adaptive.Aeron/Publication.cs +++ b/src/Adaptive.Aeron/Publication.cs @@ -261,7 +261,7 @@ public long ChannelStatus /// Get the current position to which the publication has advanced for this stream. /// /// the current position to which the publication has advanced for this stream or . - public long Position + public virtual long Position { get { @@ -301,6 +301,13 @@ public long PositionLimit /// /// the counter id for the position limit after which the publication will be back pressured. public int PositionLimitId => _positionLimit.Id; + + /// + /// Available window for offering into a publication before the is reached. + /// + /// window for offering into a publication before the is reached. If + /// the publication is closed then will be returned. + public abstract long AvailableWindow { get; } /// /// Non-blocking publish of a buffer containing a message. diff --git a/src/Adaptive.Aeron/Subscription.cs b/src/Adaptive.Aeron/Subscription.cs index caf68b50..9883fde4 100644 --- a/src/Adaptive.Aeron/Subscription.cs +++ b/src/Adaptive.Aeron/Subscription.cs @@ -137,20 +137,27 @@ internal Subscription( /// callback used to indicate when an goes unavailable under this . public UnavailableImageHandler UnavailableImageHandler => _fields.unavailableImageHandler; + /// + /// Poll the s under the subscription for having reached End of Stream (EOS). This method will miss + /// s that have gone unavailable between calls unless using the . + /// + /// callback for handling end of stream indication. + /// number of that have reached End of Stream. + [Obsolete] public int PollEndOfStreams(EndOfStreamHandler endOfStreamHandler) { - int numberEndOfStreams = 0; + int eosCount = 0; foreach (var image in Images) { if (image.IsEndOfStream) { - numberEndOfStreams++; + eosCount++; endOfStreamHandler(image); } } - return numberEndOfStreams; + return eosCount; } /// @@ -493,7 +500,7 @@ internal Image RemoveImage(long correlationId) if (null != removedImage) { _fields.images = ArrayUtil.Remove(oldArray, i); - _fields.conductor.ReleaseLogBuffers(removedImage.LogBuffers, removedImage.CorrelationId); + _fields.conductor.ReleaseLogBuffers(removedImage.LogBuffers, correlationId); } return removedImage; diff --git a/src/Adaptive.Agrona/Adaptive.Agrona.csproj b/src/Adaptive.Agrona/Adaptive.Agrona.csproj index 2eef961d..9efe6f65 100644 --- a/src/Adaptive.Agrona/Adaptive.Agrona.csproj +++ b/src/Adaptive.Agrona/Adaptive.Agrona.csproj @@ -3,7 +3,7 @@ netstandard2.0;net45 true Agrona - 1.12.0 + 1.13.0 Adaptive Financial Consulting Ltd. Adaptive Financial Consulting Ltd. Agrona libraries initially included in Aeron Client diff --git a/src/Adaptive.Agrona/Collections/ArrayUtil.cs b/src/Adaptive.Agrona/Collections/ArrayUtil.cs index 52e4ab20..cb8062f0 100644 --- a/src/Adaptive.Agrona/Collections/ArrayUtil.cs +++ b/src/Adaptive.Agrona/Collections/ArrayUtil.cs @@ -30,6 +30,8 @@ namespace Adaptive.Agrona.Collections public sealed class ArrayUtil { public const int UNKNOWN_INDEX = -1; + + public static readonly string[] EMPTY_STRING_ARRAY = new string[0]; /// /// Add an element to an array resulting in a new array. diff --git a/src/Adaptive.Archiver/Adaptive.Archiver.csproj b/src/Adaptive.Archiver/Adaptive.Archiver.csproj index adf82bc2..b110ecfc 100644 --- a/src/Adaptive.Archiver/Adaptive.Archiver.csproj +++ b/src/Adaptive.Archiver/Adaptive.Archiver.csproj @@ -3,7 +3,7 @@ netstandard2.0;net45 true Aeron.Archiver - 1.12.0 + 1.13.0 Adaptive Financial Consulting Ltd. Adaptive Financial Consulting Ltd. Archiving over the Aeron transport diff --git a/src/Adaptive.Archiver/AeronArchive.cs b/src/Adaptive.Archiver/AeronArchive.cs index 8187bfc7..255224e1 100644 --- a/src/Adaptive.Archiver/AeronArchive.cs +++ b/src/Adaptive.Archiver/AeronArchive.cs @@ -706,7 +706,7 @@ public int ListRecordings(long fromRecordingId, int recordCount, IRecordingDescr } /// - /// List recording descriptors from a recording id with a limit of record count for a given channel and stream id. + /// List recording descriptors from a recording id with a limit of record count for a given channelFragment and stream id. /// /// If the recording id is greater than the largest known id then nothing is returned. /// @@ -714,11 +714,15 @@ public int ListRecordings(long fromRecordingId, int recordCount, IRecordingDescr /// /// at which to begin the listing. /// to limit for each query. - /// for a contains match on the stripped channel stored with the archive descriptor. + /// for a contains match on the original channel stored with the archive descriptor. /// to match. /// to which the descriptors are dispatched. /// the number of descriptors found and consumed. - public int ListRecordingsForUri(long fromRecordingId, int recordCount, string channel, int streamId, + public int ListRecordingsForUri( + long fromRecordingId, + int recordCount, + string channelFragment, + int streamId, IRecordingDescriptorConsumer consumer) { _lock.Lock(); @@ -726,7 +730,7 @@ public int ListRecordingsForUri(long fromRecordingId, int recordCount, string ch { long correlationId = aeron.NextCorrelationId(); - if (!archiveProxy.ListRecordingsForUri(fromRecordingId, recordCount, channel, streamId, correlationId, + if (!archiveProxy.ListRecordingsForUri(fromRecordingId, recordCount, channelFragment, streamId, correlationId, controlSessionId)) { throw new ArchiveException("failed to send list recordings request"); @@ -823,12 +827,12 @@ public long GetStopPosition(long recordingId) /// /// Find the last recording that matches the given criteria. /// - /// to search back to. - /// for a contains match on the stripped channel stored with the archive descriptor - /// of the recording to match. - /// of the recording to match. + /// to search back to. + /// for a contains match on the stripped channel stored with the archive descriptor + /// of the recording to match. + /// of the recording to match. /// the recordingId if found otherwise if not found. - public long FindLastMatchingRecording(long minRecordingId, string channel, int streamId, int sessionId) + public long FindLastMatchingRecording(long minRecordingId, string channelFragment, int streamId, int sessionId) { _lock.Lock(); try @@ -836,7 +840,7 @@ public long FindLastMatchingRecording(long minRecordingId, string channel, int s long correlationId = aeron.NextCorrelationId(); if (!archiveProxy.FindLastMatchingRecording( - minRecordingId, channel, streamId, sessionId, correlationId, controlSessionId)) + minRecordingId, channelFragment, streamId, sessionId, correlationId, controlSessionId)) { throw new ArchiveException("failed to send find last matching request"); } @@ -950,7 +954,7 @@ private long PollForResponse(long correlationId) if (code == ControlResponseCode.ERROR) { var ex = new ArchiveException("response for correlationId=" + correlationId + ", error: " + - poller.ErrorMessage(), (int) poller.RelevantId()); + poller.ErrorMessage(), (int) poller.RelevantId()); if (poller.CorrelationId() == correlationId) { @@ -962,14 +966,13 @@ private long PollForResponse(long correlationId) context.ErrorHandler().Invoke(ex); } } - - if (poller.CorrelationId() == correlationId) + else if (poller.CorrelationId() == correlationId) { if (ControlResponseCode.OK != code) { throw new ArchiveException("unexpected response code: " + code); } - + return poller.RelevantId(); } } @@ -1154,32 +1157,32 @@ public class Configuration /// /// Sparse term buffer indicator for control streams. /// - private const string CONTROL_TERM_BUFFER_SPARSE_PARAM_NAME = "aeron.archive.control.term.buffer.sparse"; + public const string CONTROL_TERM_BUFFER_SPARSE_PARAM_NAME = "aeron.archive.control.term.buffer.sparse"; /// /// Overrides driver's sparse term buffer indicator for control streams. /// - private const bool CONTROL_TERM_BUFFER_SPARSE_DEFAULT = true; + public const bool CONTROL_TERM_BUFFER_SPARSE_DEFAULT = true; /// /// Term length for control streams. /// - internal const string CONTROL_TERM_BUFFER_LENGTH_PARAM_NAME = "aeron.archive.control.term.buffer.length"; + public const string CONTROL_TERM_BUFFER_LENGTH_PARAM_NAME = "aeron.archive.control.term.buffer.length"; /// /// Low term length for control channel reflects expected low bandwidth usage. /// - internal const int CONTROL_TERM_BUFFER_LENGTH_DEFAULT = 64 * 1024; + public const int CONTROL_TERM_BUFFER_LENGTH_DEFAULT = 64 * 1024; /// /// MTU length for control streams. /// - internal const string CONTROL_MTU_LENGTH_PARAM_NAME = "aeron.archive.control.mtu.length"; + public const string CONTROL_MTU_LENGTH_PARAM_NAME = "aeron.archive.control.mtu.length"; /// /// MTU to reflect default for the control streams. /// - internal const int CONTROL_MTU_LENGTH_DEFAULT = 4 * 1024; + public const int CONTROL_MTU_LENGTH_DEFAULT = 4 * 1024; /// /// The timeout in nanoseconds to wait for a message. diff --git a/src/Adaptive.Archiver/ArchiveProxy.cs b/src/Adaptive.Archiver/ArchiveProxy.cs index d0fccb2c..09e0d643 100644 --- a/src/Adaptive.Archiver/ArchiveProxy.cs +++ b/src/Adaptive.Archiver/ArchiveProxy.cs @@ -283,12 +283,18 @@ public bool ListRecordings(long fromRecordingId, int recordCount, long correlati /// /// at which to begin listing. /// for the number of descriptors to be listed. - /// to match recordings on. + /// to match recordings on. /// to match recordings on. /// for this request. /// for this request. /// true if successfully offered otherwise false. - public bool ListRecordingsForUri(long fromRecordingId, int recordCount, string channel, int streamId, long correlationId, long controlSessionId) + public bool ListRecordingsForUri( + long fromRecordingId, + int recordCount, + string channelFragment, + int streamId, + long correlationId, + long controlSessionId) { listRecordingsForUriRequestEncoder .WrapAndApplyHeader(buffer, 0, messageHeaderEncoder) @@ -297,7 +303,7 @@ public bool ListRecordingsForUri(long fromRecordingId, int recordCount, string c .FromRecordingId(fromRecordingId) .RecordCount(recordCount) .StreamId(streamId) - .Channel(channel); + .Channel(channelFragment); return Offer(listRecordingsForUriRequestEncoder.EncodedLength()); } @@ -412,7 +418,7 @@ public bool GetStopPosition( /// /// /// to search back to. - /// for a contains match on the stripped channel stored with the archive descriptor. + /// for a contains match on the stripped channel stored with the archive descriptor. /// of the recording to match. /// of the recording to match. /// for this request. @@ -420,7 +426,7 @@ public bool GetStopPosition( /// true if successfully offered otherwise false. public bool FindLastMatchingRecording( long minRecordingId, - string channel, + string channelFragment, int streamId, int sessionId, long correlationId, @@ -433,7 +439,7 @@ public bool FindLastMatchingRecording( .MinRecordingId(minRecordingId) .SessionId(sessionId) .StreamId(streamId) - .Channel(channel); + .Channel(channelFragment); return Offer(findLastMatchingRecordingRequestEncoder.EncodedLength()); } diff --git a/src/Adaptive.Cluster/Adaptive.Cluster.csproj b/src/Adaptive.Cluster/Adaptive.Cluster.csproj index 26c7ac5e..b09b14d6 100644 --- a/src/Adaptive.Cluster/Adaptive.Cluster.csproj +++ b/src/Adaptive.Cluster/Adaptive.Cluster.csproj @@ -3,7 +3,7 @@ netstandard2.0;net45 true Aeron.Cluster - 1.12.0 + 1.13.0 Adaptive Financial Consulting Ltd. Adaptive Financial Consulting Ltd. Clustering libraries over the Aeron transport diff --git a/src/Adaptive.Cluster/Codecs/ChangeType.cs b/src/Adaptive.Cluster/Codecs/ChangeType.cs index 955aa247..d7ad8d03 100644 --- a/src/Adaptive.Cluster/Codecs/ChangeType.cs +++ b/src/Adaptive.Cluster/Codecs/ChangeType.cs @@ -4,7 +4,7 @@ namespace Adaptive.Cluster.Codecs { public enum ChangeType : int { JOIN = 0, - LEAVE = 1, + QUIT = 1, NULL_VALUE = -2147483648 } } diff --git a/src/Adaptive.Cluster/Codecs/ClusterSessionDecoder.cs b/src/Adaptive.Cluster/Codecs/ClusterSessionDecoder.cs index fc6894f8..a74bb7d4 100644 --- a/src/Adaptive.Cluster/Codecs/ClusterSessionDecoder.cs +++ b/src/Adaptive.Cluster/Codecs/ClusterSessionDecoder.cs @@ -142,27 +142,27 @@ public long ClusterSessionId() } - public static int OpenedLogPositionId() + public static int CorrelationIdId() { return 2; } - public static int OpenedLogPositionSinceVersion() + public static int CorrelationIdSinceVersion() { return 0; } - public static int OpenedLogPositionEncodingOffset() + public static int CorrelationIdEncodingOffset() { return 8; } - public static int OpenedLogPositionEncodingLength() + public static int CorrelationIdEncodingLength() { return 8; } - public static string OpenedLogPositionMetaAttribute(MetaAttribute metaAttribute) + public static string CorrelationIdMetaAttribute(MetaAttribute metaAttribute) { switch (metaAttribute) { @@ -175,48 +175,48 @@ public static string OpenedLogPositionMetaAttribute(MetaAttribute metaAttribute) return ""; } - public static long OpenedLogPositionNullValue() + public static long CorrelationIdNullValue() { return -9223372036854775808L; } - public static long OpenedLogPositionMinValue() + public static long CorrelationIdMinValue() { return -9223372036854775807L; } - public static long OpenedLogPositionMaxValue() + public static long CorrelationIdMaxValue() { return 9223372036854775807L; } - public long OpenedLogPosition() + public long CorrelationId() { return _buffer.GetLong(_offset + 8, ByteOrder.LittleEndian); } - public static int CorrelationIdId() + public static int OpenedLogPositionId() { return 3; } - public static int CorrelationIdSinceVersion() + public static int OpenedLogPositionSinceVersion() { return 0; } - public static int CorrelationIdEncodingOffset() + public static int OpenedLogPositionEncodingOffset() { return 16; } - public static int CorrelationIdEncodingLength() + public static int OpenedLogPositionEncodingLength() { return 8; } - public static string CorrelationIdMetaAttribute(MetaAttribute metaAttribute) + public static string OpenedLogPositionMetaAttribute(MetaAttribute metaAttribute) { switch (metaAttribute) { @@ -229,22 +229,22 @@ public static string CorrelationIdMetaAttribute(MetaAttribute metaAttribute) return ""; } - public static long CorrelationIdNullValue() + public static long OpenedLogPositionNullValue() { return -9223372036854775808L; } - public static long CorrelationIdMinValue() + public static long OpenedLogPositionMinValue() { return -9223372036854775807L; } - public static long CorrelationIdMaxValue() + public static long OpenedLogPositionMaxValue() { return 9223372036854775807L; } - public long CorrelationId() + public long OpenedLogPosition() { return _buffer.GetLong(_offset + 16, ByteOrder.LittleEndian); } @@ -506,16 +506,16 @@ public StringBuilder AppendTo(StringBuilder builder) builder.Append("ClusterSessionId="); builder.Append(ClusterSessionId()); builder.Append('|'); - //Token{signal=BEGIN_FIELD, name='openedLogPosition', referencedName='null', description='null', id=2, version=0, deprecated=0, encodedLength=0, offset=8, componentTokenCount=3, encoding=Encoding{presence=REQUIRED, primitiveType=null, byteOrder=LITTLE_ENDIAN, minValue=null, maxValue=null, nullValue=null, constValue=null, characterEncoding='null', epoch='unix', timeUnit=nanosecond, semanticType='null'}} + //Token{signal=BEGIN_FIELD, name='correlationId', referencedName='null', description='null', id=2, version=0, deprecated=0, encodedLength=0, offset=8, componentTokenCount=3, encoding=Encoding{presence=REQUIRED, primitiveType=null, byteOrder=LITTLE_ENDIAN, minValue=null, maxValue=null, nullValue=null, constValue=null, characterEncoding='null', epoch='unix', timeUnit=nanosecond, semanticType='null'}} //Token{signal=ENCODING, name='int64', referencedName='null', description='null', id=-1, version=0, deprecated=0, encodedLength=8, offset=8, componentTokenCount=1, encoding=Encoding{presence=REQUIRED, primitiveType=INT64, byteOrder=LITTLE_ENDIAN, minValue=null, maxValue=null, nullValue=null, constValue=null, characterEncoding='null', epoch='unix', timeUnit=nanosecond, semanticType='null'}} - builder.Append("OpenedLogPosition="); - builder.Append(OpenedLogPosition()); - builder.Append('|'); - //Token{signal=BEGIN_FIELD, name='correlationId', referencedName='null', description='null', id=3, version=0, deprecated=0, encodedLength=0, offset=16, componentTokenCount=3, encoding=Encoding{presence=REQUIRED, primitiveType=null, byteOrder=LITTLE_ENDIAN, minValue=null, maxValue=null, nullValue=null, constValue=null, characterEncoding='null', epoch='unix', timeUnit=nanosecond, semanticType='null'}} - //Token{signal=ENCODING, name='int64', referencedName='null', description='null', id=-1, version=0, deprecated=0, encodedLength=8, offset=16, componentTokenCount=1, encoding=Encoding{presence=REQUIRED, primitiveType=INT64, byteOrder=LITTLE_ENDIAN, minValue=null, maxValue=null, nullValue=null, constValue=null, characterEncoding='null', epoch='unix', timeUnit=nanosecond, semanticType='null'}} builder.Append("CorrelationId="); builder.Append(CorrelationId()); builder.Append('|'); + //Token{signal=BEGIN_FIELD, name='openedLogPosition', referencedName='null', description='null', id=3, version=0, deprecated=0, encodedLength=0, offset=16, componentTokenCount=3, encoding=Encoding{presence=REQUIRED, primitiveType=null, byteOrder=LITTLE_ENDIAN, minValue=null, maxValue=null, nullValue=null, constValue=null, characterEncoding='null', epoch='unix', timeUnit=nanosecond, semanticType='null'}} + //Token{signal=ENCODING, name='int64', referencedName='null', description='null', id=-1, version=0, deprecated=0, encodedLength=8, offset=16, componentTokenCount=1, encoding=Encoding{presence=REQUIRED, primitiveType=INT64, byteOrder=LITTLE_ENDIAN, minValue=null, maxValue=null, nullValue=null, constValue=null, characterEncoding='null', epoch='unix', timeUnit=nanosecond, semanticType='null'}} + builder.Append("OpenedLogPosition="); + builder.Append(OpenedLogPosition()); + builder.Append('|'); //Token{signal=BEGIN_FIELD, name='timeOfLastActivity', referencedName='null', description='null', id=4, version=0, deprecated=0, encodedLength=0, offset=24, componentTokenCount=3, encoding=Encoding{presence=REQUIRED, primitiveType=null, byteOrder=LITTLE_ENDIAN, minValue=null, maxValue=null, nullValue=null, constValue=null, characterEncoding='null', epoch='unix', timeUnit=nanosecond, semanticType='null'}} //Token{signal=ENCODING, name='time_t', referencedName='null', description='Epoch time in milliseconds since 1 Jan 1970 UTC', id=-1, version=0, deprecated=0, encodedLength=8, offset=24, componentTokenCount=1, encoding=Encoding{presence=REQUIRED, primitiveType=INT64, byteOrder=LITTLE_ENDIAN, minValue=null, maxValue=null, nullValue=null, constValue=null, characterEncoding='null', epoch='unix', timeUnit=nanosecond, semanticType='null'}} builder.Append("TimeOfLastActivity="); diff --git a/src/Adaptive.Cluster/Codecs/ClusterSessionEncoder.cs b/src/Adaptive.Cluster/Codecs/ClusterSessionEncoder.cs index 32114fa3..55f0f87f 100644 --- a/src/Adaptive.Cluster/Codecs/ClusterSessionEncoder.cs +++ b/src/Adaptive.Cluster/Codecs/ClusterSessionEncoder.cs @@ -128,64 +128,64 @@ public ClusterSessionEncoder ClusterSessionId(long value) } - public static int OpenedLogPositionEncodingOffset() + public static int CorrelationIdEncodingOffset() { return 8; } - public static int OpenedLogPositionEncodingLength() + public static int CorrelationIdEncodingLength() { return 8; } - public static long OpenedLogPositionNullValue() + public static long CorrelationIdNullValue() { return -9223372036854775808L; } - public static long OpenedLogPositionMinValue() + public static long CorrelationIdMinValue() { return -9223372036854775807L; } - public static long OpenedLogPositionMaxValue() + public static long CorrelationIdMaxValue() { return 9223372036854775807L; } - public ClusterSessionEncoder OpenedLogPosition(long value) + public ClusterSessionEncoder CorrelationId(long value) { _buffer.PutLong(_offset + 8, value, ByteOrder.LittleEndian); return this; } - public static int CorrelationIdEncodingOffset() + public static int OpenedLogPositionEncodingOffset() { return 16; } - public static int CorrelationIdEncodingLength() + public static int OpenedLogPositionEncodingLength() { return 8; } - public static long CorrelationIdNullValue() + public static long OpenedLogPositionNullValue() { return -9223372036854775808L; } - public static long CorrelationIdMinValue() + public static long OpenedLogPositionMinValue() { return -9223372036854775807L; } - public static long CorrelationIdMaxValue() + public static long OpenedLogPositionMaxValue() { return 9223372036854775807L; } - public ClusterSessionEncoder CorrelationId(long value) + public ClusterSessionEncoder OpenedLogPosition(long value) { _buffer.PutLong(_offset + 16, value, ByteOrder.LittleEndian); return this; diff --git a/src/Adaptive.Cluster/Codecs/ClusterChangeEventDecoder.cs b/src/Adaptive.Cluster/Codecs/MembershipChangeEventDecoder.cs similarity index 94% rename from src/Adaptive.Cluster/Codecs/ClusterChangeEventDecoder.cs rename to src/Adaptive.Cluster/Codecs/MembershipChangeEventDecoder.cs index 8eff585d..87d1ce9c 100644 --- a/src/Adaptive.Cluster/Codecs/ClusterChangeEventDecoder.cs +++ b/src/Adaptive.Cluster/Codecs/MembershipChangeEventDecoder.cs @@ -7,21 +7,21 @@ namespace Adaptive.Cluster.Codecs { -public class ClusterChangeEventDecoder +public class MembershipChangeEventDecoder { public const ushort BLOCK_LENGTH = 40; public const ushort TEMPLATE_ID = 26; public const ushort SCHEMA_ID = 1; public const ushort SCHEMA_VERSION = 1; - private ClusterChangeEventDecoder _parentMessage; + private MembershipChangeEventDecoder _parentMessage; private IDirectBuffer _buffer; protected int _offset; protected int _limit; protected int _actingBlockLength; protected int _actingVersion; - public ClusterChangeEventDecoder() + public MembershipChangeEventDecoder() { _parentMessage = this; } @@ -61,7 +61,7 @@ public int Offset() return _offset; } - public ClusterChangeEventDecoder Wrap( + public MembershipChangeEventDecoder Wrap( IDirectBuffer buffer, int offset, int actingBlockLength, int actingVersion) { this._buffer = buffer; @@ -358,27 +358,27 @@ public int ClusterSize() } - public static int EventTypeId() + public static int ChangeTypeId() { return 6; } - public static int EventTypeSinceVersion() + public static int ChangeTypeSinceVersion() { return 0; } - public static int EventTypeEncodingOffset() + public static int ChangeTypeEncodingOffset() { return 32; } - public static int EventTypeEncodingLength() + public static int ChangeTypeEncodingLength() { return 4; } - public static string EventTypeMetaAttribute(MetaAttribute metaAttribute) + public static string ChangeTypeMetaAttribute(MetaAttribute metaAttribute) { switch (metaAttribute) { @@ -391,7 +391,7 @@ public static string EventTypeMetaAttribute(MetaAttribute metaAttribute) return ""; } - public ChangeType EventType() + public ChangeType ChangeType() { return (ChangeType)_buffer.GetInt(_offset + 32, ByteOrder.LittleEndian); } @@ -536,7 +536,7 @@ public StringBuilder AppendTo(StringBuilder builder) { int originalLimit = Limit(); Limit(_offset + _actingBlockLength); - builder.Append("[ClusterChangeEvent](sbeTemplateId="); + builder.Append("[MembershipChangeEvent](sbeTemplateId="); builder.Append(TEMPLATE_ID); builder.Append("|sbeSchemaId="); builder.Append(SCHEMA_ID); @@ -580,10 +580,10 @@ public StringBuilder AppendTo(StringBuilder builder) builder.Append("ClusterSize="); builder.Append(ClusterSize()); builder.Append('|'); - //Token{signal=BEGIN_FIELD, name='eventType', referencedName='null', description='null', id=6, version=0, deprecated=0, encodedLength=0, offset=32, componentTokenCount=6, encoding=Encoding{presence=REQUIRED, primitiveType=null, byteOrder=LITTLE_ENDIAN, minValue=null, maxValue=null, nullValue=null, constValue=null, characterEncoding='null', epoch='unix', timeUnit=nanosecond, semanticType='null'}} + //Token{signal=BEGIN_FIELD, name='changeType', referencedName='null', description='null', id=6, version=0, deprecated=0, encodedLength=0, offset=32, componentTokenCount=6, encoding=Encoding{presence=REQUIRED, primitiveType=null, byteOrder=LITTLE_ENDIAN, minValue=null, maxValue=null, nullValue=null, constValue=null, characterEncoding='null', epoch='unix', timeUnit=nanosecond, semanticType='null'}} //Token{signal=BEGIN_ENUM, name='ChangeType', referencedName='null', description='Type of Cluster Change Event', id=-1, version=0, deprecated=0, encodedLength=4, offset=32, componentTokenCount=4, encoding=Encoding{presence=REQUIRED, primitiveType=INT32, byteOrder=LITTLE_ENDIAN, minValue=null, maxValue=null, nullValue=null, constValue=null, characterEncoding='null', epoch='null', timeUnit=null, semanticType='null'}} - builder.Append("EventType="); - builder.Append(EventType()); + builder.Append("ChangeType="); + builder.Append(ChangeType()); builder.Append('|'); //Token{signal=BEGIN_FIELD, name='memberId', referencedName='null', description='null', id=7, version=0, deprecated=0, encodedLength=0, offset=36, componentTokenCount=3, encoding=Encoding{presence=REQUIRED, primitiveType=null, byteOrder=LITTLE_ENDIAN, minValue=null, maxValue=null, nullValue=null, constValue=null, characterEncoding='null', epoch='unix', timeUnit=nanosecond, semanticType='null'}} //Token{signal=ENCODING, name='int32', referencedName='null', description='null', id=-1, version=0, deprecated=0, encodedLength=4, offset=36, componentTokenCount=1, encoding=Encoding{presence=REQUIRED, primitiveType=INT32, byteOrder=LITTLE_ENDIAN, minValue=null, maxValue=null, nullValue=null, constValue=null, characterEncoding='null', epoch='unix', timeUnit=nanosecond, semanticType='null'}} diff --git a/src/Adaptive.Cluster/Codecs/ClusterChangeEventEncoder.cs b/src/Adaptive.Cluster/Codecs/MembershipChangeEventEncoder.cs similarity index 86% rename from src/Adaptive.Cluster/Codecs/ClusterChangeEventEncoder.cs rename to src/Adaptive.Cluster/Codecs/MembershipChangeEventEncoder.cs index 93a9abc6..92b3303b 100644 --- a/src/Adaptive.Cluster/Codecs/ClusterChangeEventEncoder.cs +++ b/src/Adaptive.Cluster/Codecs/MembershipChangeEventEncoder.cs @@ -7,19 +7,19 @@ namespace Adaptive.Cluster.Codecs { -public class ClusterChangeEventEncoder +public class MembershipChangeEventEncoder { public const ushort BLOCK_LENGTH = 40; public const ushort TEMPLATE_ID = 26; public const ushort SCHEMA_ID = 1; public const ushort SCHEMA_VERSION = 1; - private ClusterChangeEventEncoder _parentMessage; + private MembershipChangeEventEncoder _parentMessage; private IMutableDirectBuffer _buffer; protected int _offset; protected int _limit; - public ClusterChangeEventEncoder() + public MembershipChangeEventEncoder() { _parentMessage = this; } @@ -59,7 +59,7 @@ public int Offset() return _offset; } - public ClusterChangeEventEncoder Wrap(IMutableDirectBuffer buffer, int offset) + public MembershipChangeEventEncoder Wrap(IMutableDirectBuffer buffer, int offset) { this._buffer = buffer; this._offset = offset; @@ -68,7 +68,7 @@ public ClusterChangeEventEncoder Wrap(IMutableDirectBuffer buffer, int offset) return this; } - public ClusterChangeEventEncoder WrapAndApplyHeader( + public MembershipChangeEventEncoder WrapAndApplyHeader( IMutableDirectBuffer buffer, int offset, MessageHeaderEncoder headerEncoder) { headerEncoder @@ -121,7 +121,7 @@ public static long LeadershipTermIdMaxValue() return 9223372036854775807L; } - public ClusterChangeEventEncoder LeadershipTermId(long value) + public MembershipChangeEventEncoder LeadershipTermId(long value) { _buffer.PutLong(_offset + 0, value, ByteOrder.LittleEndian); return this; @@ -153,7 +153,7 @@ public static long LogPositionMaxValue() return 9223372036854775807L; } - public ClusterChangeEventEncoder LogPosition(long value) + public MembershipChangeEventEncoder LogPosition(long value) { _buffer.PutLong(_offset + 8, value, ByteOrder.LittleEndian); return this; @@ -185,7 +185,7 @@ public static long TimestampMaxValue() return 9223372036854775807L; } - public ClusterChangeEventEncoder Timestamp(long value) + public MembershipChangeEventEncoder Timestamp(long value) { _buffer.PutLong(_offset + 16, value, ByteOrder.LittleEndian); return this; @@ -217,7 +217,7 @@ public static int LeaderMemberIdMaxValue() return 2147483647; } - public ClusterChangeEventEncoder LeaderMemberId(int value) + public MembershipChangeEventEncoder LeaderMemberId(int value) { _buffer.PutInt(_offset + 24, value, ByteOrder.LittleEndian); return this; @@ -249,24 +249,24 @@ public static int ClusterSizeMaxValue() return 2147483647; } - public ClusterChangeEventEncoder ClusterSize(int value) + public MembershipChangeEventEncoder ClusterSize(int value) { _buffer.PutInt(_offset + 28, value, ByteOrder.LittleEndian); return this; } - public static int EventTypeEncodingOffset() + public static int ChangeTypeEncodingOffset() { return 32; } - public static int EventTypeEncodingLength() + public static int ChangeTypeEncodingLength() { return 4; } - public ClusterChangeEventEncoder EventType(ChangeType value) + public MembershipChangeEventEncoder ChangeType(ChangeType value) { _buffer.PutInt(_offset + 32, (int)value, ByteOrder.LittleEndian); return this; @@ -297,7 +297,7 @@ public static int MemberIdMaxValue() return 2147483647; } - public ClusterChangeEventEncoder MemberId(int value) + public MembershipChangeEventEncoder MemberId(int value) { _buffer.PutInt(_offset + 36, value, ByteOrder.LittleEndian); return this; @@ -332,7 +332,7 @@ public static int ClusterMembersHeaderLength() return 4; } - public ClusterChangeEventEncoder PutClusterMembers(IDirectBuffer src, int srcOffset, int length) + public MembershipChangeEventEncoder PutClusterMembers(IDirectBuffer src, int srcOffset, int length) { if (length > 1073741824) { @@ -348,7 +348,7 @@ public ClusterChangeEventEncoder PutClusterMembers(IDirectBuffer src, int srcOff return this; } - public ClusterChangeEventEncoder PutClusterMembers(byte[] src, int srcOffset, int length) + public MembershipChangeEventEncoder PutClusterMembers(byte[] src, int srcOffset, int length) { if (length > 1073741824) { @@ -364,7 +364,7 @@ public ClusterChangeEventEncoder PutClusterMembers(byte[] src, int srcOffset, in return this; } - public ClusterChangeEventEncoder ClusterMembers(string value) + public MembershipChangeEventEncoder ClusterMembers(string value) { int length = value.Length; if (length > 1073741824) @@ -389,7 +389,7 @@ public override string ToString() public StringBuilder AppendTo(StringBuilder builder) { - ClusterChangeEventDecoder writer = new ClusterChangeEventDecoder(); + MembershipChangeEventDecoder writer = new MembershipChangeEventDecoder(); writer.Wrap(_buffer, _offset, BLOCK_LENGTH, SCHEMA_VERSION); return writer.AppendTo(builder); diff --git a/src/Adaptive.Cluster/Codecs/NewLeadershipTermDecoder.cs b/src/Adaptive.Cluster/Codecs/NewLeadershipTermDecoder.cs index cef883d0..5aa577e1 100644 --- a/src/Adaptive.Cluster/Codecs/NewLeadershipTermDecoder.cs +++ b/src/Adaptive.Cluster/Codecs/NewLeadershipTermDecoder.cs @@ -9,7 +9,7 @@ namespace Adaptive.Cluster.Codecs { public class NewLeadershipTermDecoder { - public const ushort BLOCK_LENGTH = 32; + public const ushort BLOCK_LENGTH = 40; public const ushort TEMPLATE_ID = 53; public const ushort SCHEMA_ID = 1; public const ushort SCHEMA_VERSION = 1; @@ -250,11 +250,65 @@ public long LeadershipTermId() } - public static int LeaderMemberIdId() + public static int MaxLogPositionId() { return 4; } + public static int MaxLogPositionSinceVersion() + { + return 0; + } + + public static int MaxLogPositionEncodingOffset() + { + return 24; + } + + public static int MaxLogPositionEncodingLength() + { + return 8; + } + + public static string MaxLogPositionMetaAttribute(MetaAttribute metaAttribute) + { + switch (metaAttribute) + { + case MetaAttribute.EPOCH: return "unix"; + case MetaAttribute.TIME_UNIT: return "nanosecond"; + case MetaAttribute.SEMANTIC_TYPE: return ""; + case MetaAttribute.PRESENCE: return "required"; + } + + return ""; + } + + public static long MaxLogPositionNullValue() + { + return -9223372036854775808L; + } + + public static long MaxLogPositionMinValue() + { + return -9223372036854775807L; + } + + public static long MaxLogPositionMaxValue() + { + return 9223372036854775807L; + } + + public long MaxLogPosition() + { + return _buffer.GetLong(_offset + 24, ByteOrder.LittleEndian); + } + + + public static int LeaderMemberIdId() + { + return 5; + } + public static int LeaderMemberIdSinceVersion() { return 0; @@ -262,7 +316,7 @@ public static int LeaderMemberIdSinceVersion() public static int LeaderMemberIdEncodingOffset() { - return 24; + return 32; } public static int LeaderMemberIdEncodingLength() @@ -300,13 +354,13 @@ public static int LeaderMemberIdMaxValue() public int LeaderMemberId() { - return _buffer.GetInt(_offset + 24, ByteOrder.LittleEndian); + return _buffer.GetInt(_offset + 32, ByteOrder.LittleEndian); } public static int LogSessionIdId() { - return 5; + return 6; } public static int LogSessionIdSinceVersion() @@ -316,7 +370,7 @@ public static int LogSessionIdSinceVersion() public static int LogSessionIdEncodingOffset() { - return 28; + return 36; } public static int LogSessionIdEncodingLength() @@ -354,7 +408,7 @@ public static int LogSessionIdMaxValue() public int LogSessionId() { - return _buffer.GetInt(_offset + 28, ByteOrder.LittleEndian); + return _buffer.GetInt(_offset + 36, ByteOrder.LittleEndian); } @@ -402,13 +456,18 @@ public StringBuilder AppendTo(StringBuilder builder) builder.Append("LeadershipTermId="); builder.Append(LeadershipTermId()); builder.Append('|'); - //Token{signal=BEGIN_FIELD, name='leaderMemberId', referencedName='null', description='null', id=4, version=0, deprecated=0, encodedLength=0, offset=24, componentTokenCount=3, encoding=Encoding{presence=REQUIRED, primitiveType=null, byteOrder=LITTLE_ENDIAN, minValue=null, maxValue=null, nullValue=null, constValue=null, characterEncoding='null', epoch='unix', timeUnit=nanosecond, semanticType='null'}} - //Token{signal=ENCODING, name='int32', referencedName='null', description='null', id=-1, version=0, deprecated=0, encodedLength=4, offset=24, componentTokenCount=1, encoding=Encoding{presence=REQUIRED, primitiveType=INT32, byteOrder=LITTLE_ENDIAN, minValue=null, maxValue=null, nullValue=null, constValue=null, characterEncoding='null', epoch='unix', timeUnit=nanosecond, semanticType='null'}} + //Token{signal=BEGIN_FIELD, name='maxLogPosition', referencedName='null', description='null', id=4, version=0, deprecated=0, encodedLength=0, offset=24, componentTokenCount=3, encoding=Encoding{presence=REQUIRED, primitiveType=null, byteOrder=LITTLE_ENDIAN, minValue=null, maxValue=null, nullValue=null, constValue=null, characterEncoding='null', epoch='unix', timeUnit=nanosecond, semanticType='null'}} + //Token{signal=ENCODING, name='int64', referencedName='null', description='null', id=-1, version=0, deprecated=0, encodedLength=8, offset=24, componentTokenCount=1, encoding=Encoding{presence=REQUIRED, primitiveType=INT64, byteOrder=LITTLE_ENDIAN, minValue=null, maxValue=null, nullValue=null, constValue=null, characterEncoding='null', epoch='unix', timeUnit=nanosecond, semanticType='null'}} + builder.Append("MaxLogPosition="); + builder.Append(MaxLogPosition()); + builder.Append('|'); + //Token{signal=BEGIN_FIELD, name='leaderMemberId', referencedName='null', description='null', id=5, version=0, deprecated=0, encodedLength=0, offset=32, componentTokenCount=3, encoding=Encoding{presence=REQUIRED, primitiveType=null, byteOrder=LITTLE_ENDIAN, minValue=null, maxValue=null, nullValue=null, constValue=null, characterEncoding='null', epoch='unix', timeUnit=nanosecond, semanticType='null'}} + //Token{signal=ENCODING, name='int32', referencedName='null', description='null', id=-1, version=0, deprecated=0, encodedLength=4, offset=32, componentTokenCount=1, encoding=Encoding{presence=REQUIRED, primitiveType=INT32, byteOrder=LITTLE_ENDIAN, minValue=null, maxValue=null, nullValue=null, constValue=null, characterEncoding='null', epoch='unix', timeUnit=nanosecond, semanticType='null'}} builder.Append("LeaderMemberId="); builder.Append(LeaderMemberId()); builder.Append('|'); - //Token{signal=BEGIN_FIELD, name='logSessionId', referencedName='null', description='null', id=5, version=0, deprecated=0, encodedLength=0, offset=28, componentTokenCount=3, encoding=Encoding{presence=REQUIRED, primitiveType=null, byteOrder=LITTLE_ENDIAN, minValue=null, maxValue=null, nullValue=null, constValue=null, characterEncoding='null', epoch='unix', timeUnit=nanosecond, semanticType='null'}} - //Token{signal=ENCODING, name='int32', referencedName='null', description='null', id=-1, version=0, deprecated=0, encodedLength=4, offset=28, componentTokenCount=1, encoding=Encoding{presence=REQUIRED, primitiveType=INT32, byteOrder=LITTLE_ENDIAN, minValue=null, maxValue=null, nullValue=null, constValue=null, characterEncoding='null', epoch='unix', timeUnit=nanosecond, semanticType='null'}} + //Token{signal=BEGIN_FIELD, name='logSessionId', referencedName='null', description='null', id=6, version=0, deprecated=0, encodedLength=0, offset=36, componentTokenCount=3, encoding=Encoding{presence=REQUIRED, primitiveType=null, byteOrder=LITTLE_ENDIAN, minValue=null, maxValue=null, nullValue=null, constValue=null, characterEncoding='null', epoch='unix', timeUnit=nanosecond, semanticType='null'}} + //Token{signal=ENCODING, name='int32', referencedName='null', description='null', id=-1, version=0, deprecated=0, encodedLength=4, offset=36, componentTokenCount=1, encoding=Encoding{presence=REQUIRED, primitiveType=INT32, byteOrder=LITTLE_ENDIAN, minValue=null, maxValue=null, nullValue=null, constValue=null, characterEncoding='null', epoch='unix', timeUnit=nanosecond, semanticType='null'}} builder.Append("LogSessionId="); builder.Append(LogSessionId()); diff --git a/src/Adaptive.Cluster/Codecs/NewLeadershipTermEncoder.cs b/src/Adaptive.Cluster/Codecs/NewLeadershipTermEncoder.cs index 33e64918..eca4579b 100644 --- a/src/Adaptive.Cluster/Codecs/NewLeadershipTermEncoder.cs +++ b/src/Adaptive.Cluster/Codecs/NewLeadershipTermEncoder.cs @@ -9,7 +9,7 @@ namespace Adaptive.Cluster.Codecs { public class NewLeadershipTermEncoder { - public const ushort BLOCK_LENGTH = 32; + public const ushort BLOCK_LENGTH = 40; public const ushort TEMPLATE_ID = 53; public const ushort SCHEMA_ID = 1; public const ushort SCHEMA_VERSION = 1; @@ -192,11 +192,43 @@ public NewLeadershipTermEncoder LeadershipTermId(long value) } - public static int LeaderMemberIdEncodingOffset() + public static int MaxLogPositionEncodingOffset() { return 24; } + public static int MaxLogPositionEncodingLength() + { + return 8; + } + + public static long MaxLogPositionNullValue() + { + return -9223372036854775808L; + } + + public static long MaxLogPositionMinValue() + { + return -9223372036854775807L; + } + + public static long MaxLogPositionMaxValue() + { + return 9223372036854775807L; + } + + public NewLeadershipTermEncoder MaxLogPosition(long value) + { + _buffer.PutLong(_offset + 24, value, ByteOrder.LittleEndian); + return this; + } + + + public static int LeaderMemberIdEncodingOffset() + { + return 32; + } + public static int LeaderMemberIdEncodingLength() { return 4; @@ -219,14 +251,14 @@ public static int LeaderMemberIdMaxValue() public NewLeadershipTermEncoder LeaderMemberId(int value) { - _buffer.PutInt(_offset + 24, value, ByteOrder.LittleEndian); + _buffer.PutInt(_offset + 32, value, ByteOrder.LittleEndian); return this; } public static int LogSessionIdEncodingOffset() { - return 28; + return 36; } public static int LogSessionIdEncodingLength() @@ -251,7 +283,7 @@ public static int LogSessionIdMaxValue() public NewLeadershipTermEncoder LogSessionId(int value) { - _buffer.PutInt(_offset + 28, value, ByteOrder.LittleEndian); + _buffer.PutInt(_offset + 36, value, ByteOrder.LittleEndian); return this; } diff --git a/src/Adaptive.Cluster/Codecs/StopCatchupDecoder.cs b/src/Adaptive.Cluster/Codecs/StopCatchupDecoder.cs index bf9c27e4..7c7a8a64 100644 --- a/src/Adaptive.Cluster/Codecs/StopCatchupDecoder.cs +++ b/src/Adaptive.Cluster/Codecs/StopCatchupDecoder.cs @@ -9,7 +9,7 @@ namespace Adaptive.Cluster.Codecs { public class StopCatchupDecoder { - public const ushort BLOCK_LENGTH = 8; + public const ushort BLOCK_LENGTH = 20; public const ushort TEMPLATE_ID = 57; public const ushort SCHEMA_ID = 1; public const ushort SCHEMA_VERSION = 1; @@ -88,27 +88,27 @@ public void Limit(int limit) this._limit = limit; } - public static int ReplaySessionIdId() + public static int LeadershipTermIdId() { return 1; } - public static int ReplaySessionIdSinceVersion() + public static int LeadershipTermIdSinceVersion() { return 0; } - public static int ReplaySessionIdEncodingOffset() + public static int LeadershipTermIdEncodingOffset() { return 0; } - public static int ReplaySessionIdEncodingLength() + public static int LeadershipTermIdEncodingLength() { - return 4; + return 8; } - public static string ReplaySessionIdMetaAttribute(MetaAttribute metaAttribute) + public static string LeadershipTermIdMetaAttribute(MetaAttribute metaAttribute) { switch (metaAttribute) { @@ -121,32 +121,86 @@ public static string ReplaySessionIdMetaAttribute(MetaAttribute metaAttribute) return ""; } - public static int ReplaySessionIdNullValue() + public static long LeadershipTermIdNullValue() { - return -2147483648; + return -9223372036854775808L; } - public static int ReplaySessionIdMinValue() + public static long LeadershipTermIdMinValue() { - return -2147483647; + return -9223372036854775807L; } - public static int ReplaySessionIdMaxValue() + public static long LeadershipTermIdMaxValue() { - return 2147483647; + return 9223372036854775807L; } - public int ReplaySessionId() + public long LeadershipTermId() { - return _buffer.GetInt(_offset + 0, ByteOrder.LittleEndian); + return _buffer.GetLong(_offset + 0, ByteOrder.LittleEndian); } - public static int FollowerMemberIdId() + public static int LogPositionId() { return 2; } + public static int LogPositionSinceVersion() + { + return 0; + } + + public static int LogPositionEncodingOffset() + { + return 8; + } + + public static int LogPositionEncodingLength() + { + return 8; + } + + public static string LogPositionMetaAttribute(MetaAttribute metaAttribute) + { + switch (metaAttribute) + { + case MetaAttribute.EPOCH: return "unix"; + case MetaAttribute.TIME_UNIT: return "nanosecond"; + case MetaAttribute.SEMANTIC_TYPE: return ""; + case MetaAttribute.PRESENCE: return "required"; + } + + return ""; + } + + public static long LogPositionNullValue() + { + return -9223372036854775808L; + } + + public static long LogPositionMinValue() + { + return -9223372036854775807L; + } + + public static long LogPositionMaxValue() + { + return 9223372036854775807L; + } + + public long LogPosition() + { + return _buffer.GetLong(_offset + 8, ByteOrder.LittleEndian); + } + + + public static int FollowerMemberIdId() + { + return 3; + } + public static int FollowerMemberIdSinceVersion() { return 0; @@ -154,7 +208,7 @@ public static int FollowerMemberIdSinceVersion() public static int FollowerMemberIdEncodingOffset() { - return 4; + return 16; } public static int FollowerMemberIdEncodingLength() @@ -192,7 +246,7 @@ public static int FollowerMemberIdMaxValue() public int FollowerMemberId() { - return _buffer.GetInt(_offset + 4, ByteOrder.LittleEndian); + return _buffer.GetInt(_offset + 16, ByteOrder.LittleEndian); } @@ -225,13 +279,18 @@ public StringBuilder AppendTo(StringBuilder builder) } builder.Append(BLOCK_LENGTH); builder.Append("):"); - //Token{signal=BEGIN_FIELD, name='replaySessionId', referencedName='null', description='null', id=1, version=0, deprecated=0, encodedLength=0, offset=0, componentTokenCount=3, encoding=Encoding{presence=REQUIRED, primitiveType=null, byteOrder=LITTLE_ENDIAN, minValue=null, maxValue=null, nullValue=null, constValue=null, characterEncoding='null', epoch='unix', timeUnit=nanosecond, semanticType='null'}} - //Token{signal=ENCODING, name='int32', referencedName='null', description='null', id=-1, version=0, deprecated=0, encodedLength=4, offset=0, componentTokenCount=1, encoding=Encoding{presence=REQUIRED, primitiveType=INT32, byteOrder=LITTLE_ENDIAN, minValue=null, maxValue=null, nullValue=null, constValue=null, characterEncoding='null', epoch='unix', timeUnit=nanosecond, semanticType='null'}} - builder.Append("ReplaySessionId="); - builder.Append(ReplaySessionId()); + //Token{signal=BEGIN_FIELD, name='leadershipTermId', referencedName='null', description='null', id=1, version=0, deprecated=0, encodedLength=0, offset=0, componentTokenCount=3, encoding=Encoding{presence=REQUIRED, primitiveType=null, byteOrder=LITTLE_ENDIAN, minValue=null, maxValue=null, nullValue=null, constValue=null, characterEncoding='null', epoch='unix', timeUnit=nanosecond, semanticType='null'}} + //Token{signal=ENCODING, name='int64', referencedName='null', description='null', id=-1, version=0, deprecated=0, encodedLength=8, offset=0, componentTokenCount=1, encoding=Encoding{presence=REQUIRED, primitiveType=INT64, byteOrder=LITTLE_ENDIAN, minValue=null, maxValue=null, nullValue=null, constValue=null, characterEncoding='null', epoch='unix', timeUnit=nanosecond, semanticType='null'}} + builder.Append("LeadershipTermId="); + builder.Append(LeadershipTermId()); + builder.Append('|'); + //Token{signal=BEGIN_FIELD, name='logPosition', referencedName='null', description='null', id=2, version=0, deprecated=0, encodedLength=0, offset=8, componentTokenCount=3, encoding=Encoding{presence=REQUIRED, primitiveType=null, byteOrder=LITTLE_ENDIAN, minValue=null, maxValue=null, nullValue=null, constValue=null, characterEncoding='null', epoch='unix', timeUnit=nanosecond, semanticType='null'}} + //Token{signal=ENCODING, name='int64', referencedName='null', description='null', id=-1, version=0, deprecated=0, encodedLength=8, offset=8, componentTokenCount=1, encoding=Encoding{presence=REQUIRED, primitiveType=INT64, byteOrder=LITTLE_ENDIAN, minValue=null, maxValue=null, nullValue=null, constValue=null, characterEncoding='null', epoch='unix', timeUnit=nanosecond, semanticType='null'}} + builder.Append("LogPosition="); + builder.Append(LogPosition()); builder.Append('|'); - //Token{signal=BEGIN_FIELD, name='followerMemberId', referencedName='null', description='null', id=2, version=0, deprecated=0, encodedLength=0, offset=4, componentTokenCount=3, encoding=Encoding{presence=REQUIRED, primitiveType=null, byteOrder=LITTLE_ENDIAN, minValue=null, maxValue=null, nullValue=null, constValue=null, characterEncoding='null', epoch='unix', timeUnit=nanosecond, semanticType='null'}} - //Token{signal=ENCODING, name='int32', referencedName='null', description='null', id=-1, version=0, deprecated=0, encodedLength=4, offset=4, componentTokenCount=1, encoding=Encoding{presence=REQUIRED, primitiveType=INT32, byteOrder=LITTLE_ENDIAN, minValue=null, maxValue=null, nullValue=null, constValue=null, characterEncoding='null', epoch='unix', timeUnit=nanosecond, semanticType='null'}} + //Token{signal=BEGIN_FIELD, name='followerMemberId', referencedName='null', description='null', id=3, version=0, deprecated=0, encodedLength=0, offset=16, componentTokenCount=3, encoding=Encoding{presence=REQUIRED, primitiveType=null, byteOrder=LITTLE_ENDIAN, minValue=null, maxValue=null, nullValue=null, constValue=null, characterEncoding='null', epoch='unix', timeUnit=nanosecond, semanticType='null'}} + //Token{signal=ENCODING, name='int32', referencedName='null', description='null', id=-1, version=0, deprecated=0, encodedLength=4, offset=16, componentTokenCount=1, encoding=Encoding{presence=REQUIRED, primitiveType=INT32, byteOrder=LITTLE_ENDIAN, minValue=null, maxValue=null, nullValue=null, constValue=null, characterEncoding='null', epoch='unix', timeUnit=nanosecond, semanticType='null'}} builder.Append("FollowerMemberId="); builder.Append(FollowerMemberId()); diff --git a/src/Adaptive.Cluster/Codecs/StopCatchupEncoder.cs b/src/Adaptive.Cluster/Codecs/StopCatchupEncoder.cs index e3876b4b..285b737c 100644 --- a/src/Adaptive.Cluster/Codecs/StopCatchupEncoder.cs +++ b/src/Adaptive.Cluster/Codecs/StopCatchupEncoder.cs @@ -9,7 +9,7 @@ namespace Adaptive.Cluster.Codecs { public class StopCatchupEncoder { - public const ushort BLOCK_LENGTH = 8; + public const ushort BLOCK_LENGTH = 20; public const ushort TEMPLATE_ID = 57; public const ushort SCHEMA_ID = 1; public const ushort SCHEMA_VERSION = 1; @@ -96,41 +96,73 @@ public void Limit(int limit) this._limit = limit; } - public static int ReplaySessionIdEncodingOffset() + public static int LeadershipTermIdEncodingOffset() { return 0; } - public static int ReplaySessionIdEncodingLength() + public static int LeadershipTermIdEncodingLength() { - return 4; + return 8; } - public static int ReplaySessionIdNullValue() + public static long LeadershipTermIdNullValue() { - return -2147483648; + return -9223372036854775808L; } - public static int ReplaySessionIdMinValue() + public static long LeadershipTermIdMinValue() { - return -2147483647; + return -9223372036854775807L; } - public static int ReplaySessionIdMaxValue() + public static long LeadershipTermIdMaxValue() { - return 2147483647; + return 9223372036854775807L; } - public StopCatchupEncoder ReplaySessionId(int value) + public StopCatchupEncoder LeadershipTermId(long value) { - _buffer.PutInt(_offset + 0, value, ByteOrder.LittleEndian); + _buffer.PutLong(_offset + 0, value, ByteOrder.LittleEndian); + return this; + } + + + public static int LogPositionEncodingOffset() + { + return 8; + } + + public static int LogPositionEncodingLength() + { + return 8; + } + + public static long LogPositionNullValue() + { + return -9223372036854775808L; + } + + public static long LogPositionMinValue() + { + return -9223372036854775807L; + } + + public static long LogPositionMaxValue() + { + return 9223372036854775807L; + } + + public StopCatchupEncoder LogPosition(long value) + { + _buffer.PutLong(_offset + 8, value, ByteOrder.LittleEndian); return this; } public static int FollowerMemberIdEncodingOffset() { - return 4; + return 16; } public static int FollowerMemberIdEncodingLength() @@ -155,7 +187,7 @@ public static int FollowerMemberIdMaxValue() public StopCatchupEncoder FollowerMemberId(int value) { - _buffer.PutInt(_offset + 4, value, ByteOrder.LittleEndian); + _buffer.PutInt(_offset + 16, value, ByteOrder.LittleEndian); return this; } diff --git a/src/Adaptive.Cluster/Service/BoundedLogAdapter.cs b/src/Adaptive.Cluster/Service/BoundedLogAdapter.cs index 5d4200b9..fa28ea88 100644 --- a/src/Adaptive.Cluster/Service/BoundedLogAdapter.cs +++ b/src/Adaptive.Cluster/Service/BoundedLogAdapter.cs @@ -26,7 +26,7 @@ internal sealed class BoundedLogAdapter : IControlledFragmentHandler, IDisposabl private readonly TimerEventDecoder timerEventDecoder = new TimerEventDecoder(); private readonly ClusterActionRequestDecoder actionRequestDecoder = new ClusterActionRequestDecoder(); private readonly NewLeadershipTermEventDecoder newLeadershipTermEventDecoder = new NewLeadershipTermEventDecoder(); - private readonly ClusterChangeEventDecoder clusterChangeEventDecoder = new ClusterChangeEventDecoder(); + private readonly MembershipChangeEventDecoder membershipChangeEventDecoder = new MembershipChangeEventDecoder(); private readonly Image image; private readonly ReadableCounter upperBound; @@ -158,23 +158,23 @@ public ControlledFragmentHandlerAction OnFragment(IDirectBuffer buffer, int offs newLeadershipTermEventDecoder.LogSessionId()); break; - case ClusterChangeEventDecoder.TEMPLATE_ID: - clusterChangeEventDecoder.Wrap( + case MembershipChangeEventDecoder.TEMPLATE_ID: + membershipChangeEventDecoder.Wrap( buffer, offset + MessageHeaderDecoder.ENCODED_LENGTH, messageHeaderDecoder.BlockLength(), messageHeaderDecoder.Version() ); - agent.OnClusterChange( - clusterChangeEventDecoder.LeadershipTermId(), - clusterChangeEventDecoder.LogPosition(), - clusterChangeEventDecoder.Timestamp(), - clusterChangeEventDecoder.LeaderMemberId(), - clusterChangeEventDecoder.ClusterSize(), - clusterChangeEventDecoder.EventType(), - clusterChangeEventDecoder.MemberId(), - clusterChangeEventDecoder.ClusterMembers()); + agent.OnMembershipChange( + membershipChangeEventDecoder.LeadershipTermId(), + membershipChangeEventDecoder.LogPosition(), + membershipChangeEventDecoder.Timestamp(), + membershipChangeEventDecoder.LeaderMemberId(), + membershipChangeEventDecoder.ClusterSize(), + membershipChangeEventDecoder.ChangeType(), + membershipChangeEventDecoder.MemberId(), + membershipChangeEventDecoder.ClusterMembers()); break; } diff --git a/src/Adaptive.Cluster/Service/ClusteredServiceAgent.cs b/src/Adaptive.Cluster/Service/ClusteredServiceAgent.cs index 924c6202..cbcd1f87 100644 --- a/src/Adaptive.Cluster/Service/ClusteredServiceAgent.cs +++ b/src/Adaptive.Cluster/Service/ClusteredServiceAgent.cs @@ -316,19 +316,19 @@ internal void OnNewLeadershipTermEvent( clusterTimeMs = timestampMs; } - internal void OnClusterChange( + internal void OnMembershipChange( long leadershipTermId, long logPosition, long timestampMs, int leaderMemberId, int clusterSize, - ChangeType eventType, + ChangeType changeType, int memberId, string clusterMembers) { clusterTimeMs = timestampMs; - if (memberId == this.memberId && eventType == ChangeType.LEAVE) + if (memberId == this.memberId && changeType == ChangeType.QUIT) { _consensusModuleProxy.Ack(logPosition, ackId++, serviceId); ctx.TerminationHook().Invoke(); diff --git a/src/Adaptive.Cluster/Service/ClusteredServiceContainer.cs b/src/Adaptive.Cluster/Service/ClusteredServiceContainer.cs index 321ea8e6..dbd3a1a5 100644 --- a/src/Adaptive.Cluster/Service/ClusteredServiceContainer.cs +++ b/src/Adaptive.Cluster/Service/ClusteredServiceContainer.cs @@ -42,7 +42,7 @@ public static void Main(string[] args) private ClusteredServiceContainer(Context ctx) { this.ctx = ctx; - + try { ctx.Conclude(); @@ -181,7 +181,7 @@ public class Configuration /// Default stream id within a channel for communications from the services to the consensus module. /// public const int CONSENSUS_MODULE_STREAM_ID_DEFAULT = 105; - + /// /// Default channel to be used for archiving snapshots. /// @@ -231,7 +231,7 @@ public class Configuration /// Default to true that this a responding service to client requests. /// public const bool RESPONDER_SERVICE_DEFAULT = true; - + /// /// The value or system property if set. /// @@ -280,7 +280,7 @@ public static string ServiceControlChannel() { return Config.GetProperty(SERVICE_CONTROL_CHANNEL_PROP_NAME, SERVICE_CONTROL_CHANNEL_DEFAULT); } - + /// /// The value or system property /// if set. @@ -302,7 +302,7 @@ public static int ServiceStreamId() { return Config.GetInteger(SERVICE_STREAM_ID_PROP_NAME, SERVICE_CONTROL_STREAM_ID_DEFAULT); } - + /// /// The value or system property if set. /// @@ -357,7 +357,7 @@ public static int ErrorBufferLength() { return Config.GetSizeAsInt(ERROR_BUFFER_LENGTH_PROP_NAME, ERROR_BUFFER_LENGTH_DEFAULT); } - + /// /// The value or system property if set. /// @@ -387,7 +387,7 @@ public class Context : IDisposable private int snapshotStreamId = Configuration.SnapshotStreamId(); private int errorBufferLength = Configuration.ErrorBufferLength(); private bool isRespondingService = Configuration.IsRespondingService(); - + private IThreadFactory threadFactory; private Func idleStrategySupplier; private IEpochClock epochClock; @@ -422,7 +422,7 @@ public void Conclude() { throw new ConfigurationException("service id must be not be negative: " + serviceId); } - + if (null == threadFactory) { threadFactory = new DefaultThreadFactory(); @@ -452,10 +452,7 @@ public void Conclude() { markFile = new ClusterMarkFile( new FileInfo(Path.Combine(clusterDir.FullName, ClusterMarkFile.MarkFilenameForService(serviceId))), - ClusterComponentType.CONTAINER, - errorBufferLength, - epochClock, - 0); + ClusterComponentType.CONTAINER, errorBufferLength, epochClock, 0); } if (null == errorLog) @@ -685,7 +682,7 @@ public int ConsensusModuleStreamId() { return consensusModuleStreamId; } - + /// /// Set the channel parameter for snapshot recordings. /// @@ -729,7 +726,7 @@ public int SnapshotStreamId() { return snapshotStreamId; } - + /// /// Set if this a service that responds to client requests. /// diff --git a/src/Adaptive.Cluster/aeron-cluster-codecs.xml b/src/Adaptive.Cluster/aeron-cluster-codecs.xml index e34192a0..e7a7b561 100644 --- a/src/Adaptive.Cluster/aeron-cluster-codecs.xml +++ b/src/Adaptive.Cluster/aeron-cluster-codecs.xml @@ -54,7 +54,7 @@ 0 - 1 + 1 @@ -63,7 +63,7 @@ Cluster Session Protocol ======================== - Protocol is: + Session Protocol: -> session-connect, [*ingress-message | *session-keep-alive], session-close \ <- *session-event, [*egress-message | *session-event | *new-leader-event] @@ -252,17 +252,17 @@ - + description="Event for the change of the cluster membership that affects the cluster size"> - + - + @@ -383,8 +383,9 @@ - - + + + @@ -415,9 +416,10 @@ - - + description="The leader informs the follower it can stop the catchup process"> + + + - - + +