diff --git a/Sources/NIOCore/AsyncChannel/AsyncChannelHandler.swift b/Sources/NIOCore/AsyncChannel/AsyncChannelHandler.swift index 63d243b74e..149c0ff5ea 100644 --- a/Sources/NIOCore/AsyncChannel/AsyncChannelHandler.swift +++ b/Sources/NIOCore/AsyncChannel/AsyncChannelHandler.swift @@ -188,7 +188,7 @@ extension NIOAsyncChannelHandler: ChannelInboundHandler { // We are making sure to be on our event loop so we can safely use self in whenComplete channelReadTransformation(unwrapped) .hop(to: context.eventLoop) - .assumeIsolated() + .assumeIsolatedUnsafeUnchecked() .whenComplete { result in switch result { case .success: diff --git a/Sources/NIOCore/ChannelHandlers.swift b/Sources/NIOCore/ChannelHandlers.swift index 8e0da681ee..358332945b 100644 --- a/Sources/NIOCore/ChannelHandlers.swift +++ b/Sources/NIOCore/ChannelHandlers.swift @@ -114,7 +114,7 @@ public final class AcceptBackoffHandler: ChannelDuplexHandler, RemovableChannelH } private func scheduleRead(at: NIODeadline, context: ChannelHandlerContext) { - self.scheduledRead = context.eventLoop.assumeIsolated().scheduleTask(deadline: at) { + self.scheduledRead = context.eventLoop.assumeIsolatedUnsafeUnchecked().scheduleTask(deadline: at) { self.doRead(context) } } @@ -252,7 +252,9 @@ public final class IdleStateHandler: ChannelDuplexHandler, RemovableChannelHandl } let writePromise = promise ?? context.eventLoop.makePromise() - writePromise.futureResult.assumeIsolated().whenComplete { (_: Result) in + writePromise.futureResult.hop( + to: context.eventLoop + ).assumeIsolatedUnsafeUnchecked().whenComplete { (_: Result) in self.lastWriteCompleteTime = .now() } context.write(data, promise: writePromise) @@ -272,7 +274,7 @@ public final class IdleStateHandler: ChannelDuplexHandler, RemovableChannelHandl } if self.reading { - self.scheduledReaderTask = context.eventLoop.assumeIsolated().scheduleTask( + self.scheduledReaderTask = context.eventLoop.assumeIsolatedUnsafeUnchecked().scheduleTask( in: timeout, self.makeReadTimeoutTask(context, timeout) ) @@ -282,7 +284,7 @@ public final class IdleStateHandler: ChannelDuplexHandler, RemovableChannelHandl let diff = .now() - self.lastReadTime if diff >= timeout { // Reader is idle - set a new timeout and trigger an event through the pipeline - self.scheduledReaderTask = context.eventLoop.assumeIsolated().scheduleTask( + self.scheduledReaderTask = context.eventLoop.assumeIsolatedUnsafeUnchecked().scheduleTask( in: timeout, self.makeReadTimeoutTask(context, timeout) ) @@ -290,7 +292,7 @@ public final class IdleStateHandler: ChannelDuplexHandler, RemovableChannelHandl context.fireUserInboundEventTriggered(IdleStateEvent.read) } else { // Read occurred before the timeout - set a new timeout with shorter delay. - self.scheduledReaderTask = context.eventLoop.assumeIsolated().scheduleTask( + self.scheduledReaderTask = context.eventLoop.assumeIsolatedUnsafeUnchecked().scheduleTask( deadline: self.lastReadTime + timeout, self.makeReadTimeoutTask(context, timeout) ) @@ -309,7 +311,7 @@ public final class IdleStateHandler: ChannelDuplexHandler, RemovableChannelHandl if diff >= timeout { // Writer is idle - set a new timeout and notify the callback. - self.scheduledWriterTask = context.eventLoop.assumeIsolated().scheduleTask( + self.scheduledWriterTask = context.eventLoop.assumeIsolatedUnsafeUnchecked().scheduleTask( in: timeout, self.makeWriteTimeoutTask(context, timeout) ) @@ -317,7 +319,7 @@ public final class IdleStateHandler: ChannelDuplexHandler, RemovableChannelHandl context.fireUserInboundEventTriggered(IdleStateEvent.write) } else { // Write occurred before the timeout - set a new timeout with shorter delay. - self.scheduledWriterTask = context.eventLoop.assumeIsolated().scheduleTask( + self.scheduledWriterTask = context.eventLoop.assumeIsolatedUnsafeUnchecked().scheduleTask( deadline: self.lastWriteCompleteTime + timeout, self.makeWriteTimeoutTask(context, timeout) ) @@ -332,7 +334,7 @@ public final class IdleStateHandler: ChannelDuplexHandler, RemovableChannelHandl } if self.reading { - self.scheduledReaderTask = context.eventLoop.assumeIsolated().scheduleTask( + self.scheduledReaderTask = context.eventLoop.assumeIsolatedUnsafeUnchecked().scheduleTask( in: timeout, self.makeAllTimeoutTask(context, timeout) ) @@ -345,7 +347,7 @@ public final class IdleStateHandler: ChannelDuplexHandler, RemovableChannelHandl let diff = .now() - latestLast if diff >= timeout { // Reader is idle - set a new timeout and trigger an event through the pipeline - self.scheduledReaderTask = context.eventLoop.assumeIsolated().scheduleTask( + self.scheduledReaderTask = context.eventLoop.assumeIsolatedUnsafeUnchecked().scheduleTask( in: timeout, self.makeAllTimeoutTask(context, timeout) ) @@ -353,7 +355,7 @@ public final class IdleStateHandler: ChannelDuplexHandler, RemovableChannelHandl context.fireUserInboundEventTriggered(IdleStateEvent.all) } else { // Read occurred before the timeout - set a new timeout with shorter delay. - self.scheduledReaderTask = context.eventLoop.assumeIsolated().scheduleTask( + self.scheduledReaderTask = context.eventLoop.assumeIsolatedUnsafeUnchecked().scheduleTask( deadline: latestLast + timeout, self.makeAllTimeoutTask(context, timeout) ) @@ -367,7 +369,7 @@ public final class IdleStateHandler: ChannelDuplexHandler, RemovableChannelHandl _ body: @escaping (ChannelHandlerContext, TimeAmount) -> (() -> Void) ) -> Scheduled? { if let timeout = amount { - return context.eventLoop.assumeIsolated().scheduleTask(in: timeout, body(context, timeout)) + return context.eventLoop.assumeIsolatedUnsafeUnchecked().scheduleTask(in: timeout, body(context, timeout)) } return nil } diff --git a/Sources/NIOCore/ChannelPipeline.swift b/Sources/NIOCore/ChannelPipeline.swift index 6920a2b0fd..7c4b0853a4 100644 --- a/Sources/NIOCore/ChannelPipeline.swift +++ b/Sources/NIOCore/ChannelPipeline.swift @@ -472,10 +472,10 @@ public final class ChannelPipeline: ChannelInvoker { let promise = self.eventLoop.makePromise(of: ChannelHandlerContext.self) if self.eventLoop.inEventLoop { - promise.assumeIsolated().completeWith(self.contextSync(handler: handler)) + promise.assumeIsolatedUnsafeUnchecked().completeWith(self.contextSync(handler: handler)) } else { self.eventLoop.execute { - promise.assumeIsolated().completeWith(self.contextSync(handler: handler)) + promise.assumeIsolatedUnsafeUnchecked().completeWith(self.contextSync(handler: handler)) } } @@ -501,10 +501,10 @@ public final class ChannelPipeline: ChannelInvoker { let promise = self.eventLoop.makePromise(of: ChannelHandlerContext.self) if self.eventLoop.inEventLoop { - promise.assumeIsolated().completeWith(self.contextSync(name: name)) + promise.assumeIsolatedUnsafeUnchecked().completeWith(self.contextSync(name: name)) } else { self.eventLoop.execute { - promise.assumeIsolated().completeWith(self.contextSync(name: name)) + promise.assumeIsolatedUnsafeUnchecked().completeWith(self.contextSync(name: name)) } } @@ -534,10 +534,10 @@ public final class ChannelPipeline: ChannelInvoker { let promise = self.eventLoop.makePromise(of: ChannelHandlerContext.self) if self.eventLoop.inEventLoop { - promise.assumeIsolated().completeWith(self._contextSync(handlerType: handlerType)) + promise.assumeIsolatedUnsafeUnchecked().completeWith(self._contextSync(handlerType: handlerType)) } else { self.eventLoop.execute { - promise.assumeIsolated().completeWith(self._contextSync(handlerType: handlerType)) + promise.assumeIsolatedUnsafeUnchecked().completeWith(self._contextSync(handlerType: handlerType)) } } diff --git a/Sources/NIOCore/Codec.swift b/Sources/NIOCore/Codec.swift index acb4fc1394..f38f2eb9e3 100644 --- a/Sources/NIOCore/Codec.swift +++ b/Sources/NIOCore/Codec.swift @@ -749,7 +749,7 @@ extension ByteToMessageHandler: RemovableChannelHandler { public func removeHandler(context: ChannelHandlerContext, removalToken: ChannelHandlerContext.RemovalToken) { precondition(self.removalState == .notBeingRemoved) self.removalState = .removalStarted - context.eventLoop.assumeIsolated().execute { + context.eventLoop.assumeIsolatedUnsafeUnchecked().execute { self.processLeftovers(context: context) assert(!self.state.isLeftoversNeedProcessing, "illegal state: \(self.state)") switch self.removalState { diff --git a/Sources/NIOCore/Docs.docc/loops-futures-concurrency.md b/Sources/NIOCore/Docs.docc/loops-futures-concurrency.md index b5cbffa4f1..a9af119ec0 100644 --- a/Sources/NIOCore/Docs.docc/loops-futures-concurrency.md +++ b/Sources/NIOCore/Docs.docc/loops-futures-concurrency.md @@ -138,10 +138,11 @@ guaranteed to fire on the same isolation domain as the ``ChannelHandlerContext`` of data race is present. However, Swift Concurrency cannot guarantee this at compile time, as the specific isolation domain is determined only at runtime. -In these contexts, today users can make their callbacks safe using ``NIOLoopBound`` and -``NIOLoopBoundBox``. These values can only be constructed on the event loop, and only allow -access to their values on the same event loop. These constraints are enforced at runtime, -so at compile time these are unconditionally `Sendable`. +In these contexts, users that cannot require NIO 2.77.0 can make their callbacks +safe using ``NIOLoopBound`` and ``NIOLoopBoundBox``. These values can only be +constructed on the event loop, and only allow access to their values on the same +event loop. These constraints are enforced at runtime, so at compile time these are +unconditionally `Sendable`. > Warning: ``NIOLoopBound`` and ``NIOLoopBoundBox`` replace compile-time isolation checks with runtime ones. This makes it possible to introduce crashes in your code. Please @@ -150,18 +151,43 @@ so at compile time these are unconditionally `Sendable`. ``EventLoop``, use ``EventLoopFuture/hop(to:)`` to move it to your isolation domain before using these types. -> Note: In a future NIO release we intend to improve the ergonomics of this common problem - by offering a related type that can only be created from an ``EventLoopFuture`` on a - given ``EventLoop``. This minimises the number of runtime checks, and will make it - easier and more pleasant to write this kind of code. +From NIO 2.77.0, new types were introduced to make this common problem easier. These types are +``EventLoopFuture/Isolated`` and ``EventLoopPromise/Isolated``. These isolated view types +can only be constructed from an existing Future or Promise, and they can only be constructed +on the ``EventLoop`` to which those futures or promises are bound. Because they are not +`Sendable`, we can be confident that these values never escape the isolation domain in which +they are created, which must be the same isolation domain where the callbacks execute. + +As a result, these types can drop the `@Sendable` requirements on all the future closures, and +many of the `Sendable` requirements on the return types from future closures. They can also +drop the `Sendable` requirements from the promise completion functions. + +These isolated views can be obtained by calling ``EventLoopFuture/assumeIsolated()`` or +``EventLoopPromise/assumeIsolated()``. + +> Warning: ``EventLoopFuture/assumeIsolated()`` and ``EventLoopPromise/assumeIsolated()`` + supplement compile-time isolation checks with runtime ones. This makes it possible + to introduce crashes in your code. Please ensure that you are 100% confident that the + isolation domains align. If you are not sure that the ``EventLoopFuture`` or + ``EventLoopPromise`` you wish to attach a callback to is bound to your + ``EventLoop``, use ``EventLoopFuture/hop(to:)`` to move it to your isolation domain + before using these types. + +> Warning: ``EventLoopFuture/assumeIsolated()`` and ``EventLoopPromise/assumeIsolated()`` + **must not** be called from a Swift concurrency context, either an async method or + from within an actor. This is because it uses runtime checking of the event loop + to confirm that the value is not being sent to a different concurrency domain. + + When using an ``EventLoop`` as a custom actor executor, this API can be used to retrieve + a value that region based isolation will then allow to be sent to another domain. ## Interacting with Event Loops on the Event Loop As with Futures, there are occasionally times where it is necessary to schedule ``EventLoop`` operations on the ``EventLoop`` where your code is currently executing. -Much like with ``EventLoopFuture``, you can use ``NIOLoopBound`` and ``NIOLoopBoundBox`` -to make these callbacks safe. +Much like with ``EventLoopFuture``, if you need to support NIO versions before 2.77.0 +you can use ``NIOLoopBound`` and ``NIOLoopBoundBox`` to make these callbacks safe. > Warning: ``NIOLoopBound`` and ``NIOLoopBoundBox`` replace compile-time isolation checks with runtime ones. This makes it possible to introduce crashes in your code. Please @@ -170,7 +196,27 @@ to make these callbacks safe. ``EventLoop``, use ``EventLoopFuture/hop(to:)`` to move it to your isolation domain before using these types. -> Note: In a future NIO release we intend to improve the ergonomics of this common problem - by offering a related type that can only be created from an ``EventLoopFuture`` on a - given ``EventLoop``. This minimises the number of runtime checks, and will make it - easier and more pleasant to write this kind of code. +From NIO 2.77.0, a new type was introduced to make this common problem easier. This type is +``NIOIsolatedEventLoop``. This isolated view type can only be constructed from an existing +``EventLoop``, and it can only be constructed on the ``EventLoop`` that is being wrapped. +Because this type is not `Sendable`, we can be confident that this value never escapes the +isolation domain in which it was created, which must be the same isolation domain where the +callbacks execute. + +As a result, this type can drop the `@Sendable` requirements on all the operation closures, and +many of the `Sendable` requirements on the return types from these closures. + +This isolated view can be obtained by calling ``EventLoop/assumeIsolated()``. + +> Warning: ``EventLoop/assumeIsolated()`` supplements compile-time isolation checks with + runtime ones. This makes it possible to introduce crashes in your code. Please ensure + that you are 100% confident that the isolation domains align. If you are not sure that + the your code is running on the relevant ``EventLoop``, prefer the non-isolated type. + +> Warning: ``EventLoop/assumeIsolated()`` **must not** be called from a Swift concurrency + context, either an async method or from within an actor. This is because it uses runtime + checking of the event loop to confirm that the value is not being sent to a different + concurrency domain. + + When using an ``EventLoop`` as a custom actor executor, this API can be used to retrieve + a value that region based isolation will then allow to be sent to another domain. diff --git a/Sources/NIOCore/EventLoopFuture+AssumeIsolated.swift b/Sources/NIOCore/EventLoopFuture+AssumeIsolated.swift index d2dded2cfd..4748ef320d 100644 --- a/Sources/NIOCore/EventLoopFuture+AssumeIsolated.swift +++ b/Sources/NIOCore/EventLoopFuture+AssumeIsolated.swift @@ -14,8 +14,14 @@ /// A struct wrapping an ``EventLoop`` that ensures all calls to any method on this struct /// are coming from the event loop. -@usableFromInline -struct IsolatedEventLoop { +/// +/// This type is explicitly not `Sendable`. It may only be constructed on an event loop, +/// using ``EventLoop/assumeIsolated()``, and may not subsequently be passed to other isolation +/// domains. +/// +/// Using this type relaxes the need to have the closures for ``EventLoop/execute(_:)``, +/// ``EventLoop/submit(_:)``, and ``EventLoop/scheduleTask(in:_:)`` to be `@Sendable`. +public struct NIOIsolatedEventLoop { @usableFromInline let _wrapped: EventLoop @@ -25,9 +31,9 @@ struct IsolatedEventLoop { } /// Submit a given task to be executed by the `EventLoop` + @available(*, noasync) @inlinable - func execute(_ task: @escaping () -> Void) { - self._wrapped.assertInEventLoop() + public func execute(_ task: @escaping () -> Void) { let unsafeTransfer = UnsafeTransfer(task) self._wrapped.execute { unsafeTransfer.wrappedValue() @@ -39,9 +45,9 @@ struct IsolatedEventLoop { /// - Parameters: /// - task: The closure that will be submitted to the `EventLoop` for execution. /// - Returns: `EventLoopFuture` that is notified once the task was executed. + @available(*, noasync) @inlinable - func submit(_ task: @escaping () throws -> T) -> EventLoopFuture { - self._wrapped.assertInEventLoop() + public func submit(_ task: @escaping () throws -> T) -> EventLoopFuture { let unsafeTransfer = UnsafeTransfer(task) return self._wrapped.submit { try unsafeTransfer.wrappedValue() @@ -51,18 +57,19 @@ struct IsolatedEventLoop { /// Schedule a `task` that is executed by this `EventLoop` at the given time. /// /// - Parameters: + /// - deadline: The time at which the task should run. /// - task: The synchronous task to run. As with everything that runs on the `EventLoop`, it must not block. /// - Returns: A `Scheduled` object which may be used to cancel the task if it has not yet run, or to wait /// on the completion of the task. /// /// - Note: You can only cancel a task before it has started executing. @discardableResult + @available(*, noasync) @inlinable - func scheduleTask( + public func scheduleTask( deadline: NIODeadline, _ task: @escaping () throws -> T ) -> Scheduled { - self._wrapped.assertInEventLoop() let unsafeTransfer = UnsafeTransfer(task) return self._wrapped.scheduleTask(deadline: deadline) { try unsafeTransfer.wrappedValue() @@ -72,6 +79,7 @@ struct IsolatedEventLoop { /// Schedule a `task` that is executed by this `EventLoop` after the given amount of time. /// /// - Parameters: + /// - delay: The time to wait before running the task. /// - task: The synchronous task to run. As with everything that runs on the `EventLoop`, it must not block. /// - Returns: A `Scheduled` object which may be used to cancel the task if it has not yet run, or to wait /// on the completion of the task. @@ -79,12 +87,12 @@ struct IsolatedEventLoop { /// - Note: You can only cancel a task before it has started executing. /// - Note: The `in` value is clamped to a maximum when running on a Darwin-kernel. @discardableResult + @available(*, noasync) @inlinable - func scheduleTask( + public func scheduleTask( in delay: TimeAmount, _ task: @escaping () throws -> T ) -> Scheduled { - self._wrapped.assertInEventLoop() let unsafeTransfer = UnsafeTransfer(task) return self._wrapped.scheduleTask(in: delay) { try unsafeTransfer.wrappedValue() @@ -97,20 +105,23 @@ struct IsolatedEventLoop { /// this event loop might differ. /// /// - Parameters: + /// - deadline: The time at which we should run the asynchronous task. + /// - file: The file in which the task is scheduled. + /// - line: The line of the `file` in which the task is scheduled. /// - task: The asynchronous task to run. As with everything that runs on the `EventLoop`, it must not block. /// - Returns: A `Scheduled` object which may be used to cancel the task if it has not yet run, or to wait /// on the full execution of the task, including its returned `EventLoopFuture`. /// /// - Note: You can only cancel a task before it has started executing. @discardableResult + @available(*, noasync) @inlinable - func flatScheduleTask( + public func flatScheduleTask( deadline: NIODeadline, file: StaticString = #file, line: UInt = #line, _ task: @escaping () throws -> EventLoopFuture ) -> Scheduled { - self._wrapped.assertInEventLoop() let unsafeTransfer = UnsafeTransfer(task) return self._wrapped.flatScheduleTask(deadline: deadline, file: file, line: line) { try unsafeTransfer.wrappedValue() @@ -119,26 +130,54 @@ struct IsolatedEventLoop { /// Returns the wrapped event loop. @inlinable - func nonisolated() -> any EventLoop { + public func nonisolated() -> any EventLoop { self._wrapped } } + extension EventLoop { /// Assumes the calling context is isolated to the event loop. - @usableFromInline - func assumeIsolated() -> IsolatedEventLoop { - IsolatedEventLoop(self) + @inlinable + @available(*, noasync) + public func assumeIsolated() -> NIOIsolatedEventLoop { + self.preconditionInEventLoop() + return NIOIsolatedEventLoop(self) + } + + /// Assumes the calling context is isolated to the event loop. + /// + /// This version of ``EventLoop/assumeIsolated()`` omits the runtime + /// isolation check in release builds and doesn't prevent you using it + /// from using it in async contexts. + @inlinable + public func assumeIsolatedUnsafeUnchecked() -> NIOIsolatedEventLoop { + self.assertInEventLoop() + return NIOIsolatedEventLoop(self) } } +@available(*, unavailable) +extension NIOIsolatedEventLoop: Sendable {} + extension EventLoopFuture { /// A struct wrapping an ``EventLoopFuture`` that ensures all calls to any method on this struct /// are coming from the event loop of the future. - @usableFromInline - struct Isolated { + /// + /// This type is explicitly not `Sendable`. It may only be constructed on an event loop, + /// using ``EventLoopFuture/assumeIsolated()``, and may not subsequently be passed to other isolation + /// domains. + /// + /// Using this type relaxes the need to have the closures for the various ``EventLoopFuture`` + /// callback-attaching functions be `Sendable`. + public struct Isolated { @usableFromInline let _wrapped: EventLoopFuture + @inlinable + init(_wrapped: EventLoopFuture) { + self._wrapped = _wrapped + } + /// When the current `EventLoopFuture` is fulfilled, run the provided callback, /// which will provide a new `EventLoopFuture`. /// @@ -160,6 +199,9 @@ extension EventLoopFuture { /// } /// ``` /// + /// Note that the returned ``EventLoopFuture`` still needs a `Sendable` wrapped value, + /// as it may have been created on a different event loop. + /// /// Note: In a sense, the `EventLoopFuture` is returned before it's created. /// /// - Parameters: @@ -167,14 +209,14 @@ extension EventLoopFuture { /// a new `EventLoopFuture`. /// - Returns: A future that will receive the eventual value. @inlinable - func flatMap( + @available(*, noasync) + public func flatMap( _ callback: @escaping (Value) -> EventLoopFuture ) -> EventLoopFuture.Isolated { - self._wrapped.eventLoop.assertInEventLoop() let unsafeTransfer = UnsafeTransfer(callback) return self._wrapped.flatMap { unsafeTransfer.wrappedValue($0) - }.assumeIsolated() + }.assumeIsolatedUnsafeUnchecked() } /// When the current `EventLoopFuture` is fulfilled, run the provided callback, which @@ -192,14 +234,14 @@ extension EventLoopFuture { /// a new value lifted into a new `EventLoopFuture`. /// - Returns: A future that will receive the eventual value. @inlinable - func flatMapThrowing( + @available(*, noasync) + public func flatMapThrowing( _ callback: @escaping (Value) throws -> NewValue ) -> EventLoopFuture.Isolated { - self._wrapped.eventLoop.assertInEventLoop() let unsafeTransfer = UnsafeTransfer(callback) return self._wrapped.flatMapThrowing { try unsafeTransfer.wrappedValue($0) - }.assumeIsolated() + }.assumeIsolatedUnsafeUnchecked() } /// When the current `EventLoopFuture` is in an error state, run the provided callback, which @@ -217,14 +259,14 @@ extension EventLoopFuture { /// a new value lifted into a new `EventLoopFuture`. /// - Returns: A future that will receive the eventual value or a rethrown error. @inlinable - func flatMapErrorThrowing( + @available(*, noasync) + public func flatMapErrorThrowing( _ callback: @escaping (Error) throws -> Value ) -> EventLoopFuture.Isolated { - self._wrapped.eventLoop.assertInEventLoop() let unsafeTransfer = UnsafeTransfer(callback) return self._wrapped.flatMapErrorThrowing { try unsafeTransfer.wrappedValue($0) - }.assumeIsolated() + }.assumeIsolatedUnsafeUnchecked() } /// When the current `EventLoopFuture` is fulfilled, run the provided callback, which @@ -254,14 +296,14 @@ extension EventLoopFuture { /// a new value lifted into a new `EventLoopFuture`. /// - Returns: A future that will receive the eventual value. @inlinable - func map( + @available(*, noasync) + public func map( _ callback: @escaping (Value) -> (NewValue) ) -> EventLoopFuture.Isolated { - self._wrapped.eventLoop.assertInEventLoop() let unsafeTransfer = UnsafeTransfer(callback) return self._wrapped.map { unsafeTransfer.wrappedValue($0) - }.assumeIsolated() + }.assumeIsolatedUnsafeUnchecked() } /// When the current `EventLoopFuture` is in an error state, run the provided callback, which @@ -279,14 +321,14 @@ extension EventLoopFuture { /// a new value lifted into a new `EventLoopFuture`. /// - Returns: A future that will receive the recovered value. @inlinable - func flatMapError( + @available(*, noasync) + public func flatMapError( _ callback: @escaping (Error) -> EventLoopFuture ) -> EventLoopFuture.Isolated where Value: Sendable { - self._wrapped.eventLoop.assertInEventLoop() let unsafeTransfer = UnsafeTransfer(callback) return self._wrapped.flatMapError { unsafeTransfer.wrappedValue($0) - }.assumeIsolated() + }.assumeIsolatedUnsafeUnchecked() } /// When the current `EventLoopFuture` is fulfilled, run the provided callback, which @@ -303,14 +345,14 @@ extension EventLoopFuture { /// a new value or error lifted into a new `EventLoopFuture`. /// - Returns: A future that will receive the eventual value. @inlinable - func flatMapResult( + @available(*, noasync) + public func flatMapResult( _ body: @escaping (Value) -> Result ) -> EventLoopFuture.Isolated { - self._wrapped.eventLoop.assertInEventLoop() let unsafeTransfer = UnsafeTransfer(body) return self._wrapped.flatMapResult { unsafeTransfer.wrappedValue($0) - }.assumeIsolated() + }.assumeIsolatedUnsafeUnchecked() } /// When the current `EventLoopFuture` is in an error state, run the provided callback, which @@ -326,14 +368,14 @@ extension EventLoopFuture { /// a new value lifted into a new `EventLoopFuture`. /// - Returns: A future that will receive the recovered value. @inlinable - func recover( + @available(*, noasync) + public func recover( _ callback: @escaping (Error) -> Value ) -> EventLoopFuture.Isolated { - self._wrapped.eventLoop.assertInEventLoop() let unsafeTransfer = UnsafeTransfer(callback) return self._wrapped.recover { unsafeTransfer.wrappedValue($0) - }.assumeIsolated() + }.assumeIsolatedUnsafeUnchecked() } /// Adds an observer callback to this `EventLoopFuture` that is called when the @@ -347,8 +389,8 @@ extension EventLoopFuture { /// - Parameters: /// - callback: The callback that is called with the successful result of the `EventLoopFuture`. @inlinable - func whenSuccess(_ callback: @escaping (Value) -> Void) { - self._wrapped.eventLoop.assertInEventLoop() + @available(*, noasync) + public func whenSuccess(_ callback: @escaping (Value) -> Void) { let unsafeTransfer = UnsafeTransfer(callback) return self._wrapped.whenSuccess { unsafeTransfer.wrappedValue($0) @@ -366,8 +408,8 @@ extension EventLoopFuture { /// - Parameters: /// - callback: The callback that is called with the failed result of the `EventLoopFuture`. @inlinable - func whenFailure(_ callback: @escaping (Error) -> Void) { - self._wrapped.eventLoop.assertInEventLoop() + @available(*, noasync) + public func whenFailure(_ callback: @escaping (Error) -> Void) { let unsafeTransfer = UnsafeTransfer(callback) return self._wrapped.whenFailure { unsafeTransfer.wrappedValue($0) @@ -380,10 +422,10 @@ extension EventLoopFuture { /// - Parameters: /// - callback: The callback that is called when the `EventLoopFuture` is fulfilled. @inlinable - func whenComplete( + @available(*, noasync) + public func whenComplete( _ callback: @escaping (Result) -> Void ) { - self._wrapped.eventLoop.assertInEventLoop() let unsafeTransfer = UnsafeTransfer(callback) return self._wrapped.whenComplete { unsafeTransfer.wrappedValue($0) @@ -397,14 +439,14 @@ extension EventLoopFuture { /// - callback: the callback that is called when the `EventLoopFuture` is fulfilled. /// - Returns: the current `EventLoopFuture` @inlinable - func always( + @available(*, noasync) + public func always( _ callback: @escaping (Result) -> Void - ) -> EventLoopFuture { - self._wrapped.eventLoop.assertInEventLoop() + ) -> EventLoopFuture.Isolated { let unsafeTransfer = UnsafeTransfer(callback) return self._wrapped.always { unsafeTransfer.wrappedValue($0) - } + }.assumeIsolatedUnsafeUnchecked() } /// Unwrap an `EventLoopFuture` where its type parameter is an `Optional`. @@ -416,10 +458,11 @@ extension EventLoopFuture { /// ``` /// /// - Parameters: - /// - orReplace: the value of the returned `EventLoopFuture` when then resolved future's value is `Optional.some()`. + /// - replacement: the value of the returned `EventLoopFuture` when then resolved future's value is `Optional.some()`. /// - Returns: an new `EventLoopFuture` with new type parameter `NewValue` and the value passed in the `orReplace` parameter. @inlinable - func unwrap( + @available(*, noasync) + public func unwrap( orReplace replacement: NewValue ) -> EventLoopFuture.Isolated where Value == NewValue? { self.map { (value) -> NewValue in @@ -440,12 +483,13 @@ extension EventLoopFuture { /// ``` /// /// - Parameters: - /// - orElse: a closure that returns the value of the returned `EventLoopFuture` when then resolved future's value + /// - callback: a closure that returns the value of the returned `EventLoopFuture` when then resolved future's value /// is `Optional.some()`. /// - Returns: an new `EventLoopFuture` with new type parameter `NewValue` and with the value returned by the closure /// passed in the `orElse` parameter. @inlinable - func unwrap( + @available(*, noasync) + public func unwrap( orElse callback: @escaping () -> NewValue ) -> EventLoopFuture.Isolated where Value == NewValue? { self.map { (value) -> NewValue in @@ -458,34 +502,73 @@ extension EventLoopFuture { /// Returns the wrapped event loop future. @inlinable - func nonisolated() -> EventLoopFuture { + public func nonisolated() -> EventLoopFuture { self._wrapped } } - /// Assumes the calling context is isolated to the future's event loop. - @usableFromInline - func assumeIsolated() -> Isolated { + /// Returns a variant of this ``EventLoopFuture`` with less strict + /// `Sendable` requirements. Can only be called from on the + /// ``EventLoop`` to which this ``EventLoopFuture`` is bound, will crash + /// if that invariant fails to be met. + @inlinable + @available(*, noasync) + public func assumeIsolated() -> Isolated { + self.eventLoop.preconditionInEventLoop() + return Isolated(_wrapped: self) + } + + /// Returns a variant of this ``EventLoopFuture`` with less strict + /// `Sendable` requirements. Can only be called from on the + /// ``EventLoop`` to which this ``EventLoopFuture`` is bound, will crash + /// if that invariant fails to be met. + /// + /// This is an unsafe version of ``EventLoopFuture/assumeIsolated()`` which + /// omits the runtime check in release builds and doesn't prevent you using it + /// from using it in async contexts. + @inlinable + @available(*, noasync) + public func assumeIsolatedUnsafeUnchecked() -> Isolated { self.eventLoop.assertInEventLoop() return Isolated(_wrapped: self) } } +@available(*, unavailable) +extension EventLoopFuture.Isolated: Sendable {} + extension EventLoopPromise { /// A struct wrapping an ``EventLoopPromise`` that ensures all calls to any method on this struct /// are coming from the event loop of the promise. - @usableFromInline - struct Isolated { + /// + /// This type is explicitly not `Sendable`. It may only be constructed on an event loop, + /// using ``EventLoopPromise/assumeIsolated()``, and may not subsequently be passed to other isolation + /// domains. + /// + /// Using this type relaxes the need to have the promise completion functions accept `Sendable` + /// values, as this type can only be handled on the ``EventLoop``. + /// + /// This type does not offer the full suite of completion functions that ``EventLoopPromise`` + /// does, as many of those functions do not require `Sendable` values already. It only offers + /// versions for the functions that do require `Sendable` types. If you have an + /// ``EventLoopPromise/Isolated`` but need a regular ``EventLoopPromise``, use + /// ``EventLoopPromise/Isolated/nonisolated()`` to unwrap the value. + public struct Isolated { @usableFromInline let _wrapped: EventLoopPromise + @inlinable + init(_wrapped: EventLoopPromise) { + self._wrapped = _wrapped + } + /// Deliver a successful result to the associated `EventLoopFuture` object. /// /// - Parameters: /// - value: The successful result of the operation. @inlinable - func succeed(_ value: Value) { - self._wrapped.futureResult.eventLoop.assertInEventLoop() + @available(*, noasync) + public func succeed(_ value: Value) { self._wrapped._setValue(value: .success(value))._run() } @@ -504,22 +587,43 @@ extension EventLoopPromise { /// - Parameters: /// - result: The result which will be used to succeed or fail this promise. @inlinable - func completeWith(_ result: Result) { - self._wrapped.futureResult.eventLoop.assertInEventLoop() + @available(*, noasync) + public func completeWith(_ result: Result) { self._wrapped._setValue(value: result)._run() } /// Returns the wrapped event loop promise. @inlinable - func nonisolated() -> EventLoopPromise { + public func nonisolated() -> EventLoopPromise { self._wrapped } } - /// Assumes the calling context is isolated to the promise's event loop. - @usableFromInline - func assumeIsolated() -> Isolated { + /// Returns a variant of this ``EventLoopPromise`` with less strict + /// `Sendable` requirements. Can only be called from on the + /// ``EventLoop`` to which this ``EventLoopPromise`` is bound, will crash + /// if that invariant fails to be met. + @inlinable + @available(*, noasync) + public func assumeIsolated() -> Isolated { + self.futureResult.eventLoop.preconditionInEventLoop() + return Isolated(_wrapped: self) + } + + /// Returns a variant of this ``EventLoopPromise`` with less strict + /// `Sendable` requirements. Can only be called from on the + /// ``EventLoop`` to which this ``EventLoopPromise`` is bound, will crash + /// if that invariant fails to be met. + /// + /// This is an unsafe version of ``EventLoopPromise/assumeIsolated()`` which + /// omits the runtime check in release builds and doesn't prevent you using it + /// from using it in async contexts. + @inlinable + public func assumeIsolatedUnsafeUnchecked() -> Isolated { self.futureResult.eventLoop.assertInEventLoop() return Isolated(_wrapped: self) } } + +@available(*, unavailable) +extension EventLoopPromise.Isolated: Sendable {} diff --git a/Sources/NIOEmbedded/Embedded.swift b/Sources/NIOEmbedded/Embedded.swift index 70148c4163..fed89fc1ae 100644 --- a/Sources/NIOEmbedded/Embedded.swift +++ b/Sources/NIOEmbedded/Embedded.swift @@ -304,6 +304,16 @@ public final class EmbeddedEventLoop: EventLoop, CustomStringConvertible { } #endif + public func preconditionInEventLoop(file: StaticString, line: UInt) { + self.checkCorrectThread() + // Currently, inEventLoop is always true so this always passes. + } + + public func preconditionNotInEventLoop(file: StaticString, line: UInt) { + // As inEventLoop always returns true, this must always preconditon. + preconditionFailure("Always in EmbeddedEventLoop", file: file, line: line) + } + public func _preconditionSafeToWait(file: StaticString, line: UInt) { self.checkCorrectThread() // EmbeddedEventLoop always allows a wait, as waiting will essentially always block diff --git a/Tests/NIOPosixTests/EventLoopFutureIsolatedTests.swift b/Tests/NIOPosixTests/EventLoopFutureIsolatedTests.swift new file mode 100644 index 0000000000..2388c28236 --- /dev/null +++ b/Tests/NIOPosixTests/EventLoopFutureIsolatedTests.swift @@ -0,0 +1,318 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the SwiftNIO open source project +// +// Copyright (c) 2024 Apple Inc. and the SwiftNIO project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of SwiftNIO project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import NIOCore +import NIOPosix +import XCTest + +final class SuperNotSendable { + var x: Int = 5 +} + +@available(*, unavailable) +extension SuperNotSendable: Sendable {} + +final class EventLoopFutureIsolatedTest: XCTestCase { + func testCompletingPromiseWithNonSendableValue() throws { + let group = MultiThreadedEventLoopGroup.singleton + let loop = group.next() + + try loop.flatSubmit { + let promise = loop.makePromise(of: SuperNotSendable.self) + let value = SuperNotSendable() + promise.assumeIsolated().succeed(value) + return promise.futureResult.assumeIsolated().map { val in + XCTAssertIdentical(val, value) + }.nonisolated() + }.wait() + } + + func testCompletingPromiseWithNonSendableResult() throws { + let group = MultiThreadedEventLoopGroup.singleton + let loop = group.next() + + try loop.flatSubmit { + let promise = loop.makePromise(of: SuperNotSendable.self) + let value = SuperNotSendable() + promise.assumeIsolated().completeWith(.success(value)) + return promise.futureResult.assumeIsolated().map { val in + XCTAssertIdentical(val, value) + }.nonisolated() + }.wait() + } + + func testCompletingPromiseWithNonSendableValueUnchecked() throws { + let group = MultiThreadedEventLoopGroup.singleton + let loop = group.next() + + try loop.flatSubmit { + let promise = loop.makePromise(of: SuperNotSendable.self) + let value = SuperNotSendable() + promise.assumeIsolatedUnsafeUnchecked().succeed(value) + return promise.futureResult.assumeIsolatedUnsafeUnchecked().map { val in + XCTAssertIdentical(val, value) + }.nonisolated() + }.wait() + } + + func testCompletingPromiseWithNonSendableResultUnchecked() throws { + let group = MultiThreadedEventLoopGroup.singleton + let loop = group.next() + + try loop.flatSubmit { + let promise = loop.makePromise(of: SuperNotSendable.self) + let value = SuperNotSendable() + promise.assumeIsolatedUnsafeUnchecked().completeWith(.success(value)) + return promise.futureResult.assumeIsolatedUnsafeUnchecked().map { val in + XCTAssertIdentical(val, value) + }.nonisolated() + }.wait() + } + + func testBackAndForthUnwrapping() throws { + let group = MultiThreadedEventLoopGroup.singleton + let loop = group.next() + + try loop.submit { + let promise = loop.makePromise(of: SuperNotSendable.self) + let future = promise.futureResult + + XCTAssertEqual(promise.assumeIsolated().nonisolated(), promise) + XCTAssertEqual(future.assumeIsolated().nonisolated(), future) + promise.assumeIsolated().succeed(SuperNotSendable()) + }.wait() + } + + func testBackAndForthUnwrappingUnchecked() throws { + let group = MultiThreadedEventLoopGroup.singleton + let loop = group.next() + + try loop.submit { + let promise = loop.makePromise(of: SuperNotSendable.self) + let future = promise.futureResult + + XCTAssertEqual(promise.assumeIsolatedUnsafeUnchecked().nonisolated(), promise) + XCTAssertEqual(future.assumeIsolatedUnsafeUnchecked().nonisolated(), future) + promise.assumeIsolated().succeed(SuperNotSendable()) + }.wait() + } + + func testFutureChaining() throws { + enum TestError: Error { + case error + } + + let group = MultiThreadedEventLoopGroup.singleton + let loop = group.next() + + try loop.flatSubmit { + let promise = loop.makePromise(of: SuperNotSendable.self) + let future = promise.futureResult.assumeIsolated() + let originalValue = SuperNotSendable() + + // Note that for this test it is _very important_ that all of these + // close over `originalValue`. This proves the non-Sendability of + // the closure. + + // This block is the main happy path. + let newFuture = future.flatMap { result in + XCTAssertIdentical(originalValue, result) + let promise = loop.makePromise(of: Int.self) + promise.succeed(4) + return promise.futureResult + }.map { (result: Int) in + XCTAssertEqual(result, 4) + return originalValue + }.flatMapThrowing { (result: SuperNotSendable) in + XCTAssertIdentical(originalValue, result) + return SuperNotSendable() + }.flatMapResult { (result: SuperNotSendable) -> Result in + XCTAssertNotIdentical(originalValue, result) + return .failure(TestError.error) + }.recover { err in + XCTAssertTrue(err is TestError) + return originalValue + }.always { val in + XCTAssertNotNil(try? val.get()) + } + + newFuture.whenComplete { result in + guard case .success(let r) = result else { + XCTFail("Unexpected error") + return + } + XCTAssertIdentical(r, originalValue) + } + newFuture.whenSuccess { result in + XCTAssertIdentical(result, originalValue) + } + + // This block covers the flatMapError and whenFailure tests + let throwingFuture = newFuture.flatMapThrowing { (_: SuperNotSendable) throws -> SuperNotSendable in + XCTAssertEqual(originalValue.x, 5) + throw TestError.error + } + throwingFuture.whenFailure { error in + // Supurious but forces the closure. + XCTAssertEqual(originalValue.x, 5) + guard let error = error as? TestError, error == .error else { + XCTFail("Invalid passed error: \(error)") + return + } + } + throwingFuture.flatMapErrorThrowing { error in + guard let error = error as? TestError, error == .error else { + XCTFail("Invalid passed error: \(error)") + throw error + } + return originalValue + }.whenComplete { result in + guard case .success(let r) = result else { + XCTFail("Unexpected error") + return + } + XCTAssertIdentical(r, originalValue) + } + throwingFuture.map { _ in 5 }.flatMapError { (error: any Error) -> EventLoopFuture in + guard let error = error as? TestError, error == .error else { + XCTFail("Invalid passed error: \(error)") + return loop.makeSucceededFuture(originalValue.x) + } + return loop.makeSucceededFuture(originalValue.x - 1) + }.whenComplete { (result: Result) in + guard case .success(let r) = result else { + XCTFail("Unexpected error") + return + } + XCTAssertEqual(r, originalValue.x - 1) + } + + // This block handles unwrap. + newFuture.map { x -> SuperNotSendable? in + XCTAssertEqual(originalValue.x, 5) + return nil + }.unwrap(orReplace: originalValue).unwrap( + orReplace: SuperNotSendable() + ).map { x -> SuperNotSendable? in + XCTAssertIdentical(x, originalValue) + return nil + }.unwrap(orElse: { + originalValue + }).unwrap(orElse: { + SuperNotSendable() + }).whenSuccess { x in + XCTAssertIdentical(x, originalValue) + } + + promise.assumeIsolated().succeed(originalValue) + return newFuture.map { _ in }.nonisolated() + }.wait() + } + + func testEventLoopIsolated() throws { + let group = MultiThreadedEventLoopGroup.singleton + let loop = group.next() + + let result: Int = try loop.flatSubmit { + let value = SuperNotSendable() + + // Again, all of these need to close over value. In addition, + // many need to return it as well. + let isolated = loop.assumeIsolated() + XCTAssertIdentical(isolated.nonisolated(), loop) + isolated.execute { + XCTAssertEqual(value.x, 5) + } + let firstFuture = isolated.submit { + let val = SuperNotSendable() + val.x = value.x + 1 + return val + }.map { $0.x } + + let secondFuture = isolated.scheduleTask(deadline: .now() + .milliseconds(50)) { + let val = SuperNotSendable() + val.x = value.x + 1 + return val + }.futureResult.map { $0.x } + + let thirdFuture = isolated.scheduleTask(in: .milliseconds(50)) { + let val = SuperNotSendable() + val.x = value.x + 1 + return val + }.futureResult.map { $0.x } + + let fourthFuture = isolated.flatScheduleTask(deadline: .now() + .milliseconds(50)) { + let promise = loop.makePromise(of: Int.self) + promise.succeed(value.x + 1) + return promise.futureResult + }.futureResult.map { $0 } + + return EventLoopFuture.reduce( + into: 0, + [firstFuture, secondFuture, thirdFuture, fourthFuture], + on: loop + ) { $0 += $1 } + }.wait() + + XCTAssertEqual(result, 6 * 4) + } + + func testEventLoopIsolatedUnchecked() throws { + let group = MultiThreadedEventLoopGroup.singleton + let loop = group.next() + + let result: Int = try loop.flatSubmit { + let value = SuperNotSendable() + + // Again, all of these need to close over value. In addition, + // many need to return it as well. + let isolated = loop.assumeIsolatedUnsafeUnchecked() + XCTAssertIdentical(isolated.nonisolated(), loop) + isolated.execute { + XCTAssertEqual(value.x, 5) + } + let firstFuture = isolated.submit { + let val = SuperNotSendable() + val.x = value.x + 1 + return val + }.map { $0.x } + + let secondFuture = isolated.scheduleTask(deadline: .now() + .milliseconds(50)) { + let val = SuperNotSendable() + val.x = value.x + 1 + return val + }.futureResult.map { $0.x } + + let thirdFuture = isolated.scheduleTask(in: .milliseconds(50)) { + let val = SuperNotSendable() + val.x = value.x + 1 + return val + }.futureResult.map { $0.x } + + let fourthFuture = isolated.flatScheduleTask(deadline: .now() + .milliseconds(50)) { + let promise = loop.makePromise(of: Int.self) + promise.succeed(value.x + 1) + return promise.futureResult + }.futureResult.map { $0 } + + return EventLoopFuture.reduce( + into: 0, + [firstFuture, secondFuture, thirdFuture, fourthFuture], + on: loop + ) { $0 += $1 } + }.wait() + + XCTAssertEqual(result, 6 * 4) + } +}