diff --git a/src/Grpc.Net.Client/Balancer/Subchannel.cs b/src/Grpc.Net.Client/Balancer/Subchannel.cs index 12161e14a..a1c2f9323 100644 --- a/src/Grpc.Net.Client/Balancer/Subchannel.cs +++ b/src/Grpc.Net.Client/Balancer/Subchannel.cs @@ -17,6 +17,7 @@ #endregion #if SUPPORT_LOAD_BALANCING +using System.Diagnostics; using System.Net; using Grpc.Core; using Grpc.Net.Client.Balancer.Internal; @@ -173,6 +174,10 @@ public void UpdateAddresses(IReadOnlyList addresses) return; } + // Get a copy of the current address before updating addresses. + // Updating addresses to not contain this value changes the property to return null. + var currentAddress = CurrentAddress; + _addresses.Clear(); _addresses.AddRange(addresses); @@ -186,11 +191,11 @@ public void UpdateAddresses(IReadOnlyList addresses) requireReconnect = true; break; case ConnectivityState.Ready: - // Transport uses the subchannel lock but take copy in an abundance of caution. - var currentAddress = CurrentAddress; + // Check if the subchannel is connected to an address that's not longer present. + // In this situation require the subchannel to reconnect to a new address. if (currentAddress != null) { - if (GetAddressByEndpoint(_addresses, currentAddress.EndPoint) != null) + if (GetAddressByEndpoint(_addresses, currentAddress.EndPoint) is null) { SubchannelLog.ConnectedAddressNotInUpdatedAddresses(_logger, Id, currentAddress); requireReconnect = true; diff --git a/src/Grpc.Net.Client/Balancer/SubchannelsLoadBalancer.cs b/src/Grpc.Net.Client/Balancer/SubchannelsLoadBalancer.cs index c152796df..f5fccf828 100644 --- a/src/Grpc.Net.Client/Balancer/SubchannelsLoadBalancer.cs +++ b/src/Grpc.Net.Client/Balancer/SubchannelsLoadBalancer.cs @@ -17,6 +17,7 @@ #endregion #if SUPPORT_LOAD_BALANCING +using System.Diagnostics; using Grpc.Core; using Grpc.Net.Client.Balancer.Internal; using Microsoft.Extensions.Logging; @@ -144,8 +145,11 @@ public override void UpdateChannelState(ChannelState state) // The new subchannel address has the same endpoint so the connection isn't impacted. if (!BalancerAddressEqualityComparer.Instance.Equals(address, newOrCurrentSubchannel.Address)) { + newOrCurrentSubchannel = new AddressSubchannel( + newOrCurrentSubchannel.Subchannel, + address, + newOrCurrentSubchannel.LastKnownState); newOrCurrentSubchannel.Subchannel.UpdateAddresses(new[] { address }); - newOrCurrentSubchannel = new AddressSubchannel(newOrCurrentSubchannel.Subchannel, address); } SubchannelLog.SubchannelPreserved(_logger, newOrCurrentSubchannel.Subchannel.Id, address); @@ -306,15 +310,16 @@ protected override void Dispose(bool disposing) /// A subchannel picker. protected abstract SubchannelPicker CreatePicker(IReadOnlyList readySubchannels); + [DebuggerDisplay("Subchannel = {Subchannel.Id}, Address = {Address}, LastKnownState = {LastKnownState}")] private sealed class AddressSubchannel { private ConnectivityState _lastKnownState; - public AddressSubchannel(Subchannel subchannel, BalancerAddress address) + public AddressSubchannel(Subchannel subchannel, BalancerAddress address, ConnectivityState lastKnownState = ConnectivityState.Idle) { Subchannel = subchannel; Address = address; - _lastKnownState = ConnectivityState.Idle; + _lastKnownState = lastKnownState; } // Track connectivity state that has been updated to load balancer. diff --git a/test/Grpc.Net.Client.Tests/Balancer/PickFirstBalancerTests.cs b/test/Grpc.Net.Client.Tests/Balancer/PickFirstBalancerTests.cs index f2c3bdd6b..598bfda7c 100644 --- a/test/Grpc.Net.Client.Tests/Balancer/PickFirstBalancerTests.cs +++ b/test/Grpc.Net.Client.Tests/Balancer/PickFirstBalancerTests.cs @@ -56,7 +56,18 @@ public async Task ChangeAddresses_HasReadySubchannel_OldSubchannelShutdown() }); services.AddSingleton(new TestResolverFactory(resolver)); - services.AddSingleton(new TestSubchannelTransportFactory()); + + var subChannelConnections = new List(); + var transportFactory = new TestSubchannelTransportFactory((s, c) => + { + lock (subChannelConnections) + { + subChannelConnections.Add(s); + } + return Task.FromResult(new TryConnectResult(ConnectivityState.Ready)); + }); + services.AddSingleton(transportFactory); + var serviceProvider = services.BuildServiceProvider(); var logger = serviceProvider.GetRequiredService().CreateLogger(GetType().FullName!); @@ -95,6 +106,12 @@ public async Task ChangeAddresses_HasReadySubchannel_OldSubchannelShutdown() Assert.AreEqual(1, subchannels[0]._addresses.Count); Assert.AreEqual(new DnsEndPoint("localhost", 81), subchannels[0]._addresses[0].EndPoint); Assert.AreEqual(ConnectivityState.Ready, subchannels[0].State); + + lock (subChannelConnections) + { + Assert.AreEqual(2, subChannelConnections.Count); + Assert.AreSame(subChannelConnections[0], subChannelConnections[1]); + } } [Test] diff --git a/test/Grpc.Net.Client.Tests/Balancer/RoundRobinBalancerTests.cs b/test/Grpc.Net.Client.Tests/Balancer/RoundRobinBalancerTests.cs index 692a875a4..7bcef90b2 100644 --- a/test/Grpc.Net.Client.Tests/Balancer/RoundRobinBalancerTests.cs +++ b/test/Grpc.Net.Client.Tests/Balancer/RoundRobinBalancerTests.cs @@ -300,7 +300,15 @@ public async Task HasSubchannels_ResolverRefresh_MatchingSubchannelUnchanged() var connectState = ConnectivityState.Ready; - var transportFactory = new TestSubchannelTransportFactory((s, c) => Task.FromResult(new TryConnectResult(connectState))); + var subChannelConnections = new List(); + var transportFactory = new TestSubchannelTransportFactory((s, c) => + { + lock (subChannelConnections) + { + subChannelConnections.Add(s); + } + return Task.FromResult(new TryConnectResult(connectState)); + }); services.AddSingleton(s => { return new TestResolver( @@ -351,9 +359,15 @@ public async Task HasSubchannels_ResolverRefresh_MatchingSubchannelUnchanged() Assert.AreEqual(new DnsEndPoint("localhost", 82), subchannels[2]._addresses[0].EndPoint); // Preserved because port 81, 82 is in both refresh results + var discardedSubchannel = subchannels[0]; var preservedSubchannel1 = subchannels[1]; var preservedSubchannel2 = subchannels[2]; + await BalancerWaitHelpers.WaitForSubchannelsToBeReadyAsync( + serviceProvider.GetRequiredService().CreateLogger(GetType()), + channel.ConnectionManager, + expectedCount: 3).DefaultTimeout(); + var address2 = new BalancerAddress("localhost", 82); address2.Attributes.Set(new BalancerAttributesKey("test"), 1); @@ -364,7 +378,13 @@ public async Task HasSubchannels_ResolverRefresh_MatchingSubchannelUnchanged() new BalancerAddress("localhost", 83) }); + await BalancerWaitHelpers.WaitForSubchannelsToBeReadyAsync( + serviceProvider.GetRequiredService().CreateLogger(GetType()), + channel.ConnectionManager, + expectedCount: 3).DefaultTimeout(); + subchannels = channel.ConnectionManager.GetSubchannels(); + var newSubchannel = subchannels[2]; Assert.AreEqual(3, subchannels.Count); Assert.AreEqual(1, subchannels[0]._addresses.Count); @@ -379,6 +399,22 @@ public async Task HasSubchannels_ResolverRefresh_MatchingSubchannelUnchanged() // Test that the channel's address was updated with new attribute with new attributes. Assert.AreSame(preservedSubchannel2.CurrentAddress, address2); + + lock (subChannelConnections) + { + try + { + Assert.AreEqual(4, subChannelConnections.Count); + Assert.Contains(discardedSubchannel, subChannelConnections); + Assert.Contains(preservedSubchannel1, subChannelConnections); + Assert.Contains(preservedSubchannel2, subChannelConnections); + Assert.Contains(newSubchannel, subChannelConnections); + } + catch (Exception ex) + { + throw new Exception("Connected subchannels: " + Environment.NewLine + string.Join(Environment.NewLine, subChannelConnections), ex); + } + } } } #endif