Skip to content

Commit

Permalink
Backport 10000286390ac9b0288cee25a4f3551d09475fdc
Browse files Browse the repository at this point in the history
  • Loading branch information
MBaesken committed Aug 6, 2024
1 parent 3acdebe commit f55652e
Show file tree
Hide file tree
Showing 7 changed files with 618 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.io.IOException;
import java.lang.System.Logger.Level;
import java.net.InetSocketAddress;
import java.net.ProtocolException;
import java.net.ProxySelector;
import java.net.URI;
import java.net.URISyntaxException;
Expand Down Expand Up @@ -410,10 +411,62 @@ private CompletableFuture<Response> sendRequestBody(ExchangeImpl<T> ex) {
CompletableFuture<Response> cf = ex.sendBodyAsync()
.thenCompose(exIm -> exIm.getResponseAsync(parentExecutor));
cf = wrapForUpgrade(cf);
// after 101 is handled we check for other 1xx responses
cf = cf.thenCompose(this::ignore1xxResponse);
cf = wrapForLog(cf);
return cf;
}

/**
* Checks whether the passed Response has a status code between 102 and 199 (both inclusive).
* If so, then that {@code Response} is considered intermediate informational response and is
* ignored by the client. This method then creates a new {@link CompletableFuture} which
* completes when a subsequent response is sent by the server. Such newly constructed
* {@link CompletableFuture} will not complete till a "final" response (one which doesn't have
* a response code between 102 and 199 inclusive) is sent by the server. The returned
* {@link CompletableFuture} is thus capable of handling multiple subsequent intermediate
* informational responses from the server.
* <p>
* If the passed Response doesn't have a status code between 102 and 199 (both inclusive) then
* this method immediately returns back a completed {@link CompletableFuture} with the passed
* {@code Response}.
* </p>
*
* @param rsp The response
* @return A {@code CompletableFuture} with the final response from the server
*/
private CompletableFuture<Response> ignore1xxResponse(final Response rsp) {
final int statusCode = rsp.statusCode();
// we ignore any response code which is 1xx.
// For 100 (with the request configured to expect-continue) and 101, we handle it
// specifically as defined in the RFC-9110, outside of this method.
// As noted in RFC-9110, section 15.2.1, if response code is 100 and if the request wasn't
// configured with expectContinue, then we ignore the 100 response and wait for the final
// response (just like any other 1xx response).
// Any other response code between 102 and 199 (both inclusive) aren't specified in the
// "HTTP semantics" RFC-9110. The spec states that these 1xx response codes are informational
// and interim and the client can choose to ignore them and continue to wait for the
// final response (headers)
if ((statusCode >= 102 && statusCode <= 199)
|| (statusCode == 100 && !request.expectContinue)) {
Log.logTrace("Ignoring (1xx informational) response code {0}", rsp.statusCode());
if (debug.on()) {
debug.log("Ignoring (1xx informational) response code "
+ rsp.statusCode());
}
assert exchImpl != null : "Illegal state - current exchange isn't set";
// ignore this Response and wait again for the subsequent response headers
final CompletableFuture<Response> cf = exchImpl.getResponseAsync(parentExecutor);
// we recompose the CF again into the ignore1xxResponse check/function because
// the 1xx response is allowed to be sent multiple times for a request, before
// a final response arrives
return cf.thenCompose(this::ignore1xxResponse);
} else {
// return the already completed future
return MinimalFuture.completedFuture(rsp);
}
}

CompletableFuture<Response> responseAsyncImpl0(HttpConnection connection) {
Function<ExchangeImpl<T>, CompletableFuture<Response>> after407Check;
bodyIgnored = null;
Expand Down Expand Up @@ -444,7 +497,30 @@ private CompletableFuture<Response> wrapForUpgrade(CompletableFuture<Response> c
if (upgrading) {
return cf.thenCompose(r -> checkForUpgradeAsync(r, exchImpl));
}
return cf;
// websocket requests use "Connection: Upgrade" and "Upgrade: websocket" headers.
// however, the "upgrading" flag we maintain in this class only tracks a h2 upgrade
// that we internally triggered. So it will be false in the case of websocket upgrade, hence
// this additional check. If it's a websocket request we allow 101 responses and we don't
// require any additional checks when a response arrives.
if (request.isWebSocket()) {
return cf;
}
// not expecting an upgrade, but if the server sends a 101 response then we fail the
// request and also let the ExchangeImpl deal with it as a protocol error
return cf.thenCompose(r -> {
if (r.statusCode == 101) {
final ProtocolException protoEx = new ProtocolException("Unexpected 101 " +
"response, when not upgrading");
assert exchImpl != null : "Illegal state - current exchange isn't set";
try {
exchImpl.onProtocolError(protoEx);
} catch (Throwable ignore){
// ignored
}
return MinimalFuture.failedFuture(protoEx);
}
return MinimalFuture.completedFuture(r);
});
}

private CompletableFuture<Response> wrapForLog(CompletableFuture<Response> cf) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,16 @@ abstract CompletableFuture<T> readBodyAsync(HttpResponse.BodyHandler<T> handler,
*/
abstract void cancel(IOException cause);

/**
* Invoked whenever there is a (HTTP) protocol error when dealing with the response
* from the server. The implementations of {@code ExchangeImpl} are then expected to
* take necessary action that is expected by the corresponding specifications whenever
* a protocol error happens. For example, in HTTP/1.1, such protocol error would result
* in the connection being closed.
* @param cause The cause of the protocol violation
*/
abstract void onProtocolError(IOException cause);

/**
* Called when the exchange is released, so that cleanup actions may be
* performed - such as deregistering callbacks.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -426,6 +426,15 @@ void cancel(IOException cause) {
cancelImpl(cause);
}

@Override
void onProtocolError(final IOException cause) {
if (debug.on()) {
debug.log("cancelling exchange due to protocol error: %s", cause.getMessage());
}
Log.logError("cancelling exchange due to protocol error: {0}\n", cause);
cancelImpl(cause);
}

private void cancelImpl(Throwable cause) {
LinkedList<CompletableFuture<?>> toComplete = null;
int count = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ private Http2Connection(HttpConnection connection,
sendConnectionPreface();
if (!opened) {
debug.log("ensure reset frame is sent to cancel initial stream");
initialStream.sendCancelStreamFrame();
initialStream.sendResetStreamFrame(ResetFrame.CANCEL);
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.net.Authenticator;
import java.net.ConnectException;
import java.net.CookieHandler;
import java.net.ProtocolException;
import java.net.ProxySelector;
import java.net.http.HttpConnectTimeoutException;
import java.net.http.HttpTimeoutException;
Expand Down Expand Up @@ -561,6 +562,8 @@ private void debugCompleted(String tag, long startNanos, HttpRequest req) {
ConnectException ce = new ConnectException(msg);
ce.initCause(throwable);
throw ce;
} else if (throwable instanceof ProtocolException) {
throw new ProtocolException(msg);
} else if (throwable instanceof IOException) {
throw new IOException(msg, throwable);
} else {
Expand Down
26 changes: 20 additions & 6 deletions src/java.net.http/share/classes/jdk/internal/net/http/Stream.java
Original file line number Diff line number Diff line change
Expand Up @@ -1213,6 +1213,16 @@ void cancel(IOException cause) {
cancelImpl(cause);
}

@Override
void onProtocolError(final IOException cause) {
if (debug.on()) {
debug.log("cancelling exchange on stream %d due to protocol error: %s", streamid, cause.getMessage());
}
Log.logError("cancelling exchange on stream {0} due to protocol error: {1}\n", streamid, cause);
// send a RESET frame and close the stream
cancelImpl(cause, ResetFrame.PROTOCOL_ERROR);
}

void connectionClosing(Throwable cause) {
Flow.Subscriber<?> subscriber =
responseSubscriber == null ? pendingResponseSubscriber : responseSubscriber;
Expand All @@ -1224,6 +1234,10 @@ void connectionClosing(Throwable cause) {

// This method sends a RST_STREAM frame
void cancelImpl(Throwable e) {
cancelImpl(e, ResetFrame.CANCEL);
}

private void cancelImpl(final Throwable e, final int resetFrameErrCode) {
errorRef.compareAndSet(null, e);
if (debug.on()) {
if (streamid == 0) debug.log("cancelling stream: %s", (Object)e);
Expand Down Expand Up @@ -1255,25 +1269,25 @@ void cancelImpl(Throwable e) {
try {
// will send a RST_STREAM frame
if (streamid != 0 && streamState == 0) {
e = Utils.getCompletionCause(e);
if (e instanceof EOFException) {
final Throwable cause = Utils.getCompletionCause(e);
if (cause instanceof EOFException) {
// read EOF: no need to try & send reset
connection.decrementStreamsCount(streamid);
connection.closeStream(streamid);
} else {
// no use to send CANCEL if already closed.
sendCancelStreamFrame();
sendResetStreamFrame(resetFrameErrCode);
}
}
} catch (Throwable ex) {
Log.logError(ex);
}
}

void sendCancelStreamFrame() {
void sendResetStreamFrame(final int resetFrameErrCode) {
// do not reset a stream until it has a streamid.
if (streamid > 0 && markStream(ResetFrame.CANCEL) == 0) {
connection.resetStream(streamid, ResetFrame.CANCEL);
if (streamid > 0 && markStream(resetFrameErrCode) == 0) {
connection.resetStream(streamid, resetFrameErrCode);
}
close();
}
Expand Down
Loading

0 comments on commit f55652e

Please sign in to comment.