Skip to content

Commit

Permalink
Decouple Future initialization from its computation
Browse files Browse the repository at this point in the history
  • Loading branch information
mbovel committed Nov 25, 2024
1 parent db7f170 commit 5722529
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 11 deletions.
32 changes: 21 additions & 11 deletions shared/src/main/scala/async/futures.scala
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ import scala.util.{Failure, Success, Try}
trait Future[+T] extends Async.OriginalSource[Try[T]], Cancellable

object Future:
trait DeferredFuture[+T] extends Future[T]:
def start(): Unit

/** A future that is completed explicitly by calling its `complete` method. There are three public implementations
*
* - RunnableFuture: Completion is done by running a block of code
Expand Down Expand Up @@ -107,7 +110,7 @@ object Future:

/** A future that is completed by evaluating `body` as a separate asynchronous operation in the given `scheduler`
*/
private class RunnableFuture[+T](body: Async.Spawn ?=> T)(using ac: Async) extends CoreFuture[T]:
private class RunnableFuture[+T](body: Async.Spawn ?=> T)(using ac: Async) extends CoreFuture[T], DeferredFuture[T]:

/** RunnableFuture maintains its own inner [[CompletionGroup]], that is separated from the provided Async
* instance's. When the future is cancelled, we only cancel this CompletionGroup. This effectively means any
Expand Down Expand Up @@ -205,24 +208,28 @@ object Future:

override def cancel(): Unit = if setCancelled() then this.innerGroup.cancel()

def start(): Unit =
ac.support.scheduleBoundary:
val result = Async.withNewCompletionGroup(innerGroup)(Try({
val r = body
checkCancellation()
r
}).recoverWith { case _: InterruptedException | _: CancellationException =>
Failure(new CancellationException())
})(using FutureAsync(CompletionGroup.Unlinked))
complete(result)

link()
ac.support.scheduleBoundary:
val result = Async.withNewCompletionGroup(innerGroup)(Try({
val r = body
checkCancellation()
r
}).recoverWith { case _: InterruptedException | _: CancellationException =>
Failure(new CancellationException())
})(using FutureAsync(CompletionGroup.Unlinked))
complete(result)

end RunnableFuture

/** Create a future that asynchronously executes `body` that wraps its execution in a [[scala.util.Try]]. The returned
* future is linked to the given [[Async.Spawn]] scope by default, i.e. it is cancelled when this scope ends.
*/
def apply[T](body: Async.Spawn ?=> T)(using async: Async, spawnable: Async.Spawn & async.type): Future[T] =
RunnableFuture(body)
val future = RunnableFuture(body)
future.start()
future

/** A future that is immediately completed with the given result. */
def now[T](result: Try[T]): Future[T] =
Expand All @@ -239,6 +246,9 @@ object Future:
/** A future that immediately rejects with the given exception. Similar to `Future.now(Failure(exception))`. */
inline def rejected(exception: Throwable): Future[Nothing] = now(Failure(exception))

def deferred[T](body: Async.Spawn ?=> T)(using async: Async, spawnable: Async.Spawn & async.type): DeferredFuture[T] =
RunnableFuture(body)

extension [T](f1: Future[T])
/** Parallel composition of two futures. If both futures succeed, succeed with their values in a pair. Otherwise,
* fail with the failure that was returned first.
Expand Down
24 changes: 24 additions & 0 deletions shared/src/test/scala/FutureBehavior.scala
Original file line number Diff line number Diff line change
Expand Up @@ -447,4 +447,28 @@ class FutureBehavior extends munit.FunSuite {
reader.awaitResult
assertEquals(ch.read(), Right(2))
}

test("deferred futures") {
Async.blocking:
val counter = AtomicInteger(0)
val a = new Array[Future.DeferredFuture[Int]](4)

a(0) = Future.deferred:
counter.incrementAndGet()
a(1).await + a(2).await
a(1) = Future.deferred:
counter.incrementAndGet()
a(3).await + 4
a(2) = Future.deferred:
counter.incrementAndGet()
a(3).await + 2
a(3) = Future.deferred:
counter.incrementAndGet()
1

a.foreach(_.start())

assertEquals(a(0).await, 8)
assertEquals(counter.get(), 4)
}
}

0 comments on commit 5722529

Please sign in to comment.