diff --git a/IntegrationTests/tests_04_performance/Thresholds/5.10.json b/IntegrationTests/tests_04_performance/Thresholds/5.10.json index 9f75ed151e..c3fc7078e3 100644 --- a/IntegrationTests/tests_04_performance/Thresholds/5.10.json +++ b/IntegrationTests/tests_04_performance/Thresholds/5.10.json @@ -1,8 +1,8 @@ { "10000000_asyncsequenceproducer": 19, "1000000_asyncwriter": 1000050, - "1000_addHandlers": 43050, - "1000_addHandlers_sync": 36050, + "1000_addHandlers": 44050, + "1000_addHandlers_sync": 37050, "1000_addRemoveHandlers_handlercontext": 8050, "1000_addRemoveHandlers_handlername": 8050, "1000_addRemoveHandlers_handlertype": 8050, @@ -11,7 +11,7 @@ "1000_copying_bytebufferview_to_array": 1050, "1000_copying_circularbuffer_to_array": 1050, "1000_getHandlers": 8050, - "1000_getHandlers_sync": 34, + "1000_getHandlers_sync": 35, "1000_reqs_1_conn": 26400, "1000_rst_connections": 145050, "1000_tcpbootstraps": 3050, diff --git a/IntegrationTests/tests_04_performance/Thresholds/5.9.json b/IntegrationTests/tests_04_performance/Thresholds/5.9.json index 2edeea7c88..b9e6d8faa7 100644 --- a/IntegrationTests/tests_04_performance/Thresholds/5.9.json +++ b/IntegrationTests/tests_04_performance/Thresholds/5.9.json @@ -1,8 +1,8 @@ { "10000000_asyncsequenceproducer": 19, "1000000_asyncwriter": 1000050, - "1000_addHandlers": 43050, - "1000_addHandlers_sync": 36050, + "1000_addHandlers": 44050, + "1000_addHandlers_sync": 37050, "1000_addRemoveHandlers_handlercontext": 8050, "1000_addRemoveHandlers_handlername": 8050, "1000_addRemoveHandlers_handlertype": 8050, @@ -11,7 +11,7 @@ "1000_copying_bytebufferview_to_array": 1050, "1000_copying_circularbuffer_to_array": 1050, "1000_getHandlers": 8050, - "1000_getHandlers_sync": 34, + "1000_getHandlers_sync": 35, "1000_reqs_1_conn": 26400, "1000_rst_connections": 147050, "1000_tcpbootstraps": 4050, diff --git a/IntegrationTests/tests_04_performance/Thresholds/6.0.json b/IntegrationTests/tests_04_performance/Thresholds/6.0.json index 9f75ed151e..c3fc7078e3 100644 --- a/IntegrationTests/tests_04_performance/Thresholds/6.0.json +++ b/IntegrationTests/tests_04_performance/Thresholds/6.0.json @@ -1,8 +1,8 @@ { "10000000_asyncsequenceproducer": 19, "1000000_asyncwriter": 1000050, - "1000_addHandlers": 43050, - "1000_addHandlers_sync": 36050, + "1000_addHandlers": 44050, + "1000_addHandlers_sync": 37050, "1000_addRemoveHandlers_handlercontext": 8050, "1000_addRemoveHandlers_handlername": 8050, "1000_addRemoveHandlers_handlertype": 8050, @@ -11,7 +11,7 @@ "1000_copying_bytebufferview_to_array": 1050, "1000_copying_circularbuffer_to_array": 1050, "1000_getHandlers": 8050, - "1000_getHandlers_sync": 34, + "1000_getHandlers_sync": 35, "1000_reqs_1_conn": 26400, "1000_rst_connections": 145050, "1000_tcpbootstraps": 3050, diff --git a/IntegrationTests/tests_04_performance/Thresholds/nightly-6.0.json b/IntegrationTests/tests_04_performance/Thresholds/nightly-6.0.json index 9f75ed151e..c3fc7078e3 100644 --- a/IntegrationTests/tests_04_performance/Thresholds/nightly-6.0.json +++ b/IntegrationTests/tests_04_performance/Thresholds/nightly-6.0.json @@ -1,8 +1,8 @@ { "10000000_asyncsequenceproducer": 19, "1000000_asyncwriter": 1000050, - "1000_addHandlers": 43050, - "1000_addHandlers_sync": 36050, + "1000_addHandlers": 44050, + "1000_addHandlers_sync": 37050, "1000_addRemoveHandlers_handlercontext": 8050, "1000_addRemoveHandlers_handlername": 8050, "1000_addRemoveHandlers_handlertype": 8050, @@ -11,7 +11,7 @@ "1000_copying_bytebufferview_to_array": 1050, "1000_copying_circularbuffer_to_array": 1050, "1000_getHandlers": 8050, - "1000_getHandlers_sync": 34, + "1000_getHandlers_sync": 35, "1000_reqs_1_conn": 26400, "1000_rst_connections": 145050, "1000_tcpbootstraps": 3050, diff --git a/IntegrationTests/tests_04_performance/Thresholds/nightly-main.json b/IntegrationTests/tests_04_performance/Thresholds/nightly-main.json index 9f75ed151e..f75dbda93d 100644 --- a/IntegrationTests/tests_04_performance/Thresholds/nightly-main.json +++ b/IntegrationTests/tests_04_performance/Thresholds/nightly-main.json @@ -1,8 +1,8 @@ { "10000000_asyncsequenceproducer": 19, "1000000_asyncwriter": 1000050, - "1000_addHandlers": 43050, - "1000_addHandlers_sync": 36050, + "1000_addHandlers": 44050, + "1000_addHandlers_sync": 37050, "1000_addRemoveHandlers_handlercontext": 8050, "1000_addRemoveHandlers_handlername": 8050, "1000_addRemoveHandlers_handlertype": 8050, @@ -11,7 +11,7 @@ "1000_copying_bytebufferview_to_array": 1050, "1000_copying_circularbuffer_to_array": 1050, "1000_getHandlers": 8050, - "1000_getHandlers_sync": 34, + "1000_getHandlers_sync": 35, "1000_reqs_1_conn": 26400, "1000_rst_connections": 145050, "1000_tcpbootstraps": 3050, @@ -34,10 +34,10 @@ "execute_hop_10000_tasks": 0, "future_erase_result": 4050, "future_lots_of_callbacks": 53050, - "get_100000_headers_canonical_form": 700050, - "get_100000_headers_canonical_form_trimming_whitespace": 700050, - "get_100000_headers_canonical_form_trimming_whitespace_from_long_string": 700050, - "get_100000_headers_canonical_form_trimming_whitespace_from_short_string": 700050, + "get_100000_headers_canonical_form": 500050, + "get_100000_headers_canonical_form_trimming_whitespace": 500050, + "get_100000_headers_canonical_form_trimming_whitespace_from_long_string": 500050, + "get_100000_headers_canonical_form_trimming_whitespace_from_short_string": 500050, "modifying_1000_circular_buffer_elements": 0, "modifying_byte_buffer_view": 6050, "ping_pong_1000_reqs_1_conn": 319, diff --git a/Package.swift b/Package.swift index 021ebf8e50..1b63eae193 100644 --- a/Package.swift +++ b/Package.swift @@ -105,7 +105,8 @@ let package = Package( "_NIODataStructures", swiftAtomics, swiftCollections, - ] + ], + swiftSettings: strictConcurrencySettings ), .target( name: "NIOPosix", @@ -429,7 +430,8 @@ let package = Package( "NIOConcurrencyHelpers", "NIOCore", "NIOEmbedded", - ] + ], + swiftSettings: strictConcurrencySettings ), .testTarget( name: "NIOPosixTests", diff --git a/Sources/NIOEmbedded/AsyncTestingChannel.swift b/Sources/NIOEmbedded/AsyncTestingChannel.swift index 75889e5c37..d3a4834f7e 100644 --- a/Sources/NIOEmbedded/AsyncTestingChannel.swift +++ b/Sources/NIOEmbedded/AsyncTestingChannel.swift @@ -199,24 +199,28 @@ public final class NIOAsyncTestingChannel: Channel { /// `nil` because ``NIOAsyncTestingChannel``s don't have parents. public let parent: Channel? = nil - // This is only written once, from a single thread, and never written again, so it's _technically_ thread-safe. Most methods cannot safely + // These two variables are only written once, from a single thread, and never written again, so they're _technically_ thread-safe. Most methods cannot safely // be used from multiple threads, but `isActive`, `isOpen`, `eventLoop`, and `closeFuture` can all safely be used from any thread. Just. + #if compiler(>=5.10) + @usableFromInline + nonisolated(unsafe) var channelcore: EmbeddedChannelCore! + nonisolated(unsafe) private var _pipeline: ChannelPipeline! + #else @usableFromInline var channelcore: EmbeddedChannelCore! + private var _pipeline: ChannelPipeline! + #endif - /// Guards any of the getters/setters that can be accessed from any thread. - private let stateLock: NIOLock = NIOLock() - - // Guarded by `stateLock` - private var _isWritable: Bool = true - - // Guarded by `stateLock` - private var _localAddress: SocketAddress? = nil - - // Guarded by `stateLock` - private var _remoteAddress: SocketAddress? = nil + private struct State { + var isWritable: Bool + var localAddress: SocketAddress? + var remoteAddress: SocketAddress? + } - private var _pipeline: ChannelPipeline! + /// Guards any of the getters/setters that can be accessed from any thread. + private let stateLock = NIOLockedValueBox( + State(isWritable: true, localAddress: nil, remoteAddress: nil) + ) /// - see: `Channel._channelCore` public var _channelCore: ChannelCore { @@ -231,11 +235,11 @@ public final class NIOAsyncTestingChannel: Channel { /// - see: `Channel.isWritable` public var isWritable: Bool { get { - self.stateLock.withLock { self._isWritable } + self.stateLock.withLockedValue { $0.isWritable } } set { - self.stateLock.withLock { () -> Void in - self._isWritable = newValue + self.stateLock.withLockedValue { + $0.isWritable = newValue } } } @@ -243,11 +247,11 @@ public final class NIOAsyncTestingChannel: Channel { /// - see: `Channel.localAddress` public var localAddress: SocketAddress? { get { - self.stateLock.withLock { self._localAddress } + self.stateLock.withLockedValue { $0.localAddress } } set { - self.stateLock.withLock { () -> Void in - self._localAddress = newValue + self.stateLock.withLockedValue { + $0.localAddress = newValue } } } @@ -255,11 +259,11 @@ public final class NIOAsyncTestingChannel: Channel { /// - see: `Channel.remoteAddress` public var remoteAddress: SocketAddress? { get { - self.stateLock.withLock { self._remoteAddress } + self.stateLock.withLockedValue { $0.remoteAddress } } set { - self.stateLock.withLock { () -> Void in - self._remoteAddress = newValue + self.stateLock.withLockedValue { + $0.remoteAddress = newValue } } } @@ -283,8 +287,11 @@ public final class NIOAsyncTestingChannel: Channel { /// - Parameters: /// - handler: The `ChannelHandler` to add to the `ChannelPipeline` before register. /// - loop: The ``NIOAsyncTestingEventLoop`` to use. - public convenience init(handler: ChannelHandler, loop: NIOAsyncTestingEventLoop = NIOAsyncTestingEventLoop()) async - { + @preconcurrency + public convenience init( + handler: ChannelHandler & Sendable, + loop: NIOAsyncTestingEventLoop = NIOAsyncTestingEventLoop() + ) async { await self.init(handlers: [handler], loop: loop) } @@ -295,8 +302,9 @@ public final class NIOAsyncTestingChannel: Channel { /// - Parameters: /// - handlers: The `ChannelHandler`s to add to the `ChannelPipeline` before register. /// - loop: The ``NIOAsyncTestingEventLoop`` to use. + @preconcurrency public convenience init( - handlers: [ChannelHandler], + handlers: [ChannelHandler & Sendable], loop: NIOAsyncTestingEventLoop = NIOAsyncTestingEventLoop() ) async { self.init(loop: loop) @@ -671,3 +679,7 @@ extension NIOAsyncTestingChannel.LeftOverState: @unchecked Sendable {} @available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) extension NIOAsyncTestingChannel.BufferState: @unchecked Sendable {} #endif + +// Synchronous options are never Sendable. +@available(*, unavailable) +extension NIOAsyncTestingChannel.SynchronousOptions: Sendable {} diff --git a/Sources/NIOEmbedded/AsyncTestingEventLoop.swift b/Sources/NIOEmbedded/AsyncTestingEventLoop.swift index 7997d38e36..3321a6a601 100644 --- a/Sources/NIOEmbedded/AsyncTestingEventLoop.swift +++ b/Sources/NIOEmbedded/AsyncTestingEventLoop.swift @@ -14,7 +14,13 @@ #if canImport(Dispatch) import Atomics + +#if canImport(Darwin) import Dispatch +#else +@preconcurrency import Dispatch +#endif + import NIOConcurrencyHelpers import NIOCore import _NIODataStructures @@ -125,7 +131,7 @@ public final class NIOAsyncTestingEventLoop: EventLoop, @unchecked Sendable { self.scheduledTasks.removeFirst { $0.id == taskID } } - private func insertTask( + private func insertTask( taskID: UInt64, deadline: NIODeadline, promise: EventLoopPromise, @@ -152,7 +158,11 @@ public final class NIOAsyncTestingEventLoop: EventLoop, @unchecked Sendable { /// - see: `EventLoop.scheduleTask(deadline:_:)` @discardableResult - public func scheduleTask(deadline: NIODeadline, _ task: @escaping () throws -> T) -> Scheduled { + @preconcurrency + public func scheduleTask( + deadline: NIODeadline, + _ task: @escaping @Sendable () throws -> T + ) -> Scheduled { let promise: EventLoopPromise = self.makePromise() let taskID = self.scheduledTaskCounter.loadThenWrappingIncrement(ordering: .relaxed) @@ -190,7 +200,8 @@ public final class NIOAsyncTestingEventLoop: EventLoop, @unchecked Sendable { /// - see: `EventLoop.scheduleTask(in:_:)` @discardableResult - public func scheduleTask(in: TimeAmount, _ task: @escaping () throws -> T) -> Scheduled { + @preconcurrency + public func scheduleTask(in: TimeAmount, _ task: @escaping @Sendable () throws -> T) -> Scheduled { self.scheduleTask(deadline: self.now + `in`, task) } @@ -230,7 +241,8 @@ public final class NIOAsyncTestingEventLoop: EventLoop, @unchecked Sendable { /// On an `NIOAsyncTestingEventLoop`, `execute` will simply use `scheduleTask` with a deadline of _now_. Unlike with the other operations, this will /// immediately execute, to eliminate a common class of bugs. - public func execute(_ task: @escaping () -> Void) { + @preconcurrency + public func execute(_ task: @escaping @Sendable () -> Void) { if self.inEventLoop { self.scheduleTask(deadline: self.now, task) } else { @@ -359,7 +371,8 @@ public final class NIOAsyncTestingEventLoop: EventLoop, @unchecked Sendable { } /// - see: `EventLoop.shutdownGracefully` - public func shutdownGracefully(queue: DispatchQueue, _ callback: @escaping (Error?) -> Void) { + @preconcurrency + public func shutdownGracefully(queue: DispatchQueue, _ callback: @escaping @Sendable (Error?) -> Void) { self.queue.async { self._shutdownGracefully() queue.async { diff --git a/Sources/NIOEmbedded/Embedded.swift b/Sources/NIOEmbedded/Embedded.swift index fed89fc1ae..865a97c88a 100644 --- a/Sources/NIOEmbedded/Embedded.swift +++ b/Sources/NIOEmbedded/Embedded.swift @@ -36,6 +36,22 @@ import WASILibc #error("Unknown C library.") #endif +private func printError(_ string: StaticString) { + string.withUTF8Buffer { buf in + var buf = buf + while buf.count > 0 { + // 2 is stderr + let rc = write(2, buf.baseAddress, buf.count) + if rc < 0 { + let err = errno + if err == EINTR { continue } + fatalError("Unexpected error writing: \(err)") + } + buf = .init(rebasing: buf.dropFirst(Int(rc))) + } + } +} + internal struct EmbeddedScheduledTask { let id: UInt64 let task: () -> Void @@ -146,14 +162,13 @@ public final class EmbeddedEventLoop: EventLoop, CustomStringConvertible { "EmbeddedEventLoop is not thread-safe. You can only use it from the thread you created it on." ) } else { - fputs( + printError( """ ERROR: NIO API misuse: EmbeddedEventLoop is not thread-safe. \ You can only use it from the thread you created it on. This problem will be upgraded to a forced \ crash in future versions of SwiftNIO. - """, - stderr + """ ) } return @@ -185,7 +200,7 @@ public final class EmbeddedEventLoop: EventLoop, CustomStringConvertible { insertOrder: self.nextTaskNumber(), task: { do { - promise.succeed(try task()) + promise.assumeIsolated().succeed(try task()) } catch let err { promise.fail(err) } @@ -365,6 +380,11 @@ public final class EmbeddedEventLoop: EventLoop, CustomStringConvertible { }() } +// EmbeddedEventLoop is extremely _not_ Sendable. However, the EventLoop protocol +// requires it to be. We are doing some runtime enforcement of correct use, but +// ultimately we can't have the compiler validating this usage. +extension EmbeddedEventLoop: @unchecked Sendable {} + @usableFromInline class EmbeddedChannelCore: ChannelCore { var isOpen: Bool { @@ -484,8 +504,11 @@ class EmbeddedChannelCore: ChannelCore { self.pipeline.syncOperations.fireChannelInactive() self.pipeline.syncOperations.fireChannelUnregistered() + let loopBoundSelf = NIOLoopBound(self, eventLoop: self.eventLoop) + eventLoop.execute { // ensure this is executed in a delayed fashion as the users code may still traverse the pipeline + let `self` = loopBoundSelf.value self.removeHandlers(pipeline: self.pipeline) self.closePromise.succeed(()) } @@ -583,6 +606,10 @@ class EmbeddedChannelCore: ChannelCore { } } +// ChannelCores are basically never Sendable. +@available(*, unavailable) +extension EmbeddedChannelCore: Sendable {} + /// `EmbeddedChannel` is a `Channel` implementation that does neither any /// actual IO nor has a proper eventing mechanism. The prime use-case for /// `EmbeddedChannel` is in unit tests when you want to feed the inbound events @@ -867,8 +894,8 @@ public final class EmbeddedChannel: Channel { @inlinable @discardableResult public func writeInbound(_ data: T) throws -> BufferState { self.embeddedEventLoop.checkCorrectThread() - self.pipeline.fireChannelRead(data) - self.pipeline.fireChannelReadComplete() + self.pipeline.syncOperations.fireChannelRead(NIOAny(data)) + self.pipeline.syncOperations.fireChannelReadComplete() try self.throwIfErrorCaught() return self.channelcore.inboundBuffer.isEmpty ? .empty : .full(Array(self.channelcore.inboundBuffer)) } @@ -1086,5 +1113,16 @@ extension EmbeddedChannel { } } +// EmbeddedChannel is extremely _not_ Sendable. However, the Channel protocol +// requires it to be. We are doing some runtime enforcement of correct use, but +// ultimately we can't have the compiler validating this usage. +extension EmbeddedChannel: @unchecked Sendable {} + +@available(*, unavailable) +extension EmbeddedChannel.LeftOverState: @unchecked Sendable {} + +@available(*, unavailable) +extension EmbeddedChannel.BufferState: @unchecked Sendable {} + @available(*, unavailable) extension EmbeddedChannel.SynchronousOptions: Sendable {} diff --git a/Tests/NIOEmbeddedTests/AsyncTestingChannelTests.swift b/Tests/NIOEmbeddedTests/AsyncTestingChannelTests.swift index 5fa112f6d1..5022dcd3ca 100644 --- a/Tests/NIOEmbeddedTests/AsyncTestingChannelTests.swift +++ b/Tests/NIOEmbeddedTests/AsyncTestingChannelTests.swift @@ -36,7 +36,7 @@ class AsyncTestingChannelTests: XCTestCase { } let channel = NIOAsyncTestingChannel() - XCTAssertThrowsError(try channel.pipeline.handler(type: Handler.self).wait()) { e in + XCTAssertThrowsError(try channel.pipeline.handler(type: Handler.self).map { _ in }.wait()) { e in XCTAssertEqual(e as? ChannelPipelineError, .notFound) } diff --git a/Tests/NIOEmbeddedTests/AsyncTestingEventLoopTests.swift b/Tests/NIOEmbeddedTests/AsyncTestingEventLoopTests.swift index 66c40c7414..86920495f0 100644 --- a/Tests/NIOEmbeddedTests/AsyncTestingEventLoopTests.swift +++ b/Tests/NIOEmbeddedTests/AsyncTestingEventLoopTests.swift @@ -19,7 +19,7 @@ import XCTest @testable import NIOEmbedded -private class EmbeddedTestError: Error {} +private final class EmbeddedTestError: Error {} @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) final class NIOAsyncTestingEventLoopTests: XCTestCase { @@ -336,10 +336,12 @@ final class NIOAsyncTestingEventLoopTests: XCTestCase { // advanceTime(by:) is the same as on MultiThreadedEventLoopGroup: specifically, that tasks run via // schedule that expire "now" all run at the same time, and that any work they schedule is run // after all such tasks expire. + struct TestState { + var firstScheduled: Scheduled? + var secondScheduled: Scheduled? + } let loop = NIOAsyncTestingEventLoop() - let lock = NIOLock() - var firstScheduled: Scheduled? = nil - var secondScheduled: Scheduled? = nil + let lock = NIOLockedValueBox(TestState()) let orderingCounter = ManagedAtomic(0) // Here's the setup. First, we'll set up two scheduled tasks to fire in 5 nanoseconds. Each of these @@ -356,13 +358,13 @@ final class NIOAsyncTestingEventLoopTests: XCTestCase { // // To validate the ordering, we'll use a counter. - lock.withLock { () -> Void in - firstScheduled = loop.scheduleTask(in: .nanoseconds(5)) { - let second = lock.withLock { () -> Scheduled? in - XCTAssertNotNil(firstScheduled) - firstScheduled = nil - XCTAssertNotNil(secondScheduled) - return secondScheduled + lock.withLockedValue { + $0.firstScheduled = loop.scheduleTask(in: .nanoseconds(5)) { + let second = lock.withLockedValue { + XCTAssertNotNil($0.firstScheduled) + $0.firstScheduled = nil + XCTAssertNotNil($0.secondScheduled) + return $0.secondScheduled } if let partner = second { @@ -379,11 +381,11 @@ final class NIOAsyncTestingEventLoopTests: XCTestCase { } } - secondScheduled = loop.scheduleTask(in: .nanoseconds(5)) { - lock.withLock { () -> Void in - secondScheduled = nil - XCTAssertNil(firstScheduled) - XCTAssertNil(secondScheduled) + $0.secondScheduled = loop.scheduleTask(in: .nanoseconds(5)) { + lock.withLockedValue { + $0.secondScheduled = nil + XCTAssertNil($0.firstScheduled) + XCTAssertNil($0.secondScheduled) } XCTAssertCompareAndSwapSucceeds(storage: orderingCounter, expected: 2, desired: 3) @@ -482,6 +484,7 @@ final class NIOAsyncTestingEventLoopTests: XCTestCase { let eventLoop = NIOAsyncTestingEventLoop() let tasksRun = ManagedAtomic(0) + @Sendable func scheduleRecursiveTask( at taskStartTime: NIODeadline, andChildTaskAfter childTaskStartDelay: TimeAmount @@ -514,29 +517,33 @@ final class NIOAsyncTestingEventLoopTests: XCTestCase { func testShutdownCancelsRemainingScheduledTasks() async { let eventLoop = NIOAsyncTestingEventLoop() - var tasksRun = 0 + let tasksRun = ManagedAtomic(0) - let a = eventLoop.scheduleTask(in: .seconds(1)) { tasksRun += 1 } - let b = eventLoop.scheduleTask(in: .seconds(2)) { tasksRun += 1 } + let a = eventLoop.scheduleTask(in: .seconds(1)) { + tasksRun.wrappingIncrement(ordering: .sequentiallyConsistent) + } + let b = eventLoop.scheduleTask(in: .seconds(2)) { + tasksRun.wrappingIncrement(ordering: .sequentiallyConsistent) + } - XCTAssertEqual(tasksRun, 0) + XCTAssertEqual(tasksRun.load(ordering: .sequentiallyConsistent), 0) await eventLoop.advanceTime(by: .seconds(1)) - XCTAssertEqual(tasksRun, 1) + XCTAssertEqual(tasksRun.load(ordering: .sequentiallyConsistent), 1) XCTAssertNoThrow(try eventLoop.syncShutdownGracefully()) - XCTAssertEqual(tasksRun, 1) + XCTAssertEqual(tasksRun.load(ordering: .sequentiallyConsistent), 1) await eventLoop.advanceTime(by: .seconds(1)) - XCTAssertEqual(tasksRun, 1) + XCTAssertEqual(tasksRun.load(ordering: .sequentiallyConsistent), 1) await eventLoop.advanceTime(to: .distantFuture) - XCTAssertEqual(tasksRun, 1) + XCTAssertEqual(tasksRun.load(ordering: .sequentiallyConsistent), 1) XCTAssertNoThrow(try a.futureResult.wait()) await XCTAssertThrowsError(try await b.futureResult.get()) { error in XCTAssertEqual(error as? EventLoopError, .cancelled) - XCTAssertEqual(tasksRun, 1) + XCTAssertEqual(tasksRun.load(ordering: .sequentiallyConsistent), 1) } } diff --git a/Tests/NIOEmbeddedTests/EmbeddedChannelTest.swift b/Tests/NIOEmbeddedTests/EmbeddedChannelTest.swift index 3c77ca624a..3b48ed3b3f 100644 --- a/Tests/NIOEmbeddedTests/EmbeddedChannelTest.swift +++ b/Tests/NIOEmbeddedTests/EmbeddedChannelTest.swift @@ -12,12 +12,14 @@ // //===----------------------------------------------------------------------===// +import Atomics +import NIOConcurrencyHelpers import NIOCore import XCTest @testable import NIOEmbedded -class ChannelLifecycleHandler: ChannelInboundHandler { +final class ChannelLifecycleHandler: ChannelInboundHandler, Sendable { public typealias InboundIn = Any public enum ChannelState { @@ -27,17 +29,28 @@ class ChannelLifecycleHandler: ChannelInboundHandler { case active } - public var currentState: ChannelState - public var stateHistory: [ChannelState] + public var currentState: ChannelState { + get { + self._state.withLockedValue { $0.currentState } + } + } + public var stateHistory: [ChannelState] { + get { + self._state.withLockedValue { $0.stateHistory } + } + } + + private let _state: NIOLockedValueBox<(currentState: ChannelState, stateHistory: [ChannelState])> public init() { - currentState = .unregistered - stateHistory = [.unregistered] + self._state = NIOLockedValueBox((currentState: .unregistered, stateHistory: [.unregistered])) } private func updateState(_ state: ChannelState) { - currentState = state - stateHistory.append(state) + self._state.withLockedValue { + $0.currentState = state + $0.stateHistory.append(state) + } } public func channelRegistered(context: ChannelHandlerContext) { @@ -77,7 +90,7 @@ class EmbeddedChannelTest: XCTestCase { } let channel = EmbeddedChannel(handler: Handler()) - XCTAssertNoThrow(try channel.pipeline.handler(type: Handler.self).wait()) + XCTAssertNoThrow(try channel.pipeline.handler(type: Handler.self).map { _ in }.wait()) } func testSingleHandlerInitNil() { @@ -86,7 +99,7 @@ class EmbeddedChannelTest: XCTestCase { } let channel = EmbeddedChannel(handler: nil) - XCTAssertThrowsError(try channel.pipeline.handler(type: Handler.self).wait()) { e in + XCTAssertThrowsError(try channel.pipeline.handler(type: Handler.self).map { _ in }.wait()) { e in XCTAssertEqual(e as? ChannelPipelineError, .notFound) } } @@ -104,13 +117,19 @@ class EmbeddedChannelTest: XCTestCase { let channel = EmbeddedChannel( handlers: [Handler(identifier: "0"), Handler(identifier: "1"), Handler(identifier: "2")] ) - XCTAssertNoThrow(XCTAssertEqual(try channel.pipeline.handler(type: Handler.self).wait().identifier, "0")) + XCTAssertNoThrow( + XCTAssertEqual(try channel.pipeline.handler(type: Handler.self).map { $0.identifier }.wait(), "0") + ) XCTAssertNoThrow(try channel.pipeline.removeHandler(name: "handler0").wait()) - XCTAssertNoThrow(XCTAssertEqual(try channel.pipeline.handler(type: Handler.self).wait().identifier, "1")) + XCTAssertNoThrow( + XCTAssertEqual(try channel.pipeline.handler(type: Handler.self).map { $0.identifier }.wait(), "1") + ) XCTAssertNoThrow(try channel.pipeline.removeHandler(name: "handler1").wait()) - XCTAssertNoThrow(XCTAssertEqual(try channel.pipeline.handler(type: Handler.self).wait().identifier, "2")) + XCTAssertNoThrow( + XCTAssertEqual(try channel.pipeline.handler(type: Handler.self).map { $0.identifier }.wait(), "2") + ) XCTAssertNoThrow(try channel.pipeline.removeHandler(name: "handler2").wait()) } @@ -180,7 +199,7 @@ class EmbeddedChannelTest: XCTestCase { func testWriteInboundByteBufferReThrow() { let channel = EmbeddedChannel() - XCTAssertNoThrow(try channel.pipeline.addHandler(ExceptionThrowingInboundHandler()).wait()) + XCTAssertNoThrow(try channel.pipeline.syncOperations.addHandler(ExceptionThrowingInboundHandler())) XCTAssertThrowsError(try channel.writeInbound("msg")) { error in XCTAssertEqual(ChannelError.operationUnsupported, error as? ChannelError) } @@ -189,7 +208,7 @@ class EmbeddedChannelTest: XCTestCase { func testWriteOutboundByteBufferReThrow() { let channel = EmbeddedChannel() - XCTAssertNoThrow(try channel.pipeline.addHandler(ExceptionThrowingOutboundHandler()).wait()) + XCTAssertNoThrow(try channel.pipeline.syncOperations.addHandler(ExceptionThrowingOutboundHandler())) XCTAssertThrowsError(try channel.writeOutbound("msg")) { error in XCTAssertEqual(ChannelError.operationUnsupported, error as? ChannelError) } @@ -323,7 +342,7 @@ class EmbeddedChannelTest: XCTestCase { func testCloseOnInactiveIsOk() throws { let channel = EmbeddedChannel() let inactiveHandler = CloseInChannelInactiveHandler() - XCTAssertNoThrow(try channel.pipeline.addHandler(inactiveHandler).wait()) + XCTAssertNoThrow(try channel.pipeline.syncOperations.addHandler(inactiveHandler)) XCTAssertTrue(try channel.finish().isClean) // channelInactive should fire only once. @@ -479,11 +498,12 @@ class EmbeddedChannelTest: XCTestCase { func testFinishWithRecursivelyScheduledTasks() throws { let channel = EmbeddedChannel() let tasks: NIOLoopBoundBox<[Scheduled]> = NIOLoopBoundBox([], eventLoop: channel.eventLoop) - var invocations = 0 + let invocations = ManagedAtomic(0) + @Sendable func recursivelyScheduleAndIncrement() { let task = channel.pipeline.eventLoop.scheduleTask(deadline: .distantFuture) { - invocations += 1 + invocations.wrappingIncrement(ordering: .sequentiallyConsistent) recursivelyScheduleAndIncrement() } tasks.value.append(task) @@ -494,7 +514,7 @@ class EmbeddedChannelTest: XCTestCase { try XCTAssertNoThrow(channel.finish()) // None of the tasks should have been executed, they were scheduled for distant future. - XCTAssertEqual(invocations, 0) + XCTAssertEqual(invocations.load(ordering: .sequentiallyConsistent), 0) // Because the root task didn't run, it should be the onnly one scheduled. XCTAssertEqual(tasks.value.count, 1) diff --git a/Tests/NIOEmbeddedTests/EmbeddedEventLoopTest.swift b/Tests/NIOEmbeddedTests/EmbeddedEventLoopTest.swift index b1690f7f86..989bef581f 100644 --- a/Tests/NIOEmbeddedTests/EmbeddedEventLoopTest.swift +++ b/Tests/NIOEmbeddedTests/EmbeddedEventLoopTest.swift @@ -12,13 +12,14 @@ // //===----------------------------------------------------------------------===// +import Atomics import NIOConcurrencyHelpers import NIOCore import XCTest @testable import NIOEmbedded -private class EmbeddedTestError: Error {} +private final class EmbeddedTestError: Error {} public final class EmbeddedEventLoopTest: XCTestCase { func testExecuteDoesNotImmediatelyRunTasks() throws { @@ -195,37 +196,38 @@ public final class EmbeddedEventLoopTest: XCTestCase { } func testScheduledTasksFuturesFire() throws { - var fired = false + let fired = ManagedAtomic(false) let loop = EmbeddedEventLoop() let task = loop.scheduleTask(in: .nanoseconds(5)) { true } - task.futureResult.whenSuccess { fired = $0 } + task.futureResult.whenSuccess { fired.store($0, ordering: .sequentiallyConsistent) } loop.advanceTime(by: .nanoseconds(4)) - XCTAssertFalse(fired) + XCTAssertFalse(fired.load(ordering: .sequentiallyConsistent)) loop.advanceTime(by: .nanoseconds(1)) - XCTAssertTrue(fired) + XCTAssertTrue(fired.load(ordering: .sequentiallyConsistent)) } func testScheduledTasksFuturesError() throws { - var err: EmbeddedTestError? = nil - var fired = false + let err: NIOLockedValueBox = NIOLockedValueBox(nil) + let fired = ManagedAtomic(false) let loop = EmbeddedEventLoop() let task = loop.scheduleTask(in: .nanoseconds(5)) { - err = EmbeddedTestError() - throw err! + let localErr = EmbeddedTestError() + err.withLockedValue { $0 = localErr } + throw localErr } task.futureResult.map { XCTFail("Scheduled future completed") }.recover { caughtErr in - XCTAssertTrue(err === caughtErr as? EmbeddedTestError) + XCTAssertTrue(err.withLockedValue { $0 === caughtErr as? EmbeddedTestError }) }.whenComplete { (_: Result) in - fired = true + fired.store(true, ordering: .sequentiallyConsistent) } loop.advanceTime(by: .nanoseconds(4)) - XCTAssertFalse(fired) + XCTAssertFalse(fired.load(ordering: .sequentiallyConsistent)) loop.advanceTime(by: .nanoseconds(1)) - XCTAssertTrue(fired) + XCTAssertTrue(fired.load(ordering: .sequentiallyConsistent)) } func testTaskOrdering() { @@ -511,21 +513,24 @@ public final class EmbeddedEventLoopTest: XCTestCase { func testScheduleRepeatedTask() { let eventLoop = EmbeddedEventLoop() - var counter = 0 + let counter = ManagedAtomic(0) eventLoop.scheduleRepeatedTask(initialDelay: .seconds(1), delay: .seconds(1)) { repeatedTask in - guard counter < 10 else { + guard counter.load(ordering: .sequentiallyConsistent) < 10 else { repeatedTask.cancel() return } - counter += 1 + counter.wrappingIncrement(ordering: .sequentiallyConsistent) } - XCTAssertEqual(counter, 0) + XCTAssertEqual(counter.load(ordering: .sequentiallyConsistent), 0) eventLoop.advanceTime(by: .seconds(1)) - XCTAssertEqual(counter, 1) + XCTAssertEqual(counter.load(ordering: .sequentiallyConsistent), 1) eventLoop.advanceTime(by: .seconds(9)) - XCTAssertEqual(counter, 10) + XCTAssertEqual(counter.load(ordering: .sequentiallyConsistent), 10) } } + +@available(*, unavailable) +extension EmbeddedEventLoopTest: Sendable {} diff --git a/Tests/NIOEmbeddedTests/TestUtils.swift b/Tests/NIOEmbeddedTests/TestUtils.swift index c25bc44a1b..3cefa81064 100644 --- a/Tests/NIOEmbeddedTests/TestUtils.swift +++ b/Tests/NIOEmbeddedTests/TestUtils.swift @@ -48,26 +48,25 @@ extension EventLoopFuture { if self.eventLoop.inEventLoop { // Easy, we're on the EventLoop. Let's just use our knowledge that we run completed future callbacks // immediately. - var fulfilled = false + let fulfilled = NIOLoopBoundBox(false, eventLoop: self.eventLoop) self.whenComplete { _ in - fulfilled = true + fulfilled.value = true } - return fulfilled + return fulfilled.value } else { - let lock = NIOLock() + let fulfilled = NIOLockedValueBox(false) let group = DispatchGroup() - var fulfilled = false // protected by lock group.enter() self.eventLoop.execute { let isFulfilled = self.isFulfilled // This will now enter the above branch. - lock.withLock { - fulfilled = isFulfilled + fulfilled.withLockedValue { + $0 = isFulfilled } group.leave() } group.wait() // this is very nasty but this is for tests only, so... - return lock.withLock { fulfilled } + return fulfilled.withLockedValue { $0 } } } }