diff --git a/akka-http-bench-jmh/src/main/scala/akka/http/CorsBenchmark.scala b/akka-http-bench-jmh/src/main/scala/akka/http/CorsBenchmark.scala index d273ab0e90..5466cb45cd 100644 --- a/akka-http-bench-jmh/src/main/scala/akka/http/CorsBenchmark.scala +++ b/akka-http-bench-jmh/src/main/scala/akka/http/CorsBenchmark.scala @@ -6,7 +6,6 @@ package akka.http import akka.actor.ActorSystem -import akka.dispatch.ExecutionContexts import akka.http.scaladsl.model.HttpResponse import akka.http.scaladsl.model.headers.HttpOrigin import akka.http.scaladsl.model.headers.Origin @@ -123,5 +122,5 @@ class CorsBenchmark extends Directives { } private def responseBody(response: Future[HttpResponse]): String = - Await.result(response.flatMap(_.entity.toStrict(3.seconds)).map(_.data.utf8String)(ExecutionContexts.parasitic), 3.seconds) + Await.result(response.flatMap(_.entity.toStrict(3.seconds)).map(_.data.utf8String)(ExecutionContext.parasitic), 3.seconds) } diff --git a/akka-http-bench-jmh/src/main/scala/akka/http/impl/engine/ConnectionPoolBenchmark.scala b/akka-http-bench-jmh/src/main/scala/akka/http/impl/engine/ConnectionPoolBenchmark.scala index f5e64f7528..82cb17d7c7 100644 --- a/akka-http-bench-jmh/src/main/scala/akka/http/impl/engine/ConnectionPoolBenchmark.scala +++ b/akka-http-bench-jmh/src/main/scala/akka/http/impl/engine/ConnectionPoolBenchmark.scala @@ -8,7 +8,6 @@ import java.net.InetSocketAddress import java.util.concurrent.CountDownLatch import akka.actor.ActorSystem -import akka.dispatch.ExecutionContexts import akka.http.CommonBenchmark import akka.http.impl.util.enhanceString_ import akka.http.scaladsl.model.HttpRequest @@ -47,7 +46,7 @@ class ConnectionPoolBenchmark extends CommonBenchmark { .onComplete { case Success(_) => latch.countDown() case Failure(_) => throw new IllegalStateException - }(ExecutionContexts.parasitic) + }(ExecutionContext.parasitic) } latch.await() diff --git a/akka-http-bench-jmh/src/main/scala/akka/http/impl/engine/HttpEntityBenchmark.scala b/akka-http-bench-jmh/src/main/scala/akka/http/impl/engine/HttpEntityBenchmark.scala index 5b0b83a743..0fee6a33f6 100644 --- a/akka-http-bench-jmh/src/main/scala/akka/http/impl/engine/HttpEntityBenchmark.scala +++ b/akka-http-bench-jmh/src/main/scala/akka/http/impl/engine/HttpEntityBenchmark.scala @@ -6,8 +6,9 @@ package akka.http.impl.engine import java.util.concurrent.CountDownLatch +import scala.concurrent.ExecutionContext + import akka.actor.ActorSystem -import akka.dispatch.ExecutionContexts import akka.http.CommonBenchmark import akka.http.scaladsl.model.{ ContentTypes, HttpEntity } import akka.stream.scaladsl.Source @@ -28,7 +29,7 @@ class HttpEntityBenchmark extends CommonBenchmark { val latch = new CountDownLatch(1) entity.discardBytes() .future - .onComplete(_ => latch.countDown())(ExecutionContexts.parasitic) + .onComplete(_ => latch.countDown())(ExecutionContext.parasitic) latch.await() } diff --git a/akka-http-core/src/main/scala/akka/http/impl/engine/client/PoolMasterActor.scala b/akka-http-core/src/main/scala/akka/http/impl/engine/client/PoolMasterActor.scala index c80caa733a..afc37cea23 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/engine/client/PoolMasterActor.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/engine/client/PoolMasterActor.scala @@ -7,11 +7,11 @@ package akka.http.impl.engine.client import akka.Done import akka.actor.{ Actor, ActorLogging, ActorRef, DeadLetterSuppression, Deploy, ExtendedActorSystem, NoSerializationVerificationNeeded, Props } import akka.annotation.InternalApi -import akka.dispatch.ExecutionContexts import akka.http.impl.engine.client.PoolInterface.ShutdownReason import akka.http.scaladsl.model.{ HttpRequest, HttpResponse } import akka.stream.Materializer +import scala.concurrent.ExecutionContext import scala.concurrent.{ Future, Promise } import scala.util.Failure import scala.util.Success @@ -170,7 +170,7 @@ private[http] final class PoolMasterActor extends Actor with ActorLogging { // to this actor by the pool actor, they will be retried once the shutdown // has completed. val completed = pool.shutdown()(context.dispatcher) - shutdownCompletedPromise.tryCompleteWith(completed.map(_ => Done)(ExecutionContexts.parasitic)) + shutdownCompletedPromise.tryCompleteWith(completed.map(_ => Done)(ExecutionContext.parasitic)) statusById += poolId -> PoolInterfaceShuttingDown(shutdownCompletedPromise) case Some(PoolInterfaceShuttingDown(formerPromise)) => // Pool is already shutting down, mirror the existing promise. diff --git a/akka-http-core/src/main/scala/akka/http/impl/engine/client/pool/NewHostConnectionPool.scala b/akka-http-core/src/main/scala/akka/http/impl/engine/client/pool/NewHostConnectionPool.scala index 3f3000bf99..4c69d88491 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/engine/client/pool/NewHostConnectionPool.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/engine/client/pool/NewHostConnectionPool.scala @@ -10,7 +10,6 @@ import java.util import akka.NotUsed import akka.actor.Cancellable import akka.annotation.InternalApi -import akka.dispatch.ExecutionContexts import akka.event.LoggingAdapter import akka.http.impl.engine.client.PoolFlow.{ RequestContext, ResponseContext } import akka.http.impl.engine.client.pool.SlotState._ @@ -23,6 +22,7 @@ import akka.stream.scaladsl.{ Flow, Keep, Sink, Source } import akka.stream.stage.{ GraphStage, GraphStageLogic, InHandler, OutHandler } import akka.util.OptionVal +import scala.concurrent.ExecutionContext import scala.collection.JavaConverters._ import scala.concurrent.Future import scala.concurrent.duration._ @@ -450,7 +450,7 @@ private[client] object NewHostConnectionPool { entityComplete.onComplete(safely { case Success(_) => withSlot(_.onRequestEntityCompleted()) case Failure(cause) => withSlot(_.onRequestEntityFailed(cause)) - })(ExecutionContexts.parasitic) + })(ExecutionContext.parasitic) request.withEntity(newEntity) } @@ -504,9 +504,9 @@ private[client] object NewHostConnectionPool { ongoingResponseEntity = None ongoingResponseEntityKillSwitch = None } - }(ExecutionContexts.parasitic) + }(ExecutionContext.parasitic) case Failure(_) => throw new IllegalStateException("Should never fail") - })(ExecutionContexts.parasitic) + })(ExecutionContext.parasitic) withSlot(_.onResponseReceived(response.withEntity(newEntity))) } @@ -588,7 +588,7 @@ private[client] object NewHostConnectionPool { onConnectionAttemptFailed(currentEmbargoLevel) sl.onConnectionAttemptFailed(cause) } - })(ExecutionContexts.parasitic) + })(ExecutionContext.parasitic) slotCon } diff --git a/akka-http-core/src/main/scala/akka/http/impl/engine/http2/Http2.scala b/akka-http-core/src/main/scala/akka/http/impl/engine/http2/Http2.scala index 78b0a2d6f0..00a1082003 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/engine/http2/Http2.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/engine/http2/Http2.scala @@ -6,7 +6,6 @@ package akka.http.impl.engine.http2 import akka.actor.{ ActorSystem, ClassicActorSystemProvider, ExtendedActorSystem, Extension, ExtensionId, ExtensionIdProvider } import akka.annotation.InternalApi -import akka.dispatch.ExecutionContexts import akka.event.LoggingAdapter import akka.http.impl.engine.HttpConnectionIdleTimeoutBidi import akka.http.impl.engine.server.{ GracefulTerminatorStage, MasterServerTerminator, ServerTerminator, UpgradeToOtherProtocolResponseHeader } @@ -27,9 +26,9 @@ import akka.stream.scaladsl.{ Flow, Keep, Sink, Source, TLS, TLSPlacebo, Tcp } import akka.stream.{ IgnoreComplete, Materializer } import akka.util.ByteString import akka.Done - import javax.net.ssl.SSLEngine import scala.collection.immutable +import scala.concurrent.ExecutionContext import scala.concurrent.Future import scala.concurrent.duration.Duration import scala.util.control.NonFatal @@ -103,7 +102,7 @@ private[http] final class Http2Ext(implicit val system: ActorSystem) // See https://github.com/akka/akka/issues/17992 case NonFatal(ex) => Done - }(ExecutionContexts.parasitic) + }(ExecutionContext.parasitic) } catch { case NonFatal(e) => log.error(e, "Could not materialize handling flow for {}", incoming) @@ -121,7 +120,7 @@ private[http] final class Http2Ext(implicit val system: ActorSystem) try { handler(request).recover { case NonFatal(ex) => handleHandlerError(log, ex) - }(ExecutionContexts.parasitic) + }(ExecutionContext.parasitic) } catch { case NonFatal(ex) => Future.successful(handleHandlerError(log, ex)) } diff --git a/akka-http-core/src/main/scala/akka/http/impl/engine/http2/OutgoingConnectionBuilderImpl.scala b/akka-http-core/src/main/scala/akka/http/impl/engine/http2/OutgoingConnectionBuilderImpl.scala index 6427884107..ec0061d9cd 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/engine/http2/OutgoingConnectionBuilderImpl.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/engine/http2/OutgoingConnectionBuilderImpl.scala @@ -8,7 +8,6 @@ import java.util.concurrent.CompletionStage import akka.NotUsed import akka.actor.ClassicActorSystemProvider import akka.annotation.InternalApi -import akka.dispatch.ExecutionContexts import akka.event.LoggingAdapter import akka.http.impl.engine.http2.client.PersistentConnection import akka.http.scaladsl.Http.OutgoingConnection @@ -24,6 +23,7 @@ import akka.http.scaladsl.HttpsConnectionContext import akka.http.scaladsl.OutgoingConnectionBuilder import akka.stream.javadsl.{ Flow => JFlow } +import scala.concurrent.ExecutionContext import scala.concurrent.Future /** @@ -131,7 +131,7 @@ private[akka] object OutgoingConnectionBuilderImpl { private def javaFlow(flow: Flow[HttpRequest, HttpResponse, Future[OutgoingConnection]]): JFlow[javadsl.model.HttpRequest, javadsl.model.HttpResponse, CompletionStage[javadsl.OutgoingConnection]] = { import scala.jdk.FutureConverters._ - javaFlowKeepMatVal(flow.mapMaterializedValue(f => f.map(oc => new javadsl.OutgoingConnection(oc))(ExecutionContexts.parasitic).asJava)) + javaFlowKeepMatVal(flow.mapMaterializedValue(f => f.map(oc => new javadsl.OutgoingConnection(oc))(ExecutionContext.parasitic).asJava)) } private def javaFlowKeepMatVal[M](flow: Flow[HttpRequest, HttpResponse, M]): JFlow[javadsl.model.HttpRequest, javadsl.model.HttpResponse, M] = diff --git a/akka-http-core/src/main/scala/akka/http/impl/engine/http2/client/PersistentConnection.scala b/akka-http-core/src/main/scala/akka/http/impl/engine/http2/client/PersistentConnection.scala index 1a7af1e4b1..c4e606a4d1 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/engine/http2/client/PersistentConnection.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/engine/http2/client/PersistentConnection.scala @@ -6,7 +6,6 @@ package akka.http.impl.engine.http2.client import akka.NotUsed import akka.annotation.InternalApi -import akka.dispatch.ExecutionContexts import akka.http.scaladsl.Http.OutgoingConnection import akka.http.scaladsl.model.{ AttributeKey, HttpRequest, HttpResponse, RequestResponseAssociation, StatusCodes } import akka.http.scaladsl.settings.Http2ClientSettings @@ -17,9 +16,11 @@ import akka.stream.{ Attributes, FlowShape, Inlet, Outlet, StreamTcpException } import akka.util.PrettyDuration import java.util.concurrent.ThreadLocalRandom + import scala.concurrent.duration.Duration import scala.concurrent.duration.DurationLong import scala.concurrent.duration.FiniteDuration +import scala.concurrent.ExecutionContext import scala.concurrent.{ Future, Promise } import scala.util.{ Failure, Success } @@ -103,7 +104,7 @@ private[http2] object PersistentConnection { onConnected.invoke(()) case Failure(cause) => onFailed.invoke(cause) - })(ExecutionContexts.parasitic) + })(ExecutionContext.parasitic) var requestOutPulled = false requestOut.setHandler(new OutHandler { diff --git a/akka-http-core/src/main/scala/akka/http/impl/engine/server/HttpServerBluePrint.scala b/akka-http-core/src/main/scala/akka/http/impl/engine/server/HttpServerBluePrint.scala index f1fc2632a8..7fe395a6a8 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/engine/server/HttpServerBluePrint.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/engine/server/HttpServerBluePrint.scala @@ -12,7 +12,6 @@ import scala.util.control.{ NoStackTrace, NonFatal } import akka.NotUsed import akka.actor.Cancellable import akka.annotation.InternalApi -import akka.dispatch.ExecutionContexts import akka.japi.Function import akka.event.LoggingAdapter import akka.http.ParsingErrorHandler @@ -35,6 +34,7 @@ import akka.http.scaladsl.model._ import akka.http.impl.util.LogByteStringTools._ import scala.annotation.nowarn +import scala.concurrent.ExecutionContext import scala.util.Failure /** @@ -460,7 +460,7 @@ private[http] object HttpServerBluePrint { case Failure(ex) => log.error(ex, s"Response stream for [${requestStart.debugString}] failed with '${ex.getMessage}'. Aborting connection.") case _ => // ignore - }(ExecutionContexts.parasitic) + }(ExecutionContext.parasitic) newEntity } diff --git a/akka-http-core/src/main/scala/akka/http/impl/util/StreamUtils.scala b/akka-http-core/src/main/scala/akka/http/impl/util/StreamUtils.scala index 2e5149871c..bb632f953d 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/util/StreamUtils.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/util/StreamUtils.scala @@ -7,7 +7,6 @@ package akka.http.impl.util import akka.NotUsed import akka.actor.Cancellable import akka.annotation.InternalApi -import akka.dispatch.ExecutionContexts import akka.http.scaladsl.model.HttpEntity import akka.stream._ import akka.stream.impl.fusing.GraphInterpreter @@ -72,7 +71,7 @@ private[http] object StreamUtils { materializationPromise.trySuccess(()) killResult.future.value match { case Some(res) => handleKill(res) - case None => killResult.future.onComplete(killCallback.invoke)(ExecutionContexts.parasitic) + case None => killResult.future.onComplete(killCallback.invoke)(ExecutionContext.parasitic) } } diff --git a/akka-http-core/src/main/scala/akka/http/javadsl/ServerBinding.scala b/akka-http-core/src/main/scala/akka/http/javadsl/ServerBinding.scala index 9cfe0537a4..87645242b0 100644 --- a/akka-http-core/src/main/scala/akka/http/javadsl/ServerBinding.scala +++ b/akka-http-core/src/main/scala/akka/http/javadsl/ServerBinding.scala @@ -9,10 +9,10 @@ import java.util.concurrent.{ CompletionStage, TimeUnit } import akka.actor.ClassicActorSystemProvider import akka.annotation.DoNotInherit -import akka.dispatch.ExecutionContexts import akka.Done import scala.concurrent.duration.FiniteDuration +import scala.concurrent.ExecutionContext import scala.jdk.FutureConverters._ import scala.jdk.DurationConverters._ @@ -78,7 +78,7 @@ class ServerBinding private[http] (delegate: akka.http.scaladsl.Http.ServerBindi def terminate(hardDeadline: java.time.Duration): CompletionStage[HttpTerminated] = { delegate.terminate(FiniteDuration.apply(hardDeadline.toMillis, TimeUnit.MILLISECONDS)) - .map(_.asInstanceOf[HttpTerminated])(ExecutionContexts.parasitic) + .map(_.asInstanceOf[HttpTerminated])(ExecutionContext.parasitic) .asJava } @@ -92,7 +92,7 @@ class ServerBinding private[http] (delegate: akka.http.scaladsl.Http.ServerBindi */ def whenTerminationSignalIssued: CompletionStage[java.time.Duration] = delegate.whenTerminationSignalIssued - .map(deadline => deadline.time.toJava)(ExecutionContexts.parasitic) + .map(deadline => deadline.time.toJava)(ExecutionContext.parasitic) .asJava /** @@ -109,7 +109,7 @@ class ServerBinding private[http] (delegate: akka.http.scaladsl.Http.ServerBindi */ def whenTerminated: CompletionStage[HttpTerminated] = delegate.whenTerminated - .map(_.asInstanceOf[HttpTerminated])(ExecutionContexts.parasitic) + .map(_.asInstanceOf[HttpTerminated])(ExecutionContext.parasitic) .asJava /** diff --git a/akka-http-core/src/main/scala/akka/http/javadsl/ServerBuilder.scala b/akka-http-core/src/main/scala/akka/http/javadsl/ServerBuilder.scala index a35c57bf40..6bd9e4f156 100644 --- a/akka-http-core/src/main/scala/akka/http/javadsl/ServerBuilder.scala +++ b/akka-http-core/src/main/scala/akka/http/javadsl/ServerBuilder.scala @@ -7,7 +7,6 @@ package akka.http.javadsl import java.util.concurrent.CompletionStage import akka.actor.ClassicActorSystemProvider -import akka.dispatch.ExecutionContexts import akka.event.LoggingAdapter import akka.http.impl.util.JavaMapping.Implicits._ import akka.http.javadsl.model.{ HttpRequest, HttpResponse } @@ -185,6 +184,6 @@ object ServerBuilder { def connectionSource(): Source[IncomingConnection, CompletionStage[ServerBinding]] = http.bindImpl(interface, port, context.asScala, settings.asScala, log) .map(new IncomingConnection(_)) - .mapMaterializedValue(_.map(new ServerBinding(_))(ExecutionContexts.parasitic).asJava).asJava + .mapMaterializedValue(_.map(new ServerBinding(_))(ExecutionContext.parasitic).asJava).asJava } } diff --git a/akka-http-core/src/main/scala/akka/http/scaladsl/Http.scala b/akka-http-core/src/main/scala/akka/http/scaladsl/Http.scala index 7e95acea2b..738d18d3bb 100644 --- a/akka-http-core/src/main/scala/akka/http/scaladsl/Http.scala +++ b/akka-http-core/src/main/scala/akka/http/scaladsl/Http.scala @@ -9,7 +9,6 @@ import java.util.concurrent.CompletionStage import javax.net.ssl._ import akka.actor._ import akka.annotation.{ DoNotInherit, InternalApi, InternalStableApi } -import akka.dispatch.ExecutionContexts import akka.event.{ LogSource, Logging, LoggingAdapter } import akka.http.impl.engine.HttpConnectionIdleTimeoutBidi import akka.http.impl.engine.client._ @@ -120,7 +119,7 @@ class HttpExt @InternalStableApi /* constructor signature is hardcoded in Teleme .watchTermination() { (termWatchBefore, termWatchAfter) => // flag termination when the user handler has gotten (or has emitted) termination // signals in both directions - termWatchBefore.flatMap(_ => termWatchAfter)(ExecutionContexts.parasitic) + termWatchBefore.flatMap(_ => termWatchAfter)(ExecutionContext.parasitic) } .joinMat(baseFlow)(Keep.both) ) @@ -259,7 +258,7 @@ class HttpExt @InternalStableApi /* constructor signature is hardcoded in Teleme // from the TCP layer through the HTTP layer to the Http.IncomingConnection.flow. // See https://github.com/akka/akka/issues/17992 case NonFatal(ex) => Done - }(ExecutionContexts.parasitic) + }(ExecutionContext.parasitic) } catch { case NonFatal(e) => log.error(e, "Could not materialize handling flow for {}", incoming) @@ -799,7 +798,7 @@ class HttpExt @InternalStableApi /* constructor signature is hardcoded in Teleme // The user should keep control over how much parallelism is required. val parallelism = settings.pipeliningLimit * settings.maxConnections Flow[(HttpRequest, T)].mapAsyncUnordered(parallelism) { - case (request, userContext) => poolInterface(request).transform(response => Success(response -> userContext))(ExecutionContexts.parasitic) + case (request, userContext) => poolInterface(request).transform(response => Success(response -> userContext))(ExecutionContext.parasitic) } } @@ -909,7 +908,7 @@ object Http extends ExtensionId[HttpExt] with ExtensionIdProvider { * Note: rather than unbinding explicitly you can also use [[addToCoordinatedShutdown]] to add this task to Akka's coordinated shutdown. */ def unbind(): Future[Done] = - unbindAction().map(_ => Done)(ExecutionContexts.parasitic) + unbindAction().map(_ => Done)(ExecutionContext.parasitic) /** * Triggers "graceful" termination request being handled on this connection. @@ -958,7 +957,7 @@ object Http extends ExtensionId[HttpExt] with ExtensionIdProvider { require(hardDeadline > Duration.Zero, "deadline must be greater than 0, was: " + hardDeadline) _whenTerminationSignalIssued.trySuccess(hardDeadline.fromNow) - val terminated = unbindAction().flatMap(_ => terminateAction(hardDeadline))(ExecutionContexts.parasitic) + val terminated = unbindAction().flatMap(_ => terminateAction(hardDeadline))(ExecutionContext.parasitic) _whenTerminated.completeWith(terminated) whenTerminated } @@ -998,7 +997,7 @@ object Http extends ExtensionId[HttpExt] with ExtensionIdProvider { unbind() } shutdown.addTask(CoordinatedShutdown.PhaseServiceRequestsDone, s"http-terminate-${localAddress}") { () => - terminate(hardTerminationDeadline).map(_ => Done)(ExecutionContexts.parasitic) + terminate(hardTerminationDeadline).map(_ => Done)(ExecutionContext.parasitic) } this } diff --git a/akka-http-core/src/test/scala/akka/http/impl/engine/ws/WebSocketIntegrationSpec.scala b/akka-http-core/src/test/scala/akka/http/impl/engine/ws/WebSocketIntegrationSpec.scala index 3545374369..ccba2468d1 100644 --- a/akka-http-core/src/test/scala/akka/http/impl/engine/ws/WebSocketIntegrationSpec.scala +++ b/akka-http-core/src/test/scala/akka/http/impl/engine/ws/WebSocketIntegrationSpec.scala @@ -25,6 +25,7 @@ import akka.util.ByteString import akka.stream.testkit.scaladsl.TestSink import akka.testkit._ +import scala.concurrent.ExecutionContext import scala.util.{ Failure, Success } class WebSocketIntegrationSpec extends AkkaSpecWithMaterializer( @@ -86,7 +87,7 @@ class WebSocketIntegrationSpec extends AkkaSpecWithMaterializer( override def onPull(): Unit = pull(shape.in) override def preStart(): Unit = { - promise.future.foreach(_ => getAsyncCallback[Done](_ => complete(shape.out)).invoke(Done))(akka.dispatch.ExecutionContexts.parasitic) + promise.future.foreach(_ => getAsyncCallback[Done](_ => complete(shape.out)).invoke(Done))(ExecutionContext.parasitic) } setHandlers(shape.in, shape.out, this) diff --git a/akka-http/src/main/scala/akka/http/javadsl/server/RequestContext.scala b/akka-http/src/main/scala/akka/http/javadsl/server/RequestContext.scala index a82b4bbc96..ecc0c5d972 100644 --- a/akka-http/src/main/scala/akka/http/javadsl/server/RequestContext.scala +++ b/akka-http/src/main/scala/akka/http/javadsl/server/RequestContext.scala @@ -25,6 +25,7 @@ import java.util.concurrent.CompletionStage import java.util.function.{ Function => JFunction } import scala.annotation.varargs +import scala.concurrent.ExecutionContext import scala.concurrent.ExecutionContextExecutor import scala.jdk.FutureConverters._ @@ -48,18 +49,18 @@ class RequestContext private (val delegate: scaladsl.server.RequestContext) { def complete[T](value: T, marshaller: Marshaller[T, HttpResponse]): CompletionStage[RouteResult] = { delegate.complete(ToResponseMarshallable(value)(marshaller)) - .fast.map(r => r: RouteResult)(akka.dispatch.ExecutionContexts.parasitic).asJava + .fast.map(r => r: RouteResult)(ExecutionContext.parasitic).asJava } def completeWith(response: HttpResponse): CompletionStage[RouteResult] = { delegate.complete(response.asScala) - .fast.map(r => r: RouteResult)(akka.dispatch.ExecutionContexts.parasitic).asJava + .fast.map(r => r: RouteResult)(ExecutionContext.parasitic).asJava } @varargs def reject(rejections: Rejection*): CompletionStage[RouteResult] = { val scalaRejections = rejections.map(_.asScala) delegate.reject(scalaRejections: _*) - .fast.map(r => r: RouteResult)(akka.dispatch.ExecutionContexts.parasitic).asJava + .fast.map(r => r: RouteResult)(ExecutionContext.parasitic).asJava } def redirect(uri: Uri, redirectionType: StatusCode): CompletionStage[RouteResult] = { @@ -68,7 +69,7 @@ class RequestContext private (val delegate: scaladsl.server.RequestContext) { def fail(error: Throwable): CompletionStage[RouteResult] = delegate.fail(error) - .fast.map(r => r: RouteResult)(akka.dispatch.ExecutionContexts.parasitic).asJava + .fast.map(r => r: RouteResult)(ExecutionContext.parasitic).asJava def withRequest(req: HttpRequest): RequestContext = wrap(delegate.withRequest(req.asScala)) def withExecutionContext(ec: ExecutionContextExecutor): RequestContext = wrap(delegate.withExecutionContext(ec)) diff --git a/akka-http/src/main/scala/akka/http/javadsl/server/directives/BasicDirectives.scala b/akka-http/src/main/scala/akka/http/javadsl/server/directives/BasicDirectives.scala index cafe5426b6..662a36a59f 100644 --- a/akka-http/src/main/scala/akka/http/javadsl/server/directives/BasicDirectives.scala +++ b/akka-http/src/main/scala/akka/http/javadsl/server/directives/BasicDirectives.scala @@ -35,11 +35,11 @@ import java.lang.{ Iterable => JIterable } import java.util.concurrent.CompletionStage import java.util.function.Predicate -import akka.dispatch.ExecutionContexts import akka.event.LoggingAdapter import akka.http.javadsl.server import scala.concurrent.duration.FiniteDuration +import scala.concurrent.ExecutionContext import scala.jdk.FutureConverters._ abstract class BasicDirectives { @@ -84,17 +84,17 @@ abstract class BasicDirectives { def mapRouteResultFuture(f: JFunction[CompletionStage[RouteResult], CompletionStage[RouteResult]], inner: Supplier[Route]): Route = RouteAdapter { D.mapRouteResultFuture(stage => - f(stage.fast.map(_.asJava)(ExecutionContexts.parasitic).asJava).asScala.fast.map(_.asScala)(ExecutionContexts.parasitic)) { + f(stage.fast.map(_.asJava)(ExecutionContext.parasitic).asJava).asScala.fast.map(_.asScala)(ExecutionContext.parasitic)) { inner.get.delegate } } def mapRouteResultWith(f: JFunction[RouteResult, CompletionStage[RouteResult]], inner: Supplier[Route]): Route = RouteAdapter { - D.mapRouteResultWith(r => f(r.asJava).asScala.fast.map(_.asScala)(ExecutionContexts.parasitic)) { inner.get.delegate } + D.mapRouteResultWith(r => f(r.asJava).asScala.fast.map(_.asScala)(ExecutionContext.parasitic)) { inner.get.delegate } } def mapRouteResultWithPF(f: PartialFunction[RouteResult, CompletionStage[RouteResult]], inner: Supplier[Route]): Route = RouteAdapter { - D.mapRouteResultWith(r => f(r.asJava).asScala.fast.map(_.asScala)(ExecutionContexts.parasitic)) { inner.get.delegate } + D.mapRouteResultWith(r => f(r.asJava).asScala.fast.map(_.asScala)(ExecutionContext.parasitic)) { inner.get.delegate } } /** @@ -148,7 +148,7 @@ abstract class BasicDirectives { } def recoverRejectionsWith(f: JFunction[JIterable[Rejection], CompletionStage[RouteResult]], inner: Supplier[Route]): Route = RouteAdapter { - D.recoverRejectionsWith(rs => f.apply(Util.javaArrayList(rs.map(_.asJava))).asScala.fast.map(_.asScala)(ExecutionContexts.parasitic)) { inner.get.delegate } + D.recoverRejectionsWith(rs => f.apply(Util.javaArrayList(rs.map(_.asJava))).asScala.fast.map(_.asScala)(ExecutionContext.parasitic)) { inner.get.delegate } } /** diff --git a/akka-http/src/main/scala/akka/http/javadsl/server/directives/RouteDirectives.scala b/akka-http/src/main/scala/akka/http/javadsl/server/directives/RouteDirectives.scala index 69b484ad03..e2f5d1b647 100644 --- a/akka-http/src/main/scala/akka/http/javadsl/server/directives/RouteDirectives.scala +++ b/akka-http/src/main/scala/akka/http/javadsl/server/directives/RouteDirectives.scala @@ -5,7 +5,6 @@ package akka.http.javadsl.server.directives import java.util.concurrent.{ CompletionException, CompletionStage } -import akka.dispatch.ExecutionContexts import akka.http.javadsl.marshalling.Marshaller import scala.annotation.varargs @@ -29,7 +28,7 @@ abstract class RouteDirectives extends RespondWithDirectives { import RoutingJavaMapping.Implicits._ // Don't try this at home – we only use it here for the java -> scala conversions - private implicit val conversionExecutionContext: ExecutionContext = ExecutionContexts.parasitic + private implicit val conversionExecutionContext: ExecutionContext = ExecutionContext.parasitic /** * Java-specific call added so you can chain together multiple alternate routes using comma, diff --git a/akka-http/src/main/scala/akka/http/scaladsl/server/directives/FileUploadDirectives.scala b/akka-http/src/main/scala/akka/http/scaladsl/server/directives/FileUploadDirectives.scala index 74fa25858a..7e18e8f56b 100644 --- a/akka-http/src/main/scala/akka/http/scaladsl/server/directives/FileUploadDirectives.scala +++ b/akka-http/src/main/scala/akka/http/scaladsl/server/directives/FileUploadDirectives.scala @@ -4,8 +4,9 @@ package akka.http.scaladsl.server.directives +import scala.concurrent.ExecutionContext + import akka.Done -import akka.dispatch.ExecutionContexts import akka.http.javadsl import akka.http.scaladsl.model.ContentType import akka.http.scaladsl.model.Multipart @@ -54,7 +55,7 @@ trait FileUploadDirectives { val uploadedF: Future[(FileInfo, File)] = bytes .runWith(FileIO.toPath(dest.toPath)) - .map(_ => (fileInfo, dest))(ExecutionContexts.parasitic) + .map(_ => (fileInfo, dest))(ExecutionContext.parasitic) .recoverWith { case ex => dest.delete() @@ -90,7 +91,7 @@ trait FileUploadDirectives { val dest = destFn(fileInfo) part.entity.dataBytes.runWith(FileIO.toPath(dest.toPath)) - .map(_ => (fileInfo, dest))(ExecutionContexts.parasitic) + .map(_ => (fileInfo, dest))(ExecutionContext.parasitic) } val uploadedF = uploaded.runWith(Sink.seq[(FileInfo, File)])