From 634d400109c7178e88066a1ab72ce1a1ec18af56 Mon Sep 17 00:00:00 2001 From: James Watson Date: Tue, 30 Oct 2018 23:29:21 +0000 Subject: [PATCH] Make cluster API more idomatic --- src/Adaptive.Cluster/Client/AeronCluster.cs | 33 ++----- src/Adaptive.Cluster/Service/ClientSession.cs | 44 +++------ .../Service/ClusteredServiceAgent.cs | 96 ++++++++----------- src/Adaptive.Cluster/Service/ICluster.cs | 10 +- .../Service/ServiceSnapshotTaker.cs | 8 +- .../EchoService.cs | 6 +- 6 files changed, 73 insertions(+), 124 deletions(-) diff --git a/src/Adaptive.Cluster/Client/AeronCluster.cs b/src/Adaptive.Cluster/Client/AeronCluster.cs index 443af224..3faf36b4 100644 --- a/src/Adaptive.Cluster/Client/AeronCluster.cs +++ b/src/Adaptive.Cluster/Client/AeronCluster.cs @@ -225,37 +225,26 @@ public void Dispose() /// Get the context used to launch this cluster client. /// /// the context used to launch this cluster client. - public Context Ctx() - { - return _ctx; - } + public Context Ctx => _ctx; /// /// Cluster session id for the session that was opened as the result of a successful connect. /// /// session id for the session that was opened as the result of a successful connect. - public long ClusterSessionId() - { - return _clusterSessionId; - } + public long ClusterSessionId => _clusterSessionId; /// /// Leadership term identity for the cluster. Advances with changing leadership. /// /// leadership term identity for the cluster. - public long LeadershipTermId() - { - return _leadershipTermId; - } + public long LeadershipTermId => _leadershipTermId; + /// /// Get the current leader member id for the cluster. /// /// the current leader member id for the cluster. - public int LeaderMemberId() - { - return _leaderMemberId; - } + public int LeaderMemberId => _leaderMemberId; /// /// Get the raw for sending to the cluster. @@ -267,10 +256,7 @@ public int LeaderMemberId() /// /// /// the raw for connecting to the cluster. - public Publication IngressPublication() - { - return _publication; - } + public Publication IngressPublication => _publication; /// /// Get the raw for receiving from the cluster. @@ -279,10 +265,7 @@ public Publication IngressPublication() /// /// /// the raw for receiving from the cluster. - public Subscription EgressSubscription() - { - return _subscription; - } + public Subscription EgressSubscription => _subscription; /// /// Non-blocking publish of a partial buffer containing a message plus session header to a cluster. @@ -685,7 +668,7 @@ private void CheckDeadline(long deadlineNs, String errorMessage) throw new TimeoutException(errorMessage); } } - + private static void CheckResult(long result) { if (result == Publication.NOT_CONNECTED || result == Publication.CLOSED || diff --git a/src/Adaptive.Cluster/Service/ClientSession.cs b/src/Adaptive.Cluster/Service/ClientSession.cs index 29526ea0..6ff19177 100644 --- a/src/Adaptive.Cluster/Service/ClientSession.cs +++ b/src/Adaptive.Cluster/Service/ClientSession.cs @@ -13,14 +13,8 @@ public class ClientSession /// public const long MOCKED_OFFER = 1; - private readonly long _id; - private readonly int _responseStreamId; - private readonly string _responseChannel; - private readonly byte[] _encodedPrincipal; - private readonly ClusteredServiceAgent _cluster; private Publication _responsePublication; - private bool _isClosing; internal ClientSession( long sessionId, @@ -29,10 +23,10 @@ internal ClientSession( byte[] encodedPrincipal, ClusteredServiceAgent cluster) { - _id = sessionId; - _responseStreamId = responseStreamId; - _responseChannel = responseChannel; - _encodedPrincipal = encodedPrincipal; + Id = sessionId; + ResponseStreamId = responseStreamId; + ResponseChannel = responseChannel; + EncodedPrincipal = encodedPrincipal; _cluster = cluster; } @@ -40,43 +34,31 @@ internal ClientSession( /// Cluster session identity uniquely allocated when the session was opened. /// /// the cluster session identity uniquely allocated when the session was opened. - public long Id() - { - return _id; - } + public long Id { get; } /// /// The response channel stream id for responding to the client. /// /// response channel stream id for responding to the client. - public int ResponseStreamId() - { - return _responseStreamId; - } + public int ResponseStreamId { get; } /// /// The response channel for responding to the client. /// /// response channel for responding to the client. - public string ResponseChannel() - { - return _responseChannel; - } + public string ResponseChannel { get; } /// /// Cluster session encoded principal from when the session was authenticated. /// /// The encoded Principal passed. May be 0 length to indicate none present. - public byte[] EncodedPrincipal() - { - return _encodedPrincipal; - } + public byte[] EncodedPrincipal { get; } /// /// Indicates that a request to close this session has been made. /// /// whether a request to close this session has been made. - public bool IsClosing => _isClosing; + public bool IsClosing { get; private set; } /// /// Non-blocking publish of a partial buffer containing a message to a cluster. @@ -88,25 +70,25 @@ public byte[] EncodedPrincipal() /// otherwise . public long Offer(IDirectBuffer buffer, int offset, int length) { - return _cluster.Offer(_id, _responsePublication, buffer, offset, length); + return _cluster.Offer(Id, _responsePublication, buffer, offset, length); } internal void Connect(Aeron.Aeron aeron) { if (null == _responsePublication) { - _responsePublication = aeron.AddExclusivePublication(_responseChannel, _responseStreamId); + _responsePublication = aeron.AddExclusivePublication(ResponseChannel, ResponseStreamId); } } internal void MarkClosing() { - _isClosing = true; + IsClosing = true; } internal void ResetClosing() { - _isClosing = false; + IsClosing = false; } internal void Disconnect() diff --git a/src/Adaptive.Cluster/Service/ClusteredServiceAgent.cs b/src/Adaptive.Cluster/Service/ClusteredServiceAgent.cs index 2c1fb474..0cc15c63 100644 --- a/src/Adaptive.Cluster/Service/ClusteredServiceAgent.cs +++ b/src/Adaptive.Cluster/Service/ClusteredServiceAgent.cs @@ -17,9 +17,9 @@ namespace Adaptive.Cluster.Service { internal sealed class ClusteredServiceAgent : IAgent, ICluster { - public static readonly int SESSION_HEADER_LENGTH = + public static readonly int SESSION_HEADER_LENGTH = MessageHeaderEncoder.ENCODED_LENGTH + SessionHeaderEncoder.BLOCK_LENGTH; - + private readonly int serviceId; private bool isRecovering; private readonly AeronArchive.Context archiveCtx; @@ -64,7 +64,7 @@ internal ClusteredServiceAgent(ClusteredServiceContainer.Context ctx) var channel = ctx.ServiceControlChannel(); _consensusModuleProxy = new ConsensusModuleProxy(aeron.AddPublication(channel, ctx.ConsensusModuleStreamId())); _serviceAdapter = new ServiceAdapter(aeron.AddSubscription(channel, ctx.ServiceStreamId()), this); - + UnsafeBuffer headerBuffer = new UnsafeBuffer(new byte[SESSION_HEADER_LENGTH]); _egressMessageHeaderEncoder.WrapAndApplyHeader(headerBuffer, 0, new MessageHeaderEncoder()); @@ -80,9 +80,9 @@ public void OnStart() commitPosition = AwaitCommitPositionCounter(counters); service.OnStart(this); - + isRecovering = true; - + int recoveryCounterId = AwaitRecoveryCounter(counters); heartbeatCounter.SetOrdered(epochClock.Time()); CheckForSnapshot(counters, recoveryCounterId); @@ -115,7 +115,7 @@ public int DoWork() PollServiceAdapter(); workCount += 1; } - + if (null != logAdapter) { int polled = logAdapter.Poll(); @@ -135,35 +135,31 @@ public int DoWork() return workCount; } - public string RoleName() - { - return ctx.ServiceName(); - } + public string RoleName() => ctx.ServiceName(); - public ClusterRole Role() + public ClusterRole Role { - return role; + get => role; + private set + { + if (value != role) + { + role = value; + service.OnRoleChange(value); + } + } } - public int MemberId() - { - return memberId; - } + public int MemberId => memberId; - public Aeron.Aeron Aeron() - { - return aeron; - } + public Aeron.Aeron Aeron => aeron; public ClientSession GetClientSession(long clusterSessionId) { return sessionByIdMap[clusterSessionId]; } - public ICollection GetClientSessions() - { - return sessionByIdMap.Values; - } + public ICollection ClientSessions => sessionByIdMap.Values; public bool CloseSession(long clusterSessionId) { @@ -188,10 +184,7 @@ public bool CloseSession(long clusterSessionId) return false; } - public long TimeMs() - { - return clusterTimeMs; - } + public long TimeMs => clusterTimeMs; public bool ScheduleTimer(long correlationId, long deadlineMs) { @@ -216,7 +209,7 @@ public void Idle(int workCount) CheckForClockTick(); idleStrategy.Idle(workCount); } - + public long Offer( long clusterSessionId, Publication publication, @@ -253,22 +246,22 @@ public void OnJoinLog( logAdapter.Dispose(); logAdapter = null; } - + activeLogEvent = new ActiveLogEvent( leadershipTermId, logPosition, maxLogPosition, memberId, logSessionId, logStreamId, logChannel); } internal void OnSessionMessage( - long clusterSessionId, - long timestampMs, - IDirectBuffer buffer, - int offset, - int length, + long clusterSessionId, + long timestampMs, + IDirectBuffer buffer, + int offset, + int length, Header header) { clusterTimeMs = timestampMs; var clientSession = sessionByIdMap[clusterSessionId]; - + service.OnSessionMessage(clientSession, timestampMs, buffer, offset, length, header); } @@ -325,7 +318,7 @@ internal void OnNewLeadershipTermEvent( _egressMessageHeaderEncoder.LeadershipTermId(leadershipTermId); clusterTimeMs = timestampMs; } - + internal void OnClusterChange( long leadershipTermId, long logPosition, @@ -357,15 +350,6 @@ internal void AddSession( sessionByIdMap[clusterSessionId] = session; } - private void Role(ClusterRole newRole) - { - if (newRole != role) - { - role = newRole; - service.OnRoleChange(newRole); - } - } - private void CheckForSnapshot(CountersReader counters, int recoveryCounterId) { clusterTimeMs = RecoveryState.GetTimestamp(counters, recoveryCounterId); @@ -445,7 +429,7 @@ private int AwaitRecoveryCounter(CountersReader counters) { CheckInterruptedStatus(); idleStrategy.Idle(); - + heartbeatCounter.SetOrdered(epochClock.Time()); counterId = RecoveryState.FindCounterId(counters); } @@ -468,7 +452,7 @@ private void JoinActiveLog() activeLogEvent = null; logAdapter = new BoundedLogAdapter(image, commitPosition, this); - Role((ClusterRole) roleCounter.Get()); + Role = (ClusterRole) roleCounter.Get(); foreach (ClientSession session in sessionByIdMap.Values) { @@ -476,9 +460,9 @@ private void JoinActiveLog() { if (ctx.IsRespondingService()) { - session.Connect(aeron); + session.Connect(aeron); } - + session.ResetClosing(); } else @@ -514,7 +498,7 @@ private ReadableCounter AwaitClusterRoleCounter(CountersReader counters) return new ReadableCounter(counters, counterId); } - + private ReadableCounter AwaitCommitPositionCounter(CountersReader counters) { idleStrategy.Reset(); @@ -529,7 +513,7 @@ private ReadableCounter AwaitCommitPositionCounter(CountersReader counters) return new ReadableCounter(counters, counterId); } - + private AtomicCounter AwaitHeartbeatCounter(CountersReader counters) { idleStrategy.Reset(); @@ -591,12 +575,12 @@ private long OnTakeSnapshot(long logPosition, long leadershipTermId) { long recordingId; - using (AeronArchive archive = AeronArchive.Connect(archiveCtx)) - using(Publication publication = aeron.AddExclusivePublication(ctx.SnapshotChannel(), ctx.SnapshotStreamId())) + using (AeronArchive archive = AeronArchive.Connect(archiveCtx)) + using (Publication publication = aeron.AddExclusivePublication(ctx.SnapshotChannel(), ctx.SnapshotStreamId())) { var channel = ChannelUri.AddSessionId(ctx.SnapshotChannel(), publication.SessionId); long subscriptionId = archive.StartRecording(channel, ctx.SnapshotStreamId(), SourceLocation.LOCAL); - + try { CountersReader counters = aeron.CountersReader; @@ -708,7 +692,7 @@ private static void CheckInterruptedStatus() private bool CheckForClockTick() { long nowMs = epochClock.Time(); - + if (cachedTimeMs != nowMs) { cachedTimeMs = nowMs; diff --git a/src/Adaptive.Cluster/Service/ICluster.cs b/src/Adaptive.Cluster/Service/ICluster.cs index f6b2bca9..62f6fa30 100644 --- a/src/Adaptive.Cluster/Service/ICluster.cs +++ b/src/Adaptive.Cluster/Service/ICluster.cs @@ -12,19 +12,19 @@ public interface ICluster /// The unique id for the hosting member of the cluster. Useful only for debugging purposes. /// /// unique id for the hosting member of the cluster. - int MemberId(); + int MemberId { get; } /// /// The role the cluster node is playing. /// /// the role the cluster node is playing. - ClusterRole Role(); + ClusterRole Role { get; } /// /// Get the client used by the cluster. /// /// the client used by the cluster. - Aeron.Aeron Aeron(); + Aeron.Aeron Aeron { get; } /// /// Get the for a given cluster session id. @@ -37,7 +37,7 @@ public interface ICluster /// Get all s. /// /// the s. - ICollection GetClientSessions(); + ICollection ClientSessions { get; } /// /// Request the close of a by sending the request to the consensus module. @@ -51,7 +51,7 @@ public interface ICluster /// Current Epoch time in milliseconds. /// /// Epoch time in milliseconds. - long TimeMs(); + long TimeMs { get; } /// /// Schedule a timer for a given deadline and provide a correlation id to identify the timer when it expires or diff --git a/src/Adaptive.Cluster/Service/ServiceSnapshotTaker.cs b/src/Adaptive.Cluster/Service/ServiceSnapshotTaker.cs index 3329d870..feca6e39 100644 --- a/src/Adaptive.Cluster/Service/ServiceSnapshotTaker.cs +++ b/src/Adaptive.Cluster/Service/ServiceSnapshotTaker.cs @@ -15,8 +15,8 @@ internal ServiceSnapshotTaker(Publication publication, IIdleStrategy idleStrateg public void SnapshotSession(ClientSession session) { - string responseChannel = session.ResponseChannel(); - byte[] encodedPrincipal = session.EncodedPrincipal(); + string responseChannel = session.ResponseChannel; + byte[] encodedPrincipal = session.EncodedPrincipal; int length = MessageHeaderEncoder.ENCODED_LENGTH + ClientSessionEncoder.BLOCK_LENGTH + ClientSessionEncoder.ResponseChannelHeaderLength() + responseChannel.Length + ClientSessionEncoder.EncodedPrincipalHeaderLength() + encodedPrincipal.Length; @@ -28,8 +28,8 @@ public void SnapshotSession(ClientSession session) if (result > 0) { _clientSessionEncoder.WrapAndApplyHeader(bufferClaim.Buffer, bufferClaim.Offset, messageHeaderEncoder) - .ClusterSessionId(session.Id()) - .ResponseStreamId(session.ResponseStreamId()) + .ClusterSessionId(session.Id) + .ResponseStreamId(session.ResponseStreamId) .ResponseChannel(responseChannel) .PutEncodedPrincipal(encodedPrincipal, 0, encodedPrincipal.Length); diff --git a/src/Samples/Adaptive.Aeron.Samples.ClusterService/EchoService.cs b/src/Samples/Adaptive.Aeron.Samples.ClusterService/EchoService.cs index 6bae3895..f35cc0e9 100644 --- a/src/Samples/Adaptive.Aeron.Samples.ClusterService/EchoService.cs +++ b/src/Samples/Adaptive.Aeron.Samples.ClusterService/EchoService.cs @@ -18,17 +18,17 @@ public void OnStart(ICluster cluster) public void OnSessionOpen(ClientSession session, long timestampMs) { - Console.WriteLine($"OnSessionOpen: sessionId={session.Id()}, timestamp={timestampMs}"); + Console.WriteLine($"OnSessionOpen: sessionId={session.Id}, timestamp={timestampMs}"); } public void OnSessionClose(ClientSession session, long timestampMs, CloseReason closeReason) { - Console.WriteLine($"OnSessionClose: sessionId={session.Id()}, timestamp={timestampMs}"); + Console.WriteLine($"OnSessionClose: sessionId={session.Id}, timestamp={timestampMs}"); } public void OnSessionMessage(ClientSession session, long timestampMs, IDirectBuffer buffer, int offset, int length, Header header) { - Console.WriteLine($"OnSessionMessage: sessionId={session.Id()}, timestamp={timestampMs}, length={length}"); + Console.WriteLine($"OnSessionMessage: sessionId={session.Id}, timestamp={timestampMs}, length={length}"); Console.WriteLine("Received Message: " + buffer.GetStringWithoutLengthUtf8(offset, length));