-
Notifications
You must be signed in to change notification settings - Fork 602
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
interruptWhen
and friends shouldn't cancel downstream processing
#3202
Comments
interruptWhen
shouldn't cancel downstream processinginterruptWhen
and friends shouldn't cancel downstream processing
@Jasper-M Good morning. Thanks for reaching out. I am sorry that we have not taken a look at this for a while. Regarding the issue, I think that it is working as expected. The To obtain the desired behavior, that the action would not be cancelled from outside once started, that can be done using the Stream(1,2).covary[IO].interruptAfter(200.millis).evalMap( i =>
IO.println(i).delayBy(300.millis).onCancel(IO.println("CANCELED")).uncancelable)
).compile.drain.unsafeRunSync() |
I miss this part. Why must the The confusion here is that it seems that interruption applies to the entire stream, rather than specifically to the upstream. And maybe I'm missing it, but it's not obvious to me why this is fundamental. It does seem fundamental that when an upstream is interrupted, it will no longer emit elements, but why this forces the downstream to cancel processing of already-emitted elements is not clear. |
@armanbilge That is a good question that is not documented. Here is a vague guess, that I would like @SystemFw or @mpilquist to confirm:L Stream subsumes Resource. The time a stream is emitting elements is the lifetime of any resource acquired during that stream. Whenever you do for {
/*a*/ r <- Stream.resource(myResource).interruptAfter(2 seconds)
/*b*/ x <- streamThatEmitsOutputs
/*c*/ _ <- r.doActionWith(x)
} yield () Once the resource-acquiring stream in |
Thanks Diego! That was a nice explanation; it makes sense to me. So in this case, "interrupt" not only means stop emitting elements, but also immediately release resources (even if this requires canceling ongoing users). |
But I do understand why you would like to have these "extended" resource semantics. In fact you probably want these semantics in almost all cases where you use So let's assume that we want these semantics and that the |
A relevant discussion popped up on Discord, copying it here because I think there is useful insight. @SystemFw: Stream
.repeatEval(action)
.metered(8.minutes)
.interruptAfter(1.hour) ++ Stream.repeatEval(action2) ofc, you can make a case for always having control flow in IO and have a streaming library purely for streaming data, but that's a different library in my mind. Stream interruption cancelling ongoing effects is perfectly fine IMO, but why should logActionResult get cancelled in this case? Stream
.repeatEval(action)
.metered(8.minutes)
.interruptAfter(1.hour)
.evalMap(logActionResult) ++ Stream.repeatEval(action2) Especially since that interrupt could be hidden behind some interface def runAnActionFor1Hour: Stream[IO, ActionResult] =
Stream
.repeatEval(action)
.metered(8.minutes)
.interruptAfter(1.hour)
runAnActionFor1Hour.evalMap(logActionResult) ++ Stream.repeatEval(action2) IMHO wanting to cancel logActionResult is the corner case, specific for those instances where your stream is actually a resource. But in the general case you don't want that. @SystemFw If you want graceful stopping semantics, there are other tools: you could merge with a timeout
If we think this type of semantics is commonly useful, I wouldn't be opposed to facilitating it with more combinators |
That sounds like it wouldn't cancel the upstream processing either ( |
This program prints
CANCELED
even though this is happening downstream frominterruptAfter
.Only canceling upstream would give more control to the clients of this stream.
The text was updated successfully, but these errors were encountered: