Skip to content

Commit

Permalink
Complete health checks watch service on server shutting down (#2582)
Browse files Browse the repository at this point in the history
  • Loading branch information
JamesNK authored Dec 12, 2024
1 parent b7af033 commit c9d2671
Show file tree
Hide file tree
Showing 6 changed files with 359 additions and 6 deletions.
18 changes: 18 additions & 0 deletions src/Grpc.AspNetCore.HealthChecks/GrpcHealthChecksOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#endregion

using Microsoft.Extensions.Diagnostics.HealthChecks;
using Microsoft.Extensions.Hosting;

namespace Grpc.AspNetCore.HealthChecks;

Expand All @@ -39,4 +40,21 @@ public sealed class GrpcHealthChecksOptions
/// published by <see cref="IHealthCheckPublisher"/> are returned.
/// </remarks>
public bool UseHealthChecksCache { get; set; }

/// <summary>
/// Gets or sets a value indicating whether to suppress completing <c>Watch</c> health check calls when the application begins shutting down.
/// The default value is <c>false</c>.
/// </summary>
/// <remarks>
/// <para>
/// When <c>false</c>, health checks <c>Watch</c> calls are completed with a status of NotServing when the server application begins shutting down.
/// Shutdown is indicated by the <see cref="IHostApplicationLifetime.ApplicationStopping"/> token being raised and causes <c>Watch</c> to complete.
/// When <c>true</c>, health checks <c>Watch</c> calls are left running. Running calls will be eventually be forcefully aborted when the server finishes shutting down.
/// </para>
/// <para>
/// Completing the <c>Watch</c> call allows the server to gracefully exit. If <c>Watch</c> calls aren't shutdown then the server runs until
/// <see cref="HostOptions.ShutdownTimeout"/> is exceeded and the server forcefully aborts remaining active requests.
/// </para>
/// </remarks>
public bool SuppressCompletionOnShutdown { get; set; }
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

#endregion

using System.Linq;
using Grpc.Health.V1;
using Grpc.HealthCheck;
using Microsoft.Extensions.Diagnostics.HealthChecks;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
using Grpc.HealthCheck;
using Microsoft.AspNetCore.Diagnostics.HealthChecks;
using Microsoft.Extensions.Diagnostics.HealthChecks;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Options;

namespace Grpc.AspNetCore.HealthChecks.Internal;
Expand All @@ -32,17 +33,20 @@ internal sealed class HealthServiceIntegration : Grpc.Health.V1.Health.HealthBas
private readonly GrpcHealthChecksOptions _grpcHealthCheckOptions;
private readonly HealthServiceImpl _healthServiceImpl;
private readonly HealthCheckService _healthCheckService;
private readonly IHostApplicationLifetime _applicationLifetime;

public HealthServiceIntegration(
HealthServiceImpl healthServiceImpl,
IOptions<HealthCheckOptions> healthCheckOptions,
IOptions<GrpcHealthChecksOptions> grpcHealthCheckOptions,
HealthCheckService healthCheckService)
HealthCheckService healthCheckService,
IHostApplicationLifetime applicationLifetime)
{
_healthCheckOptions = healthCheckOptions.Value;
_grpcHealthCheckOptions = grpcHealthCheckOptions.Value;
_healthServiceImpl = healthServiceImpl;
_healthCheckService = healthCheckService;
_applicationLifetime = applicationLifetime;
}

public override Task<HealthCheckResponse> Check(HealthCheckRequest request, ServerCallContext context)
Expand All @@ -57,15 +61,84 @@ public override Task<HealthCheckResponse> Check(HealthCheckRequest request, Serv
}
}

public override Task Watch(HealthCheckRequest request, IServerStreamWriter<HealthCheckResponse> responseStream, ServerCallContext context)
public override async Task Watch(HealthCheckRequest request, IServerStreamWriter<HealthCheckResponse> responseStream, ServerCallContext context)
{
ServerCallContext resolvedContext;
IServerStreamWriter<HealthCheckResponse> resolvedResponseStream;

if (!_grpcHealthCheckOptions.SuppressCompletionOnShutdown)
{
// Create a linked token source to cancel the request if the application is stopping.
// This is required because the server won't shut down gracefully if the request is still open.
// The context needs to be wrapped because HealthServiceImpl is in an assembly that can't reference IHostApplicationLifetime.
var cts = CancellationTokenSource.CreateLinkedTokenSource(context.CancellationToken, _applicationLifetime.ApplicationStopping);
resolvedContext = new WrappedServerCallContext(context, cts);
}
else
{
resolvedContext = context;
}

if (!_grpcHealthCheckOptions.UseHealthChecksCache)
{
// Stream writer replaces first health checks results from the cache with newly calculated health check results.
responseStream = new WatchServerStreamWriter(this, request, responseStream, context.CancellationToken);
resolvedResponseStream = new WatchServerStreamWriter(this, request, responseStream, context.CancellationToken);
}
else
{
resolvedResponseStream = responseStream;
}

await _healthServiceImpl.Watch(request, resolvedResponseStream, resolvedContext);

// If the request is not canceled and the application is stopping then return NotServing before finishing.
if (!context.CancellationToken.IsCancellationRequested && _applicationLifetime.ApplicationStopping.IsCancellationRequested)
{
await responseStream.WriteAsync(new HealthCheckResponse { Status = HealthCheckResponse.Types.ServingStatus.NotServing });
}
}

return _healthServiceImpl.Watch(request, responseStream, context);
private sealed class WrappedServerCallContext : ServerCallContext
{
private readonly ServerCallContext _serverCallContext;
private readonly CancellationTokenSource _cancellationTokenSource;

public WrappedServerCallContext(ServerCallContext serverCallContext, CancellationTokenSource cancellationTokenSource)
{
_serverCallContext = serverCallContext;
_cancellationTokenSource = cancellationTokenSource;
}

protected override string MethodCore => _serverCallContext.Method;
protected override string HostCore => _serverCallContext.Host;
protected override string PeerCore => _serverCallContext.Peer;
protected override DateTime DeadlineCore => _serverCallContext.Deadline;
protected override Metadata RequestHeadersCore => _serverCallContext.RequestHeaders;
protected override CancellationToken CancellationTokenCore => _cancellationTokenSource.Token;
protected override Metadata ResponseTrailersCore => _serverCallContext.ResponseTrailers;
protected override Status StatusCore
{
get => _serverCallContext.Status;
set => _serverCallContext.Status = value;
}
protected override WriteOptions? WriteOptionsCore
{
get => _serverCallContext.WriteOptions;
set => _serverCallContext.WriteOptions = value;
}
protected override AuthContext AuthContextCore => _serverCallContext.AuthContext;

protected override IDictionary<object, object> UserStateCore => _serverCallContext.UserState;

protected override ContextPropagationToken CreatePropagationTokenCore(ContextPropagationOptions? options)
{
return _serverCallContext.CreatePropagationToken(options);
}

protected override Task WriteResponseHeadersAsyncCore(Metadata responseHeaders)
{
return _serverCallContext.WriteResponseHeadersAsync(responseHeaders);
}
}

private async Task<HealthCheckResponse> GetHealthCheckResponseAsync(string service, bool throwOnNotFound, CancellationToken cancellationToken)
Expand Down
7 changes: 7 additions & 0 deletions src/Grpc.HealthCheck/HealthServiceImpl.cs
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,13 @@ public override Task<HealthCheckResponse> Check(HealthCheckRequest request, Serv
/// <returns>A task indicating completion of the handler.</returns>
public override async Task Watch(HealthCheckRequest request, IServerStreamWriter<HealthCheckResponse> responseStream, ServerCallContext context)
{
// The call has already been canceled. Writing to the response will fail so immediately exit.
// In the real world this situation is unlikely to happen as the server would have prevented a canceled call from making it this far.
if (context.CancellationToken.IsCancellationRequested)
{
return;
}

string service = request.Service;

// Channel is used to to marshall multiple callers updating status into a single queue.
Expand Down
Loading

0 comments on commit c9d2671

Please sign in to comment.