diff --git a/Directory.Build.props b/Directory.Build.props index d60c28751..8a926277d 100644 --- a/Directory.Build.props +++ b/Directory.Build.props @@ -21,7 +21,7 @@ $(NoWarn);NETSDK1195;NETSDK1210 true - 12.0 + preview enable enable diff --git a/perf/benchmarkapps/QpsWorker/Infrastructure/Histogram.cs b/perf/benchmarkapps/QpsWorker/Infrastructure/Histogram.cs index b0574893c..2eb689586 100644 --- a/perf/benchmarkapps/QpsWorker/Infrastructure/Histogram.cs +++ b/perf/benchmarkapps/QpsWorker/Infrastructure/Histogram.cs @@ -28,7 +28,7 @@ namespace QpsWorker.Infrastructure; /// public class Histogram { - private readonly object _myLock = new object(); + private readonly Lock _myLock = new Lock(); private readonly double _multiplier; private readonly double _oneOnLogMultiplier; private readonly double _maxPossible; diff --git a/perf/benchmarkapps/QpsWorker/Infrastructure/TimeStats.cs b/perf/benchmarkapps/QpsWorker/Infrastructure/TimeStats.cs index e3bc0454f..d38a65811 100644 --- a/perf/benchmarkapps/QpsWorker/Infrastructure/TimeStats.cs +++ b/perf/benchmarkapps/QpsWorker/Infrastructure/TimeStats.cs @@ -27,7 +27,7 @@ namespace QpsWorker.Infrastructure; /// public class TimeStats { - private readonly object _myLock = new object(); + private readonly Lock _myLock = new Lock(); private DateTime _lastWallClock; private TimeSpan _lastUserTime; private TimeSpan _lastPrivilegedTime; diff --git a/src/Grpc.AspNetCore.Server/Internal/HttpContextStreamWriter.cs b/src/Grpc.AspNetCore.Server/Internal/HttpContextStreamWriter.cs index b6a67f9fd..7fd6ab904 100644 --- a/src/Grpc.AspNetCore.Server/Internal/HttpContextStreamWriter.cs +++ b/src/Grpc.AspNetCore.Server/Internal/HttpContextStreamWriter.cs @@ -33,7 +33,7 @@ internal class HttpContextStreamWriter : IServerStreamWriter _serializer; private readonly PipeWriter _bodyWriter; private readonly IHttpRequestLifetimeFeature _requestLifetimeFeature; - private readonly object _writeLock; + private readonly Lock _writeLock; private Task? _writeTask; private bool _completed; private long _writeCount; @@ -42,7 +42,7 @@ public HttpContextStreamWriter(HttpContextServerCallContext context, Actiontrue true - net462;netstandard2.0;net6.0;net7.0;net8.0 + net462;netstandard2.0;net6.0;net7.0;net8.0;net9.0 README.md @@ -26,6 +26,7 @@ + @@ -33,4 +34,8 @@ + + + + diff --git a/src/Grpc.HealthCheck/HealthServiceImpl.cs b/src/Grpc.HealthCheck/HealthServiceImpl.cs index 80003ee68..e02095088 100644 --- a/src/Grpc.HealthCheck/HealthServiceImpl.cs +++ b/src/Grpc.HealthCheck/HealthServiceImpl.cs @@ -37,11 +37,11 @@ public class HealthServiceImpl : Grpc.Health.V1.Health.HealthBase // The maximum number of statuses to buffer on the server. internal const int MaxStatusBufferSize = 5; - private readonly object statusLock = new object(); + private readonly Lock statusLock = new Lock(); private readonly Dictionary statusMap = new Dictionary(); - private readonly object watchersLock = new object(); + private readonly Lock watchersLock = new Lock(); private readonly Dictionary>> watchers = new Dictionary>>(); @@ -155,7 +155,8 @@ public override async Task Watch(HealthCheckRequest request, IServerStreamWriter FullMode = BoundedChannelFullMode.DropOldest }); - lock (watchersLock) + watchersLock.Enter(); + try { if (!watchers.TryGetValue(service, out List>? channelWriters)) { @@ -165,6 +166,10 @@ public override async Task Watch(HealthCheckRequest request, IServerStreamWriter channelWriters.Add(channel.Writer); } + finally + { + watchersLock.Exit(); + } // Watch calls run until ended by the client canceling them. context.CancellationToken.Register(() => { diff --git a/src/Grpc.Net.Client/Balancer/Internal/BalancerHttpHandler.cs b/src/Grpc.Net.Client/Balancer/Internal/BalancerHttpHandler.cs index 33bb1d8b5..06f7627e1 100644 --- a/src/Grpc.Net.Client/Balancer/Internal/BalancerHttpHandler.cs +++ b/src/Grpc.Net.Client/Balancer/Internal/BalancerHttpHandler.cs @@ -31,7 +31,7 @@ namespace Grpc.Net.Client.Balancer.Internal; internal class BalancerHttpHandler : DelegatingHandler { - private static readonly object SetupLock = new object(); + private static readonly Lock SetupLock = new Lock(); internal const string WaitForReadyKey = "WaitForReady"; internal const string SubchannelKey = "Subchannel"; diff --git a/src/Grpc.Net.Client/Balancer/Internal/ChildHandlerLoadBalancer.cs b/src/Grpc.Net.Client/Balancer/Internal/ChildHandlerLoadBalancer.cs index bace88872..f11a40573 100644 --- a/src/Grpc.Net.Client/Balancer/Internal/ChildHandlerLoadBalancer.cs +++ b/src/Grpc.Net.Client/Balancer/Internal/ChildHandlerLoadBalancer.cs @@ -1,4 +1,4 @@ -#region Copyright notice and license +#region Copyright notice and license // Copyright 2019 The gRPC Authors // @@ -53,7 +53,7 @@ internal sealed class ChildHandlerLoadBalancer : LoadBalancer private readonly IChannelControlHelper _controller; private readonly ServiceConfig? _initialServiceConfig; private readonly ConnectionManager _connectionManager; - private readonly object _lock = new object(); + private readonly Lock _lock = new Lock(); internal (LoadBalancer LoadBalancer, string Name)? _current; internal (LoadBalancer LoadBalancer, string Name)? _pending; diff --git a/src/Grpc.Net.Client/Balancer/Internal/ConnectionManager.cs b/src/Grpc.Net.Client/Balancer/Internal/ConnectionManager.cs index b64a4557c..51bb0f66f 100644 --- a/src/Grpc.Net.Client/Balancer/Internal/ConnectionManager.cs +++ b/src/Grpc.Net.Client/Balancer/Internal/ConnectionManager.cs @@ -30,7 +30,7 @@ internal sealed class ConnectionManager : IDisposable, IChannelControlHelper public static readonly BalancerAttributesKey HostOverrideKey = new BalancerAttributesKey("HostOverride"); private static readonly ChannelIdProvider _channelIdProvider = new ChannelIdProvider(); - private readonly object _lock; + private readonly Lock _lock; internal readonly Resolver _resolver; private readonly ISubchannelTransportFactory _subchannelTransportFactory; private readonly List _subchannels; @@ -56,7 +56,7 @@ internal ConnectionManager( ISubchannelTransportFactory subchannelTransportFactory, LoadBalancerFactory[] loadBalancerFactories) { - _lock = new object(); + _lock = new Lock(); _nextPickerTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); _resolverStartedTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); _channelId = _channelIdProvider.GetNextChannelId(); @@ -221,7 +221,8 @@ public async Task ConnectAsync(bool waitForReady, CancellationToken cancellation else { Task waitForReadyTask; - lock (_lock) + _lock.Enter(); + try { var state = State; if (state == ConnectivityState.Ready) @@ -232,6 +233,10 @@ public async Task ConnectAsync(bool waitForReady, CancellationToken cancellation waitForReadyTask = WaitForStateChangedAsync(state, waitForState: ConnectivityState.Ready, cancellationToken); _balancer?.RequestConnection(); } + finally + { + _lock.Exit(); + } await waitForReadyTask.ConfigureAwait(false); } diff --git a/src/Grpc.Net.Client/Balancer/Internal/SocketConnectivitySubchannelTransport.cs b/src/Grpc.Net.Client/Balancer/Internal/SocketConnectivitySubchannelTransport.cs index b54666d88..bb071c0ab 100644 --- a/src/Grpc.Net.Client/Balancer/Internal/SocketConnectivitySubchannelTransport.cs +++ b/src/Grpc.Net.Client/Balancer/Internal/SocketConnectivitySubchannelTransport.cs @@ -79,7 +79,7 @@ public SocketConnectivitySubchannelTransport( _socketConnectedTimer = NonCapturingTimer.Create(OnCheckSocketConnection, state: null, Timeout.InfiniteTimeSpan, Timeout.InfiniteTimeSpan); } - private object Lock => _subchannel.Lock; + private Lock Lock => _subchannel.Lock; public DnsEndPoint? CurrentEndPoint => _currentEndPoint; public TimeSpan? ConnectTimeout { get; } public TransportStatus TransportStatus @@ -132,7 +132,7 @@ public void Disconnect() private void DisconnectUnsynchronized() { - Debug.Assert(Monitor.IsEntered(Lock)); + Debug.Assert(Lock.IsHeldByCurrentThread); Debug.Assert(!_disposed); _initialSocket?.Dispose(); @@ -170,7 +170,8 @@ public async ValueTask TryConnectAsync(ConnectContext context, in await _socketConnect(socket, currentEndPoint, context.CancellationToken).ConfigureAwait(false); SocketConnectivitySubchannelTransportLog.ConnectedSocket(_logger, _subchannel.Id, currentEndPoint); - lock (Lock) + Lock.Enter(); + try { _currentEndPoint = currentEndPoint; _lastEndPointIndex = currentIndex; @@ -184,6 +185,10 @@ public async ValueTask TryConnectAsync(ConnectContext context, in // Instead, the socket timer target method reschedules the next run after it has finished. _socketConnectedTimer.Change(_socketPingInterval, Timeout.InfiniteTimeSpan); } + finally + { + Lock.Exit(); + } _subchannel.UpdateConnectivityState(ConnectivityState.Ready, "Successfully connected to socket."); return ConnectResult.Success; @@ -223,13 +228,18 @@ public async ValueTask TryConnectAsync(ConnectContext context, in _subchannel.UpdateConnectivityState( ConnectivityState.TransientFailure, new Status(StatusCode.Unavailable, "Error connecting to subchannel.", firstConnectionError)); - lock (Lock) + Lock.Enter(); + try { if (!_disposed) { _socketConnectedTimer.Change(Timeout.InfiniteTimeSpan, Timeout.InfiniteTimeSpan); } } + finally + { + Lock.Exit(); + } return result; } @@ -319,7 +329,8 @@ public async ValueTask GetStreamAsync(DnsEndPoint endPoint, Cancellation DnsEndPoint? socketEndPoint = null; List>? socketData = null; DateTime? socketCreatedTime = null; - lock (Lock) + Lock.Enter(); + try { if (_initialSocket != null) { @@ -345,6 +356,10 @@ public async ValueTask GetStreamAsync(DnsEndPoint endPoint, Cancellation _socketConnectedTimer.Change(Timeout.InfiniteTimeSpan, Timeout.InfiniteTimeSpan); } } + finally + { + Lock.Exit(); + } if (socket != null) { @@ -384,11 +399,16 @@ public async ValueTask GetStreamAsync(DnsEndPoint endPoint, Cancellation // This stream wrapper intercepts dispose. var stream = new StreamWrapper(networkStream, OnStreamDisposed, socketData); - lock (Lock) + Lock.Enter(); + try { _activeStreams.Add(new ActiveStream(endPoint, socket, stream)); SocketConnectivitySubchannelTransportLog.StreamCreated(_logger, _subchannel.Id, endPoint, CalculateInitialSocketDataLength(socketData), _activeStreams.Count); } + finally + { + Lock.Exit(); + } return stream; } diff --git a/src/Grpc.Net.Client/Balancer/PollingResolver.cs b/src/Grpc.Net.Client/Balancer/PollingResolver.cs index 0973f9339..9bf16c1f5 100644 --- a/src/Grpc.Net.Client/Balancer/PollingResolver.cs +++ b/src/Grpc.Net.Client/Balancer/PollingResolver.cs @@ -44,7 +44,7 @@ public abstract class PollingResolver : Resolver private bool _disposed; private bool _resolveSuccessful; - private readonly object _lock = new object(); + private readonly Lock _lock = new Lock(); private readonly CancellationTokenSource _cts = new CancellationTokenSource(); private readonly ILogger _logger; private readonly IBackoffPolicyFactory? _backoffPolicyFactory; diff --git a/src/Grpc.Net.Client/Balancer/Subchannel.cs b/src/Grpc.Net.Client/Balancer/Subchannel.cs index dcfdcd8a5..3d2bd3e2d 100644 --- a/src/Grpc.Net.Client/Balancer/Subchannel.cs +++ b/src/Grpc.Net.Client/Balancer/Subchannel.cs @@ -40,7 +40,7 @@ namespace Grpc.Net.Client.Balancer; public sealed class Subchannel : IDisposable { internal readonly List _addresses; - internal readonly object Lock; + internal readonly Lock Lock; internal ISubchannelTransport Transport => _transport; internal string Id { get; } @@ -96,7 +96,7 @@ public BalancerAddress? CurrentAddress internal Subchannel(ConnectionManager manager, IReadOnlyList addresses) { - Lock = new object(); + Lock = new Lock(); _logger = manager.LoggerFactory.CreateLogger(GetType()); _connectSemaphore = new SemaphoreSlim(1); @@ -283,7 +283,7 @@ public void RequestConnection() private void CancelInProgressConnectUnsynchronized() { - Debug.Assert(Monitor.IsEntered(Lock)); + Debug.Assert(Lock.IsHeldByCurrentThread); if (_connectContext != null && !_connectContext.Disposed) { @@ -299,7 +299,7 @@ private void CancelInProgressConnectUnsynchronized() private ConnectContext GetConnectContextUnsynchronized() { - Debug.Assert(Monitor.IsEntered(Lock)); + Debug.Assert(Lock.IsHeldByCurrentThread); // There shouldn't be a previous connect in progress, but cancel the CTS to ensure they're no longer running. CancelInProgressConnectUnsynchronized(); @@ -312,7 +312,8 @@ private async Task ConnectTransportAsync() { ConnectContext connectContext; Task? waitSemaporeTask = null; - lock (Lock) + Lock.Enter(); + try { // Don't start connecting if the subchannel has been shutdown. Transport/semaphore will be disposed if shutdown. if (_state == ConnectivityState.Shutdown) @@ -333,6 +334,10 @@ private async Task ConnectTransportAsync() waitSemaporeTask = _connectSemaphore.WaitAsync(connectContext.CancellationToken); } } + finally + { + Lock.Exit(); + } if (waitSemaporeTask != null) { @@ -355,13 +360,18 @@ private async Task ConnectTransportAsync() for (var attempt = 0; ; attempt++) { - lock (Lock) + Lock.Enter(); + try { if (_state == ConnectivityState.Shutdown) { return; } } + finally + { + Lock.Exit(); + } switch (await _transport.TryConnectAsync(connectContext, attempt).ConfigureAwait(false)) { @@ -425,7 +435,8 @@ private async Task ConnectTransportAsync() } finally { - lock (Lock) + Lock.Enter(); + try { // Dispose context because it might have been created with a connect timeout. // Want to clean up the connect timeout timer. @@ -438,6 +449,10 @@ private async Task ConnectTransportAsync() _connectSemaphore.Release(); } } + finally + { + Lock.Exit(); + } } } diff --git a/src/Grpc.Net.Client/Grpc.Net.Client.csproj b/src/Grpc.Net.Client/Grpc.Net.Client.csproj index a810fe41d..0aceef90d 100644 --- a/src/Grpc.Net.Client/Grpc.Net.Client.csproj +++ b/src/Grpc.Net.Client/Grpc.Net.Client.csproj @@ -6,7 +6,7 @@ true true - net462;netstandard2.0;netstandard2.1;net6.0;net7.0;net8.0 + net462;netstandard2.0;netstandard2.1;net6.0;net7.0;net8.0;net9.0 README.md diff --git a/src/Grpc.Net.Client/GrpcChannel.cs b/src/Grpc.Net.Client/GrpcChannel.cs index e84459040..dcda9acfc 100644 --- a/src/Grpc.Net.Client/GrpcChannel.cs +++ b/src/Grpc.Net.Client/GrpcChannel.cs @@ -50,7 +50,7 @@ public sealed class GrpcChannel : ChannelBase, IDisposable internal const long DefaultMaxRetryBufferSize = 1024 * 1024 * 16; // 16 MB internal const long DefaultMaxRetryBufferPerCallSize = 1024 * 1024; // 1 MB - private readonly object _lock; + private readonly Lock _lock; private readonly ConcurrentDictionary _methodInfoCache; private readonly Func _createMethodInfoFunc; private readonly Dictionary? _serviceConfigMethods; @@ -104,7 +104,7 @@ public sealed class GrpcChannel : ChannelBase, IDisposable internal GrpcChannel(Uri address, GrpcChannelOptions channelOptions) : base(address.Authority) { - _lock = new object(); + _lock = new Lock(); _methodInfoCache = new ConcurrentDictionary(); // Dispose the HTTP client/handler if... diff --git a/src/Grpc.Net.Client/Internal/ClientStreamWriterBase.cs b/src/Grpc.Net.Client/Internal/ClientStreamWriterBase.cs index a70049eac..de8218efd 100644 --- a/src/Grpc.Net.Client/Internal/ClientStreamWriterBase.cs +++ b/src/Grpc.Net.Client/Internal/ClientStreamWriterBase.cs @@ -26,13 +26,13 @@ internal abstract class ClientStreamWriterBase : IClientStreamWriter : IAsyncStream private readonly GrpcCall _call; private readonly ILogger _logger; - private readonly object _moveNextLock; + private readonly Lock _moveNextLock; public TaskCompletionSource<(HttpResponseMessage, Status?)> HttpResponseTcs { get; } @@ -49,7 +49,7 @@ public HttpContentClientStreamReader(GrpcCall call) { _call = call; _logger = call.Channel.LoggerFactory.CreateLogger(LoggerName); - _moveNextLock = new object(); + _moveNextLock = new Lock(); HttpResponseTcs = new TaskCompletionSource<(HttpResponseMessage, Status?)>(TaskCreationOptions.RunContinuationsAsynchronously); } diff --git a/src/Grpc.Net.Client/Internal/Retry/ChannelRetryThrottling.cs b/src/Grpc.Net.Client/Internal/Retry/ChannelRetryThrottling.cs index 2cbe4d3fd..d7a347b4d 100644 --- a/src/Grpc.Net.Client/Internal/Retry/ChannelRetryThrottling.cs +++ b/src/Grpc.Net.Client/Internal/Retry/ChannelRetryThrottling.cs @@ -23,7 +23,7 @@ namespace Grpc.Net.Client.Internal.Retry; internal class ChannelRetryThrottling { - private readonly object _lock = new object(); + private readonly Lock _lock = new Lock(); private readonly double _tokenRatio; private readonly int _maxTokens; private readonly ILogger _logger; @@ -72,7 +72,7 @@ public void CallFailure() private void UpdateRetryThrottlingActive() { - Debug.Assert(Monitor.IsEntered(_lock)); + Debug.Assert(_lock.IsHeldByCurrentThread); var newRetryThrottlingActive = _tokenCount <= _tokenThreshold; diff --git a/src/Grpc.Net.Client/Internal/Retry/HedgingCall.cs b/src/Grpc.Net.Client/Internal/Retry/HedgingCall.cs index 4eb134548..add43ef3d 100644 --- a/src/Grpc.Net.Client/Internal/Retry/HedgingCall.cs +++ b/src/Grpc.Net.Client/Internal/Retry/HedgingCall.cs @@ -64,7 +64,8 @@ private async Task StartCall(Action> startCallFunc try { - lock (Lock) + Lock.Enter(); + try { if (CommitedCallTask.IsCompletedSuccessfully()) { @@ -89,6 +90,10 @@ private async Task StartCall(Action> startCallFunc return; } } + finally + { + Lock.Exit(); + } Status? responseStatus; @@ -138,7 +143,8 @@ private async Task StartCall(Action> startCallFunc return; } - lock (Lock) + Lock.Enter(); + try { var status = responseStatus.Value; if (IsDeadlineExceeded()) @@ -193,6 +199,10 @@ private async Task StartCall(Action> startCallFunc } } } + finally + { + Lock.Exit(); + } } catch (Exception ex) { @@ -221,7 +231,7 @@ private async Task StartCall(Action> startCallFunc protected override void OnCommitCall(IGrpcCall call) { - Debug.Assert(Monitor.IsEntered(Lock)); + Debug.Assert(Lock.IsHeldByCurrentThread); _activeCalls.Remove(call); @@ -230,7 +240,7 @@ protected override void OnCommitCall(IGrpcCall call) private void CleanUpUnsynchronized() { - Debug.Assert(Monitor.IsEntered(Lock)); + Debug.Assert(Lock.IsHeldByCurrentThread); while (_activeCalls.Count > 0) { @@ -293,7 +303,8 @@ private async Task CreateHedgingCalls(Action> star } else { - lock (Lock) + Lock.Enter(); + try { if (IsRetryThrottlingActive()) { @@ -314,6 +325,10 @@ private async Task CreateHedgingCalls(Action> star break; } } + finally + { + Lock.Exit(); + } } } } @@ -350,7 +365,8 @@ private async Task HedgingDelayAsync(TimeSpan hedgingDelay) _hedgingDelayCts.Cancel(); } - lock (Lock) + Lock.Enter(); + try { // If we reaching this point then the delay was interrupted. // Need to recreate the delay TCS/CTS for the next cycle. @@ -371,6 +387,10 @@ private async Task HedgingDelayAsync(TimeSpan hedgingDelay) return; } } + finally + { + Lock.Exit(); + } } } @@ -465,13 +485,18 @@ await DoClientStreamActionAsync(async calls => } catch { - lock (Lock) + Lock.Enter(); + try { if (CommitedCallTask.IsCompletedSuccessfully()) { throw; } } + finally + { + Lock.Exit(); + } // Flag indicates whether buffered message was successfully written. var success = await _writeClientMessageTcs.Task.ConfigureAwait(false); @@ -490,10 +515,15 @@ await DoClientStreamActionAsync(async calls => _writeClientMessageTcs = null; } - lock (Lock) + Lock.Enter(); + try { BufferedCurrentMessage = false; } + finally + { + Lock.Exit(); + } } private Task DoClientStreamActionAsync(Func>, Task> action) diff --git a/src/Grpc.Net.Client/Internal/Retry/RetryCall.cs b/src/Grpc.Net.Client/Internal/Retry/RetryCall.cs index f007a917b..3c479399b 100644 --- a/src/Grpc.Net.Client/Internal/Retry/RetryCall.cs +++ b/src/Grpc.Net.Client/Internal/Retry/RetryCall.cs @@ -104,7 +104,8 @@ private async Task StartRetry(Action> startCallFun while (true) { GrpcCall currentCall; - lock (Lock) + Lock.Enter(); + try { // Start new call. OnStartingAttempt(); @@ -121,6 +122,10 @@ private async Task StartRetry(Action> startCallFun return; } } + finally + { + Lock.Exit(); + } Status? responseStatus; @@ -284,7 +289,7 @@ private bool IsSuccessfulStreamingCall(Status responseStatus) protected override void OnCommitCall(IGrpcCall call) { - Debug.Assert(Monitor.IsEntered(Lock)); + Debug.Assert(Lock.IsHeldByCurrentThread); _activeCall = null; } @@ -344,10 +349,9 @@ await DoClientStreamActionAsync(async call => registration?.Dispose(); } - lock (Lock) - { - BufferedCurrentMessage = false; - } + Lock.Enter(); + BufferedCurrentMessage = false; + Lock.Exit(); if (ClientStreamComplete) { @@ -390,7 +394,8 @@ private async Task DoClientStreamActionAsync(Func { Debug.Assert(NewActiveCallTcs != null); - lock (Lock) + Lock.Enter(); + try { // Return currently active call if there is one, and its not the previous call. if (_activeCall != null && previousCall != _activeCall) @@ -401,5 +406,9 @@ private async Task DoClientStreamActionAsync(Func // Wait to see whether new call will be made return GetActiveCallUnsynchronizedAsync(previousCall); } + finally + { + Lock.Exit(); + } } } diff --git a/src/Grpc.Net.Client/Internal/Retry/RetryCallBase.cs b/src/Grpc.Net.Client/Internal/Retry/RetryCallBase.cs index 53cf2da2d..07f166b12 100644 --- a/src/Grpc.Net.Client/Internal/Retry/RetryCallBase.cs +++ b/src/Grpc.Net.Client/Internal/Retry/RetryCallBase.cs @@ -42,7 +42,7 @@ internal abstract partial class RetryCallBase : IGrpcCall Method { get; } protected CallOptions Options { get; } @@ -214,7 +214,8 @@ private PushStreamContent CreatePushStreamContent(GrpcCall< return new PushStreamContent(clientStreamWriter, async requestStream => { Task writeTask; - lock (Lock) + Lock.Enter(); + try { Log.SendingBufferedMessages(Logger, BufferedMessages.Count); @@ -229,6 +230,10 @@ private PushStreamContent CreatePushStreamContent(GrpcCall< writeTask = WriteBufferedMessages(call, requestStream, bufferedMessageCopy); } } + finally + { + Lock.Exit(); + } await writeTask.ConfigureAwait(false); @@ -323,7 +328,8 @@ protected async Task WriteNewMessage(GrpcCall call, Stream // Serialize current message and add to the buffer. ReadOnlyMemory messageData; - lock (Lock) + Lock.Enter(); + try { if (!BufferedCurrentMessage) { @@ -361,6 +367,10 @@ protected async Task WriteNewMessage(GrpcCall call, Stream messageData = BufferedMessages[BufferedMessages.Count - 1]; } } + finally + { + Lock.Exit(); + } await call.WriteMessageAsync(writeStream, messageData, callOptions.CancellationToken).ConfigureAwait(false); MessagesWritten++; @@ -414,7 +424,7 @@ protected bool HasResponseStream() protected void SetNewActiveCallUnsynchronized(IGrpcCall call) { - Debug.Assert(Monitor.IsEntered(Lock), "Should be called with lock."); + Debug.Assert(Lock.IsHeldByCurrentThread, "Should be called with lock."); if (NewActiveCallTcs != null) { @@ -435,7 +445,7 @@ Task IGrpcCall.WriteClientStreamAsync(Functrue true - netstandard2.0;netstandard2.1;net6.0;net7.0;net8.0 + netstandard2.0;netstandard2.1;net6.0;net7.0;net8.0;net9.0 @@ -18,6 +18,11 @@ + + + + + diff --git a/src/Shared/Lock.cs b/src/Shared/Lock.cs new file mode 100644 index 000000000..9d6c26f98 --- /dev/null +++ b/src/Shared/Lock.cs @@ -0,0 +1,126 @@ +// Credit: Mark Cilia Vincenti, 2024 +// Taken from: https://github.com/MarkCiliaVincenti/Backport.System.Threading.Lock +// NuGet package: https://www.nuget.org/packages/Backport.System.Threading.Lock + +using System.Runtime.CompilerServices; +#if NET9_0_OR_GREATER +[assembly: TypeForwardedTo(typeof(System.Threading.Lock))] +#else +namespace System.Threading; +/// +/// A backport of .NET 9.0+'s System.Threading.Lock. +/// +internal sealed class Lock +{ +#pragma warning disable CS9216 // A value of type 'System.Threading.Lock' converted to a different type will use likely unintended monitor-based locking in 'lock' statement. + /// + /// + /// + /// +#if !PARTIAL_SUPPORT + [MethodImpl(MethodImplOptions.AggressiveInlining)] +#endif + public void Enter() => Monitor.Enter(this); + + /// + /// + /// + /// + /// + /// + /// +#if !PARTIAL_SUPPORT + [MethodImpl(MethodImplOptions.AggressiveInlining)] +#endif + public bool TryEnter() => Monitor.TryEnter(this); + + /// + /// + /// + /// + /// + /// + /// + /// +#if !PARTIAL_SUPPORT + [MethodImpl(MethodImplOptions.AggressiveInlining)] +#endif + public bool TryEnter(TimeSpan timeout) => Monitor.TryEnter(this, timeout); + + /// + /// + /// + /// + /// + /// + /// + /// +#if !PARTIAL_SUPPORT + [MethodImpl(MethodImplOptions.AggressiveInlining)] +#endif + public bool TryEnter(int millisecondsTimeout) => Monitor.TryEnter(this, millisecondsTimeout); + + /// + /// + /// + /// + /// +#if !PARTIAL_SUPPORT + [MethodImpl(MethodImplOptions.AggressiveInlining)] +#endif + public void Exit() => Monitor.Exit(this); + + /// + /// Determines whether the current thread holds this lock. + /// + /// + /// true if the current thread holds this lock; otherwise, false. + /// + /// +#if !PARTIAL_SUPPORT + public bool IsHeldByCurrentThread => Monitor.IsEntered(this); +#else + public bool IsHeldByCurrentThread => throw new NotSupportedException("IsHeldByCurrentThread is only supported on .NET Framework 4.5 or greater."); +#endif +#pragma warning restore CS9216 // A value of type 'System.Threading.Lock' converted to a different type will use likely unintended monitor-based locking in 'lock' statement. + + /// + /// Enters the lock and returns a that may be disposed to exit the lock. Once the method returns, + /// the calling thread would be the only thread that holds the lock. This method is intended to be used along with a + /// language construct that would automatically dispose the , such as with the C# using + /// statement. + /// + /// + /// A that may be disposed to exit the lock. + /// + /// + /// If the lock cannot be entered immediately, the calling thread waits for the lock to be exited. If the lock is + /// already held by the calling thread, the lock is entered again. The calling thread should exit the lock, such as by + /// disposing the returned , as many times as it had entered the lock to fully exit the lock and + /// allow other threads to enter the lock. + /// +#if !PARTIAL_SUPPORT + [MethodImpl(MethodImplOptions.AggressiveInlining)] +#endif + public Scope EnterScope() + { + Enter(); + return new Scope(this); + } + + /// + /// A disposable structure that is returned by , which when disposed, exits the lock. + /// + public ref struct Scope(Lock @lock) + { + /// + /// Exits the lock. + /// + /// +#if !PARTIAL_SUPPORT + [MethodImpl(MethodImplOptions.AggressiveInlining)] +#endif + public readonly void Dispose() => @lock.Exit(); + } +} +#endif diff --git a/test/FunctionalTests/Balancer/PickFirstBalancerTests.cs b/test/FunctionalTests/Balancer/PickFirstBalancerTests.cs index 23c542491..beacef1bc 100644 --- a/test/FunctionalTests/Balancer/PickFirstBalancerTests.cs +++ b/test/FunctionalTests/Balancer/PickFirstBalancerTests.cs @@ -355,7 +355,7 @@ public async Task UnaryCall_MultipleStreams_UnavailableAddress_FallbackToWorking return true; }); - object l = new object(); + Lock l = new Lock(); int callsOnServer = 0; int callsToServer = 150; @@ -364,7 +364,8 @@ public async Task UnaryCall_MultipleStreams_UnavailableAddress_FallbackToWorking string? host = null; async Task UnaryMethod(HelloRequest request, ServerCallContext context) { - lock (l) + l.Enter(); + try { callsOnServer++; if (callsOnServer == callsToServer) @@ -372,6 +373,10 @@ async Task UnaryMethod(HelloRequest request, ServerCallContext conte allOnServerTcs.SetResult(null); } } + finally + { + l.Exit(); + } await tcs.Task; host = context.Host; return new HelloReply { Message = request.Name }; diff --git a/test/FunctionalTests/Client/UnaryTests.cs b/test/FunctionalTests/Client/UnaryTests.cs index bc58012ff..29f58a647 100644 --- a/test/FunctionalTests/Client/UnaryTests.cs +++ b/test/FunctionalTests/Client/UnaryTests.cs @@ -152,11 +152,12 @@ private async Task RunConcurrentStreams(bool writeResponseHeaders) var count = 0; var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); var connectionIds = new List(); - var l = new object(); + var l = new Lock(); async Task UnaryThrowError(HelloRequest request, ServerCallContext context) { - lock (l) + l.Enter(); + try { count++; @@ -166,6 +167,10 @@ async Task UnaryThrowError(HelloRequest request, ServerCallContext c connectionIds.Add(connectionId); } } + finally + { + l.Exit(); + } Logger.LogInformation($"Received message '{request.Name}'"); diff --git a/test/FunctionalTests/Infrastructure/GrpcTestContext.cs b/test/FunctionalTests/Infrastructure/GrpcTestContext.cs index 737ef572b..6cbcadfee 100644 --- a/test/FunctionalTests/Infrastructure/GrpcTestContext.cs +++ b/test/FunctionalTests/Infrastructure/GrpcTestContext.cs @@ -1,4 +1,4 @@ -#region Copyright notice and license +#region Copyright notice and license // Copyright 2019 The gRPC Authors // @@ -27,7 +27,7 @@ public sealed class GrpcTestContext : IDisposable { private readonly ServiceProvider _serviceProvider; private readonly ConcurrentDictionary _serverLoggers; - private readonly object _lock = new object(); + private readonly Lock _lock = new Lock(); public ILoggerFactory LoggerFactory { get; } public ILogger Logger { get; } diff --git a/test/FunctionalTests/Infrastructure/TestEventListener.cs b/test/FunctionalTests/Infrastructure/TestEventListener.cs index 031e77f48..de2b326d3 100644 --- a/test/FunctionalTests/Infrastructure/TestEventListener.cs +++ b/test/FunctionalTests/Infrastructure/TestEventListener.cs @@ -1,4 +1,4 @@ -#region Copyright notice and license +#region Copyright notice and license // Copyright 2019 The gRPC Authors // @@ -27,7 +27,7 @@ namespace Grpc.AspNetCore.FunctionalTests.Infrastructure; /// public class TestEventListener : EventListener { - private readonly object _lock = new object(); + private readonly Lock _lock = new Lock(); private readonly List _subscriptions; private readonly ILogger _logger; private readonly int _eventId; diff --git a/test/FunctionalTests/Linker/Helpers/DotNetProcess.cs b/test/FunctionalTests/Linker/Helpers/DotNetProcess.cs index fc9c4a5e7..482058da5 100644 --- a/test/FunctionalTests/Linker/Helpers/DotNetProcess.cs +++ b/test/FunctionalTests/Linker/Helpers/DotNetProcess.cs @@ -25,7 +25,7 @@ public class DotNetProcess : IDisposable { private readonly TaskCompletionSource _exitedTcs; private readonly StringBuilder _output; - private readonly object _outputLock = new object(); + private readonly Lock _outputLock = new Lock(); protected Process Process { get; } diff --git a/test/Grpc.Net.Client.Tests/Balancer/ResolverTests.cs b/test/Grpc.Net.Client.Tests/Balancer/ResolverTests.cs index 03af800f3..b03bc075c 100644 --- a/test/Grpc.Net.Client.Tests/Balancer/ResolverTests.cs +++ b/test/Grpc.Net.Client.Tests/Balancer/ResolverTests.cs @@ -83,7 +83,7 @@ public async Task Refresh_BlockInsideResolveAsync_ResolverNotBlocked() private class LockingPollingResolver : PollingResolver { private ManualResetEvent? _waitHandle; - private readonly object _lock = new(); + private readonly Lock _lock = new(); public LockingPollingResolver(ILoggerFactory loggerFactory, ManualResetEvent waitHandle) : base(loggerFactory) { diff --git a/test/Grpc.Net.Client.Tests/Retry/HedgingCallTests.cs b/test/Grpc.Net.Client.Tests/Retry/HedgingCallTests.cs index 987aa524b..6755115d1 100644 --- a/test/Grpc.Net.Client.Tests/Retry/HedgingCallTests.cs +++ b/test/Grpc.Net.Client.Tests/Retry/HedgingCallTests.cs @@ -1,4 +1,4 @@ -#region Copyright notice and license +#region Copyright notice and license // Copyright 2019 The gRPC Authors // @@ -89,7 +89,7 @@ public async Task ActiveCalls_FatalStatusCode_CleansUpActiveCalls() // Arrange var allCallsOnServerSyncPoint = new SyncPoint(runContinuationsAsynchronously: true); var waitUntilFinishedTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); - var callLock = new object(); + var callLock = new Lock(); var callCount = 0; var httpClient = ClientTestHelpers.CreateTestClient(async request => @@ -98,7 +98,8 @@ public async Task ActiveCalls_FatalStatusCode_CleansUpActiveCalls() // All calls are in-progress at once. bool allCallsOnServer = false; - lock (callLock) + callLock.Enter(); + try { callCount++; if (callCount == 5) @@ -106,6 +107,10 @@ public async Task ActiveCalls_FatalStatusCode_CleansUpActiveCalls() allCallsOnServer = true; } } + finally + { + callLock.Exit(); + } if (allCallsOnServer) { await allCallsOnServerSyncPoint.WaitToContinue(); diff --git a/test/Grpc.Net.Client.Tests/Retry/HedgingTests.cs b/test/Grpc.Net.Client.Tests/Retry/HedgingTests.cs index 1369fa9c9..568771028 100644 --- a/test/Grpc.Net.Client.Tests/Retry/HedgingTests.cs +++ b/test/Grpc.Net.Client.Tests/Retry/HedgingTests.cs @@ -550,14 +550,15 @@ public async Task AsyncServerStreamingCall_SuccessAfterRetry_RequestContentSent( public async Task AsyncClientStreamingCall_SuccessAfterRetry_RequestContentSent(int hedgingDelayMS) { // Arrange - var callLock = new object(); + var callLock = new Lock(); var requestContent = new MemoryStream(); var callCount = 0; var httpClient = ClientTestHelpers.CreateTestClient(async request => { var firstCall = false; - lock (callLock) + callLock.Enter(); + try { callCount++; if (callCount == 1) @@ -565,6 +566,10 @@ public async Task AsyncClientStreamingCall_SuccessAfterRetry_RequestContentSent( firstCall = true; } } + finally + { + callLock.Exit(); + } if (firstCall) { await request.Content!.CopyToAsync(new MemoryStream()); diff --git a/test/Shared/HttpEventSourceListener.cs b/test/Shared/HttpEventSourceListener.cs index 56fdd5e2a..7fd997937 100644 --- a/test/Shared/HttpEventSourceListener.cs +++ b/test/Shared/HttpEventSourceListener.cs @@ -50,7 +50,7 @@ public abstract class EventSourceListenerBase : EventListener { private readonly StringBuilder _messageBuilder = new StringBuilder(); private readonly ILogger? _logger; - private readonly object _lock = new object(); + private readonly Lock _lock = new Lock(); private bool _disposed; public EventSourceListenerBase(ILoggerFactory loggerFactory) diff --git a/testassets/BenchmarkWorkerWebsite/TimeStats.cs b/testassets/BenchmarkWorkerWebsite/TimeStats.cs index 69ec211de..4fd21cf78 100644 --- a/testassets/BenchmarkWorkerWebsite/TimeStats.cs +++ b/testassets/BenchmarkWorkerWebsite/TimeStats.cs @@ -24,7 +24,7 @@ namespace BenchmarkWorkerWebsite; // Copied from https://github.com/grpc/grpc/blob/master/src/csharp/Grpc.IntegrationTesting/TimeStats.cs public class TimeStats { - readonly object myLock = new object(); + readonly Lock myLock = new Lock(); DateTime lastWallClock; TimeSpan lastUserTime; TimeSpan lastPrivilegedTime; diff --git a/testassets/InteropTestsGrpcWebClient/Pages/Index.razor.cs b/testassets/InteropTestsGrpcWebClient/Pages/Index.razor.cs index e82a80f0a..9c072a248 100644 --- a/testassets/InteropTestsGrpcWebClient/Pages/Index.razor.cs +++ b/testassets/InteropTestsGrpcWebClient/Pages/Index.razor.cs @@ -1,4 +1,4 @@ -#region Copyright notice and license +#region Copyright notice and license // Copyright 2019 The gRPC Authors // @@ -23,7 +23,7 @@ namespace InteropTestsGrpcWebClient.Pages; public partial class Index { - private readonly object _lock = new object(); + private readonly Lock _lock = new Lock(); private InteropTestInvoker? _interopTestInvoker; public List Messages { get; } = new List();