Skip to content

Commit

Permalink
fix: Close Jetty h2 streams with RST_STREAM and no error code (deepha…
Browse files Browse the repository at this point in the history
…ven#6424)

This is a Jetty-specific workaround to avoid irritating the Python gRPC
client into failing calls that had already half-closed successfully.

Since we're now using ServletOutputStream.close() in place of
AsyncContext.complete(), we need to apply the same wrapping for trailers
to close() - that is, when the stream is clsoed, we can't rely on the
servlet container sending our trailers because grpc-web trailers are
actually a DATA frame which must be explicitly written.

See deephaven#6400
Fixes deephaven#5996
Reapplies deephaven#6401
  • Loading branch information
niloc132 authored Nov 26, 2024
1 parent b93333f commit 35f1030
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,23 @@
import jakarta.servlet.WriteListener;

import java.io.IOException;
import java.util.concurrent.atomic.AtomicReference;

/**
* Wraps the usual ServletOutputStream so as to allow downstream writers to use it according to the servlet spec, but
* still make it easy to write trailers as a payload instead of using HTTP trailers at the end of a stream.
*/
public class GrpcWebOutputStream extends ServletOutputStream implements WriteListener {
private final ServletOutputStream wrapped;
private final GrpcWebServletResponse grpcWebServletResponse;

// Access to these are guarded by synchronized
private Runnable waiting;
private WriteListener writeListener;

public GrpcWebOutputStream(ServletOutputStream wrapped) {
public GrpcWebOutputStream(ServletOutputStream wrapped, GrpcWebServletResponse grpcWebServletResponse) {
this.wrapped = wrapped;
this.grpcWebServletResponse = grpcWebServletResponse;
}

@Override
Expand Down Expand Up @@ -97,7 +100,21 @@ public void flush() throws IOException {

@Override
public void close() throws IOException {
wrapped.close();
// Since we're a grpc-web response, we must write trailers on our way out as part of close - but trailers
// for grpc-web are a data frame, not HTTP trailers. Call up to the response to write the trailer frame,
// then close the underlying stream.
AtomicReference<IOException> exception = new AtomicReference<>();
grpcWebServletResponse.writeTrailers(() -> {
try {
wrapped.close();
} catch (IOException e) {
exception.set(e);
}
});
IOException ex = exception.get();
if (ex != null) {
throw ex;
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ public AsyncContext startAsync() throws IllegalStateException {
public AsyncContext startAsync(ServletRequest servletRequest, ServletResponse servletResponse)
throws IllegalStateException {
AsyncContext delegate = super.startAsync(servletRequest, servletResponse);
// Note that this anonymous class has no purpose while our workaround for
// https://github.com/deephaven/deephaven-core/issues/6400 is in place.
return new DelegatingAsyncContext(delegate) {
private void safelyComplete() {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public Supplier<Map<String, String>> getTrailerFields() {
public synchronized GrpcWebOutputStream getOutputStream() throws IOException {
if (outputStream == null) {
// Provide our own output stream instance, so we can control/monitor the write listener
outputStream = new GrpcWebOutputStream(super.getOutputStream());
outputStream = new GrpcWebOutputStream(super.getOutputStream(), this);
}
return outputStream;
}
Expand Down

0 comments on commit 35f1030

Please sign in to comment.