Skip to content

Commit

Permalink
Port to 1.9.2-333-g721767e11
Browse files Browse the repository at this point in the history
  • Loading branch information
JPWatson committed Aug 1, 2018
1 parent 2bd619c commit add1d89
Show file tree
Hide file tree
Showing 118 changed files with 7,337 additions and 3,616 deletions.
2 changes: 1 addition & 1 deletion driver/Aeron.Driver.nuspec
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
<package >
<metadata>
<id>Aeron.Driver</id>
<version>1.9.4</version>
<version>1.10.0</version>
<title>Aeron Driver</title>
<authors>Adaptive Financial Consulting Ltd.</authors>
<owners>Adaptive Financial Consulting Ltd.</owners>
Expand Down
Binary file modified driver/media-driver.jar
Binary file not shown.
6 changes: 4 additions & 2 deletions driver/version.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,4 @@
Driver source:
http://repo1.maven.org/maven2/io/aeron/aeron-all/1.9.3/aeron-all-1.9.3.jar
Driver built from source
Agrona: 0.9.18-11-gcbee425
SBE: 1.8.1-31-g337ef8bb
Aeron: 1.9.2-308-gb795fd261
12 changes: 12 additions & 0 deletions scripts/build-release-nuget-packages - Copy.bat
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
@echo off
pushd %~dp0..

del nupkgs\*.nupkg

call dotnet pack src\Adaptive.Aeron\Adaptive.Aeron.csproj -c Release --output ..\..\nupkgs
call dotnet pack src\Adaptive.Agrona\Adaptive.Agrona.csproj -c Release --output ..\..\nupkgs
call dotnet pack src\Adaptive.Cluster\Adaptive.Cluster.csproj -c Release --output ..\..\nupkgs
call dotnet pack src\Adaptive.Archiver\Adaptive.Archiver.csproj -c Release --output ..\..\nupkgs
call .\scripts\nuget pack .\driver\Aeron.Driver.nuspec -OutputDirectory nupkgs

popd
2 changes: 1 addition & 1 deletion src/Adaptive.Aeron/Adaptive.Aeron.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
<PropertyGroup>
<TargetFrameworks>netstandard2.0;net45</TargetFrameworks>
<PackageId>Aeron.Client</PackageId>
<VersionPrefix>1.9.4</VersionPrefix>
<VersionPrefix>1.10.0</VersionPrefix>
<Authors>Adaptive Financial Consulting Ltd.</Authors>
<Company>Adaptive Financial Consulting Ltd.</Company>
<Product>Aeron Client</Product>
Expand Down
12 changes: 11 additions & 1 deletion src/Adaptive.Aeron/Aeron.cs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,11 @@ namespace Adaptive.Aeron
/// </summary>
public sealed class Aeron : IDisposable
{
/// <summary>
/// Used to represent a null value for when some value is not yet set.
/// </summary>
public const int NULL_VALUE = -1;

/// <summary>
/// The Default handler for Aeron runtime exceptions.
/// When a <seealso cref="DriverTimeoutException"/> is encountered, this handler will
Expand Down Expand Up @@ -440,7 +445,7 @@ public class Context : IDisposable
/// <summary>
/// Value to represent a sessionId that is not to be used.
/// </summary>
public const int NULL_SESSION_ID = -1;
public const int NULL_SESSION_ID = Aeron.NULL_VALUE;

/// <summary>
/// Initial term id to be used when creating an <seealso cref="ExclusivePublication"/>.
Expand Down Expand Up @@ -518,6 +523,11 @@ public class Context : IDisposable
/// </summary>
public const string RELIABLE_STREAM_PARAM_NAME = "reliable";

/// <summary>
/// Key for the tags for a channel
/// </summary>
public const string TAGS_PARAM_NAME = "tags";

/// <summary>
/// Get the default directory name to be used if <seealso cref="AeronDirectoryName(String)"/> is not set. This will take
/// the <seealso cref="AERON_DIR_PROP_NAME"/> if set and if not then <seealso cref="AERON_DIR_PROP_DEFAULT"/>.
Expand Down
128 changes: 126 additions & 2 deletions src/Adaptive.Aeron/ChannelUri.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
using System;
using System.Collections.Generic;
using System.Text;
using Adaptive.Aeron.LogBuffer;
using Adaptive.Agrona.Collections;
using Adaptive.Agrona.Concurrent;

namespace Adaptive.Aeron
{
Expand Down Expand Up @@ -37,11 +40,17 @@ private enum State
/// </summary>
public const string SPY_QUALIFIER = "aeron-spy";

public const long INVALID_TAG = Aeron.NULL_VALUE;

private const int CHANNEL_TAG_INDEX = 0;
private const int ENTITY_TAG_INDEX = 1;

private static readonly string AERON_PREFIX = AERON_SCHEME + ":";

private string _prefix;
private string _media;
private readonly IDictionary<string, string> _params;
private string[] _tags;

/// <summary>
/// Construct with the components provided to avoid parsing.
Expand All @@ -54,6 +63,8 @@ public ChannelUri(string prefix, string media, IDictionary<string, string> @para
_prefix = prefix;
_media = media;
_params = @params;

_tags = SplitTags(_params[Aeron.Context.TAGS_PARAM_NAME]);
}

/// <summary>
Expand Down Expand Up @@ -151,6 +162,24 @@ public string Put(string key, string value)
{
return _params[key] = value;
}

/// <summary>
/// Remove a key pair in the map of params.
/// </summary>
/// <param name="key"> of the param to be removed. </param>
/// <returns> the previous value of the param or null. </returns>
public string Remove(string key)
{
String ret = null;

if (_params.ContainsKey(key))
{
ret = _params[key];
}

_params.Remove(key);
return ret;
}

/// <summary>
/// Does the URI contain a value for the given key.
Expand All @@ -162,6 +191,24 @@ public bool ContainsKey(string key)
return _params.ContainsKey(key);
}

/// <summary>
/// Get the channel tag.
/// </summary>
/// <returns> channel tag. </returns>
public string ChannelTag()
{
return (_tags.Length > CHANNEL_TAG_INDEX) ? _tags[CHANNEL_TAG_INDEX] : null;
}

/// <summary>
/// Get the entity tag.
/// </summary>
/// <returns> entity tag. </returns>
public string EntityTag()
{
return (_tags.Length > ENTITY_TAG_INDEX) ? _tags[ENTITY_TAG_INDEX] : null;
}

/// <summary>
/// Generate a String representation of the URI that is valid for an Aeron channel.
/// </summary>
Expand Down Expand Up @@ -201,6 +248,25 @@ public override string ToString()
return sb.ToString();
}

/// <summary>
/// Initialise a channel for restarting a publication at a given position.
/// </summary>
/// <param name="position"> at which the publication should be started. </param>
/// <param name="initialTermId"> what which the stream would start. </param>
/// <param name="termLength"> for the stream. </param>
public void InitialPosition(long position, int initialTermId, int termLength)
{
int bitsToShift = LogBufferDescriptor.PositionBitsToShift(termLength);
int termId = LogBufferDescriptor.ComputeTermIdFromPosition(position, bitsToShift, initialTermId);
int termOffset = (int)(position & (termLength - 1));

Put(Aeron.Context.INITIAL_TERM_ID_PARAM_NAME, Convert.ToString(initialTermId));
Put(Aeron.Context.TERM_ID_PARAM_NAME, Convert.ToString(termId));
Put(Aeron.Context.TERM_OFFSET_PARAM_NAME, Convert.ToString(termOffset));
Put(Aeron.Context.TERM_LENGTH_PARAM_NAME, Convert.ToString(termLength));
}


/// <summary>
/// Parse a <seealso cref="string"/> which contains an Aeron URI.
/// </summary>
Expand Down Expand Up @@ -322,12 +388,31 @@ public static ChannelUri Parse(string cs)
/// <returns> new string that represents channel with sessionId added. </returns>
public static string AddSessionId(string channel, int sessionId)
{
ChannelUri channelUri = ChannelUri.Parse(channel);

ChannelUri channelUri = Parse(channel);
channelUri.Put(Aeron.Context.SESSION_ID_PARAM_NAME, Convert.ToString(sessionId));

return channelUri.ToString();
}

/// <summary>
/// Is the param tagged? That is does it start with the "tag:" prefix.
/// </summary>
/// <param name="paramValue"> to check if tagged. </param>
/// <returns> true if tagged or false if not. </returns>
public static bool IsTagged(string paramValue)
{
return StartsWith(paramValue, "tag:");
}

/// <summary>
/// Get the value of the tag from a given parameter.
/// </summary>
/// <param name="paramValue"> to extra the tag value from. </param>
/// <returns> the value of the tag or <seealso cref="INVALID_TAG"/> if not tagged. </returns>
public static long GetTag(string paramValue)
{
return IsTagged(paramValue) ? long.Parse(paramValue.Substring(4, paramValue.Length - 4)) : INVALID_TAG;
}

private static bool StartsWith(string input, int position, string prefix)
{
Expand All @@ -351,5 +436,44 @@ private static bool StartsWith(string input, string prefix)
{
return StartsWith(input, 0, prefix);
}

private static string[] SplitTags(string tags)
{
string[] stringArray = new string[0];

if (null != tags)
{
int currentStartIndex = 0;
int tagIndex = 0;
stringArray = new string[2];
int length = tags.Length;

for (int i = 0; i < length; i++)
{
if (tags[i] == ',')
{
string tag = null;

if (i - currentStartIndex > 0)
{
tag = tags.Substring(currentStartIndex, i - currentStartIndex);
currentStartIndex = i + 1;
}

stringArray = ArrayUtil.EnsureCapacity(stringArray, tagIndex + 1);
stringArray[tagIndex] = tag;
tagIndex++;
}
}

if ((length - currentStartIndex) > 0)
{
stringArray = ArrayUtil.EnsureCapacity(stringArray, tagIndex + 1);
stringArray[tagIndex] = tags.Substring(currentStartIndex, length - currentStartIndex);
}
}

return stringArray;
}
}
}
Loading

0 comments on commit add1d89

Please sign in to comment.