diff --git a/driver/Aeron.Driver.nuspec b/driver/Aeron.Driver.nuspec index 6a11e316..3245d2ac 100644 --- a/driver/Aeron.Driver.nuspec +++ b/driver/Aeron.Driver.nuspec @@ -2,7 +2,7 @@ Aeron.Driver - 1.9.1 + 1.9.2 Aeron Driver Adaptive Financial Consulting Ltd. Adaptive Financial Consulting Ltd. diff --git a/driver/media-driver.jar b/driver/media-driver.jar index 7939f0b5..76d88595 100644 Binary files a/driver/media-driver.jar and b/driver/media-driver.jar differ diff --git a/driver/version.txt b/driver/version.txt index c6039c1d..5d1580a5 100644 --- a/driver/version.txt +++ b/driver/version.txt @@ -1,2 +1,2 @@ Driver source: -http://repo1.maven.org/maven2/io/aeron/aeron-all/1.9.1/aeron-all-1.9.1.jar +http://repo1.maven.org/maven2/io/aeron/aeron-all/1.9.2/aeron-all-1.9.2.jar diff --git a/scripts/build-release-nuget-packages.bat b/scripts/build-release-nuget-packages.bat index bcd08768..98acf799 100644 --- a/scripts/build-release-nuget-packages.bat +++ b/scripts/build-release-nuget-packages.bat @@ -1,11 +1,13 @@ @echo off pushd %~dp0.. SET nuget_source=https://api.nuget.org/v3/index.json +SET myget_source=https://www.myget.org/F/aeron/api/v2/package del nupkgs\*.nupkg call dotnet pack src\Adaptive.Aeron\Adaptive.Aeron.csproj -c Release --output ..\..\nupkgs call dotnet pack src\Adaptive.Agrona\Adaptive.Agrona.csproj -c Release --output ..\..\nupkgs +call dotnet pack src\Adaptive.Cluster\Adaptive.Cluster.csproj -c Release --output ..\..\nupkgs call dotnet pack src\Adaptive.Archiver\Adaptive.Archiver.csproj -c Release --output ..\..\nupkgs call .\scripts\nuget pack .\driver\Aeron.Driver.nuspec -OutputDirectory nupkgs @@ -13,5 +15,6 @@ call dotnet nuget push nupkgs\Agrona.*.nupkg -s %nuget_source% call dotnet nuget push nupkgs\Aeron.Client.*.nupkg -s %nuget_source% call dotnet nuget push nupkgs\Aeron.Archiver.*.nupkg -s %nuget_source% call dotnet nuget push nupkgs\Aeron.Driver.*.nupkg -s %nuget_source% +call dotnet nuget push nupkgs\Aeron.Cluster.*.nupkg -s %myget_source% popd \ No newline at end of file diff --git a/src/Adaptive.Aeron/Aeron.cs b/src/Adaptive.Aeron/Aeron.cs index 5f666d07..58dde5e0 100644 --- a/src/Adaptive.Aeron/Aeron.cs +++ b/src/Adaptive.Aeron/Aeron.cs @@ -1147,29 +1147,32 @@ public void Dispose() private void ConnectToDriver() { long startTimeMs = _epochClock.Time(); + long deadLineMs = startTimeMs + DriverTimeoutMs(); FileInfo cncFile = CncFile(); - while (true) + while (null == _toDriverBuffer) { + cncFile.Refresh(); + while (!cncFile.Exists || cncFile.Length <= 0) { - if (_epochClock.Time() > (startTimeMs + _driverTimeoutMs)) + if (_epochClock.Time() > deadLineMs) { throw new DriverTimeoutException("CnC file not created: " + cncFile.FullName); } - Sleep(16); + Sleep(IdleSleepMs); cncFile.Refresh(); } - _cncByteBuffer = IoUtil.MapExistingFile(CncFile(), CncFileDescriptor.CNC_FILE); + _cncByteBuffer = WaitForFileMapping(cncFile, deadLineMs, _epochClock); _cncMetaDataBuffer = CncFileDescriptor.CreateMetaDataBuffer(_cncByteBuffer); int cncVersion; while (0 == (cncVersion = _cncMetaDataBuffer.GetIntVolatile(CncFileDescriptor.CncVersionOffset(0)))) { - if (_epochClock.Time() > (startTimeMs + DriverTimeoutMs())) + if (_epochClock.Time() > deadLineMs) { throw new DriverTimeoutException("CnC file is created but not initialised."); } @@ -1188,7 +1191,7 @@ private void ConnectToDriver() while (0 == ringBuffer.ConsumerHeartbeatTime()) { - if (_epochClock.Time() > (startTimeMs + DriverTimeoutMs())) + if (_epochClock.Time() > deadLineMs) { throw new DriverTimeoutException("No driver heartbeat detected."); } @@ -1199,7 +1202,7 @@ private void ConnectToDriver() long timeMs = _epochClock.Time(); if (ringBuffer.ConsumerHeartbeatTime() < (timeMs - DriverTimeoutMs())) { - if (timeMs > (startTimeMs + DriverTimeoutMs())) + if (timeMs > deadLineMs) { throw new DriverTimeoutException("No driver heartbeat detected."); } @@ -1212,12 +1215,34 @@ private void ConnectToDriver() continue; } - if (null == _toDriverBuffer) + _toDriverBuffer = ringBuffer; + } + } + + private static MappedByteBuffer WaitForFileMapping(FileInfo cncFile, long deadLineMs, IEpochClock epochClock) + { + try + { + var fileAccess = FileAccess.ReadWrite; + var fileShare = FileShare.ReadWrite | FileShare.Delete; + + var fileStream = cncFile.Open(FileMode.Open, fileAccess, fileShare); + + while (fileStream.Length < CncFileDescriptor.CNC_VERSION_FIELD_OFFSET + BitUtil.SIZE_OF_INT) { - _toDriverBuffer = ringBuffer; + if (epochClock.Time() > deadLineMs) + { + throw new InvalidOperationException("CnC file is created but not populated."); + } + + Sleep(IdleSleepMs); } - break; + return IoUtil.MapExistingFile(fileStream); + } + catch (Exception ex) + { + throw new InvalidOperationException("cannot open CnC file", ex); } } @@ -1235,7 +1260,7 @@ public MappedByteBuffer MapExistingCncFile(Action logProgress) { FileInfo cncFile = new FileInfo(Path.Combine(_aeronDirectory.FullName, CncFileDescriptor.CNC_FILE)); - if (cncFile.Exists) + if (cncFile.Exists && cncFile.Length > 0) { if (null != logProgress) { @@ -1259,7 +1284,7 @@ public static bool IsDriverActive(DirectoryInfo directory, long driverTimeoutMs, { FileInfo cncFile = new FileInfo(Path.Combine(directory.FullName, CncFileDescriptor.CNC_FILE)); - if (cncFile.Exists) + if (cncFile.Exists && cncFile.Length > 0) { logger("INFO: Aeron CnC file " + cncFile + " exists"); @@ -1424,4 +1449,4 @@ internal static void Sleep(int durationMs) } } } -} +} \ No newline at end of file diff --git a/src/Adaptive.Aeron/Command/ControlProtocolEvents.cs b/src/Adaptive.Aeron/Command/ControlProtocolEvents.cs index ac51e5af..abf5c617 100644 --- a/src/Adaptive.Aeron/Command/ControlProtocolEvents.cs +++ b/src/Adaptive.Aeron/Command/ControlProtocolEvents.cs @@ -77,6 +77,16 @@ public class ControlProtocolEvents /// Close indication from Client. /// public const int CLIENT_CLOSE = 0x0B; + + /// + /// Add Destination for existing Subscription. + /// + public const int ADD_RCV_DESTINATION = 0x0C; + + /// + /// Remove Destination for existing Subscription. + /// + public const int REMOVE_RCV_DESTINATION = 0x0D; // Media Driver to Clients diff --git a/src/Adaptive.Agrona/IoUtil.cs b/src/Adaptive.Agrona/IoUtil.cs index 37b35de3..175430e2 100644 --- a/src/Adaptive.Agrona/IoUtil.cs +++ b/src/Adaptive.Agrona/IoUtil.cs @@ -15,7 +15,6 @@ */ using System; -using System.ComponentModel; using System.IO; using System.IO.MemoryMappedFiles; using Adaptive.Agrona.Util; @@ -78,7 +77,7 @@ public static MappedByteBuffer MapNewFile(FileInfo cncFile, long length, bool fi } - public static MappedByteBuffer MapNewOrExixtingFile(FileInfo cncFile, long length) + public static MappedByteBuffer MapNewOrExistingFile(FileInfo cncFile, long length) { var fileAccess = FileAccess.ReadWrite; var fileShare = FileShare.ReadWrite | FileShare.Delete; @@ -111,11 +110,15 @@ public static MemoryMappedFile OpenMemoryMappedFile(string path) var fileAccess = FileAccess.ReadWrite; var fileShare = FileShare.ReadWrite | FileShare.Delete; - var memoryMappedFileAccess = MemoryMappedFileAccess.ReadWrite; - var f = new FileStream(path, FileMode.Open, fileAccess, fileShare); + return OpenMemoryMappedFile(f); + } + + private static MemoryMappedFile OpenMemoryMappedFile(FileStream f) + { + var memoryMappedFileAccess = MemoryMappedFileAccess.ReadWrite; #if NETFULL return MemoryMappedFile.CreateFromFile(f, null, 0, memoryMappedFileAccess, new MemoryMappedFileSecurity(), HandleInheritability.None, false); #else @@ -124,6 +127,21 @@ public static MemoryMappedFile OpenMemoryMappedFile(string path) #endif } + /// + /// Return MappedByteBuffer for entire file + /// + /// The file itself will be closed, but the mapping will persist. + /// + /// + /// + /// of the file to map + /// for the file + public static MappedByteBuffer MapExistingFile(FileStream fileStream) + { + return new MappedByteBuffer(OpenMemoryMappedFile(fileStream)); + } + + /// /// Check that file exists, open file, and return MappedByteBuffer for entire file /// diff --git a/src/Adaptive.Agrona/MarkFile.cs b/src/Adaptive.Agrona/MarkFile.cs index 28310c2c..986eb964 100644 --- a/src/Adaptive.Agrona/MarkFile.cs +++ b/src/Adaptive.Agrona/MarkFile.cs @@ -322,7 +322,7 @@ public static MappedByteBuffer MapNewOrExistingCncFile(FileInfo cncFile, bool sh try { - cncByteBuffer = IoUtil.MapNewOrExixtingFile(cncFile, totalFileLength); + cncByteBuffer = IoUtil.MapNewOrExistingFile(cncFile, totalFileLength); UnsafeBuffer cncBuffer = new UnsafeBuffer(cncByteBuffer); diff --git a/src/Adaptive.Archiver/AeronArchive.cs b/src/Adaptive.Archiver/AeronArchive.cs index 7855f533..835b7401 100644 --- a/src/Adaptive.Archiver/AeronArchive.cs +++ b/src/Adaptive.Archiver/AeronArchive.cs @@ -29,6 +29,11 @@ public class AeronArchive : IDisposable /// Represents a position that has not been set. Can be used when the position is not known. /// public const long NULL_POSITION = -1L; + + /// + /// Represents a length that has not been set. If null length is provided then replay the whole recorded stream. + /// + public const long NULL_LENGTH = -1L; private const int FRAGMENT_LIMIT = 10; @@ -489,7 +494,7 @@ public void StopRecording(Publication publication) /// /// to be replayed. /// from which the replay should begin or if from the start. - /// of the stream to be replayed. Use to follow a live recording. + /// of the stream to be replayed. Use to follow a live recording or to replay the whole stream of unknown length. /// to which the replay should be sent. /// to which the replay should be sent. /// the id of the replay session which will be the same as the of the received diff --git a/src/Adaptive.Cluster/Service/BoundedLogAdapter.cs b/src/Adaptive.Cluster/Service/BoundedLogAdapter.cs index ffb67d7e..0ef0846e 100644 --- a/src/Adaptive.Cluster/Service/BoundedLogAdapter.cs +++ b/src/Adaptive.Cluster/Service/BoundedLogAdapter.cs @@ -45,11 +45,6 @@ public Image Image() return image; } - public int UpperBoundCounterId() - { - return upperBound.CounterId(); - } - public bool IsCaughtUp() { return image.Position() >= upperBound.Get(); diff --git a/src/Adaptive.Cluster/Service/ClientSession.cs b/src/Adaptive.Cluster/Service/ClientSession.cs index d6539f6e..f3eb2144 100644 --- a/src/Adaptive.Cluster/Service/ClientSession.cs +++ b/src/Adaptive.Cluster/Service/ClientSession.cs @@ -114,12 +114,10 @@ public long Offer(long correlationId, IDirectBuffer buffer, int offset, int leng internal void Connect(Aeron.Aeron aeron) { - if (null != _responsePublication) + if (null == _responsePublication) { - throw new InvalidOperationException("response publication already present"); + _responsePublication = aeron.AddPublication(_responseChannel, _responseStreamId); } - - _responsePublication = aeron.AddPublication(_responseChannel, _responseStreamId); } internal void MarkClosing() diff --git a/src/Adaptive.Cluster/Service/ClusteredServiceAgent.cs b/src/Adaptive.Cluster/Service/ClusteredServiceAgent.cs index db48ada8..429af28a 100644 --- a/src/Adaptive.Cluster/Service/ClusteredServiceAgent.cs +++ b/src/Adaptive.Cluster/Service/ClusteredServiceAgent.cs @@ -34,7 +34,7 @@ internal sealed class ClusteredServiceAgent : IAgent, ICluster, IServiceControlL private long leadershipTermId; private long timestampMs; private BoundedLogAdapter logAdapter; - private ActiveLog activeLog; + private NewActiveLogEvent newActiveLogEvent; private ReadableCounter roleCounter; private AtomicCounter heartbeatCounter; private ClusterRole role = ClusterRole.Follower; @@ -63,31 +63,17 @@ internal ClusteredServiceAgent(ClusteredServiceContainer.Context ctx) public void OnStart() { - service.OnStart(this); - CountersReader counters = aeron.CountersReader(); - int recoveryCounterId = AwaitRecoveryCounter(counters); + roleCounter = AwaitClusterRoleCounter(counters); FindHeartbeatCounter(counters); - + + service.OnStart(this); isRecovering = true; + int recoveryCounterId = AwaitRecoveryCounter(counters); CheckForSnapshot(counters, recoveryCounterId); CheckForReplay(counters, recoveryCounterId); isRecovering = false; - service.OnReady(); - - JoinActiveLog(counters); - - roleCounter = AwaitClusterRoleCounter(counters); - role = (ClusterRole) roleCounter.Get(); - - if (ClusterRole.Leader == role) - { - foreach (ClientSession session in sessionByIdMap.Values) - { - session.Connect(aeron); - } - } } public void OnClose() @@ -107,22 +93,24 @@ public void OnClose() public int DoWork() { + int workCount = 0; + long nowMs = epochClock.Time(); if (cachedEpochClock.Time() != nowMs) { - markFile.UpdateActivityTimestamp(nowMs); cachedEpochClock.Update(nowMs); + markFile.UpdateActivityTimestamp(nowMs); CheckHealthAndUpdateHeartbeat(nowMs); - } - - int workCount = logAdapter.Poll(); - workCount += serviceControlAdapter.Poll(); + workCount += serviceControlAdapter.Poll(); - if (activeLog != null) - { - SwitchActiveLog(); + if (newActiveLogEvent != null) + { + JoinActiveLog(); + } } + workCount += null != logAdapter ? logAdapter.Poll() : 0; + return workCount; } @@ -212,7 +200,7 @@ public void OnJoinLog( bool ackBeforeImage, string logChannel) { - activeLog = new ActiveLog( + newActiveLogEvent = new NewActiveLogEvent( leadershipTermId, commitPositionId, logSessionId, logStreamId, ackBeforeImage, logChannel); } @@ -281,7 +269,7 @@ internal void AddSession(long clusterSessionId, int responseStreamId, string res private void CheckHealthAndUpdateHeartbeat(long nowMs) { - if (null != logAdapter && !logAdapter.Image().Closed) + if (null == logAdapter || !logAdapter.Image().Closed) { heartbeatCounter.SetOrdered(nowMs); } @@ -332,17 +320,17 @@ private void CheckForReplay(CountersReader counters, int recoveryCounterId) for (int i = 0; i < replayTermCount; i++) { AwaitActiveLog(); - int counterId = activeLog.commitPositionId; + int counterId = newActiveLogEvent.commitPositionId; leadershipTermId = CommitPos.GetLeadershipTermId(counters, counterId); termBaseLogPosition = CommitPos.GetTermBaseLogPosition(counters, counterId); // TODO MARK if (CommitPos.GetLeadershipTermLength(counters, counterId) > 0) { - using (Subscription subscription = aeron.AddSubscription(activeLog.channel, activeLog.streamId)) + using (Subscription subscription = aeron.AddSubscription(newActiveLogEvent.channel, newActiveLogEvent.streamId)) { serviceControlPublisher.AckAction(termBaseLogPosition, leadershipTermId, serviceId, ClusterAction.READY); - Image image = AwaitImage(activeLog.sessionId, subscription); + Image image = AwaitImage(newActiveLogEvent.sessionId, subscription); ReadableCounter limit = new ReadableCounter(counters, counterId); BoundedLogAdapter adapter = new BoundedLogAdapter(image, limit, this); @@ -352,6 +340,7 @@ private void CheckForReplay(CountersReader counters, int recoveryCounterId) } } + newActiveLogEvent = null; serviceControlPublisher.AckAction(termBaseLogPosition, leadershipTermId, serviceId, ClusterAction.REPLAY); } @@ -360,18 +349,10 @@ private void CheckForReplay(CountersReader counters, int recoveryCounterId) private void AwaitActiveLog() { - activeLog = null; - idleStrategy.Reset(); - while (true) + while (null == newActiveLogEvent) { - int fragments = serviceControlAdapter.Poll(); - if (activeLog != null) - { - break; - } - CheckInterruptedStatus(); - idleStrategy.Idle(fragments); + idleStrategy.Idle(serviceControlAdapter.Poll()); } } @@ -399,28 +380,6 @@ private void ConsumeImage(Image image, BoundedLogAdapter adapter) } } - private void SwitchActiveLog() - { - if (logAdapter.IsCaughtUp()) - { - logAdapter.Dispose(); - - var counters = aeron.CountersReader(); - var counterId = activeLog.commitPositionId; - - leadershipTermId = activeLog.leadershipTermId; - termBaseLogPosition = CommitPos.GetTermBaseLogPosition(counters, counterId); - - Subscription subscription = aeron.AddSubscription(activeLog.channel, activeLog.streamId); - Image image = AwaitImage(activeLog.sessionId, subscription); - serviceControlPublisher.AckAction(termBaseLogPosition, leadershipTermId, serviceId, ClusterAction.READY); - - logAdapter = new BoundedLogAdapter(image, new ReadableCounter(counters, counterId), this); - activeLog = null; - Role((ClusterRole) roleCounter.Get()); - } - } - private int AwaitRecoveryCounter(CountersReader counters) { idleStrategy.Reset(); @@ -436,23 +395,35 @@ private int AwaitRecoveryCounter(CountersReader counters) return counterId; } - private void JoinActiveLog(CountersReader counters) + private void JoinActiveLog() { - AwaitActiveLog(); + if (null != logAdapter) + { + if (!logAdapter.IsCaughtUp()) + { + return; + } + + logAdapter.Dispose(); + logAdapter = null; + } + + + CountersReader counters = aeron.CountersReader(); - int commitPositionId = activeLog.commitPositionId; + int commitPositionId = newActiveLogEvent.commitPositionId; if (!CommitPos.IsActive(counters, commitPositionId)) { - throw new InvalidOperationException("CommitPos counter not active: " + commitPositionId); + throw new System.InvalidOperationException("CommitPos counter not active: " + commitPositionId); } - int logSessionId = activeLog.sessionId; - leadershipTermId = activeLog.leadershipTermId; + int logSessionId = newActiveLogEvent.sessionId; + leadershipTermId = newActiveLogEvent.leadershipTermId; termBaseLogPosition = CommitPos.GetTermBaseLogPosition(counters, commitPositionId); - Subscription logSubscription = aeron.AddSubscription(activeLog.channel, activeLog.streamId); + Subscription logSubscription = aeron.AddSubscription(newActiveLogEvent.channel, newActiveLogEvent.streamId); - if (activeLog.ackBeforeImage) + if (newActiveLogEvent.ackBeforeImage) { serviceControlPublisher.AckAction(termBaseLogPosition, leadershipTermId, serviceId, ClusterAction.READY); } @@ -460,13 +431,27 @@ private void JoinActiveLog(CountersReader counters) Image image = AwaitImage(logSessionId, logSubscription); heartbeatCounter.SetOrdered(epochClock.Time()); - if (!activeLog.ackBeforeImage) + if (!newActiveLogEvent.ackBeforeImage) { serviceControlPublisher.AckAction(termBaseLogPosition, leadershipTermId, serviceId, ClusterAction.READY); } + newActiveLogEvent = null; logAdapter = new BoundedLogAdapter(image, new ReadableCounter(counters, commitPositionId), this); - activeLog = null; + + Role((ClusterRole) roleCounter.Get()); + + foreach (ClientSession session in sessionByIdMap.Values) + { + if (ClusterRole.Leader == role) + { + session.Connect(aeron); + } + else + { + session.Disconnect(); + } + } } private Image AwaitImage(int sessionId, Subscription subscription) @@ -500,17 +485,9 @@ private void LoadSnapshot(long recordingId) { using (AeronArchive archive = AeronArchive.Connect(archiveCtx)) { - RecordingExtent recordingExtent = new RecordingExtent(); - if (0 == archive.ListRecording(recordingId, recordingExtent)) - { - throw new InvalidOperationException("could not find recordingId: " + recordingId); - } - string channel = ctx.ReplayChannel(); int streamId = ctx.ReplayStreamId(); - - long length = recordingExtent.stopPosition - recordingExtent.startPosition; - int sessionId = (int) archive.StartReplay(recordingId, 0, length, channel, streamId); + int sessionId = (int) archive.StartReplay(recordingId, 0, AeronArchive.NULL_LENGTH, channel, streamId); string replaySessionChannel = ChannelUri.AddSessionId(channel, sessionId); using (Subscription subscription = aeron.AddSubscription(replaySessionChannel, streamId)) @@ -679,31 +656,5 @@ private static void CheckInterruptedStatus() throw new AgentTerminationException("unexpected interrupt during operation"); } } - - private class ActiveLog - { - internal readonly long leadershipTermId; - internal readonly int commitPositionId; - internal readonly int sessionId; - internal readonly int streamId; - internal readonly bool ackBeforeImage; - internal readonly string channel; - - internal ActiveLog( - long leadershipTermId, - int commitPositionId, - int sessionId, - int streamId, - bool ackBeforeImage, - string channel) - { - this.leadershipTermId = leadershipTermId; - this.commitPositionId = commitPositionId; - this.sessionId = sessionId; - this.streamId = streamId; - this.ackBeforeImage = ackBeforeImage; - this.channel = channel; - } - } } } \ No newline at end of file diff --git a/src/Adaptive.Cluster/Service/ClusteredServiceContainer.cs b/src/Adaptive.Cluster/Service/ClusteredServiceContainer.cs index 179f2a75..a7098f9e 100644 --- a/src/Adaptive.Cluster/Service/ClusteredServiceContainer.cs +++ b/src/Adaptive.Cluster/Service/ClusteredServiceContainer.cs @@ -181,7 +181,7 @@ public class Configuration /// /// Directory to use for the clustered service. /// - public const string CLUSTERED_SERVICE_DIR_PROP_NAME = "aeron.clustered.service.dir"; + public const string CLUSTERED_SERVICE_DIR_PROP_NAME = "aeron.cluster.service.dir"; /// /// Directory to use for the cluster container. diff --git a/src/Adaptive.Cluster/Service/NewActiveLogEvent.cs b/src/Adaptive.Cluster/Service/NewActiveLogEvent.cs new file mode 100644 index 00000000..faeeac79 --- /dev/null +++ b/src/Adaptive.Cluster/Service/NewActiveLogEvent.cs @@ -0,0 +1,40 @@ +namespace Adaptive.Cluster.Service +{ + internal class NewActiveLogEvent + { + public long leadershipTermId { get; } + public int commitPositionId { get; } + public int sessionId { get; } + public int streamId { get; } + public bool ackBeforeImage { get; } + public string channel { get; } + + internal NewActiveLogEvent( + long leadershipTermId, + int commitPositionId, + int sessionId, + int streamId, + bool ackBeforeImage, + string channel) + { + this.leadershipTermId = leadershipTermId; + this.commitPositionId = commitPositionId; + this.sessionId = sessionId; + this.streamId = streamId; + this.ackBeforeImage = ackBeforeImage; + this.channel = channel; + } + + public override string ToString() + { + return "NewActiveLogEvent{" + + "leadershipTermId=" + leadershipTermId + + ", commitPositionId=" + commitPositionId + + ", sessionId=" + sessionId + + ", streamId=" + streamId + + ", ackBeforeImage=" + ackBeforeImage + + ", channel='" + channel + "'" + + '}'; + } + } +} \ No newline at end of file