Skip to content

Commit

Permalink
Change subchannel BalancerAddress when attributes change (#2228)
Browse files Browse the repository at this point in the history
  • Loading branch information
JamesNK authored Aug 30, 2023
1 parent b421751 commit 311f878
Show file tree
Hide file tree
Showing 14 changed files with 299 additions and 158 deletions.
24 changes: 21 additions & 3 deletions src/Grpc.Net.Client/Balancer/BalancerAddress.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#region Copyright notice and license
#region Copyright notice and license

// Copyright 2019 The gRPC Authors
//
Expand Down Expand Up @@ -30,7 +30,9 @@ namespace Grpc.Net.Client.Balancer;
/// </summary>
public sealed class BalancerAddress
{
private BalancerAttributes? _attributes;
// Internal so address attributes can be compared without using the Attributes property.
// The property allocates an empty collection if one isn't already present.
internal BalancerAttributes? _attributes;

/// <summary>
/// Initializes a new instance of the <see cref="BalancerAddress"/> class with the specified <see cref="DnsEndPoint"/>.
Expand All @@ -48,7 +50,7 @@ public BalancerAddress(DnsEndPoint endPoint)
/// <param name="host">The host.</param>
/// <param name="port">The port.</param>
[DebuggerStepThrough]
public BalancerAddress(string host, int port) : this(new DnsEndPoint(host, port))
public BalancerAddress(string host, int port) : this(new BalancerEndPoint(host, port))
{
}

Expand All @@ -69,5 +71,21 @@ public override string ToString()
{
return $"{EndPoint.Host}:{EndPoint.Port}";
}

private sealed class BalancerEndPoint : DnsEndPoint
{
private string? _cachedToString;

public BalancerEndPoint(string host, int port) : base(host, port)
{
}

public override string ToString()
{
// Improve ToString performance when logging by caching ToString.
// Don't include DnsEndPoint address family.
return _cachedToString ??= $"{Host}:{Port}";
}
}
}
#endif
93 changes: 81 additions & 12 deletions src/Grpc.Net.Client/Balancer/BalancerAttributes.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,20 +38,22 @@ public sealed class BalancerAttributes : IDictionary<string, object?>, IReadOnly
/// <summary>
/// Gets a read-only collection of metadata attributes.
/// </summary>
public static readonly BalancerAttributes Empty = new BalancerAttributes(new ReadOnlyDictionary<string, object?>(new Dictionary<string, object?>()));
public static readonly BalancerAttributes Empty = new BalancerAttributes(new Dictionary<string, object?>(), readOnly: true);

private readonly IDictionary<string, object?> _attributes;
private readonly Dictionary<string, object?> _attributes;
private readonly bool _readOnly;

/// <summary>
/// Initializes a new instance of the <see cref="BalancerAttributes"/> class.
/// </summary>
public BalancerAttributes() : this(new Dictionary<string, object?>())
public BalancerAttributes() : this(new Dictionary<string, object?>(), readOnly: false)
{
}

private BalancerAttributes(IDictionary<string, object?> attributes)
private BalancerAttributes(Dictionary<string, object?> attributes, bool readOnly)
{
_attributes = attributes;
_readOnly = readOnly;
}

object? IDictionary<string, object?>.this[string key]
Expand All @@ -62,28 +64,49 @@ private BalancerAttributes(IDictionary<string, object?> attributes)
}
set
{
ValidateReadOnly();
_attributes[key] = value;
}
}

ICollection<string> IDictionary<string, object?>.Keys => _attributes.Keys;
ICollection<object?> IDictionary<string, object?>.Values => _attributes.Values;
int ICollection<KeyValuePair<string, object?>>.Count => _attributes.Count;
bool ICollection<KeyValuePair<string, object?>>.IsReadOnly => _attributes.IsReadOnly;
bool ICollection<KeyValuePair<string, object?>>.IsReadOnly => _readOnly || ((ICollection<KeyValuePair<string, object?>>)_attributes).IsReadOnly;
IEnumerable<string> IReadOnlyDictionary<string, object?>.Keys => _attributes.Keys;
IEnumerable<object?> IReadOnlyDictionary<string, object?>.Values => _attributes.Values;
int IReadOnlyCollection<KeyValuePair<string, object?>>.Count => _attributes.Count;
object? IReadOnlyDictionary<string, object?>.this[string key] => _attributes[key];
void IDictionary<string, object?>.Add(string key, object? value) => _attributes.Add(key, value);
void ICollection<KeyValuePair<string, object?>>.Add(KeyValuePair<string, object?> item) => _attributes.Add(item);
void ICollection<KeyValuePair<string, object?>>.Clear() => _attributes.Clear();
void IDictionary<string, object?>.Add(string key, object? value)
{
ValidateReadOnly();
_attributes.Add(key, value);
}
void ICollection<KeyValuePair<string, object?>>.Add(KeyValuePair<string, object?> item)
{
ValidateReadOnly();
((ICollection<KeyValuePair<string, object?>>)_attributes).Add(item);
}
void ICollection<KeyValuePair<string, object?>>.Clear()
{
ValidateReadOnly();
_attributes.Clear();
}
bool ICollection<KeyValuePair<string, object?>>.Contains(KeyValuePair<string, object?> item) => _attributes.Contains(item);
bool IDictionary<string, object?>.ContainsKey(string key) => _attributes.ContainsKey(key);
void ICollection<KeyValuePair<string, object?>>.CopyTo(KeyValuePair<string, object?>[] array, int arrayIndex) => _attributes.CopyTo(array, arrayIndex);
void ICollection<KeyValuePair<string, object?>>.CopyTo(KeyValuePair<string, object?>[] array, int arrayIndex) => ((ICollection<KeyValuePair<string, object?>>)_attributes).CopyTo(array, arrayIndex);
IEnumerator<KeyValuePair<string, object?>> IEnumerable<KeyValuePair<string, object?>>.GetEnumerator() => _attributes.GetEnumerator();
IEnumerator System.Collections.IEnumerable.GetEnumerator() => ((System.Collections.IEnumerable)_attributes).GetEnumerator();
bool IDictionary<string, object?>.Remove(string key) => _attributes.Remove(key);
bool ICollection<KeyValuePair<string, object?>>.Remove(KeyValuePair<string, object?> item) => _attributes.Remove(item);
bool IDictionary<string, object?>.Remove(string key)
{
ValidateReadOnly();
return _attributes.Remove(key);
}
bool ICollection<KeyValuePair<string, object?>>.Remove(KeyValuePair<string, object?> item)
{
ValidateReadOnly();
return ((ICollection<KeyValuePair<string, object?>>)_attributes).Remove(item);
}
bool IDictionary<string, object?>.TryGetValue(string key, out object? value) => _attributes.TryGetValue(key, out value);
bool IReadOnlyDictionary<string, object?>.ContainsKey(string key) => _attributes.ContainsKey(key);
bool IReadOnlyDictionary<string, object?>.TryGetValue(string key, out object? value) => _attributes.TryGetValue(key, out value);
Expand Down Expand Up @@ -121,6 +144,7 @@ public bool TryGetValue<TValue>(BalancerAttributesKey<TValue> key, [MaybeNullWhe
/// <param name="value">The value.</param>
public void Set<TValue>(BalancerAttributesKey<TValue> key, TValue value)
{
ValidateReadOnly();
_attributes[key.Key] = value;
}

Expand All @@ -135,10 +159,55 @@ public void Set<TValue>(BalancerAttributesKey<TValue> key, TValue value)
/// </returns>
public bool Remove<TValue>(BalancerAttributesKey<TValue> key)
{
ValidateReadOnly();
return _attributes.Remove(key.Key);
}

internal string DebuggerToString()
private void ValidateReadOnly()
{
if (_readOnly)
{
throw new NotSupportedException("Collection is read-only.");
}
}

internal static bool DeepEquals(BalancerAttributes? x, BalancerAttributes? y)
{
var xValues = x?._attributes;
var yValues = y?._attributes;

if (ReferenceEquals(xValues, yValues))
{
return true;
}

if (xValues == null || yValues == null)
{
return false;
}

if (xValues.Count != yValues.Count)
{
return false;
}

foreach (var kvp in xValues)
{
if (!yValues.TryGetValue(kvp.Key, out var value))
{
return false;
}

if (!Equals(kvp.Value, value))
{
return false;
}
}

return true;
}

private string DebuggerToString()
{
return $"Count = {_attributes.Count}";
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#region Copyright notice and license
#region Copyright notice and license

// Copyright 2019 The gRPC Authors
//
Expand All @@ -17,8 +17,6 @@
#endregion

#if SUPPORT_LOAD_BALANCING
using System;
using System.Collections.Generic;
using System.Diagnostics.CodeAnalysis;

namespace Grpc.Net.Client.Balancer.Internal;
Expand All @@ -44,7 +42,7 @@ public bool Equals(BalancerAddress? x, BalancerAddress? y)
return false;
}

return true;
return BalancerAttributes.DeepEquals(x._attributes, y._attributes);
}

public int GetHashCode([DisallowNull] BalancerAddress obj)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ internal async ValueTask<Stream> OnConnect(SocketsHttpConnectionContext context,
}

Debug.Assert(context.DnsEndPoint.Equals(currentAddress.EndPoint), "Context endpoint should equal address endpoint.");
return await subchannel.Transport.GetStreamAsync(currentAddress, cancellationToken).ConfigureAwait(false);
return await subchannel.Transport.GetStreamAsync(currentAddress.EndPoint, cancellationToken).ConfigureAwait(false);
}
#endif

Expand Down
5 changes: 3 additions & 2 deletions src/Grpc.Net.Client/Balancer/Internal/ISubchannelTransport.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#endregion

#if SUPPORT_LOAD_BALANCING
using System.Net;
using Grpc.Shared;

namespace Grpc.Net.Client.Balancer.Internal;
Expand All @@ -28,11 +29,11 @@ namespace Grpc.Net.Client.Balancer.Internal;
/// </summary>
internal interface ISubchannelTransport : IDisposable
{
BalancerAddress? CurrentAddress { get; }
DnsEndPoint? CurrentEndPoint { get; }
TimeSpan? ConnectTimeout { get; }
TransportStatus TransportStatus { get; }

ValueTask<Stream> GetStreamAsync(BalancerAddress address, CancellationToken cancellationToken);
ValueTask<Stream> GetStreamAsync(DnsEndPoint endPoint, CancellationToken cancellationToken);
ValueTask<ConnectResult> TryConnectAsync(ConnectContext context);

void Disconnect();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,43 +35,43 @@ namespace Grpc.Net.Client.Balancer.Internal;
internal class PassiveSubchannelTransport : ISubchannelTransport, IDisposable
{
private readonly Subchannel _subchannel;
private BalancerAddress? _currentAddress;
private DnsEndPoint? _currentEndPoint;

public PassiveSubchannelTransport(Subchannel subchannel)
{
_subchannel = subchannel;
}

public BalancerAddress? CurrentAddress => _currentAddress;
public DnsEndPoint? CurrentEndPoint => _currentEndPoint;
public TimeSpan? ConnectTimeout { get; }
public TransportStatus TransportStatus => TransportStatus.Passive;

public void Disconnect()
{
_currentAddress = null;
_currentEndPoint = null;
_subchannel.UpdateConnectivityState(ConnectivityState.Idle, "Disconnected.");
}

public ValueTask<ConnectResult> TryConnectAsync(ConnectContext context)
{
Debug.Assert(_subchannel._addresses.Count == 1);
Debug.Assert(CurrentAddress == null);
Debug.Assert(CurrentEndPoint == null);

var currentAddress = _subchannel._addresses[0];

_subchannel.UpdateConnectivityState(ConnectivityState.Connecting, "Passively connecting.");
_currentAddress = currentAddress;
_currentEndPoint = currentAddress.EndPoint;
_subchannel.UpdateConnectivityState(ConnectivityState.Ready, "Passively connected.");

return new ValueTask<ConnectResult>(ConnectResult.Success);
}

public void Dispose()
{
_currentAddress = null;
_currentEndPoint = null;
}

public ValueTask<Stream> GetStreamAsync(BalancerAddress address, CancellationToken cancellationToken)
public ValueTask<Stream> GetStreamAsync(DnsEndPoint endPoint, CancellationToken cancellationToken)
{
throw new NotSupportedException();
}
Expand Down
Loading

0 comments on commit 311f878

Please sign in to comment.