Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make use of more performant System.Threading.Lock where possible #2501

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Directory.Build.props
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<NoWarn>$(NoWarn);NETSDK1195;NETSDK1210</NoWarn>

<EmbedUntrackedSources>true</EmbedUntrackedSources>
<LangVersion>12.0</LangVersion>
<LangVersion>preview</LangVersion>
<Nullable>enable</Nullable>
<ImplicitUsings>enable</ImplicitUsings>

Expand Down
2 changes: 1 addition & 1 deletion perf/benchmarkapps/QpsWorker/Infrastructure/Histogram.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ namespace QpsWorker.Infrastructure;
/// </summary>
public class Histogram
{
private readonly object _myLock = new object();
private readonly Lock _myLock = new Lock();
private readonly double _multiplier;
private readonly double _oneOnLogMultiplier;
private readonly double _maxPossible;
Expand Down
2 changes: 1 addition & 1 deletion perf/benchmarkapps/QpsWorker/Infrastructure/TimeStats.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ namespace QpsWorker.Infrastructure;
/// </summary>
public class TimeStats
{
private readonly object _myLock = new object();
private readonly Lock _myLock = new Lock();
private DateTime _lastWallClock;
private TimeSpan _lastUserTime;
private TimeSpan _lastPrivilegedTime;
Expand Down
11 changes: 8 additions & 3 deletions src/Grpc.AspNetCore.Server/Internal/HttpContextStreamWriter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ internal class HttpContextStreamWriter<TResponse> : IServerStreamWriter<TRespons
private readonly Action<TResponse, SerializationContext> _serializer;
private readonly PipeWriter _bodyWriter;
private readonly IHttpRequestLifetimeFeature _requestLifetimeFeature;
private readonly object _writeLock;
private readonly Lock _writeLock;
private Task? _writeTask;
private bool _completed;
private long _writeCount;
Expand All @@ -42,7 +42,7 @@ public HttpContextStreamWriter(HttpContextServerCallContext context, Action<TRes
{
_context = context;
_serializer = serializer;
_writeLock = new object();
_writeLock = new Lock();

// Copy HttpContext values.
// This is done to avoid a race condition when reading them from HttpContext later when running in a separate thread.
Expand Down Expand Up @@ -92,7 +92,8 @@ private async Task WriteCoreAsync(TResponse message, CancellationToken cancellat
throw new InvalidOperationException("Can't write the message because the request is complete.");
}

lock (_writeLock)
_writeLock.Enter();
try
{
// Pending writes need to be awaited first
if (IsWriteInProgressUnsynchronized)
Expand All @@ -103,6 +104,10 @@ private async Task WriteCoreAsync(TResponse message, CancellationToken cancellat
// Save write task to track whether it is complete. Must be set inside lock.
_writeTask = _bodyWriter.WriteStreamedMessageAsync(message, _context, _serializer, cancellationToken);
}
finally
{
_writeLock.Exit();
}

await _writeTask;
Interlocked.Increment(ref _writeCount);
Expand Down
7 changes: 6 additions & 1 deletion src/Grpc.HealthCheck/Grpc.HealthCheck.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

<IsGrpcPublishedPackage>true</IsGrpcPublishedPackage>
<GenerateDocumentationFile>true</GenerateDocumentationFile>
<TargetFrameworks>net462;netstandard2.0;net6.0;net7.0;net8.0</TargetFrameworks>
<TargetFrameworks>net462;netstandard2.0;net6.0;net7.0;net8.0;net9.0</TargetFrameworks>
<PackageReadmeFile>README.md</PackageReadmeFile>
</PropertyGroup>

Expand All @@ -26,11 +26,16 @@

<ItemGroup Condition=" '$(TargetFramework)' == 'net462' or '$(TargetFramework)' == 'netstandard2.0'">
<PackageReference Include="System.Threading.Channels" />
<Compile Include="..\Shared\Lock.cs" Link="Internal\Lock.cs" />
</ItemGroup>

<ItemGroup Condition=" '$(TargetFramework)' == 'net462' ">
<Reference Include="System" />
<Reference Include="Microsoft.CSharp" />
</ItemGroup>

<ItemGroup Condition=" '$(TargetFramework)' == 'net6.0' or '$(TargetFramework)' == 'net7.0' or '$(TargetFramework)' == 'net8.0'">
<Compile Include="..\Shared\Lock.cs" Link="Internal\Lock.cs" />
</ItemGroup>

</Project>
11 changes: 8 additions & 3 deletions src/Grpc.HealthCheck/HealthServiceImpl.cs
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,11 @@ public class HealthServiceImpl : Grpc.Health.V1.Health.HealthBase
// The maximum number of statuses to buffer on the server.
internal const int MaxStatusBufferSize = 5;

private readonly object statusLock = new object();
private readonly Lock statusLock = new Lock();
private readonly Dictionary<string, HealthCheckResponse.Types.ServingStatus> statusMap =
new Dictionary<string, HealthCheckResponse.Types.ServingStatus>();

private readonly object watchersLock = new object();
private readonly Lock watchersLock = new Lock();
private readonly Dictionary<string, List<ChannelWriter<HealthCheckResponse>>> watchers =
new Dictionary<string, List<ChannelWriter<HealthCheckResponse>>>();

Expand Down Expand Up @@ -155,7 +155,8 @@ public override async Task Watch(HealthCheckRequest request, IServerStreamWriter
FullMode = BoundedChannelFullMode.DropOldest
});

lock (watchersLock)
watchersLock.Enter();
try
{
if (!watchers.TryGetValue(service, out List<ChannelWriter<HealthCheckResponse>>? channelWriters))
{
Expand All @@ -165,6 +166,10 @@ public override async Task Watch(HealthCheckRequest request, IServerStreamWriter

channelWriters.Add(channel.Writer);
}
finally
{
watchersLock.Exit();
}

// Watch calls run until ended by the client canceling them.
context.CancellationToken.Register(() => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ namespace Grpc.Net.Client.Balancer.Internal;

internal class BalancerHttpHandler : DelegatingHandler
{
private static readonly object SetupLock = new object();
private static readonly Lock SetupLock = new Lock();

internal const string WaitForReadyKey = "WaitForReady";
internal const string SubchannelKey = "Subchannel";
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#region Copyright notice and license
#region Copyright notice and license

// Copyright 2019 The gRPC Authors
//
Expand Down Expand Up @@ -53,7 +53,7 @@ internal sealed class ChildHandlerLoadBalancer : LoadBalancer
private readonly IChannelControlHelper _controller;
private readonly ServiceConfig? _initialServiceConfig;
private readonly ConnectionManager _connectionManager;
private readonly object _lock = new object();
private readonly Lock _lock = new Lock();

internal (LoadBalancer LoadBalancer, string Name)? _current;
internal (LoadBalancer LoadBalancer, string Name)? _pending;
Expand Down
11 changes: 8 additions & 3 deletions src/Grpc.Net.Client/Balancer/Internal/ConnectionManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ internal sealed class ConnectionManager : IDisposable, IChannelControlHelper
public static readonly BalancerAttributesKey<string> HostOverrideKey = new BalancerAttributesKey<string>("HostOverride");
private static readonly ChannelIdProvider _channelIdProvider = new ChannelIdProvider();

private readonly object _lock;
private readonly Lock _lock;
internal readonly Resolver _resolver;
private readonly ISubchannelTransportFactory _subchannelTransportFactory;
private readonly List<Subchannel> _subchannels;
Expand All @@ -56,7 +56,7 @@ internal ConnectionManager(
ISubchannelTransportFactory subchannelTransportFactory,
LoadBalancerFactory[] loadBalancerFactories)
{
_lock = new object();
_lock = new Lock();
_nextPickerTcs = new TaskCompletionSource<SubchannelPicker>(TaskCreationOptions.RunContinuationsAsynchronously);
_resolverStartedTcs = new TaskCompletionSource<object?>(TaskCreationOptions.RunContinuationsAsynchronously);
_channelId = _channelIdProvider.GetNextChannelId();
Expand Down Expand Up @@ -221,7 +221,8 @@ public async Task ConnectAsync(bool waitForReady, CancellationToken cancellation
else
{
Task waitForReadyTask;
lock (_lock)
_lock.Enter();
try
{
var state = State;
if (state == ConnectivityState.Ready)
Expand All @@ -232,6 +233,10 @@ public async Task ConnectAsync(bool waitForReady, CancellationToken cancellation
waitForReadyTask = WaitForStateChangedAsync(state, waitForState: ConnectivityState.Ready, cancellationToken);
_balancer?.RequestConnection();
}
finally
{
_lock.Exit();
}

await waitForReadyTask.ConfigureAwait(false);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public SocketConnectivitySubchannelTransport(
_socketConnectedTimer = NonCapturingTimer.Create(OnCheckSocketConnection, state: null, Timeout.InfiniteTimeSpan, Timeout.InfiniteTimeSpan);
}

private object Lock => _subchannel.Lock;
private Lock Lock => _subchannel.Lock;
public DnsEndPoint? CurrentEndPoint => _currentEndPoint;
public TimeSpan? ConnectTimeout { get; }
public TransportStatus TransportStatus
Expand Down Expand Up @@ -132,7 +132,7 @@ public void Disconnect()

private void DisconnectUnsynchronized()
{
Debug.Assert(Monitor.IsEntered(Lock));
Debug.Assert(Lock.IsHeldByCurrentThread);
Debug.Assert(!_disposed);

_initialSocket?.Dispose();
Expand Down Expand Up @@ -170,7 +170,8 @@ public async ValueTask<ConnectResult> TryConnectAsync(ConnectContext context, in
await _socketConnect(socket, currentEndPoint, context.CancellationToken).ConfigureAwait(false);
SocketConnectivitySubchannelTransportLog.ConnectedSocket(_logger, _subchannel.Id, currentEndPoint);

lock (Lock)
Lock.Enter();
try
{
_currentEndPoint = currentEndPoint;
_lastEndPointIndex = currentIndex;
Expand All @@ -184,6 +185,10 @@ public async ValueTask<ConnectResult> TryConnectAsync(ConnectContext context, in
// Instead, the socket timer target method reschedules the next run after it has finished.
_socketConnectedTimer.Change(_socketPingInterval, Timeout.InfiniteTimeSpan);
}
finally
{
Lock.Exit();
}

_subchannel.UpdateConnectivityState(ConnectivityState.Ready, "Successfully connected to socket.");
return ConnectResult.Success;
Expand Down Expand Up @@ -223,13 +228,18 @@ public async ValueTask<ConnectResult> TryConnectAsync(ConnectContext context, in
_subchannel.UpdateConnectivityState(
ConnectivityState.TransientFailure,
new Status(StatusCode.Unavailable, "Error connecting to subchannel.", firstConnectionError));
lock (Lock)
Lock.Enter();
try
{
if (!_disposed)
{
_socketConnectedTimer.Change(Timeout.InfiniteTimeSpan, Timeout.InfiniteTimeSpan);
}
}
finally
{
Lock.Exit();
}
return result;
}

Expand Down Expand Up @@ -319,7 +329,8 @@ public async ValueTask<Stream> GetStreamAsync(DnsEndPoint endPoint, Cancellation
DnsEndPoint? socketEndPoint = null;
List<ReadOnlyMemory<byte>>? socketData = null;
DateTime? socketCreatedTime = null;
lock (Lock)
Lock.Enter();
try
{
if (_initialSocket != null)
{
Expand All @@ -345,6 +356,10 @@ public async ValueTask<Stream> GetStreamAsync(DnsEndPoint endPoint, Cancellation
_socketConnectedTimer.Change(Timeout.InfiniteTimeSpan, Timeout.InfiniteTimeSpan);
}
}
finally
{
Lock.Exit();
}

if (socket != null)
{
Expand Down Expand Up @@ -384,11 +399,16 @@ public async ValueTask<Stream> GetStreamAsync(DnsEndPoint endPoint, Cancellation
// This stream wrapper intercepts dispose.
var stream = new StreamWrapper(networkStream, OnStreamDisposed, socketData);

lock (Lock)
Lock.Enter();
try
{
_activeStreams.Add(new ActiveStream(endPoint, socket, stream));
SocketConnectivitySubchannelTransportLog.StreamCreated(_logger, _subchannel.Id, endPoint, CalculateInitialSocketDataLength(socketData), _activeStreams.Count);
}
finally
{
Lock.Exit();
}

return stream;
}
Expand Down
2 changes: 1 addition & 1 deletion src/Grpc.Net.Client/Balancer/PollingResolver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public abstract class PollingResolver : Resolver
private bool _disposed;
private bool _resolveSuccessful;

private readonly object _lock = new object();
private readonly Lock _lock = new Lock();
private readonly CancellationTokenSource _cts = new CancellationTokenSource();
private readonly ILogger _logger;
private readonly IBackoffPolicyFactory? _backoffPolicyFactory;
Expand Down
29 changes: 22 additions & 7 deletions src/Grpc.Net.Client/Balancer/Subchannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ namespace Grpc.Net.Client.Balancer;
public sealed class Subchannel : IDisposable
{
internal readonly List<BalancerAddress> _addresses;
internal readonly object Lock;
internal readonly Lock Lock;
internal ISubchannelTransport Transport => _transport;
internal string Id { get; }

Expand Down Expand Up @@ -96,7 +96,7 @@ public BalancerAddress? CurrentAddress

internal Subchannel(ConnectionManager manager, IReadOnlyList<BalancerAddress> addresses)
{
Lock = new object();
Lock = new Lock();
_logger = manager.LoggerFactory.CreateLogger(GetType());
_connectSemaphore = new SemaphoreSlim(1);

Expand Down Expand Up @@ -283,7 +283,7 @@ public void RequestConnection()

private void CancelInProgressConnectUnsynchronized()
{
Debug.Assert(Monitor.IsEntered(Lock));
Debug.Assert(Lock.IsHeldByCurrentThread);

if (_connectContext != null && !_connectContext.Disposed)
{
Expand All @@ -299,7 +299,7 @@ private void CancelInProgressConnectUnsynchronized()

private ConnectContext GetConnectContextUnsynchronized()
{
Debug.Assert(Monitor.IsEntered(Lock));
Debug.Assert(Lock.IsHeldByCurrentThread);

// There shouldn't be a previous connect in progress, but cancel the CTS to ensure they're no longer running.
CancelInProgressConnectUnsynchronized();
Expand All @@ -312,7 +312,8 @@ private async Task ConnectTransportAsync()
{
ConnectContext connectContext;
Task? waitSemaporeTask = null;
lock (Lock)
Lock.Enter();
try
{
// Don't start connecting if the subchannel has been shutdown. Transport/semaphore will be disposed if shutdown.
if (_state == ConnectivityState.Shutdown)
Expand All @@ -333,6 +334,10 @@ private async Task ConnectTransportAsync()
waitSemaporeTask = _connectSemaphore.WaitAsync(connectContext.CancellationToken);
}
}
finally
{
Lock.Exit();
}

if (waitSemaporeTask != null)
{
Expand All @@ -355,13 +360,18 @@ private async Task ConnectTransportAsync()

for (var attempt = 0; ; attempt++)
{
lock (Lock)
Lock.Enter();
try
{
if (_state == ConnectivityState.Shutdown)
{
return;
}
}
finally
{
Lock.Exit();
}

switch (await _transport.TryConnectAsync(connectContext, attempt).ConfigureAwait(false))
{
Expand Down Expand Up @@ -425,7 +435,8 @@ private async Task ConnectTransportAsync()
}
finally
{
lock (Lock)
Lock.Enter();
try
{
// Dispose context because it might have been created with a connect timeout.
// Want to clean up the connect timeout timer.
Expand All @@ -438,6 +449,10 @@ private async Task ConnectTransportAsync()
_connectSemaphore.Release();
}
}
finally
{
Lock.Exit();
}
}
}

Expand Down
Loading