diff --git a/RELEASE.md b/RELEASE.md index d638b4b6..3a61c97d 100644 --- a/RELEASE.md +++ b/RELEASE.md @@ -1,4 +1,4 @@ #### Port Aeron.NET has been ported against Java version: -- Agrona: 0.9.29 -- Aeron: 1.14.0 +- Agrona: 0.9.30 +- Aeron: 1.15.0 diff --git a/driver/Aeron.Driver.nuspec b/driver/Aeron.Driver.nuspec index 281d4dea..37080274 100644 --- a/driver/Aeron.Driver.nuspec +++ b/driver/Aeron.Driver.nuspec @@ -2,7 +2,7 @@ Aeron.Driver - 1.14.0 + 1.15.0 Aeron Driver Adaptive Financial Consulting Ltd. Adaptive Financial Consulting Ltd. diff --git a/driver/media-driver.jar b/driver/media-driver.jar index 38a72b95..2ea04179 100644 Binary files a/driver/media-driver.jar and b/driver/media-driver.jar differ diff --git a/src/Adaptive.Aeron.Tests/ClientConductorTest.cs b/src/Adaptive.Aeron.Tests/ClientConductorTest.cs index c7f57492..0fabf691 100644 --- a/src/Adaptive.Aeron.Tests/ClientConductorTest.cs +++ b/src/Adaptive.Aeron.Tests/ClientConductorTest.cs @@ -59,11 +59,13 @@ public class ClientConductorTest private SubscriptionReadyFlyweight SubscriptionReady; private OperationSucceededFlyweight OperationSuccess; private ErrorResponseFlyweight ErrorResponse; + private ClientTimeoutFlyweight ClientTimeout; private UnsafeBuffer PublicationReadyBuffer; private UnsafeBuffer SubscriptionReadyBuffer; private UnsafeBuffer OperationSuccessBuffer; private UnsafeBuffer ErrorMessageBuffer; + private UnsafeBuffer ClientTimeoutBuffer; private CopyBroadcastReceiver MockToClientReceiver; @@ -98,11 +100,13 @@ public void SetUp() SubscriptionReady = new SubscriptionReadyFlyweight(); OperationSuccess = new OperationSucceededFlyweight(); ErrorResponse = new ErrorResponseFlyweight(); + ClientTimeout = new ClientTimeoutFlyweight(); PublicationReadyBuffer = new UnsafeBuffer(new byte[SEND_BUFFER_CAPACITY]); SubscriptionReadyBuffer = new UnsafeBuffer(new byte[SEND_BUFFER_CAPACITY]); OperationSuccessBuffer = new UnsafeBuffer(new byte[SEND_BUFFER_CAPACITY]); ErrorMessageBuffer = new UnsafeBuffer(new byte[SEND_BUFFER_CAPACITY]); + ClientTimeoutBuffer = new UnsafeBuffer(new byte[SEND_BUFFER_CAPACITY]); CounterValuesBuffer = new UnsafeBuffer(new byte[COUNTER_BUFFER_LENGTH]); MockToClientReceiver = A.Fake(); @@ -144,6 +148,7 @@ public void SetUp() SubscriptionReady.Wrap(SubscriptionReadyBuffer, 0); OperationSuccess.Wrap(OperationSuccessBuffer, 0); ErrorResponse.Wrap(ErrorMessageBuffer, 0); + ClientTimeout.Wrap(ClientTimeoutBuffer, 0); PublicationReady.CorrelationId(CORRELATION_ID); PublicationReady.RegistrationId(CORRELATION_ID); @@ -497,9 +502,51 @@ public void ShouldTimeoutInterServiceIfTooLongBetweenDoWorkCalls() A.CallTo(() => MockClientErrorHandler(A._)).MustHaveHappened(); - Assert.True(Conductor.IsClosed()); + Assert.True(Conductor.IsTerminating()); } + [Test] + public void ShouldTerminateAndErrorOnClientTimeoutFromDriver() + { + SuppressPrintError = true; + + Conductor.OnClientTimeout(); + + A.CallTo(() => MockClientErrorHandler.Invoke(A._)).MustHaveHappened(); + + bool threwException = false; + try + { + Conductor.DoWork(); + } + catch (AgentTerminationException) + { + threwException = true; + } + + Assert.True(threwException); + Assert.True(Conductor.IsTerminating()); + } + + [Test] + public void ShouldNotCloseAndErrorOnClientTimeoutForAnotherClientIdFromDriver() + { + WhenReceiveBroadcastOnMessage( + ControlProtocolEvents.ON_CLIENT_TIMEOUT, + ClientTimeoutBuffer, + buffer => + { + ClientTimeout.ClientId(Conductor.DriverListenerAdapter().ClientId + 1); + return ClientTimeoutFlyweight.LENGTH; + }); + + Conductor.DoWork(); + + A.CallTo(() => MockClientErrorHandler.Invoke(A._)).MustNotHaveHappened(); + + Assert.False(Conductor.IsClosed()); + } + private void WhenReceiveBroadcastOnMessage(int msgTypeId, IMutableDirectBuffer buffer, Func filler) { A.CallTo(() => MockToClientReceiver.Receive(A._)).Invokes(() => diff --git a/src/Adaptive.Aeron/Adaptive.Aeron.csproj b/src/Adaptive.Aeron/Adaptive.Aeron.csproj index 9fae7e52..f1c4ec11 100644 --- a/src/Adaptive.Aeron/Adaptive.Aeron.csproj +++ b/src/Adaptive.Aeron/Adaptive.Aeron.csproj @@ -2,7 +2,7 @@ netstandard2.0;net45 Aeron.Client - 1.14.0 + 1.15.0 Adaptive Financial Consulting Ltd. Adaptive Financial Consulting Ltd. Aeron Client diff --git a/src/Adaptive.Aeron/Aeron.cs b/src/Adaptive.Aeron/Aeron.cs index a58d01cd..be6ab779 100644 --- a/src/Adaptive.Aeron/Aeron.cs +++ b/src/Adaptive.Aeron/Aeron.cs @@ -173,9 +173,6 @@ public void Dispose() { _conductorInvoker.Dispose(); } - - _conductor.ClientClose(); - _ctx.Dispose(); } } @@ -380,8 +377,11 @@ public static long ResourceLingerDurationNs() /// A number of the properties are for testing and should not be set by end users. /// /// Note: Do not reuse instances of the context across different clients. + /// + /// The context will be owned be after a successful + /// and closed via /// - public class Context : IDisposable + public class Context { private long _clientId; private bool _useConductorAgentInvoker = false; @@ -536,16 +536,6 @@ static Context() /// public const string MDC_CONTROL_MODE = "control-mode"; - /// - /// Key for the session id for a publication or restricted subscription. - /// - public const string SESSION_ID_PARAM_NAME = "session-id"; - - /// - /// Key for the linger timeout for a publication to wait around after draining in nanoseconds. - /// - public const string LINGER_PARAM_NAME = "linger"; - /// /// Valid value for when manual control is desired. /// @@ -555,7 +545,17 @@ static Context() /// Valid value for when dynamic control is desired. Default value. /// public const string MDC_CONTROL_MODE_DYNAMIC = "dynamic"; + + /// + /// Key for the session id for a publication or restricted subscription. + /// + public const string SESSION_ID_PARAM_NAME = "session-id"; + /// + /// Key for the linger timeout for a publication to wait around after draining in nanoseconds. + /// + public const string LINGER_PARAM_NAME = "linger"; + /// /// Parameter name for channel URI param to indicate if a subscribed must be reliable or not. Value is boolean. /// @@ -1223,8 +1223,10 @@ public IThreadFactory ThreadFactory() /// public void Dispose() { - IoUtil.Unmap(_cncByteBuffer); + var cncByteBuffer = _cncByteBuffer; _cncByteBuffer = null; + IoUtil.Unmap(cncByteBuffer); + _cncMetaDataBuffer?.Dispose(); _countersMetaDataBuffer?.Dispose(); diff --git a/src/Adaptive.Aeron/ChannelUri.cs b/src/Adaptive.Aeron/ChannelUri.cs index 2cc0b558..aa80ccc1 100644 --- a/src/Adaptive.Aeron/ChannelUri.cs +++ b/src/Adaptive.Aeron/ChannelUri.cs @@ -258,6 +258,11 @@ public override string ToString() /// for the stream. public void InitialPosition(long position, int initialTermId, int termLength) { + if (position < 0 || 0 != (position & (FrameDescriptor.FRAME_ALIGNMENT - 1))) + { + throw new ArgumentException("invalid position: " + position); + } + int bitsToShift = LogBufferDescriptor.PositionBitsToShift(termLength); int termId = LogBufferDescriptor.ComputeTermIdFromPosition(position, bitsToShift, initialTermId); int termOffset = (int)(position & (termLength - 1)); diff --git a/src/Adaptive.Aeron/ChannelUriStringBuilder.cs b/src/Adaptive.Aeron/ChannelUriStringBuilder.cs index 4207e7a1..1f5e7881 100644 --- a/src/Adaptive.Aeron/ChannelUriStringBuilder.cs +++ b/src/Adaptive.Aeron/ChannelUriStringBuilder.cs @@ -1,6 +1,7 @@ using System; using System.Text; using Adaptive.Aeron.LogBuffer; +using static Adaptive.Aeron.LogBuffer.FrameDescriptor; namespace Adaptive.Aeron { @@ -25,7 +26,6 @@ public class ChannelUriStringBuilder private string _tags; private string _alias; private bool? _reliable; - private bool? _sparse; private int? _ttl; private int? _mtu; private int? _termLength; @@ -34,6 +34,7 @@ public class ChannelUriStringBuilder private int? _termOffset; private int? _sessionId; private long? _linger; + private bool? _sparse; private bool _isSessionIdTagged; /// @@ -277,30 +278,6 @@ public ChannelUriStringBuilder Reliable(bool? isReliable) return _reliable; } - /// - /// Set to indicate if a term log buffer should be sparse on disk or not. Sparse saves space at the potential - /// expense of latency. - /// - /// true if the term buffer log is sparse on disk. - /// this for a fluent API. - /// - public ChannelUriStringBuilder Sparse(bool? isSparse) - { - _sparse = isSparse; - return this; - } - - /// - /// Get if a term log buffer should be sparse on disk or not. Sparse saves space at the potential expense of latency. - /// - /// true if the term buffer log is sparse on disk. - /// - public bool? Sparse() - { - return _sparse; - } - - /// /// Set the Time To Live (TTL) for a multicast datagram. Valid values are 0-255 for the number of hops the datagram /// can progress along. @@ -345,7 +322,7 @@ public ChannelUriStringBuilder Mtu(int? mtu) throw new ArgumentException("MTU not in range 32-65504: " + mtu); } - if ((mtu & (FrameDescriptor.FRAME_ALIGNMENT - 1)) != 0) + if ((mtu & (FRAME_ALIGNMENT - 1)) != 0) { throw new ArgumentException("MTU not a multiple of FRAME_ALIGNMENT: mtu=" + mtu); } @@ -454,7 +431,7 @@ public ChannelUriStringBuilder TermOffset(int? termOffset) throw new ArgumentException("term offset not in range 0-1g: " + termOffset); } - if (0 != (termOffset & (FrameDescriptor.FRAME_ALIGNMENT - 1))) + if (0 != (termOffset & (FRAME_ALIGNMENT - 1))) { throw new ArgumentException("term offset not multiple of FRAME_ALIGNMENT: " + termOffset); } @@ -525,6 +502,29 @@ public ChannelUriStringBuilder Linger(long? lingerNs) return _linger; } + /// + /// Set to indicate if a term log buffer should be sparse on disk or not. Sparse saves space at the potential + /// expense of latency. + /// + /// true if the term buffer log is sparse on disk. + /// this for a fluent API. + /// + public ChannelUriStringBuilder Sparse(bool? isSparse) + { + _sparse = isSparse; + return this; + } + + /// + /// Get if a term log buffer should be sparse on disk or not. Sparse saves space at the potential expense of latency. + /// + /// true if the term buffer log is sparse on disk. + /// + public bool? Sparse() + { + return _sparse; + } + /// /// Set the tags for a channel used by a publication or subscription. Tags can be used to identify or tag a /// channel so that a configuration can be referenced and reused. @@ -606,6 +606,11 @@ public string Alias() /// this for a fluent API. public ChannelUriStringBuilder InitialPosition(long position, int initialTermId, int termLength) { + if (position < 0 || 0 != (position & (FRAME_ALIGNMENT - 1))) + { + throw new ArgumentException("invalid position: " + position); + } + int bitsToShift = LogBufferDescriptor.PositionBitsToShift(termLength); _initialTermId = initialTermId; @@ -657,21 +662,6 @@ public string Build() _sb.Append(Aeron.Context.MDC_CONTROL_MODE_PARAM_NAME).Append('=').Append(_controlMode).Append('|'); } - if (null != _reliable) - { - _sb.Append(Aeron.Context.RELIABLE_STREAM_PARAM_NAME).Append('=').Append(_reliable).Append('|'); - } - - if (null != _sparse) - { - _sb.Append(Aeron.Context.SPARSE_PARAM_NAME).Append('=').Append(_sparse).Append('|'); - } - - if (null != _ttl) - { - _sb.Append(Aeron.Context.TTL_PARAM_NAME).Append('=').Append(_ttl.Value).Append('|'); - } - if (null != _mtu) { _sb.Append(Aeron.Context.MTU_LENGTH_PARAM_NAME).Append('=').Append(_mtu.Value).Append('|'); @@ -703,6 +693,16 @@ public string Build() _sb.Append(Aeron.Context.SESSION_ID_PARAM_NAME).Append('=').Append(PrefixTag(_isSessionIdTagged, _sessionId.Value)).Append('|'); } + if (null != _ttl) + { + _sb.Append(Aeron.Context.TTL_PARAM_NAME).Append('=').Append(_ttl.Value).Append('|'); + } + + if (null != _reliable) + { + _sb.Append(Aeron.Context.RELIABLE_STREAM_PARAM_NAME).Append('=').Append(_reliable).Append('|'); + } + if (null != _linger) { _sb.Append(Aeron.Context.LINGER_PARAM_NAME).Append('=').Append(_linger.Value).Append('|'); @@ -712,6 +712,11 @@ public string Build() { _sb.Append(Aeron.Context.ALIAS_PARAM_NAME).Append('=').Append(_alias).Append('|'); } + + if (null != _sparse) + { + _sb.Append(Aeron.Context.SPARSE_PARAM_NAME).Append('=').Append(_sparse).Append('|'); + } char lastChar = _sb[_sb.Length - 1]; if (lastChar == '|' || lastChar == '?') diff --git a/src/Adaptive.Aeron/ClientConductor.cs b/src/Adaptive.Aeron/ClientConductor.cs index 98b6810c..03019c9e 100644 --- a/src/Adaptive.Aeron/ClientConductor.cs +++ b/src/Adaptive.Aeron/ClientConductor.cs @@ -48,6 +48,7 @@ internal class ClientConductor : IAgent, IDriverEventsListener private long _timeOfLastServiceNs; private bool _isClosed; private bool _isInCallback; + private bool _isTerminating; private string _stashedChannel; private RegistrationException _driverException; @@ -91,7 +92,7 @@ internal ClientConductor(Aeron.Context ctx) _defaultUnavailableImageHandler = ctx.UnavailableImageHandler(); _availableCounterHandler = ctx.AvailableCounterHandler(); _unavailableCounterHandler = ctx.UnavailableCounterHandler(); - _driverEventsAdapter = new DriverEventsAdapter(ctx.ToClientBuffer(), this); + _driverEventsAdapter = new DriverEventsAdapter(ctx.ToClientBuffer(), ctx.ClientId(), this); _counterValuesBuffer = ctx.CountersValuesBuffer(); _countersReader = new CountersReader(ctx.CountersMetaDataBuffer(), ctx.CountersValuesBuffer(), Encoding.ASCII); @@ -114,7 +115,6 @@ public void OnClose() if (!_isClosed) { _isClosed = true; - ForceCloseResources(); Thread.Yield(); @@ -124,6 +124,7 @@ public void OnClose() } _driverProxy.ClientClose(); + _ctx.Dispose(); } } finally @@ -132,11 +133,6 @@ public void OnClose() } } - internal void ClientClose() - { - _driverProxy.ClientClose(); - } - public int DoWork() { int workCount = 0; @@ -145,7 +141,7 @@ public int DoWork() { try { - if (_isClosed) + if (_isTerminating) { throw new AgentTerminationException(); } @@ -166,11 +162,16 @@ public string RoleName() return "aeron-client-conductor"; } - public bool IsClosed() + internal bool IsClosed() { return _isClosed; } + internal bool IsTerminating() + { + return _isTerminating; + } + public void OnError(long correlationId, int codeValue, ErrorCode errorCode, string message) { _driverException = new RegistrationException(codeValue, errorCode, message); @@ -388,6 +389,16 @@ public void OnUnavailableCounter(long registrationId, int counterId) } } } + + public void OnClientTimeout() + { + if (!_isClosed) + { + _isTerminating = true; + ForceCloseResources(); + HandleError(new ClientTimeoutException("client timeout from driver")); + } + } internal CountersReader CountersReader() { @@ -404,7 +415,7 @@ internal ConcurrentPublication AddPublication(string channel, int streamId) _clientLock.Lock(); try { - EnsureOpen(); + EnsureActive(); EnsureNotReentrant(); _stashedChannel = channel; @@ -424,7 +435,7 @@ internal ExclusivePublication AddExclusivePublication(string channel, int stream _clientLock.Lock(); try { - EnsureOpen(); + EnsureActive(); EnsureNotReentrant(); _stashedChannel = channel; @@ -446,7 +457,7 @@ internal void ReleasePublication(Publication publication) { if (!publication.IsClosed) { - EnsureOpen(); + EnsureActive(); EnsureNotReentrant(); publication.InternalClose(); @@ -476,7 +487,7 @@ internal Subscription AddSubscription(string channel, int streamId, AvailableIma _clientLock.Lock(); try { - EnsureOpen(); + EnsureActive(); EnsureNotReentrant(); long correlationId = _driverProxy.AddSubscription(channel, streamId); @@ -501,7 +512,7 @@ internal void ReleaseSubscription(Subscription subscription) { if (!subscription.IsClosed) { - EnsureOpen(); + EnsureActive(); EnsureNotReentrant(); subscription.InternalClose(); @@ -522,7 +533,7 @@ internal void AddDestination(long registrationId, string endpointChannel) _clientLock.Lock(); try { - EnsureOpen(); + EnsureActive(); EnsureNotReentrant(); AwaitResponse(_driverProxy.AddDestination(registrationId, endpointChannel)); @@ -538,7 +549,7 @@ internal void RemoveDestination(long registrationId, string endpointChannel) _clientLock.Lock(); try { - EnsureOpen(); + EnsureActive(); EnsureNotReentrant(); AwaitResponse(_driverProxy.RemoveDestination(registrationId, endpointChannel)); @@ -554,7 +565,7 @@ internal void AddRcvDestination(long registrationId, string endpointChannel) _clientLock.Lock(); try { - EnsureOpen(); + EnsureActive(); EnsureNotReentrant(); AwaitResponse(_driverProxy.AddRcvDestination(registrationId, endpointChannel)); @@ -570,7 +581,7 @@ internal void RemoveRcvDestination(long registrationId, string endpointChannel) _clientLock.Lock(); try { - EnsureOpen(); + EnsureActive(); EnsureNotReentrant(); AwaitResponse(_driverProxy.RemoveRcvDestination(registrationId, endpointChannel)); @@ -587,7 +598,7 @@ internal Counter AddCounter(int typeId, IDirectBuffer keyBuffer, int keyOffset, _clientLock.Lock(); try { - EnsureOpen(); + EnsureActive(); EnsureNotReentrant(); if (keyLength < 0 || keyLength > CountersManager.MAX_KEY_LENGTH) @@ -616,7 +627,7 @@ internal Counter AddCounter(int typeId, string label) _clientLock.Lock(); try { - EnsureOpen(); + EnsureActive(); EnsureNotReentrant(); if (label.Length > CountersManager.MAX_LABEL_LENGTH) @@ -643,7 +654,7 @@ internal void ReleaseCounter(Counter counter) { if (!counter.IsClosed) { - EnsureOpen(); + EnsureActive(); EnsureNotReentrant(); counter.InternalClose(); @@ -691,11 +702,11 @@ internal long ChannelStatus(int channelStatusId) } } - private void EnsureOpen() + private void EnsureActive() { - if (_isClosed) + if (_isClosed || _isTerminating) { - throw new AeronException("Aeron client conductor is closed"); + throw new AeronException("Aeron client conductor is closed or terminating"); } } @@ -735,6 +746,11 @@ private int Service(long correlationId) { HandleError(throwable); + if (_driverEventsAdapter.IsInvalid) + { + OnClose(); + } + if (IsClientApiCall(correlationId)) { throw; @@ -756,11 +772,19 @@ private void AwaitResponse(long correlationId) do { - Thread.Sleep(1); + try + { + Thread.Sleep(1); + } + catch (ThreadInterruptedException) + { + _isTerminating = true; + throw; + } Service(correlationId); - if (_driverEventsAdapter.ReceivedCorrelationId() == correlationId) + if (_driverEventsAdapter.ReceivedCorrelationId == correlationId) { if (null != _driverException) { @@ -770,7 +794,15 @@ private void AwaitResponse(long correlationId) return; } - Thread.Sleep(0); // check interrupt + try + { + Thread.Sleep(1); + } + catch (ThreadInterruptedException) + { + _isTerminating = true; + throw; + } } while (deadlineNs - _nanoClock.NanoTime() > 0); @@ -798,16 +830,10 @@ private void CheckServiceInterval(long nowNs) { if ((_timeOfLastServiceNs + _interServiceTimeoutNs) - nowNs < 0) { - int lingeringResourcesSize = _lingeringResources.Count; - + _isTerminating = true; + ForceCloseResources(); - - if (_lingeringResources.Count > lingeringResourcesSize) - { - Aeron.Sleep(NanoUtil.ToMillis(_ctx.ResourceLingerDurationNs())); - } - - OnClose(); + Thread.Yield(); throw new ConductorServiceTimeoutException("service interval exceeded (ns): " + _interServiceTimeoutNs); } @@ -819,7 +845,11 @@ private int CheckLiveness(long nowNs) { if (_epochClock.Time() > (_driverProxy.TimeOfLastDriverKeepaliveMs() + _driverTimeoutMs)) { - OnClose(); + _isTerminating = true; + + ForceCloseResources(); + Thread.Yield(); + throw new DriverTimeoutException("MediaDriver keepalive older than (ms): " + _driverTimeoutMs); } diff --git a/src/Adaptive.Aeron/CncFileDescriptor.cs b/src/Adaptive.Aeron/CncFileDescriptor.cs index 247c4981..c4dd26e2 100644 --- a/src/Adaptive.Aeron/CncFileDescriptor.cs +++ b/src/Adaptive.Aeron/CncFileDescriptor.cs @@ -42,7 +42,7 @@ namespace Adaptive.Aeron /// +----------------------------+ /// /// - /// Meta Data Layout (CnC Version 13) + /// Meta Data Layout ///
     ///  0                   1                   2                   3
     ///  0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
@@ -73,8 +73,7 @@ namespace Adaptive.Aeron
     public class CncFileDescriptor
     {
         public const string CNC_FILE = "cnc.dat";
-
-        public const int CNC_VERSION = 14;
+        public const int CNC_VERSION = 15;
 
         public static readonly int CNC_VERSION_FIELD_OFFSET;
         public static readonly int TO_DRIVER_BUFFER_LENGTH_FIELD_OFFSET;
diff --git a/src/Adaptive.Aeron/Command/ClientTimeoutFlyweight.cs b/src/Adaptive.Aeron/Command/ClientTimeoutFlyweight.cs
new file mode 100644
index 00000000..4be7d82a
--- /dev/null
+++ b/src/Adaptive.Aeron/Command/ClientTimeoutFlyweight.cs
@@ -0,0 +1,61 @@
+using Adaptive.Agrona;
+
+namespace Adaptive.Aeron.Command
+{
+    /// 
+    /// Indicate a client has timed out by the driver.
+    /// 
+    /// 
+    /// 
+    /// 0                   1                   2                   3
+    /// 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+    /// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+    /// |                         Client Id                             |
+    /// |                                                               |
+    /// +---------------------------------------------------------------+
+    /// 
+ public class ClientTimeoutFlyweight + { + private const int CLIENT_ID_FIELD_OFFSET = 0; + + public static readonly int LENGTH = BitUtil.SIZE_OF_LONG; + + private IMutableDirectBuffer _buffer; + private int _offset; + + /// + /// Wrap the buffer at a given offset for updates. + /// + /// to wrap + /// at which the message begins. + /// for fluent API + public ClientTimeoutFlyweight Wrap(IMutableDirectBuffer buffer, int offset) + { + _buffer = buffer; + _offset = offset; + + return this; + } + + /// + /// return client id field + /// + /// client id field + public long ClientId() + { + return _buffer.GetLong(_offset + CLIENT_ID_FIELD_OFFSET); + } + + /// + /// set client id field + /// + /// field value + /// for fluent API + public ClientTimeoutFlyweight ClientId(long clientId) + { + _buffer.PutLong(_offset + CLIENT_ID_FIELD_OFFSET, clientId); + + return this; + } + } +} \ No newline at end of file diff --git a/src/Adaptive.Aeron/Command/ControlProtocolEvents.cs b/src/Adaptive.Aeron/Command/ControlProtocolEvents.cs index abf5c617..2e9410c6 100644 --- a/src/Adaptive.Aeron/Command/ControlProtocolEvents.cs +++ b/src/Adaptive.Aeron/Command/ControlProtocolEvents.cs @@ -134,6 +134,11 @@ public class ControlProtocolEvents /// Inform clients of removal of counter. ///
public const int ON_UNAVAILABLE_COUNTER = 0x0F09; + + /// + /// Inform clients of client timeout. + /// + public const int ON_CLIENT_TIMEOUT = 0x0F0A; } } \ No newline at end of file diff --git a/src/Adaptive.Aeron/DriverEventsAdapter.cs b/src/Adaptive.Aeron/DriverEventsAdapter.cs index b3daa96d..b9fb5411 100644 --- a/src/Adaptive.Aeron/DriverEventsAdapter.cs +++ b/src/Adaptive.Aeron/DriverEventsAdapter.cs @@ -15,10 +15,12 @@ */ using System; +using System.Runtime.CompilerServices; using Adaptive.Aeron.Command; using Adaptive.Agrona; using Adaptive.Agrona.Concurrent; using Adaptive.Agrona.Concurrent.Broadcast; +using static Adaptive.Aeron.Command.ControlProtocolEvents; namespace Adaptive.Aeron { @@ -36,15 +38,19 @@ internal class DriverEventsAdapter private readonly OperationSucceededFlyweight _operationSucceeded = new OperationSucceededFlyweight(); private readonly ImageMessageFlyweight _imageMessage = new ImageMessageFlyweight(); private readonly CounterUpdateFlyweight _counterUpdate = new CounterUpdateFlyweight(); + private readonly ClientTimeoutFlyweight _clientTimeout = new ClientTimeoutFlyweight(); private readonly IDriverEventsListener _listener; private readonly MessageHandler _messageHandler; private long _activeCorrelationId; private long _receivedCorrelationId; + private readonly long _clientId; + private bool _isInvalid; - internal DriverEventsAdapter(CopyBroadcastReceiver broadcastReceiver, IDriverEventsListener listener) + internal DriverEventsAdapter(CopyBroadcastReceiver broadcastReceiver, long clientId, IDriverEventsListener listener) { _broadcastReceiver = broadcastReceiver; + _clientId = clientId; _listener = listener; _messageHandler = OnMessage; } @@ -54,19 +60,29 @@ public int Receive(long activeCorrelationId) _activeCorrelationId = activeCorrelationId; _receivedCorrelationId = Aeron.NULL_VALUE; - return _broadcastReceiver.Receive(_messageHandler); + try + { + return _broadcastReceiver.Receive(_messageHandler); + } + catch (InvalidOperationException) + { + _isInvalid = true; + throw; + } } - public long ReceivedCorrelationId() - { - return _receivedCorrelationId; - } + public long ReceivedCorrelationId => _receivedCorrelationId; + + public bool IsInvalid => _isInvalid; + + public long ClientId => _clientId; + public void OnMessage(int msgTypeId, IMutableDirectBuffer buffer, int index, int length) { switch (msgTypeId) { - case ControlProtocolEvents.ON_ERROR: + case ON_ERROR: { _errorResponse.Wrap(buffer, index); @@ -88,7 +104,7 @@ public void OnMessage(int msgTypeId, IMutableDirectBuffer buffer, int index, int break; } - case ControlProtocolEvents.ON_AVAILABLE_IMAGE: + case ON_AVAILABLE_IMAGE: { _imageReady.Wrap(buffer, index); @@ -104,7 +120,7 @@ public void OnMessage(int msgTypeId, IMutableDirectBuffer buffer, int index, int } - case ControlProtocolEvents.ON_PUBLICATION_READY: + case ON_PUBLICATION_READY: { _publicationReady.Wrap(buffer, index); @@ -125,7 +141,7 @@ public void OnMessage(int msgTypeId, IMutableDirectBuffer buffer, int index, int break; } - case ControlProtocolEvents.ON_SUBSCRIPTION_READY: + case ON_SUBSCRIPTION_READY: { _subscriptionReady.Wrap(buffer, index); @@ -139,7 +155,7 @@ public void OnMessage(int msgTypeId, IMutableDirectBuffer buffer, int index, int break; } - case ControlProtocolEvents.ON_OPERATION_SUCCESS: + case ON_OPERATION_SUCCESS: { _operationSucceeded.Wrap(buffer, index); @@ -152,7 +168,7 @@ public void OnMessage(int msgTypeId, IMutableDirectBuffer buffer, int index, int break; } - case ControlProtocolEvents.ON_UNAVAILABLE_IMAGE: + case ON_UNAVAILABLE_IMAGE: { _imageMessage.Wrap(buffer, index); @@ -161,7 +177,7 @@ public void OnMessage(int msgTypeId, IMutableDirectBuffer buffer, int index, int break; } - case ControlProtocolEvents.ON_EXCLUSIVE_PUBLICATION_READY: + case ON_EXCLUSIVE_PUBLICATION_READY: { _publicationReady.Wrap(buffer, index); @@ -182,7 +198,7 @@ public void OnMessage(int msgTypeId, IMutableDirectBuffer buffer, int index, int break; } - case ControlProtocolEvents.ON_COUNTER_READY: + case ON_COUNTER_READY: { _counterUpdate.Wrap(buffer, index); @@ -201,13 +217,26 @@ public void OnMessage(int msgTypeId, IMutableDirectBuffer buffer, int index, int break; } - case ControlProtocolEvents.ON_UNAVAILABLE_COUNTER: + case ON_UNAVAILABLE_COUNTER: { _counterUpdate.Wrap(buffer, index); _listener.OnUnavailableCounter(_counterUpdate.CorrelationId(), _counterUpdate.CounterId()); break; } + + case ON_CLIENT_TIMEOUT: + { + _clientTimeout.Wrap(buffer, index); + + if (_clientTimeout.ClientId() == _clientId) + { + _listener.OnClientTimeout(); + } + + break; + } + } } diff --git a/src/Adaptive.Aeron/DriverProxy.cs b/src/Adaptive.Aeron/DriverProxy.cs index 11d0f48d..f4181131 100644 --- a/src/Adaptive.Aeron/DriverProxy.cs +++ b/src/Adaptive.Aeron/DriverProxy.cs @@ -28,8 +28,7 @@ namespace Adaptive.Aeron /// /// Writes commands into the client conductor buffer. /// - /// Note: this class is not thread safe and is expecting to be called within - /// with the exception of which is thread safe. + /// Note: this class is not thread safe and is expecting to be called within . ///
public class DriverProxy { @@ -69,6 +68,11 @@ public long TimeOfLastDriverKeepaliveMs() return _toDriverCommandBuffer.ConsumerHeartbeatTime(); } + public long ClientId() + { + return _correlatedMessage.ClientId(); + } + public long AddPublication(string channel, int streamId) { long correlationId = _toDriverCommandBuffer.NextCorrelationId(); @@ -278,15 +282,8 @@ public long RemoveCounter(long registrationId) public void ClientClose() { - var buffer = new UnsafeBuffer(new byte[CorrelatedMessageFlyweight.LENGTH]); - - new CorrelatedMessageFlyweight() - .Wrap(buffer, 0) - .ClientId(_correlatedMessage.ClientId()) - .CorrelationId(Aeron.NULL_VALUE); - - //_correlatedMessage.CorrelationId(_toDriverCommandBuffer.NextCorrelationId()); - _toDriverCommandBuffer.Write(ControlProtocolEvents.CLIENT_CLOSE, buffer, 0, CorrelatedMessageFlyweight.LENGTH); + _correlatedMessage.CorrelationId(Aeron.NULL_VALUE); + _toDriverCommandBuffer.Write(ControlProtocolEvents.CLIENT_CLOSE, _buffer, 0, CorrelatedMessageFlyweight.LENGTH); } } } \ No newline at end of file diff --git a/src/Adaptive.Aeron/Exceptions/ClientTimeoutException.cs b/src/Adaptive.Aeron/Exceptions/ClientTimeoutException.cs new file mode 100644 index 00000000..f71aaeef --- /dev/null +++ b/src/Adaptive.Aeron/Exceptions/ClientTimeoutException.cs @@ -0,0 +1,14 @@ +using System; + +namespace Adaptive.Aeron.Exceptions +{ + /// + /// Client timeout event received from the driver for this client. + /// + public class ClientTimeoutException : TimeoutException + { + public ClientTimeoutException(string message) : base(message) + { + } + } +} \ No newline at end of file diff --git a/src/Adaptive.Aeron/IDriverEventsListener.cs b/src/Adaptive.Aeron/IDriverEventsListener.cs index 191548f7..11458b11 100644 --- a/src/Adaptive.Aeron/IDriverEventsListener.cs +++ b/src/Adaptive.Aeron/IDriverEventsListener.cs @@ -74,6 +74,8 @@ void OnAvailableCounter( void OnUnavailableCounter( long correlationId, int counterId); + + void OnClientTimeout(); } } \ No newline at end of file diff --git a/src/Adaptive.Aeron/Image.cs b/src/Adaptive.Aeron/Image.cs index 7feaff91..fbfc9168 100644 --- a/src/Adaptive.Aeron/Image.cs +++ b/src/Adaptive.Aeron/Image.cs @@ -653,5 +653,19 @@ internal void Close() _isEos = _finalPosition >= LogBufferDescriptor.EndOfStreamPosition(_logBuffers.MetaDataBuffer()); _isClosed = true; } + + public override string ToString() + { + return "Image{" + + $"correlationId={CorrelationId}, " + + $"joinPosition={JoinPosition}, " + + $"sessionId={SessionId}, " + + $"initialTermId={InitialTermId}, " + + $"isEos={IsEndOfStream}, " + + $"sourceIdentity='{SourceIdentity}', " + + $"subscription={Subscription}, " + + $"position={Position}" + + '}'; + } } } \ No newline at end of file diff --git a/src/Adaptive.Aeron/Publication.cs b/src/Adaptive.Aeron/Publication.cs index 52b3e50a..8cbe7696 100644 --- a/src/Adaptive.Aeron/Publication.cs +++ b/src/Adaptive.Aeron/Publication.cs @@ -513,5 +513,18 @@ internal static int ValidateAndComputeLength(int lengthOne, int lengthTwo) return totalLength; } + public override string ToString() + { + return "Publication{" + + "originalRegistrationId=" + OriginalRegistrationId + + ", registrationId=" + RegistrationId + + ", initialTermId=" + InitialTermId + + ", termBufferLength=" + TermBufferLength + + ", sessionId=" + SessionId + + ", streamId=" + StreamId + + ", channel='" + Channel + '\'' + + ", position=" + Position + + '}'; + } } } \ No newline at end of file diff --git a/src/Adaptive.Aeron/Subscription.cs b/src/Adaptive.Aeron/Subscription.cs index 32bc3e83..b7077dac 100644 --- a/src/Adaptive.Aeron/Subscription.cs +++ b/src/Adaptive.Aeron/Subscription.cs @@ -533,5 +533,15 @@ private void CloseImages() } } } + + public override string ToString() + { + return "Subscription{" + + "registrationId=" + RegistrationId + + ", streamId=" + StreamId + + ", channel='" + Channel + '\'' + + ", imageCount=" + ImageCount + + '}'; + } } } \ No newline at end of file diff --git a/src/Adaptive.Agrona/Adaptive.Agrona.csproj b/src/Adaptive.Agrona/Adaptive.Agrona.csproj index ad8b192b..cf1a3b7b 100644 --- a/src/Adaptive.Agrona/Adaptive.Agrona.csproj +++ b/src/Adaptive.Agrona/Adaptive.Agrona.csproj @@ -3,7 +3,7 @@ netstandard2.0;net45 true Agrona - 1.14.0 + 1.15.0 Adaptive Financial Consulting Ltd. Adaptive Financial Consulting Ltd. Agrona libraries initially included in Aeron Client diff --git a/src/Adaptive.Agrona/Concurrent/Broadcast/BroadcastReceiver.cs b/src/Adaptive.Agrona/Concurrent/Broadcast/BroadcastReceiver.cs index b6f44fc1..9d5a8d47 100644 --- a/src/Adaptive.Agrona/Concurrent/Broadcast/BroadcastReceiver.cs +++ b/src/Adaptive.Agrona/Concurrent/Broadcast/BroadcastReceiver.cs @@ -63,6 +63,9 @@ public BroadcastReceiver(IAtomicBuffer buffer) _tailIntentCounterIndex = _capacity + BroadcastBufferDescriptor.TailIntentCounterOffset; _tailCounterIndex = _capacity + BroadcastBufferDescriptor.TailCounterOffset; _latestCounterIndex = _capacity + BroadcastBufferDescriptor.LatestCounterOffset; + + _cursor = _nextRecord = buffer.GetLongVolatile(_latestCounterIndex); + _recordOffset = (int) _cursor & (_capacity - 1); } /// diff --git a/src/Adaptive.Agrona/Concurrent/Broadcast/CopyBroadcastReceiver.cs b/src/Adaptive.Agrona/Concurrent/Broadcast/CopyBroadcastReceiver.cs index e44ca257..4635cb78 100644 --- a/src/Adaptive.Agrona/Concurrent/Broadcast/CopyBroadcastReceiver.cs +++ b/src/Adaptive.Agrona/Concurrent/Broadcast/CopyBroadcastReceiver.cs @@ -19,7 +19,7 @@ namespace Adaptive.Agrona.Concurrent.Broadcast { /// - /// Receiver that copies messages that have been broadcast to enable a simpler API for the client. + /// Receiver that copies messages which have been broadcast to enable a simpler API for the client. /// public class CopyBroadcastReceiver { @@ -45,13 +45,6 @@ public CopyBroadcastReceiver(BroadcastReceiver receiver, int scratchBufferLength { _receiver = receiver; _scratchBuffer = new UnsafeBuffer(BufferUtil.AllocateDirect(scratchBufferLength)); - - while (receiver.ReceiveNext()) - { - // If we're reconnecting to a broadcast buffer then we need to - // scan ourselves up to date, otherwise we risk "falling behind" - // the buffer due to the time taken to catchup. - } } /// @@ -62,13 +55,6 @@ public CopyBroadcastReceiver(BroadcastReceiver receiver) { _receiver = receiver; _scratchBuffer = new UnsafeBuffer(BufferUtil.AllocateDirect(ScratchBufferSize)); - - while (receiver.ReceiveNext()) - { - // If we're reconnecting to a broadcast buffer then we need to - // scan ourselves up to date, otherwise we risk "falling behind" - // the buffer due to the time taken to catchup. - } } /// @@ -86,7 +72,7 @@ public int Receive(MessageHandler handler) { if (lastSeenLappedCount != receiver.LappedCount()) { - throw new InvalidOperationException("Unable to keep up with broadcast buffer"); + throw new InvalidOperationException("Unable to keep up with broadcast"); } var length = receiver.Length(); @@ -101,7 +87,7 @@ public int Receive(MessageHandler handler) if (!receiver.Validate()) { - throw new InvalidOperationException("Unable to keep up with broadcast buffer"); + throw new InvalidOperationException("Unable to keep up with broadcast"); } handler(msgTypeId, _scratchBuffer, 0, length); diff --git a/src/Adaptive.Archiver/Adaptive.Archiver.csproj b/src/Adaptive.Archiver/Adaptive.Archiver.csproj index a6bd61e0..b4ccbff2 100644 --- a/src/Adaptive.Archiver/Adaptive.Archiver.csproj +++ b/src/Adaptive.Archiver/Adaptive.Archiver.csproj @@ -3,7 +3,7 @@ netstandard2.0;net45 true Aeron.Archiver - 1.14.0 + 1.15.0 Adaptive Financial Consulting Ltd. Adaptive Financial Consulting Ltd. Archiving over the Aeron transport diff --git a/src/Adaptive.Archiver/AeronArchive.cs b/src/Adaptive.Archiver/AeronArchive.cs index 45f1ae40..c9669569 100644 --- a/src/Adaptive.Archiver/AeronArchive.cs +++ b/src/Adaptive.Archiver/AeronArchive.cs @@ -38,6 +38,7 @@ public class AeronArchive : IDisposable private const int FRAGMENT_LIMIT = 10; + private bool isClosed = false; private readonly long controlSessionId; private readonly long messageTimeoutNs; private readonly Context context; @@ -93,7 +94,7 @@ internal AeronArchive(Context ctx) CloseHelper.QuietDispose(publication); } - CloseHelper.QuietDispose(ctx); + ctx.Dispose(); throw; } @@ -122,15 +123,19 @@ public void Dispose() _lock.Lock(); try { - archiveProxy.CloseSession(controlSessionId); - - if (!context.OwnsAeronClient()) + if (!isClosed) { - controlResponsePoller.Subscription()?.Dispose(); - archiveProxy.Pub()?.Dispose(); - } + isClosed = true; + archiveProxy.CloseSession(controlSessionId); + + if (!context.OwnsAeronClient()) + { + controlResponsePoller.Subscription()?.Dispose(); + archiveProxy.Pub()?.Dispose(); + } - context.Dispose(); + context.Dispose(); + } } finally { @@ -207,7 +212,7 @@ public static AsyncConnect ConnectAsync(Context ctx) CloseHelper.QuietDispose(publication); } - CloseHelper.QuietDispose(ctx); + ctx.Dispose(); throw; } @@ -269,6 +274,8 @@ public string PollForErrorResponse() _lock.Lock(); try { + EnsureOpen(); + if (controlResponsePoller.Poll() != 0 && controlResponsePoller.IsPollComplete()) { if (controlResponsePoller.ControlSessionId() == controlSessionId && @@ -299,6 +306,8 @@ public void CheckForErrorResponse() _lock.Lock(); try { + EnsureOpen(); + if (controlResponsePoller.Poll() != 0 && controlResponsePoller.IsPollComplete()) { if (controlResponsePoller.ControlSessionId() == controlSessionId && @@ -343,6 +352,8 @@ public Publication AddRecordedPublication(string channel, int streamId) _lock.Lock(); try { + EnsureOpen(); + publication = aeron.AddPublication(channel, streamId); if (!publication.IsOriginal) { @@ -383,8 +394,9 @@ public ExclusivePublication AddRecordedExclusivePublication(string channel, int _lock.Lock(); try { + EnsureOpen(); + publication = aeron.AddExclusivePublication(channel, streamId); - StartRecording(ChannelUri.AddSessionId(channel, publication.SessionId), streamId, SourceLocation.LOCAL); } catch (Exception) @@ -418,6 +430,8 @@ public long StartRecording(string channel, int streamId, SourceLocation sourceLo _lock.Lock(); try { + EnsureOpen(); + long correlationId = aeron.NextCorrelationId(); if (!archiveProxy.StartRecording(channel, streamId, sourceLocation, correlationId, controlSessionId)) @@ -451,6 +465,8 @@ public long ExtendRecording(long recordingId, string channel, int streamId, _lock.Lock(); try { + EnsureOpen(); + long correlationId = aeron.NextCorrelationId(); if (!archiveProxy.ExtendRecording(channel, streamId, sourceLocation, recordingId, correlationId, @@ -483,6 +499,8 @@ public void StopRecording(string channel, int streamId) _lock.Lock(); try { + EnsureOpen(); + long correlationId = aeron.NextCorrelationId(); if (!archiveProxy.StopRecording(channel, streamId, correlationId, controlSessionId)) @@ -514,12 +532,14 @@ public void StopRecording(Publication publication) /// or /// . /// - /// the subscription was registered with for the recording. + /// is the was registered with for the recording. public void StopRecording(long subscriptionId) { _lock.Lock(); try { + EnsureOpen(); + long correlationId = aeron.NextCorrelationId(); if (!archiveProxy.StopRecording(subscriptionId, correlationId, controlSessionId)) @@ -558,6 +578,8 @@ public long StartReplay(long recordingId, long position, long length, string rep _lock.Lock(); try { + EnsureOpen(); + long correlationId = aeron.NextCorrelationId(); if (!archiveProxy.Replay(recordingId, position, length, replayChannel, replayStreamId, correlationId, @@ -583,6 +605,8 @@ public void StopReplay(long replaySessionId) _lock.Lock(); try { + EnsureOpen(); + long correlationId = aeron.NextCorrelationId(); if (!archiveProxy.StopReplay(replaySessionId, correlationId, controlSessionId)) @@ -614,6 +638,8 @@ public Subscription Replay(long recordingId, long position, long length, string _lock.Lock(); try { + EnsureOpen(); + ChannelUri replayChannelUri = ChannelUri.Parse(replayChannel); long correlationId = aeron.NextCorrelationId(); @@ -653,6 +679,8 @@ public Subscription Replay(long recordingId, long position, long length, string _lock.Lock(); try { + EnsureOpen(); + ChannelUri replayChannelUri = ChannelUri.Parse(replayChannel); long correlationId = aeron.NextCorrelationId(); @@ -690,6 +718,8 @@ public int ListRecordings(long fromRecordingId, int recordCount, IRecordingDescr _lock.Lock(); try { + EnsureOpen(); + long correlationId = aeron.NextCorrelationId(); if (!archiveProxy.ListRecordings(fromRecordingId, recordCount, correlationId, controlSessionId)) @@ -728,6 +758,8 @@ public int ListRecordingsForUri( _lock.Lock(); try { + EnsureOpen(); + long correlationId = aeron.NextCorrelationId(); if (!archiveProxy.ListRecordingsForUri(fromRecordingId, recordCount, channelFragment, streamId, correlationId, @@ -759,6 +791,8 @@ public int ListRecording(long recordingId, IRecordingDescriptorConsumer consumer _lock.Lock(); try { + EnsureOpen(); + long correlationId = aeron.NextCorrelationId(); if (!archiveProxy.ListRecording(recordingId, correlationId, controlSessionId)) @@ -785,6 +819,8 @@ public long GetRecordingPosition(long recordingId) _lock.Lock(); try { + EnsureOpen(); + long correlationId = aeron.NextCorrelationId(); if (!archiveProxy.GetRecordingPosition(recordingId, correlationId, controlSessionId)) @@ -811,6 +847,8 @@ public long GetStopPosition(long recordingId) _lock.Lock(); try { + EnsureOpen(); + long correlationId = aeron.NextCorrelationId(); if (!archiveProxy.GetStopPosition(recordingId, correlationId, controlSessionId)) @@ -839,6 +877,8 @@ public long FindLastMatchingRecording(long minRecordingId, string channelFragmen _lock.Lock(); try { + EnsureOpen(); + long correlationId = aeron.NextCorrelationId(); if (!archiveProxy.FindLastMatchingRecording( @@ -866,6 +906,8 @@ public void TruncateRecording(long recordingId, long position) _lock.Lock(); try { + EnsureOpen(); + long correlationId = aeron.NextCorrelationId(); if (!archiveProxy.TruncateRecording(recordingId, position, correlationId, controlSessionId)) @@ -1059,6 +1101,14 @@ private void InvokeAeronClient() } } + private void EnsureOpen() + { + if (isClosed) + { + throw new ArchiveException("client is closed"); + } + } + /// /// Common configuration properties for communicating with an Aeron archive. /// @@ -1318,8 +1368,11 @@ public static int RecordingEventsStreamId() /// /// Specialised configuration options for communicating with an Aeron Archive. + /// + /// The context will be owned by after a successful + /// and closed via /// - public class Context : IDisposable + public class Context { internal long messageTimeoutNs = Configuration.MessageTimeoutNs(); internal string recordingEventsChannel = Configuration.RecordingEventsChannel(); @@ -1769,13 +1822,13 @@ internal AsyncConnect(Context ctx, ControlResponsePoller controlResponsePoller, } /// - /// Close any allocated resources if it fails to connect. + /// Close any allocated resources. /// public void Dispose() { controlResponsePoller.Subscription()?.Dispose(); archiveProxy.Pub()?.Dispose(); - ctx?.Dispose(); + ctx.Dispose(); } /// diff --git a/src/Adaptive.Archiver/ArchiveException.cs b/src/Adaptive.Archiver/ArchiveException.cs index 111f5fdf..6190d5b9 100644 --- a/src/Adaptive.Archiver/ArchiveException.cs +++ b/src/Adaptive.Archiver/ArchiveException.cs @@ -14,6 +14,7 @@ public class ArchiveException : AeronException public const int UNKNOWN_REPLAY = 6; public const int MAX_REPLAYS = 7; public const int MAX_RECORDINGS = 8; + public const int INVALID_EXTENSION = 9; /// /// Error code providing more detail into what went wrong. diff --git a/src/Adaptive.Cluster/Adaptive.Cluster.csproj b/src/Adaptive.Cluster/Adaptive.Cluster.csproj index 0af079c1..11b63d1b 100644 --- a/src/Adaptive.Cluster/Adaptive.Cluster.csproj +++ b/src/Adaptive.Cluster/Adaptive.Cluster.csproj @@ -3,7 +3,7 @@ netstandard2.0;net45 true Aeron.Cluster - 1.14.0 + 1.15.0 Adaptive Financial Consulting Ltd. Adaptive Financial Consulting Ltd. Clustering libraries over the Aeron transport diff --git a/src/Adaptive.Cluster/Client/AeronCluster.cs b/src/Adaptive.Cluster/Client/AeronCluster.cs index 6419e17d..3410ade9 100644 --- a/src/Adaptive.Cluster/Client/AeronCluster.cs +++ b/src/Adaptive.Cluster/Client/AeronCluster.cs @@ -46,7 +46,8 @@ public sealed class AeronCluster : IDisposable private IDictionary _endpointByMemberIdMap = new DefaultDictionary(); private readonly BufferClaim _bufferClaim = new BufferClaim(); - private readonly UnsafeBuffer _msgHeaderBuffer = new UnsafeBuffer(new byte[INGRESS_HEADER_LENGTH]); + private readonly UnsafeBuffer _headerBuffer = new UnsafeBuffer(new byte[INGRESS_HEADER_LENGTH]); + private readonly DirectBufferVector _headerVector; private readonly UnsafeBuffer _keepaliveMsgBuffer; private readonly MessageHeaderEncoder _messageHeaderEncoder = new MessageHeaderEncoder(); private readonly IngressMessageHeaderEncoder _ingressMessageHeaderEncoder = new IngressMessageHeaderEncoder(); @@ -54,6 +55,9 @@ public sealed class AeronCluster : IDisposable private readonly FragmentAssembler _fragmentAssembler; private readonly Poller _poller; private readonly IEgressListener _egressListener; + private readonly ControlledFragmentAssembler _controlledFragmentAssembler; + private readonly IControlledEgressListener _controlledEgressListener; + private readonly ControlledPoller _controlledPoller; private class Poller : IFragmentHandler { @@ -139,6 +143,96 @@ public void OnFragment(IDirectBuffer buffer, int offset, int length, Header head } } + private class ControlledPoller : IControlledFragmentHandler + { + private readonly MessageHeaderDecoder _messageHeaderDecoder = new MessageHeaderDecoder(); + private readonly EgressMessageHeaderDecoder _egressMessageHeaderDecoder = new EgressMessageHeaderDecoder(); + private readonly NewLeaderEventDecoder _newLeaderEventDecoder = new NewLeaderEventDecoder(); + private readonly SessionEventDecoder _sessionEventDecoder = new SessionEventDecoder(); + + private readonly IControlledEgressListener _egressListener; + private readonly long _clusterSessionId; + private readonly AeronCluster _cluster; + + public ControlledPoller(IControlledEgressListener egressListener, long clusterSessionId, AeronCluster cluster) + { + _egressListener = egressListener; + _clusterSessionId = clusterSessionId; + _cluster = cluster; + } + + public ControlledFragmentHandlerAction OnFragment(IDirectBuffer buffer, int offset, int length, Header header) + { + _messageHeaderDecoder.Wrap(buffer, offset); + + int templateId = _messageHeaderDecoder.TemplateId(); + if (EgressMessageHeaderDecoder.TEMPLATE_ID == templateId) + { + _egressMessageHeaderDecoder.Wrap( + buffer, + offset + MessageHeaderDecoder.ENCODED_LENGTH, + _messageHeaderDecoder.BlockLength(), + _messageHeaderDecoder.Version()); + + long sessionId = _egressMessageHeaderDecoder.ClusterSessionId(); + if (sessionId == _clusterSessionId) + { + return _egressListener.OnMessage( + sessionId, + _egressMessageHeaderDecoder.Timestamp(), + buffer, + offset + EGRESS_HEADER_LENGTH, + length - EGRESS_HEADER_LENGTH, + header); + } + } + else if (NewLeaderEventDecoder.TEMPLATE_ID == templateId) + { + _newLeaderEventDecoder.Wrap( + buffer, + offset + MessageHeaderDecoder.ENCODED_LENGTH, + _messageHeaderDecoder.BlockLength(), + _messageHeaderDecoder.Version()); + + long sessionId = _newLeaderEventDecoder.ClusterSessionId(); + if (sessionId == _clusterSessionId) + { + _cluster.OnNewLeader( + sessionId, + _newLeaderEventDecoder.LeadershipTermId(), + _newLeaderEventDecoder.LeaderMemberId(), + _newLeaderEventDecoder.MemberEndpoints()); + + return ControlledFragmentHandlerAction.COMMIT; + } + } + else if (SessionEventDecoder.TEMPLATE_ID == templateId) + { + _sessionEventDecoder.Wrap( + buffer, + offset + MessageHeaderDecoder.ENCODED_LENGTH, + _messageHeaderDecoder.BlockLength(), + _messageHeaderDecoder.Version()); + + long sessionId = _sessionEventDecoder.ClusterSessionId(); + if (sessionId == _clusterSessionId) + { + _egressListener.SessionEvent( + _sessionEventDecoder.CorrelationId(), + sessionId, + _sessionEventDecoder.LeadershipTermId(), + _sessionEventDecoder.LeaderMemberId(), + _sessionEventDecoder.Code(), + _sessionEventDecoder.Detail()); + + return ControlledFragmentHandlerAction.COMMIT; + } + } + + return ControlledFragmentHandlerAction.CONTINUE; + } + } + /// /// Connect to the cluster using default configuration. /// @@ -160,6 +254,8 @@ public static AeronCluster Connect(Context ctx) private AeronCluster(Context ctx) { + _headerVector = new DirectBufferVector(_headerBuffer, 0, _headerBuffer.Capacity); + Subscription subscription = null; try @@ -171,6 +267,13 @@ private AeronCluster(Context ctx) _idleStrategy = ctx.IdleStrategy(); _nanoClock = _aeron.Ctx.NanoClock(); _isUnicast = ctx.ClusterMemberEndpoints() != null; + _egressListener = ctx.EgressListener(); + _poller = new Poller(ctx.EgressListener(), _clusterSessionId, this); + _fragmentAssembler = new FragmentAssembler(_poller); + + _controlledEgressListener = ctx.ControlledEgressListener(); + _controlledPoller = new ControlledPoller(ctx.ControlledEgressListener(), _clusterSessionId, this); + _controlledFragmentAssembler = new ControlledFragmentAssembler(_controlledPoller, 0); // IsDirect subscription = _aeron.AddSubscription(ctx.EgressChannel(), ctx.EgressStreamId()); _subscription = subscription; @@ -178,7 +281,7 @@ private AeronCluster(Context ctx) _clusterSessionId = ConnectToCluster(); _ingressMessageHeaderEncoder - .WrapAndApplyHeader(_msgHeaderBuffer, 0, _messageHeaderEncoder) + .WrapAndApplyHeader(_headerBuffer, 0, _messageHeaderEncoder) .ClusterSessionId(_clusterSessionId) .LeadershipTermId(_leadershipTermId); @@ -189,10 +292,6 @@ private AeronCluster(Context ctx) .WrapAndApplyHeader(_keepaliveMsgBuffer, 0, _messageHeaderEncoder) .LeadershipTermId(_leadershipTermId) .ClusterSessionId(_clusterSessionId); - - _poller = new Poller(ctx.EgressListener(), _clusterSessionId, this); - _egressListener = ctx.EgressListener(); - _fragmentAssembler = new FragmentAssembler(_poller); } catch (Exception) { @@ -207,7 +306,7 @@ private AeronCluster(Context ctx) CloseHelper.QuietDispose(subscription); } - CloseHelper.QuietDispose(ctx); + ctx.Dispose(); throw; } } @@ -290,13 +389,30 @@ public void Dispose() /// the same as . public long Offer(IDirectBuffer buffer, int offset, int length) { - return _publication.Offer(_msgHeaderBuffer, 0, INGRESS_HEADER_LENGTH, buffer, offset, length); + return _publication.Offer(_headerBuffer, 0, INGRESS_HEADER_LENGTH, buffer, offset, length); + } + + /// + /// Non-blocking publish by gathering buffer vectors into a message. The first vector will be replaced cluster + /// ingress header so must be left unused. + /// + /// which make up the message. + /// the same as . + /// + public long Offer(DirectBufferVector[] vectors) + { + if (_headerVector != vectors[0]) + { + vectors[0] = _headerVector; + } + + return _publication.Offer(vectors); } /// /// Send a keep alive message to the cluster to keep this session open. /// - /// Note: keep alives can fail during a leadership transition. The consumer should continue to call + /// Note: keepalives can fail during a leadership transition. The consumer should continue to call /// to ensure a connection to the new leader is established. /// /// @@ -324,7 +440,7 @@ public bool SendKeepAlive() { throw new ClusterException("unexpected publication state: " + result); } - + if (--attempts <= 0) { break; @@ -350,6 +466,21 @@ public int PollEgress() return _subscription.Poll(_fragmentAssembler, SESSION_FRAGMENT_LIMIT); } + /// + /// Poll the for session messages which are dispatched to + /// . + /// + /// Note: if is not set then a + /// could result. + /// + /// + /// + /// the number of fragments processed. + public int ControlledPollEgress() + { + return _subscription.ControlledPoll(_controlledFragmentAssembler, SESSION_FRAGMENT_LIMIT); + } + /// /// To be called when a new leader event is delivered. This method needs to be called when using the /// or rather than method. @@ -381,7 +512,10 @@ public void OnNewLeader(long clusterSessionId, long leadershipTermId, int leader UpdateMemberEndpoints(memberEndpoints, leaderMemberId); } + _fragmentAssembler.Clear(); + _controlledFragmentAssembler.Clear(); _egressListener.NewLeader(clusterSessionId, leadershipTermId, leaderMemberId, memberEndpoints); + _controlledEgressListener.NewLeader(clusterSessionId, leadershipTermId, leaderMemberId, memberEndpoints); } private void UpdateMemberEndpoints(string memberEndpoints, int leaderMemberId) @@ -837,9 +971,9 @@ public static int EgressStreamId() /// /// Context for cluster session and connection. /// - public class Context : IDisposable + public class Context { - private class MissingEgressMessageListener : IEgressListener + private class MissingEgressMessageListener : IEgressListener, IControlledEgressListener { public void OnMessage( long clusterSessionId, @@ -852,6 +986,17 @@ public void OnMessage( throw new ConfigurationException("egressMessageListener must be specified on AeronCluster.Context"); } + ControlledFragmentHandlerAction IControlledEgressListener.OnMessage( + long clusterSessionId, + long timestampMs, + IDirectBuffer buffer, + int offset, + int length, + Header header) + { + throw new ConfigurationException("controlledEgressListened must be specified on AeronCluster.Context"); + } + public void SessionEvent( long correlationId, long clusterSessionId, @@ -860,7 +1005,6 @@ public void SessionEvent( EventCode code, string detail) { - throw new ConfigurationException("egressMessageListener must be specified on AeronCluster.Context"); } public void NewLeader( @@ -869,7 +1013,6 @@ public void NewLeader( int leaderMemberId, string memberEndpoints) { - throw new ConfigurationException("egressMessageListener must be specified on AeronCluster.Context"); } } @@ -886,7 +1029,9 @@ public void NewLeader( private bool _ownsAeronClient = false; private bool _isIngressExclusive = false; private ErrorHandler _errorHandler = Adaptive.Aeron.Aeron.Configuration.DEFAULT_ERROR_HANDLER; + private bool _isDirectAssemblers = false; private IEgressListener _egressListener; + private IControlledEgressListener _controlledEgressListener; /// /// Perform a shallow copy of the object. @@ -922,6 +1067,11 @@ public void Conclude() { _egressListener = new MissingEgressMessageListener(); } + + if (null == _controlledEgressListener) + { + _controlledEgressListener = new MissingEgressMessageListener(); + } } /// @@ -1222,6 +1372,26 @@ public Context ErrorHandler(ErrorHandler errorHandler) return this; } + /// + /// Is direct buffers used for fragment assembly on egress. + /// + /// true if direct buffers used for fragment assembly on egress. + public bool IsDirectAssemblers() + { + return _isDirectAssemblers; + } + + /// + /// Is direct buffers used for fragment assembly on egress. + /// + /// true if direct buffers used for fragment assembly on egress. + /// this for a fluent API. + public Context IsDirectAssemblers(bool isDirectAssemblers) + { + _isDirectAssemblers = isDirectAssemblers; + return this; + } + /// /// Get the function that will be called when polling for egress via /// . @@ -1236,6 +1406,10 @@ public IEgressListener EgressListener() /// /// Get the function that will be called when polling for egress via /// . + /// + /// Only will be dispatched + /// when using + /// /// /// function that will be called when polling for egress via . /// this for a fluent API. @@ -1245,6 +1419,32 @@ public Context EgressListener(IEgressListener listener) return this; } + /// + /// Get the function that will be called when polling for egress via + /// . + /// + /// the function that will be called when polling for egress via + /// . + public IControlledEgressListener ControlledEgressListener() + { + return _controlledEgressListener; + } + + /// + /// Get the function that will be called when polling for egress via + /// . + /// + /// Only will be + /// dispatched when using . + /// + /// function that will be called when polling for egress via + /// . + /// this for a fluent API. + public Context ControlledEgressListener(IControlledEgressListener listener) + { + _controlledEgressListener = listener; + return this; + } /// /// Close the context and free applicable resources. diff --git a/src/Adaptive.Cluster/Client/IControlledEgressListener.cs b/src/Adaptive.Cluster/Client/IControlledEgressListener.cs new file mode 100644 index 00000000..041354a0 --- /dev/null +++ b/src/Adaptive.Cluster/Client/IControlledEgressListener.cs @@ -0,0 +1,42 @@ +using Adaptive.Aeron.LogBuffer; +using Adaptive.Agrona; +using Adaptive.Cluster.Codecs; + +namespace Adaptive.Cluster.Client +{ + /// + /// Interface for consuming messages coming from the cluster that also include administrative events in a controlled + /// fashion like . Only session messages my be controlled in + /// consumption, other are consumed via . + /// + public interface IControlledEgressListener + { + /// + /// Message event returned from the clustered service. + /// + /// to which the message belongs. + /// at which the correlated ingress was sequenced in the cluster. + /// containing the message. + /// at which the message begins. + /// of the message in bytes. + /// Aeron header associated with the message fragment. + /// what action should be taken regarding advancement of the stream. + ControlledFragmentHandlerAction OnMessage( + long clusterSessionId, + long timestampMs, + IDirectBuffer buffer, + int offset, + int length, + Header header); + + void SessionEvent( + long correlationId, + long clusterSessionId, + long leadershipTermId, + int leaderMemberId, + EventCode code, + string detail); + + void NewLeader(long clusterSessionId, long leadershipTermId, int leaderMemberId, string memberEndpoints); + } +} \ No newline at end of file diff --git a/src/Adaptive.Cluster/Codecs/ClusterActionRequestDecoder.cs b/src/Adaptive.Cluster/Codecs/ClusterActionRequestDecoder.cs index 8e20d230..ad5566a2 100644 --- a/src/Adaptive.Cluster/Codecs/ClusterActionRequestDecoder.cs +++ b/src/Adaptive.Cluster/Codecs/ClusterActionRequestDecoder.cs @@ -334,7 +334,7 @@ public StringBuilder AppendTo(StringBuilder builder) builder.Append(Timestamp()); builder.Append('|'); //Token{signal=BEGIN_FIELD, name='action', referencedName='null', description='null', id=4, version=0, deprecated=0, encodedLength=0, offset=24, componentTokenCount=7, encoding=Encoding{presence=REQUIRED, primitiveType=null, byteOrder=LITTLE_ENDIAN, minValue=null, maxValue=null, nullValue=null, constValue=null, characterEncoding='null', epoch='unix', timeUnit=nanosecond, semanticType='null'}} - //Token{signal=BEGIN_ENUM, name='ClusterAction', referencedName='null', description='Action to be taken by a cluster nodes', id=-1, version=0, deprecated=0, encodedLength=4, offset=24, componentTokenCount=5, encoding=Encoding{presence=REQUIRED, primitiveType=INT32, byteOrder=LITTLE_ENDIAN, minValue=null, maxValue=null, nullValue=null, constValue=null, characterEncoding='null', epoch='null', timeUnit=null, semanticType='null'}} + //Token{signal=BEGIN_ENUM, name='ClusterAction', referencedName='null', description='Action to be taken by cluster nodes', id=-1, version=0, deprecated=0, encodedLength=4, offset=24, componentTokenCount=5, encoding=Encoding{presence=REQUIRED, primitiveType=INT32, byteOrder=LITTLE_ENDIAN, minValue=null, maxValue=null, nullValue=null, constValue=null, characterEncoding='null', epoch='null', timeUnit=null, semanticType='null'}} builder.Append("Action="); builder.Append(Action()); diff --git a/src/Adaptive.Cluster/Service/ClientSession.cs b/src/Adaptive.Cluster/Service/ClientSession.cs index 20451375..7f600ced 100644 --- a/src/Adaptive.Cluster/Service/ClientSession.cs +++ b/src/Adaptive.Cluster/Service/ClientSession.cs @@ -89,7 +89,20 @@ public long Offer(IDirectBuffer buffer, int offset, int length) { return _cluster.Offer(Id, _responsePublication, buffer, offset, length); } - + + /// + /// Non-blocking publish by gathering buffer vectors into a message. The first vector will be replaced cluster + /// egress header so must be left unused. + /// + /// which make up the message. + /// the same as . + /// when in + /// otherwise . + public long Offer(DirectBufferVector[] vectors) + { + return _cluster.Offer(Id, _responsePublication, vectors); + } + internal void Connect(Aeron.Aeron aeron) { if (null == _responsePublication) diff --git a/src/Adaptive.Cluster/Service/ClusteredServiceAgent.cs b/src/Adaptive.Cluster/Service/ClusteredServiceAgent.cs index 6ecde209..41734830 100644 --- a/src/Adaptive.Cluster/Service/ClusteredServiceAgent.cs +++ b/src/Adaptive.Cluster/Service/ClusteredServiceAgent.cs @@ -34,6 +34,7 @@ internal sealed class ClusteredServiceAgent : IAgent, ICluster private readonly IEpochClock epochClock; private readonly ClusterMarkFile markFile; private readonly UnsafeBuffer headerBuffer = new UnsafeBuffer(new byte[SESSION_HEADER_LENGTH]); + private readonly DirectBufferVector headerVector; private readonly EgressMessageHeaderEncoder _egressMessageHeaderEncoder = new EgressMessageHeaderEncoder(); private long ackId = 0; @@ -41,6 +42,7 @@ internal sealed class ClusteredServiceAgent : IAgent, ICluster private long cachedTimeMs; private long terminationPosition = NULL_POSITION; private int memberId = NULL_VALUE; + private bool isServiceActive; private BoundedLogAdapter logAdapter; private AtomicCounter heartbeatCounter; private ReadableCounter roleCounter; @@ -52,6 +54,8 @@ internal sealed class ClusteredServiceAgent : IAgent, ICluster internal ClusteredServiceAgent(ClusteredServiceContainer.Context ctx) { this.ctx = ctx; + + headerVector = new DirectBufferVector(headerBuffer, 0, headerBuffer.Capacity); archiveCtx = ctx.ArchiveContext(); aeron = ctx.Aeron(); @@ -86,6 +90,19 @@ public void OnStart() public void OnClose() { + if (isServiceActive) + { + isServiceActive = false; + try + { + service.OnTerminate(this); + } + catch (Exception ex) + { + ctx.CountedErrorHandler().OnError(ex); + } + } + if (!ctx.OwnsAeronClient()) { foreach (ClientSession session in sessionByIdMap.Values) @@ -97,6 +114,8 @@ public void OnClose() _consensusModuleProxy?.Dispose(); _serviceAdapter?.Dispose(); } + + ctx.Dispose(); } public int DoWork() @@ -147,6 +166,8 @@ private set public Aeron.Aeron Aeron => aeron; + public ClusteredServiceContainer.Context Context => ctx; + public ClientSession GetClientSession(long clusterSessionId) { return sessionByIdMap[clusterSessionId]; @@ -226,6 +247,30 @@ public long Offer( return publication.Offer(headerBuffer, 0, headerBuffer.Capacity, buffer, offset, length, null); } + + public long Offer(long clusterSessionId, Publication publication, DirectBufferVector[] vectors) + { + if (role != ClusterRole.Leader) + { + return ClientSession.MOCKED_OFFER; + } + + if (null == publication) + { + return Publication.NOT_CONNECTED; + } + + _egressMessageHeaderEncoder + .ClusterSessionId(clusterSessionId) + .Timestamp(clusterTimeMs); + + if (vectors[0] != headerVector) + { + vectors[0] = headerVector; + } + + return publication.Offer(vectors); + } public void OnJoinLog( long leadershipTermId, @@ -333,8 +378,7 @@ internal void OnMembershipChange( if (memberId == this.memberId && changeType == ChangeType.QUIT) { - _consensusModuleProxy.Ack(logPosition, ackId++, serviceId); - ctx.TerminationHook().Invoke(); + Terminate(logPosition); } } @@ -717,10 +761,27 @@ private void CheckForTermination() { if (null != logAdapter && logAdapter.Position() >= terminationPosition) { - _consensusModuleProxy.Ack(terminationPosition, ackId++, serviceId); + var logPosition = terminationPosition; terminationPosition = NULL_POSITION; - ctx.TerminationHook().Invoke(); + Terminate(logPosition); + } + } + + private void Terminate(long logPosition) + { + isServiceActive = false; + + try + { + service.OnTerminate(this); + } + catch (Exception ex) + { + ctx.CountedErrorHandler().OnError(ex); } + + _consensusModuleProxy.Ack(logPosition, ackId++, serviceId); + ctx.TerminationHook().Invoke(); } } } \ No newline at end of file diff --git a/src/Adaptive.Cluster/Service/ClusteredServiceContainer.cs b/src/Adaptive.Cluster/Service/ClusteredServiceContainer.cs index 04cf8904..34310ccf 100644 --- a/src/Adaptive.Cluster/Service/ClusteredServiceContainer.cs +++ b/src/Adaptive.Cluster/Service/ClusteredServiceContainer.cs @@ -54,6 +54,7 @@ private ClusteredServiceContainer(Context ctx) ctx.MarkFile().SignalFailedStart(); } + ctx.Dispose(); throw; } @@ -98,7 +99,6 @@ public Context Ctx() public void Dispose() { serviceAgentRunner?.Dispose(); - ctx?.Dispose(); } /// @@ -374,7 +374,11 @@ public static bool IsRespondingService() } } - public class Context : IDisposable + /// + /// The context will be owned by after a successful + /// and closed via . + /// + public class Context { private int serviceId = Configuration.ServiceId(); private string serviceName = Configuration.ServiceName(); @@ -1140,12 +1144,12 @@ public void DeleteDirectory() /// public void Dispose() { - CloseHelper.QuietDispose(markFile); - if (ownsAeronClient) { aeron?.Dispose(); } + + CloseHelper.QuietDispose(markFile); } private void ConcludeMarkFile() diff --git a/src/Adaptive.Cluster/Service/ICluster.cs b/src/Adaptive.Cluster/Service/ICluster.cs index 62f6fa30..2d27a618 100644 --- a/src/Adaptive.Cluster/Service/ICluster.cs +++ b/src/Adaptive.Cluster/Service/ICluster.cs @@ -26,6 +26,12 @@ public interface ICluster /// the client used by the cluster. Aeron.Aeron Aeron { get; } + /// + /// Get the under which the container is running. + /// + /// the under which the container is running. + ClusteredServiceContainer.Context Context { get; } + /// /// Get the for a given cluster session id. /// diff --git a/src/Adaptive.Cluster/Service/IClusteredService.cs b/src/Adaptive.Cluster/Service/IClusteredService.cs index 1bbc7701..a8cef160 100644 --- a/src/Adaptive.Cluster/Service/IClusteredService.cs +++ b/src/Adaptive.Cluster/Service/IClusteredService.cs @@ -87,5 +87,11 @@ void OnSessionMessage( /// /// that the node has assumed. void OnRoleChange(ClusterRole newRole); + + /// + /// Called when the container is going to terminate. + /// + /// with which the service can interact. + void OnTerminate(ICluster cluster); } } \ No newline at end of file diff --git a/src/Adaptive.Cluster/aeron-cluster-codecs.xml b/src/Adaptive.Cluster/aeron-cluster-codecs.xml index b621ce97..67c02bf2 100644 --- a/src/Adaptive.Cluster/aeron-cluster-codecs.xml +++ b/src/Adaptive.Cluster/aeron-cluster-codecs.xml @@ -40,7 +40,7 @@ 1 2 - + 0 1 2 @@ -260,7 +260,7 @@ - + diff --git a/src/Samples/Adaptive.Aeron.Samples.ClusterService/EchoService.cs b/src/Samples/Adaptive.Aeron.Samples.ClusterService/EchoService.cs index f35cc0e9..b8a58a51 100644 --- a/src/Samples/Adaptive.Aeron.Samples.ClusterService/EchoService.cs +++ b/src/Samples/Adaptive.Aeron.Samples.ClusterService/EchoService.cs @@ -57,5 +57,10 @@ public void OnRoleChange(ClusterRole newRole) { Console.WriteLine($"OnRoleChange: newRole={newRole}"); } + + public void OnTerminate(ICluster cluster) + { + Console.WriteLine("OnTerminate"); + } } } \ No newline at end of file