-
Notifications
You must be signed in to change notification settings - Fork 26
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Channels implementation with new listeners #23
Channels implementation with new listeners #23
Conversation
ede4896
to
d8f457c
Compare
... so that it no longer creates "spooky actions at a distance"
d8f457c
to
8f41bc3
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The abstraction of channels is great!!
I also changed some things in place, have a look.
} | ||
end UnboundedChannel | ||
|
||
object Channel: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it would be possible to extract all listener completion outside of the synchronized parts to increase concurrency. But it is a tedious work.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it is possible (in fact it should be possible to implement completely lock-free channels) but not right now
subscribersCopy = subscribers.toList | ||
for (s <- subscribersCopy) s.send(Failure(ChannelClosedException())) | ||
shouldTerminate = true | ||
case Right(Message.Refresh) => () |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why does this exist? And if it is only quit left, then I guess the info channel is not needed anymore. You can implement the quit using a (classical) Promise instead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Refresh
is needed as like a continue
for the while loop. Happens when we made a change to the subscriber/publisher list.
b20f227
to
3fffabc
Compare
ChannelMultiplexer is very dependent on subscribing order, and should be either set up correctly or used by subscribers that don't really care about past events.
... to make it consistent with ChannelMultiplexer. Overall `start` sounds more like a background running thing w.r.t threads.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nicely done with map. LGTM.
Depends on #21 and #22.
Channel stuff
lockBoth
..sendImmediately
non-suspending.start()
method instead of spawning a background working.Stuff based on channels
Seq[Future[T]]
=>ReadableChannel[Future[T]]
, with values arriving as they are completed.Seq[Future[T]]
hasawaitAll
(OrCancel
) andaltAll
(WithCancel
)Other misc goodies
.await
syntax to sources.Source.values
to quickly create a new source based on a list of values.