Skip to content

Commit

Permalink
Fix timer not removing listeners on tick
Browse files Browse the repository at this point in the history
  • Loading branch information
natsukagami committed Dec 7, 2023
1 parent 22fcade commit 7fcbbed
Showing 1 changed file with 19 additions and 18 deletions.
37 changes: 19 additions & 18 deletions shared/src/main/scala/async/Timer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,26 @@ import scala.annotation.tailrec
import java.util.concurrent.CancellationException

/**
* Timer exposes a steady Async.Source of ticks that happens every `tickDuration` milliseconds.
* Timer exposes a steady [[Async.Source]] of ticks that happens every `tickDuration` milliseconds.
* Note that the timer does not start ticking until `start` is called (which is a blocking operation, until the timer is cancelled).
*
* You might want to manually `cancel` the timer, so that it gets garbage collected (before the enclosing `Async` scope ends).
* You might want to manually `cancel` the timer, so that it gets garbage collected (before the enclosing [[Async]] scope ends).
*/
class Timer(tickDuration: Duration) extends Cancellable {
var isCancelled = false
enum TimerEvent:
case Tick
case Cancelled

private class Source extends Async.OriginalSource[this.TimerEvent] {
private var isCancelled = false

private object Source extends Async.OriginalSource[this.TimerEvent] {
val listeners = mutable.Set[Listener[TimerEvent]]()
def tick() = synchronized {
listeners.foreach(_.completeNow(TimerEvent.Tick, this))
listeners.filterInPlace(l =>
l.completeNow(TimerEvent.Tick, this)
false
)
}
val listeners = mutable.Set[Listener[TimerEvent]]()
override def poll(k: Listener[TimerEvent]): Boolean =
if isCancelled then k.completeNow(TimerEvent.Cancelled, this) else false // subscribing to a timer always takes you to the next tick
override def dropListener(k: Listener[TimerEvent]): Unit = listeners -= k
Expand All @@ -36,36 +43,30 @@ class Timer(tickDuration: Duration) extends Cancellable {
if isCancelled then k.completeNow(TimerEvent.Cancelled, this)
else listeners += k
}
private val _src = Source()

/** Ticks of the timer are delivered through this source. Note that ticks are ephemeral. */
val src: Async.Source[this.TimerEvent] = _src
inline final def src: Async.Source[this.TimerEvent] = Source

/** Starts the timer. Suspends until the timer is cancelled. */
def start()(using Async, AsyncOperations): Unit =
cancellationScope(this):
loop()

@tailrec private def loop()(using Async, AsyncOperations): Unit =
if !isCancelled then
try
// println(s"Sleeping at ${new java.util.Date()}, ${isCancelled}, ${this}")
sleep(tickDuration.toMillis)
try sleep(tickDuration.toMillis)
catch
case _: CancellationException => cancel()
if !isCancelled then
_src.tick()
Source.tick()
loop()


override def cancel(): Unit =
synchronized { isCancelled = true }
src.synchronized {
_src.listeners.foreach(_.completeNow(TimerEvent.Cancelled, src))
_src.listeners.clear()
Source.listeners.foreach(_.completeNow(TimerEvent.Cancelled, src))
Source.listeners.clear()
}

enum TimerEvent:
case Tick
case Cancelled
}

0 comments on commit 7fcbbed

Please sign in to comment.