Skip to content

Commit

Permalink
chore: drop usage of akka.dispatch.ExecutionContexts (#4422)
Browse files Browse the repository at this point in the history
  • Loading branch information
sebastian-alfers authored Sep 18, 2024
1 parent 589173f commit d00ebdb
Show file tree
Hide file tree
Showing 18 changed files with 51 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
package akka.http

import akka.actor.ActorSystem
import akka.dispatch.ExecutionContexts
import akka.http.scaladsl.model.HttpResponse
import akka.http.scaladsl.model.headers.HttpOrigin
import akka.http.scaladsl.model.headers.Origin
Expand Down Expand Up @@ -123,5 +122,5 @@ class CorsBenchmark extends Directives {
}

private def responseBody(response: Future[HttpResponse]): String =
Await.result(response.flatMap(_.entity.toStrict(3.seconds)).map(_.data.utf8String)(ExecutionContexts.parasitic), 3.seconds)
Await.result(response.flatMap(_.entity.toStrict(3.seconds)).map(_.data.utf8String)(ExecutionContext.parasitic), 3.seconds)
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import java.net.InetSocketAddress
import java.util.concurrent.CountDownLatch

import akka.actor.ActorSystem
import akka.dispatch.ExecutionContexts
import akka.http.CommonBenchmark
import akka.http.impl.util.enhanceString_
import akka.http.scaladsl.model.HttpRequest
Expand Down Expand Up @@ -47,7 +46,7 @@ class ConnectionPoolBenchmark extends CommonBenchmark {
.onComplete {
case Success(_) => latch.countDown()
case Failure(_) => throw new IllegalStateException
}(ExecutionContexts.parasitic)
}(ExecutionContext.parasitic)
}

latch.await()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@ package akka.http.impl.engine

import java.util.concurrent.CountDownLatch

import scala.concurrent.ExecutionContext

import akka.actor.ActorSystem
import akka.dispatch.ExecutionContexts
import akka.http.CommonBenchmark
import akka.http.scaladsl.model.{ ContentTypes, HttpEntity }
import akka.stream.scaladsl.Source
Expand All @@ -28,7 +29,7 @@ class HttpEntityBenchmark extends CommonBenchmark {
val latch = new CountDownLatch(1)
entity.discardBytes()
.future
.onComplete(_ => latch.countDown())(ExecutionContexts.parasitic)
.onComplete(_ => latch.countDown())(ExecutionContext.parasitic)
latch.await()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@ package akka.http.impl.engine.client
import akka.Done
import akka.actor.{ Actor, ActorLogging, ActorRef, DeadLetterSuppression, Deploy, ExtendedActorSystem, NoSerializationVerificationNeeded, Props }
import akka.annotation.InternalApi
import akka.dispatch.ExecutionContexts
import akka.http.impl.engine.client.PoolInterface.ShutdownReason
import akka.http.scaladsl.model.{ HttpRequest, HttpResponse }
import akka.stream.Materializer

import scala.concurrent.ExecutionContext
import scala.concurrent.{ Future, Promise }
import scala.util.Failure
import scala.util.Success
Expand Down Expand Up @@ -170,7 +170,7 @@ private[http] final class PoolMasterActor extends Actor with ActorLogging {
// to this actor by the pool actor, they will be retried once the shutdown
// has completed.
val completed = pool.shutdown()(context.dispatcher)
shutdownCompletedPromise.tryCompleteWith(completed.map(_ => Done)(ExecutionContexts.parasitic))
shutdownCompletedPromise.tryCompleteWith(completed.map(_ => Done)(ExecutionContext.parasitic))
statusById += poolId -> PoolInterfaceShuttingDown(shutdownCompletedPromise)
case Some(PoolInterfaceShuttingDown(formerPromise)) =>
// Pool is already shutting down, mirror the existing promise.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import java.util
import akka.NotUsed
import akka.actor.Cancellable
import akka.annotation.InternalApi
import akka.dispatch.ExecutionContexts
import akka.event.LoggingAdapter
import akka.http.impl.engine.client.PoolFlow.{ RequestContext, ResponseContext }
import akka.http.impl.engine.client.pool.SlotState._
Expand All @@ -23,6 +22,7 @@ import akka.stream.scaladsl.{ Flow, Keep, Sink, Source }
import akka.stream.stage.{ GraphStage, GraphStageLogic, InHandler, OutHandler }
import akka.util.OptionVal

import scala.concurrent.ExecutionContext
import scala.collection.JavaConverters._
import scala.concurrent.Future
import scala.concurrent.duration._
Expand Down Expand Up @@ -450,7 +450,7 @@ private[client] object NewHostConnectionPool {
entityComplete.onComplete(safely {
case Success(_) => withSlot(_.onRequestEntityCompleted())
case Failure(cause) => withSlot(_.onRequestEntityFailed(cause))
})(ExecutionContexts.parasitic)
})(ExecutionContext.parasitic)
request.withEntity(newEntity)
}

Expand Down Expand Up @@ -504,9 +504,9 @@ private[client] object NewHostConnectionPool {
ongoingResponseEntity = None
ongoingResponseEntityKillSwitch = None
}
}(ExecutionContexts.parasitic)
}(ExecutionContext.parasitic)
case Failure(_) => throw new IllegalStateException("Should never fail")
})(ExecutionContexts.parasitic)
})(ExecutionContext.parasitic)

withSlot(_.onResponseReceived(response.withEntity(newEntity)))
}
Expand Down Expand Up @@ -588,7 +588,7 @@ private[client] object NewHostConnectionPool {
onConnectionAttemptFailed(currentEmbargoLevel)
sl.onConnectionAttemptFailed(cause)
}
})(ExecutionContexts.parasitic)
})(ExecutionContext.parasitic)

slotCon
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ package akka.http.impl.engine.http2

import akka.actor.{ ActorSystem, ClassicActorSystemProvider, ExtendedActorSystem, Extension, ExtensionId, ExtensionIdProvider }
import akka.annotation.InternalApi
import akka.dispatch.ExecutionContexts
import akka.event.LoggingAdapter
import akka.http.impl.engine.HttpConnectionIdleTimeoutBidi
import akka.http.impl.engine.server.{ GracefulTerminatorStage, MasterServerTerminator, ServerTerminator, UpgradeToOtherProtocolResponseHeader }
Expand All @@ -27,9 +26,9 @@ import akka.stream.scaladsl.{ Flow, Keep, Sink, Source, TLS, TLSPlacebo, Tcp }
import akka.stream.{ IgnoreComplete, Materializer }
import akka.util.ByteString
import akka.Done

import javax.net.ssl.SSLEngine
import scala.collection.immutable
import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import scala.concurrent.duration.Duration
import scala.util.control.NonFatal
Expand Down Expand Up @@ -103,7 +102,7 @@ private[http] final class Http2Ext(implicit val system: ActorSystem)
// See https://github.com/akka/akka/issues/17992
case NonFatal(ex) =>
Done
}(ExecutionContexts.parasitic)
}(ExecutionContext.parasitic)
} catch {
case NonFatal(e) =>
log.error(e, "Could not materialize handling flow for {}", incoming)
Expand All @@ -121,7 +120,7 @@ private[http] final class Http2Ext(implicit val system: ActorSystem)
try {
handler(request).recover {
case NonFatal(ex) => handleHandlerError(log, ex)
}(ExecutionContexts.parasitic)
}(ExecutionContext.parasitic)
} catch {
case NonFatal(ex) => Future.successful(handleHandlerError(log, ex))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import java.util.concurrent.CompletionStage
import akka.NotUsed
import akka.actor.ClassicActorSystemProvider
import akka.annotation.InternalApi
import akka.dispatch.ExecutionContexts
import akka.event.LoggingAdapter
import akka.http.impl.engine.http2.client.PersistentConnection
import akka.http.scaladsl.Http.OutgoingConnection
Expand All @@ -24,6 +23,7 @@ import akka.http.scaladsl.HttpsConnectionContext
import akka.http.scaladsl.OutgoingConnectionBuilder
import akka.stream.javadsl.{ Flow => JFlow }

import scala.concurrent.ExecutionContext
import scala.concurrent.Future

/**
Expand Down Expand Up @@ -131,7 +131,7 @@ private[akka] object OutgoingConnectionBuilderImpl {

private def javaFlow(flow: Flow[HttpRequest, HttpResponse, Future[OutgoingConnection]]): JFlow[javadsl.model.HttpRequest, javadsl.model.HttpResponse, CompletionStage[javadsl.OutgoingConnection]] = {
import scala.jdk.FutureConverters._
javaFlowKeepMatVal(flow.mapMaterializedValue(f => f.map(oc => new javadsl.OutgoingConnection(oc))(ExecutionContexts.parasitic).asJava))
javaFlowKeepMatVal(flow.mapMaterializedValue(f => f.map(oc => new javadsl.OutgoingConnection(oc))(ExecutionContext.parasitic).asJava))
}

private def javaFlowKeepMatVal[M](flow: Flow[HttpRequest, HttpResponse, M]): JFlow[javadsl.model.HttpRequest, javadsl.model.HttpResponse, M] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ package akka.http.impl.engine.http2.client

import akka.NotUsed
import akka.annotation.InternalApi
import akka.dispatch.ExecutionContexts
import akka.http.scaladsl.Http.OutgoingConnection
import akka.http.scaladsl.model.{ AttributeKey, HttpRequest, HttpResponse, RequestResponseAssociation, StatusCodes }
import akka.http.scaladsl.settings.Http2ClientSettings
Expand All @@ -17,9 +16,11 @@ import akka.stream.{ Attributes, FlowShape, Inlet, Outlet, StreamTcpException }
import akka.util.PrettyDuration

import java.util.concurrent.ThreadLocalRandom

import scala.concurrent.duration.Duration
import scala.concurrent.duration.DurationLong
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.ExecutionContext
import scala.concurrent.{ Future, Promise }
import scala.util.{ Failure, Success }

Expand Down Expand Up @@ -103,7 +104,7 @@ private[http2] object PersistentConnection {
onConnected.invoke(())
case Failure(cause) =>
onFailed.invoke(cause)
})(ExecutionContexts.parasitic)
})(ExecutionContext.parasitic)

var requestOutPulled = false
requestOut.setHandler(new OutHandler {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import scala.util.control.{ NoStackTrace, NonFatal }
import akka.NotUsed
import akka.actor.Cancellable
import akka.annotation.InternalApi
import akka.dispatch.ExecutionContexts
import akka.japi.Function
import akka.event.LoggingAdapter
import akka.http.ParsingErrorHandler
Expand All @@ -35,6 +34,7 @@ import akka.http.scaladsl.model._
import akka.http.impl.util.LogByteStringTools._

import scala.annotation.nowarn
import scala.concurrent.ExecutionContext
import scala.util.Failure

/**
Expand Down Expand Up @@ -460,7 +460,7 @@ private[http] object HttpServerBluePrint {
case Failure(ex) =>
log.error(ex, s"Response stream for [${requestStart.debugString}] failed with '${ex.getMessage}'. Aborting connection.")
case _ => // ignore
}(ExecutionContexts.parasitic)
}(ExecutionContext.parasitic)
newEntity
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ package akka.http.impl.util
import akka.NotUsed
import akka.actor.Cancellable
import akka.annotation.InternalApi
import akka.dispatch.ExecutionContexts
import akka.http.scaladsl.model.HttpEntity
import akka.stream._
import akka.stream.impl.fusing.GraphInterpreter
Expand Down Expand Up @@ -72,7 +71,7 @@ private[http] object StreamUtils {
materializationPromise.trySuccess(())
killResult.future.value match {
case Some(res) => handleKill(res)
case None => killResult.future.onComplete(killCallback.invoke)(ExecutionContexts.parasitic)
case None => killResult.future.onComplete(killCallback.invoke)(ExecutionContext.parasitic)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@ import java.util.concurrent.{ CompletionStage, TimeUnit }

import akka.actor.ClassicActorSystemProvider
import akka.annotation.DoNotInherit
import akka.dispatch.ExecutionContexts
import akka.Done

import scala.concurrent.duration.FiniteDuration
import scala.concurrent.ExecutionContext
import scala.jdk.FutureConverters._
import scala.jdk.DurationConverters._

Expand Down Expand Up @@ -78,7 +78,7 @@ class ServerBinding private[http] (delegate: akka.http.scaladsl.Http.ServerBindi

def terminate(hardDeadline: java.time.Duration): CompletionStage[HttpTerminated] = {
delegate.terminate(FiniteDuration.apply(hardDeadline.toMillis, TimeUnit.MILLISECONDS))
.map(_.asInstanceOf[HttpTerminated])(ExecutionContexts.parasitic)
.map(_.asInstanceOf[HttpTerminated])(ExecutionContext.parasitic)
.asJava
}

Expand All @@ -92,7 +92,7 @@ class ServerBinding private[http] (delegate: akka.http.scaladsl.Http.ServerBindi
*/
def whenTerminationSignalIssued: CompletionStage[java.time.Duration] =
delegate.whenTerminationSignalIssued
.map(deadline => deadline.time.toJava)(ExecutionContexts.parasitic)
.map(deadline => deadline.time.toJava)(ExecutionContext.parasitic)
.asJava

/**
Expand All @@ -109,7 +109,7 @@ class ServerBinding private[http] (delegate: akka.http.scaladsl.Http.ServerBindi
*/
def whenTerminated: CompletionStage[HttpTerminated] =
delegate.whenTerminated
.map(_.asInstanceOf[HttpTerminated])(ExecutionContexts.parasitic)
.map(_.asInstanceOf[HttpTerminated])(ExecutionContext.parasitic)
.asJava

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ package akka.http.javadsl
import java.util.concurrent.CompletionStage

import akka.actor.ClassicActorSystemProvider
import akka.dispatch.ExecutionContexts
import akka.event.LoggingAdapter
import akka.http.impl.util.JavaMapping.Implicits._
import akka.http.javadsl.model.{ HttpRequest, HttpResponse }
Expand Down Expand Up @@ -185,6 +184,6 @@ object ServerBuilder {
def connectionSource(): Source[IncomingConnection, CompletionStage[ServerBinding]] =
http.bindImpl(interface, port, context.asScala, settings.asScala, log)
.map(new IncomingConnection(_))
.mapMaterializedValue(_.map(new ServerBinding(_))(ExecutionContexts.parasitic).asJava).asJava
.mapMaterializedValue(_.map(new ServerBinding(_))(ExecutionContext.parasitic).asJava).asJava
}
}
13 changes: 6 additions & 7 deletions akka-http-core/src/main/scala/akka/http/scaladsl/Http.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import java.util.concurrent.CompletionStage
import javax.net.ssl._
import akka.actor._
import akka.annotation.{ DoNotInherit, InternalApi, InternalStableApi }
import akka.dispatch.ExecutionContexts
import akka.event.{ LogSource, Logging, LoggingAdapter }
import akka.http.impl.engine.HttpConnectionIdleTimeoutBidi
import akka.http.impl.engine.client._
Expand Down Expand Up @@ -120,7 +119,7 @@ class HttpExt @InternalStableApi /* constructor signature is hardcoded in Teleme
.watchTermination() { (termWatchBefore, termWatchAfter) =>
// flag termination when the user handler has gotten (or has emitted) termination
// signals in both directions
termWatchBefore.flatMap(_ => termWatchAfter)(ExecutionContexts.parasitic)
termWatchBefore.flatMap(_ => termWatchAfter)(ExecutionContext.parasitic)
}
.joinMat(baseFlow)(Keep.both)
)
Expand Down Expand Up @@ -259,7 +258,7 @@ class HttpExt @InternalStableApi /* constructor signature is hardcoded in Teleme
// from the TCP layer through the HTTP layer to the Http.IncomingConnection.flow.
// See https://github.com/akka/akka/issues/17992
case NonFatal(ex) => Done
}(ExecutionContexts.parasitic)
}(ExecutionContext.parasitic)
} catch {
case NonFatal(e) =>
log.error(e, "Could not materialize handling flow for {}", incoming)
Expand Down Expand Up @@ -799,7 +798,7 @@ class HttpExt @InternalStableApi /* constructor signature is hardcoded in Teleme
// The user should keep control over how much parallelism is required.
val parallelism = settings.pipeliningLimit * settings.maxConnections
Flow[(HttpRequest, T)].mapAsyncUnordered(parallelism) {
case (request, userContext) => poolInterface(request).transform(response => Success(response -> userContext))(ExecutionContexts.parasitic)
case (request, userContext) => poolInterface(request).transform(response => Success(response -> userContext))(ExecutionContext.parasitic)
}
}

Expand Down Expand Up @@ -909,7 +908,7 @@ object Http extends ExtensionId[HttpExt] with ExtensionIdProvider {
* Note: rather than unbinding explicitly you can also use [[addToCoordinatedShutdown]] to add this task to Akka's coordinated shutdown.
*/
def unbind(): Future[Done] =
unbindAction().map(_ => Done)(ExecutionContexts.parasitic)
unbindAction().map(_ => Done)(ExecutionContext.parasitic)

/**
* Triggers "graceful" termination request being handled on this connection.
Expand Down Expand Up @@ -958,7 +957,7 @@ object Http extends ExtensionId[HttpExt] with ExtensionIdProvider {
require(hardDeadline > Duration.Zero, "deadline must be greater than 0, was: " + hardDeadline)

_whenTerminationSignalIssued.trySuccess(hardDeadline.fromNow)
val terminated = unbindAction().flatMap(_ => terminateAction(hardDeadline))(ExecutionContexts.parasitic)
val terminated = unbindAction().flatMap(_ => terminateAction(hardDeadline))(ExecutionContext.parasitic)
_whenTerminated.completeWith(terminated)
whenTerminated
}
Expand Down Expand Up @@ -998,7 +997,7 @@ object Http extends ExtensionId[HttpExt] with ExtensionIdProvider {
unbind()
}
shutdown.addTask(CoordinatedShutdown.PhaseServiceRequestsDone, s"http-terminate-${localAddress}") { () =>
terminate(hardTerminationDeadline).map(_ => Done)(ExecutionContexts.parasitic)
terminate(hardTerminationDeadline).map(_ => Done)(ExecutionContext.parasitic)
}
this
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import akka.util.ByteString
import akka.stream.testkit.scaladsl.TestSink
import akka.testkit._

import scala.concurrent.ExecutionContext
import scala.util.{ Failure, Success }

class WebSocketIntegrationSpec extends AkkaSpecWithMaterializer(
Expand Down Expand Up @@ -86,7 +87,7 @@ class WebSocketIntegrationSpec extends AkkaSpecWithMaterializer(
override def onPull(): Unit = pull(shape.in)

override def preStart(): Unit = {
promise.future.foreach(_ => getAsyncCallback[Done](_ => complete(shape.out)).invoke(Done))(akka.dispatch.ExecutionContexts.parasitic)
promise.future.foreach(_ => getAsyncCallback[Done](_ => complete(shape.out)).invoke(Done))(ExecutionContext.parasitic)
}

setHandlers(shape.in, shape.out, this)
Expand Down
Loading

0 comments on commit d00ebdb

Please sign in to comment.