diff --git a/src/Adaptive.Cluster/Client/AeronCluster.cs b/src/Adaptive.Cluster/Client/AeronCluster.cs
index 443af224..3faf36b4 100644
--- a/src/Adaptive.Cluster/Client/AeronCluster.cs
+++ b/src/Adaptive.Cluster/Client/AeronCluster.cs
@@ -225,37 +225,26 @@ public void Dispose()
/// Get the context used to launch this cluster client.
///
/// the context used to launch this cluster client.
- public Context Ctx()
- {
- return _ctx;
- }
+ public Context Ctx => _ctx;
///
/// Cluster session id for the session that was opened as the result of a successful connect.
///
/// session id for the session that was opened as the result of a successful connect.
- public long ClusterSessionId()
- {
- return _clusterSessionId;
- }
+ public long ClusterSessionId => _clusterSessionId;
///
/// Leadership term identity for the cluster. Advances with changing leadership.
///
/// leadership term identity for the cluster.
- public long LeadershipTermId()
- {
- return _leadershipTermId;
- }
+ public long LeadershipTermId => _leadershipTermId;
+
///
/// Get the current leader member id for the cluster.
///
/// the current leader member id for the cluster.
- public int LeaderMemberId()
- {
- return _leaderMemberId;
- }
+ public int LeaderMemberId => _leaderMemberId;
///
/// Get the raw for sending to the cluster.
@@ -267,10 +256,7 @@ public int LeaderMemberId()
///
///
/// the raw for connecting to the cluster.
- public Publication IngressPublication()
- {
- return _publication;
- }
+ public Publication IngressPublication => _publication;
///
/// Get the raw for receiving from the cluster.
@@ -279,10 +265,7 @@ public Publication IngressPublication()
///
///
/// the raw for receiving from the cluster.
- public Subscription EgressSubscription()
- {
- return _subscription;
- }
+ public Subscription EgressSubscription => _subscription;
///
/// Non-blocking publish of a partial buffer containing a message plus session header to a cluster.
@@ -685,7 +668,7 @@ private void CheckDeadline(long deadlineNs, String errorMessage)
throw new TimeoutException(errorMessage);
}
}
-
+
private static void CheckResult(long result)
{
if (result == Publication.NOT_CONNECTED || result == Publication.CLOSED ||
diff --git a/src/Adaptive.Cluster/Service/ClientSession.cs b/src/Adaptive.Cluster/Service/ClientSession.cs
index 29526ea0..6ff19177 100644
--- a/src/Adaptive.Cluster/Service/ClientSession.cs
+++ b/src/Adaptive.Cluster/Service/ClientSession.cs
@@ -13,14 +13,8 @@ public class ClientSession
///
public const long MOCKED_OFFER = 1;
- private readonly long _id;
- private readonly int _responseStreamId;
- private readonly string _responseChannel;
- private readonly byte[] _encodedPrincipal;
-
private readonly ClusteredServiceAgent _cluster;
private Publication _responsePublication;
- private bool _isClosing;
internal ClientSession(
long sessionId,
@@ -29,10 +23,10 @@ internal ClientSession(
byte[] encodedPrincipal,
ClusteredServiceAgent cluster)
{
- _id = sessionId;
- _responseStreamId = responseStreamId;
- _responseChannel = responseChannel;
- _encodedPrincipal = encodedPrincipal;
+ Id = sessionId;
+ ResponseStreamId = responseStreamId;
+ ResponseChannel = responseChannel;
+ EncodedPrincipal = encodedPrincipal;
_cluster = cluster;
}
@@ -40,43 +34,31 @@ internal ClientSession(
/// Cluster session identity uniquely allocated when the session was opened.
///
/// the cluster session identity uniquely allocated when the session was opened.
- public long Id()
- {
- return _id;
- }
+ public long Id { get; }
///
/// The response channel stream id for responding to the client.
///
/// response channel stream id for responding to the client.
- public int ResponseStreamId()
- {
- return _responseStreamId;
- }
+ public int ResponseStreamId { get; }
///
/// The response channel for responding to the client.
///
/// response channel for responding to the client.
- public string ResponseChannel()
- {
- return _responseChannel;
- }
+ public string ResponseChannel { get; }
///
/// Cluster session encoded principal from when the session was authenticated.
///
/// The encoded Principal passed. May be 0 length to indicate none present.
- public byte[] EncodedPrincipal()
- {
- return _encodedPrincipal;
- }
+ public byte[] EncodedPrincipal { get; }
///
/// Indicates that a request to close this session has been made.
///
/// whether a request to close this session has been made.
- public bool IsClosing => _isClosing;
+ public bool IsClosing { get; private set; }
///
/// Non-blocking publish of a partial buffer containing a message to a cluster.
@@ -88,25 +70,25 @@ public byte[] EncodedPrincipal()
/// otherwise .
public long Offer(IDirectBuffer buffer, int offset, int length)
{
- return _cluster.Offer(_id, _responsePublication, buffer, offset, length);
+ return _cluster.Offer(Id, _responsePublication, buffer, offset, length);
}
internal void Connect(Aeron.Aeron aeron)
{
if (null == _responsePublication)
{
- _responsePublication = aeron.AddExclusivePublication(_responseChannel, _responseStreamId);
+ _responsePublication = aeron.AddExclusivePublication(ResponseChannel, ResponseStreamId);
}
}
internal void MarkClosing()
{
- _isClosing = true;
+ IsClosing = true;
}
internal void ResetClosing()
{
- _isClosing = false;
+ IsClosing = false;
}
internal void Disconnect()
diff --git a/src/Adaptive.Cluster/Service/ClusteredServiceAgent.cs b/src/Adaptive.Cluster/Service/ClusteredServiceAgent.cs
index 2c1fb474..0cc15c63 100644
--- a/src/Adaptive.Cluster/Service/ClusteredServiceAgent.cs
+++ b/src/Adaptive.Cluster/Service/ClusteredServiceAgent.cs
@@ -17,9 +17,9 @@ namespace Adaptive.Cluster.Service
{
internal sealed class ClusteredServiceAgent : IAgent, ICluster
{
- public static readonly int SESSION_HEADER_LENGTH =
+ public static readonly int SESSION_HEADER_LENGTH =
MessageHeaderEncoder.ENCODED_LENGTH + SessionHeaderEncoder.BLOCK_LENGTH;
-
+
private readonly int serviceId;
private bool isRecovering;
private readonly AeronArchive.Context archiveCtx;
@@ -64,7 +64,7 @@ internal ClusteredServiceAgent(ClusteredServiceContainer.Context ctx)
var channel = ctx.ServiceControlChannel();
_consensusModuleProxy = new ConsensusModuleProxy(aeron.AddPublication(channel, ctx.ConsensusModuleStreamId()));
_serviceAdapter = new ServiceAdapter(aeron.AddSubscription(channel, ctx.ServiceStreamId()), this);
-
+
UnsafeBuffer headerBuffer = new UnsafeBuffer(new byte[SESSION_HEADER_LENGTH]);
_egressMessageHeaderEncoder.WrapAndApplyHeader(headerBuffer, 0, new MessageHeaderEncoder());
@@ -80,9 +80,9 @@ public void OnStart()
commitPosition = AwaitCommitPositionCounter(counters);
service.OnStart(this);
-
+
isRecovering = true;
-
+
int recoveryCounterId = AwaitRecoveryCounter(counters);
heartbeatCounter.SetOrdered(epochClock.Time());
CheckForSnapshot(counters, recoveryCounterId);
@@ -115,7 +115,7 @@ public int DoWork()
PollServiceAdapter();
workCount += 1;
}
-
+
if (null != logAdapter)
{
int polled = logAdapter.Poll();
@@ -135,35 +135,31 @@ public int DoWork()
return workCount;
}
- public string RoleName()
- {
- return ctx.ServiceName();
- }
+ public string RoleName() => ctx.ServiceName();
- public ClusterRole Role()
+ public ClusterRole Role
{
- return role;
+ get => role;
+ private set
+ {
+ if (value != role)
+ {
+ role = value;
+ service.OnRoleChange(value);
+ }
+ }
}
- public int MemberId()
- {
- return memberId;
- }
+ public int MemberId => memberId;
- public Aeron.Aeron Aeron()
- {
- return aeron;
- }
+ public Aeron.Aeron Aeron => aeron;
public ClientSession GetClientSession(long clusterSessionId)
{
return sessionByIdMap[clusterSessionId];
}
- public ICollection GetClientSessions()
- {
- return sessionByIdMap.Values;
- }
+ public ICollection ClientSessions => sessionByIdMap.Values;
public bool CloseSession(long clusterSessionId)
{
@@ -188,10 +184,7 @@ public bool CloseSession(long clusterSessionId)
return false;
}
- public long TimeMs()
- {
- return clusterTimeMs;
- }
+ public long TimeMs => clusterTimeMs;
public bool ScheduleTimer(long correlationId, long deadlineMs)
{
@@ -216,7 +209,7 @@ public void Idle(int workCount)
CheckForClockTick();
idleStrategy.Idle(workCount);
}
-
+
public long Offer(
long clusterSessionId,
Publication publication,
@@ -253,22 +246,22 @@ public void OnJoinLog(
logAdapter.Dispose();
logAdapter = null;
}
-
+
activeLogEvent = new ActiveLogEvent(
leadershipTermId, logPosition, maxLogPosition, memberId, logSessionId, logStreamId, logChannel);
}
internal void OnSessionMessage(
- long clusterSessionId,
- long timestampMs,
- IDirectBuffer buffer,
- int offset,
- int length,
+ long clusterSessionId,
+ long timestampMs,
+ IDirectBuffer buffer,
+ int offset,
+ int length,
Header header)
{
clusterTimeMs = timestampMs;
var clientSession = sessionByIdMap[clusterSessionId];
-
+
service.OnSessionMessage(clientSession, timestampMs, buffer, offset, length, header);
}
@@ -325,7 +318,7 @@ internal void OnNewLeadershipTermEvent(
_egressMessageHeaderEncoder.LeadershipTermId(leadershipTermId);
clusterTimeMs = timestampMs;
}
-
+
internal void OnClusterChange(
long leadershipTermId,
long logPosition,
@@ -357,15 +350,6 @@ internal void AddSession(
sessionByIdMap[clusterSessionId] = session;
}
- private void Role(ClusterRole newRole)
- {
- if (newRole != role)
- {
- role = newRole;
- service.OnRoleChange(newRole);
- }
- }
-
private void CheckForSnapshot(CountersReader counters, int recoveryCounterId)
{
clusterTimeMs = RecoveryState.GetTimestamp(counters, recoveryCounterId);
@@ -445,7 +429,7 @@ private int AwaitRecoveryCounter(CountersReader counters)
{
CheckInterruptedStatus();
idleStrategy.Idle();
-
+
heartbeatCounter.SetOrdered(epochClock.Time());
counterId = RecoveryState.FindCounterId(counters);
}
@@ -468,7 +452,7 @@ private void JoinActiveLog()
activeLogEvent = null;
logAdapter = new BoundedLogAdapter(image, commitPosition, this);
- Role((ClusterRole) roleCounter.Get());
+ Role = (ClusterRole) roleCounter.Get();
foreach (ClientSession session in sessionByIdMap.Values)
{
@@ -476,9 +460,9 @@ private void JoinActiveLog()
{
if (ctx.IsRespondingService())
{
- session.Connect(aeron);
+ session.Connect(aeron);
}
-
+
session.ResetClosing();
}
else
@@ -514,7 +498,7 @@ private ReadableCounter AwaitClusterRoleCounter(CountersReader counters)
return new ReadableCounter(counters, counterId);
}
-
+
private ReadableCounter AwaitCommitPositionCounter(CountersReader counters)
{
idleStrategy.Reset();
@@ -529,7 +513,7 @@ private ReadableCounter AwaitCommitPositionCounter(CountersReader counters)
return new ReadableCounter(counters, counterId);
}
-
+
private AtomicCounter AwaitHeartbeatCounter(CountersReader counters)
{
idleStrategy.Reset();
@@ -591,12 +575,12 @@ private long OnTakeSnapshot(long logPosition, long leadershipTermId)
{
long recordingId;
- using (AeronArchive archive = AeronArchive.Connect(archiveCtx))
- using(Publication publication = aeron.AddExclusivePublication(ctx.SnapshotChannel(), ctx.SnapshotStreamId()))
+ using (AeronArchive archive = AeronArchive.Connect(archiveCtx))
+ using (Publication publication = aeron.AddExclusivePublication(ctx.SnapshotChannel(), ctx.SnapshotStreamId()))
{
var channel = ChannelUri.AddSessionId(ctx.SnapshotChannel(), publication.SessionId);
long subscriptionId = archive.StartRecording(channel, ctx.SnapshotStreamId(), SourceLocation.LOCAL);
-
+
try
{
CountersReader counters = aeron.CountersReader;
@@ -708,7 +692,7 @@ private static void CheckInterruptedStatus()
private bool CheckForClockTick()
{
long nowMs = epochClock.Time();
-
+
if (cachedTimeMs != nowMs)
{
cachedTimeMs = nowMs;
diff --git a/src/Adaptive.Cluster/Service/ICluster.cs b/src/Adaptive.Cluster/Service/ICluster.cs
index f6b2bca9..62f6fa30 100644
--- a/src/Adaptive.Cluster/Service/ICluster.cs
+++ b/src/Adaptive.Cluster/Service/ICluster.cs
@@ -12,19 +12,19 @@ public interface ICluster
/// The unique id for the hosting member of the cluster. Useful only for debugging purposes.
///
/// unique id for the hosting member of the cluster.
- int MemberId();
+ int MemberId { get; }
///
/// The role the cluster node is playing.
///
/// the role the cluster node is playing.
- ClusterRole Role();
+ ClusterRole Role { get; }
///
/// Get the client used by the cluster.
///
/// the client used by the cluster.
- Aeron.Aeron Aeron();
+ Aeron.Aeron Aeron { get; }
///
/// Get the for a given cluster session id.
@@ -37,7 +37,7 @@ public interface ICluster
/// Get all s.
///
/// the s.
- ICollection GetClientSessions();
+ ICollection ClientSessions { get; }
///
/// Request the close of a by sending the request to the consensus module.
@@ -51,7 +51,7 @@ public interface ICluster
/// Current Epoch time in milliseconds.
///
/// Epoch time in milliseconds.
- long TimeMs();
+ long TimeMs { get; }
///
/// Schedule a timer for a given deadline and provide a correlation id to identify the timer when it expires or
diff --git a/src/Adaptive.Cluster/Service/ServiceSnapshotTaker.cs b/src/Adaptive.Cluster/Service/ServiceSnapshotTaker.cs
index 3329d870..feca6e39 100644
--- a/src/Adaptive.Cluster/Service/ServiceSnapshotTaker.cs
+++ b/src/Adaptive.Cluster/Service/ServiceSnapshotTaker.cs
@@ -15,8 +15,8 @@ internal ServiceSnapshotTaker(Publication publication, IIdleStrategy idleStrateg
public void SnapshotSession(ClientSession session)
{
- string responseChannel = session.ResponseChannel();
- byte[] encodedPrincipal = session.EncodedPrincipal();
+ string responseChannel = session.ResponseChannel;
+ byte[] encodedPrincipal = session.EncodedPrincipal;
int length = MessageHeaderEncoder.ENCODED_LENGTH + ClientSessionEncoder.BLOCK_LENGTH +
ClientSessionEncoder.ResponseChannelHeaderLength() + responseChannel.Length +
ClientSessionEncoder.EncodedPrincipalHeaderLength() + encodedPrincipal.Length;
@@ -28,8 +28,8 @@ public void SnapshotSession(ClientSession session)
if (result > 0)
{
_clientSessionEncoder.WrapAndApplyHeader(bufferClaim.Buffer, bufferClaim.Offset, messageHeaderEncoder)
- .ClusterSessionId(session.Id())
- .ResponseStreamId(session.ResponseStreamId())
+ .ClusterSessionId(session.Id)
+ .ResponseStreamId(session.ResponseStreamId)
.ResponseChannel(responseChannel)
.PutEncodedPrincipal(encodedPrincipal, 0, encodedPrincipal.Length);
diff --git a/src/Samples/Adaptive.Aeron.Samples.ClusterService/EchoService.cs b/src/Samples/Adaptive.Aeron.Samples.ClusterService/EchoService.cs
index 6bae3895..f35cc0e9 100644
--- a/src/Samples/Adaptive.Aeron.Samples.ClusterService/EchoService.cs
+++ b/src/Samples/Adaptive.Aeron.Samples.ClusterService/EchoService.cs
@@ -18,17 +18,17 @@ public void OnStart(ICluster cluster)
public void OnSessionOpen(ClientSession session, long timestampMs)
{
- Console.WriteLine($"OnSessionOpen: sessionId={session.Id()}, timestamp={timestampMs}");
+ Console.WriteLine($"OnSessionOpen: sessionId={session.Id}, timestamp={timestampMs}");
}
public void OnSessionClose(ClientSession session, long timestampMs, CloseReason closeReason)
{
- Console.WriteLine($"OnSessionClose: sessionId={session.Id()}, timestamp={timestampMs}");
+ Console.WriteLine($"OnSessionClose: sessionId={session.Id}, timestamp={timestampMs}");
}
public void OnSessionMessage(ClientSession session, long timestampMs, IDirectBuffer buffer, int offset, int length, Header header)
{
- Console.WriteLine($"OnSessionMessage: sessionId={session.Id()}, timestamp={timestampMs}, length={length}");
+ Console.WriteLine($"OnSessionMessage: sessionId={session.Id}, timestamp={timestampMs}, length={length}");
Console.WriteLine("Received Message: " + buffer.GetStringWithoutLengthUtf8(offset, length));