Skip to content

Commit

Permalink
Make cluster API more idomatic
Browse files Browse the repository at this point in the history
  • Loading branch information
JPWatson committed Oct 30, 2018
1 parent 3182262 commit 634d400
Show file tree
Hide file tree
Showing 6 changed files with 73 additions and 124 deletions.
33 changes: 8 additions & 25 deletions src/Adaptive.Cluster/Client/AeronCluster.cs
Original file line number Diff line number Diff line change
Expand Up @@ -225,37 +225,26 @@ public void Dispose()
/// Get the context used to launch this cluster client.
/// </summary>
/// <returns> the context used to launch this cluster client. </returns>
public Context Ctx()
{
return _ctx;
}
public Context Ctx => _ctx;

/// <summary>
/// Cluster session id for the session that was opened as the result of a successful connect.
/// </summary>
/// <returns> session id for the session that was opened as the result of a successful connect. </returns>
public long ClusterSessionId()
{
return _clusterSessionId;
}
public long ClusterSessionId => _clusterSessionId;

/// <summary>
/// Leadership term identity for the cluster. Advances with changing leadership.
/// </summary>
/// <returns> leadership term identity for the cluster. </returns>
public long LeadershipTermId()
{
return _leadershipTermId;
}
public long LeadershipTermId => _leadershipTermId;


/// <summary>
/// Get the current leader member id for the cluster.
/// </summary>
/// <returns> the current leader member id for the cluster. </returns>
public int LeaderMemberId()
{
return _leaderMemberId;
}
public int LeaderMemberId => _leaderMemberId;

/// <summary>
/// Get the raw <seealso cref="Publication"/> for sending to the cluster.
Expand All @@ -267,10 +256,7 @@ public int LeaderMemberId()
/// </para>
/// </summary>
/// <returns> the raw <seealso cref="Publication"/> for connecting to the cluster. </returns>
public Publication IngressPublication()
{
return _publication;
}
public Publication IngressPublication => _publication;

/// <summary>
/// Get the raw <seealso cref="Subscription"/> for receiving from the cluster.
Expand All @@ -279,10 +265,7 @@ public Publication IngressPublication()
///
/// </summary>
/// <returns> the raw <seealso cref="Subscription"/> for receiving from the cluster. </returns>
public Subscription EgressSubscription()
{
return _subscription;
}
public Subscription EgressSubscription => _subscription;

/// <summary>
/// Non-blocking publish of a partial buffer containing a message plus session header to a cluster.
Expand Down Expand Up @@ -685,7 +668,7 @@ private void CheckDeadline(long deadlineNs, String errorMessage)
throw new TimeoutException(errorMessage);
}
}

private static void CheckResult(long result)
{
if (result == Publication.NOT_CONNECTED || result == Publication.CLOSED ||
Expand Down
44 changes: 13 additions & 31 deletions src/Adaptive.Cluster/Service/ClientSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,8 @@ public class ClientSession
/// </summary>
public const long MOCKED_OFFER = 1;

private readonly long _id;
private readonly int _responseStreamId;
private readonly string _responseChannel;
private readonly byte[] _encodedPrincipal;

private readonly ClusteredServiceAgent _cluster;
private Publication _responsePublication;
private bool _isClosing;

internal ClientSession(
long sessionId,
Expand All @@ -29,54 +23,42 @@ internal ClientSession(
byte[] encodedPrincipal,
ClusteredServiceAgent cluster)
{
_id = sessionId;
_responseStreamId = responseStreamId;
_responseChannel = responseChannel;
_encodedPrincipal = encodedPrincipal;
Id = sessionId;
ResponseStreamId = responseStreamId;
ResponseChannel = responseChannel;
EncodedPrincipal = encodedPrincipal;
_cluster = cluster;
}

/// <summary>
/// Cluster session identity uniquely allocated when the session was opened.
/// </summary>
/// <returns> the cluster session identity uniquely allocated when the session was opened. </returns>
public long Id()
{
return _id;
}
public long Id { get; }

/// <summary>
/// The response channel stream id for responding to the client.
/// </summary>
/// <returns> response channel stream id for responding to the client. </returns>
public int ResponseStreamId()
{
return _responseStreamId;
}
public int ResponseStreamId { get; }

/// <summary>
/// The response channel for responding to the client.
/// </summary>
/// <returns> response channel for responding to the client. </returns>
public string ResponseChannel()
{
return _responseChannel;
}
public string ResponseChannel { get; }

/// <summary>
/// Cluster session encoded principal from when the session was authenticated.
/// </summary>
/// <returns> The encoded Principal passed. May be 0 length to indicate none present. </returns>
public byte[] EncodedPrincipal()
{
return _encodedPrincipal;
}
public byte[] EncodedPrincipal { get; }

/// <summary>
/// Indicates that a request to close this session has been made.
/// </summary>
/// <returns> whether a request to close this session has been made. </returns>
public bool IsClosing => _isClosing;
public bool IsClosing { get; private set; }

/// <summary>
/// Non-blocking publish of a partial buffer containing a message to a cluster.
Expand All @@ -88,25 +70,25 @@ public byte[] EncodedPrincipal()
/// otherwise <see cref="MOCKED_OFFER"/>. </returns>
public long Offer(IDirectBuffer buffer, int offset, int length)
{
return _cluster.Offer(_id, _responsePublication, buffer, offset, length);
return _cluster.Offer(Id, _responsePublication, buffer, offset, length);
}

internal void Connect(Aeron.Aeron aeron)
{
if (null == _responsePublication)
{
_responsePublication = aeron.AddExclusivePublication(_responseChannel, _responseStreamId);
_responsePublication = aeron.AddExclusivePublication(ResponseChannel, ResponseStreamId);
}
}

internal void MarkClosing()
{
_isClosing = true;
IsClosing = true;
}

internal void ResetClosing()
{
_isClosing = false;
IsClosing = false;
}

internal void Disconnect()
Expand Down
Loading

0 comments on commit 634d400

Please sign in to comment.