diff --git a/Sources/NIOFileSystem/CopyStrategy.swift b/Sources/NIOFileSystem/CopyStrategy.swift index ae8b4770e1..5703d9a341 100644 --- a/Sources/NIOFileSystem/CopyStrategy.swift +++ b/Sources/NIOFileSystem/CopyStrategy.swift @@ -69,8 +69,8 @@ public struct CopyStrategy: Hashable, Sendable { } extension CopyStrategy { - // A copy fundamentally can't work without two descriptors unless you copy - // everything into memory which is infeasible/inefficient for large copies. + // A copy fundamentally can't work without two descriptors unless you copy everything into + // memory which is infeasible/inefficient for large copies. private static let minDescriptorsAllowed = 2 /// Operate in whatever manner is deemed a reasonable default for the platform. This will limit @@ -85,19 +85,21 @@ extension CopyStrategy { /// only way to guarantee only one callback to the `shouldCopyItem` will happen at a time. public static let sequential: Self = Self(.sequential) - /// Allow multiple IO operations to run concurrently, including file copies/directory creation and scanning + /// Allow multiple I/O operations to run concurrently, including file copies/directory creation + /// and scanning. /// - /// - Parameter maxDescriptors: a conservative limit on the number of concurrently open - /// file descriptors involved in the copy. This number must be >= 2 though, if you are using a value that low - /// you should use ``sequential`` + /// - Parameter maxDescriptors: a conservative limit on the number of concurrently open file + /// descriptors involved in the copy. This number must be >= 2 though, if you are using a + /// value that low you should use ``sequential`` /// - /// - Throws: ``FileSystemError/Code-swift.struct/invalidArgument`` if `maxDescriptors` - /// is less than 2. + /// - Throws: ``FileSystemError/Code-swift.struct/invalidArgument`` if `maxDescriptors` is less + /// than 2. /// public static func parallel(maxDescriptors: Int) throws -> Self { guard maxDescriptors >= Self.minDescriptorsAllowed else { - // 2 is not quite the same as sequential, you could have two concurrent directory listings for example - // less than 2 and you can't actually do a _copy_ though so it's non-sensical. + // 2 is not quite the same as sequential, you could have two concurrent directory + // listings for example less than 2 and you can't actually do a _copy_ though so it's + // non-sensical. throw FileSystemError( code: .invalidArgument, message: "Can't do a copy operation without at least 2 file descriptors '\(maxDescriptors)' is illegal", diff --git a/Sources/NIOFileSystem/FileSystem.swift b/Sources/NIOFileSystem/FileSystem.swift index 33bf8c605e..5438417109 100644 --- a/Sources/NIOFileSystem/FileSystem.swift +++ b/Sources/NIOFileSystem/FileSystem.swift @@ -392,6 +392,7 @@ public struct FileSystem: Sendable, FileSystemProtocol { @discardableResult public func removeItem( at path: FilePath, + strategy removalStrategy: RemovalStrategy, recursively removeItemRecursively: Bool ) async throws -> Int { // Try to remove the item: we might just get lucky. @@ -421,39 +422,52 @@ public struct FileSystem: Sendable, FileSystemProtocol { ) } - var (subdirectories, filesRemoved) = try await self.withDirectoryHandle( - atPath: path - ) { directory in - var subdirectories = [FilePath]() - var filesRemoved = 0 + switch removalStrategy.wrapped { + case .sequential: + return try await self.removeItemSequentially(at: path) + case .parallel: + return try await self.removeConcurrently(at: path) + } - for try await batch in directory.listContents().batched() { - for entry in batch { - switch entry.type { - case .directory: - subdirectories.append(entry.path) + case let .failure(errno): + throw FileSystemError.remove(errno: errno, path: path, location: .here()) + } + } - default: - filesRemoved += try await self.removeOneItem(at: entry.path) - } + @discardableResult + private func removeItemSequentially( + at path: FilePath + ) async throws -> Int { + var (subdirectories, filesRemoved) = try await self.withDirectoryHandle( + atPath: path + ) { directory in + var subdirectories = [FilePath]() + var filesRemoved = 0 + + for try await batch in directory.listContents().batched() { + for entry in batch { + switch entry.type { + case .directory: + subdirectories.append(entry.path) + + default: + filesRemoved += try await self.removeOneItem(at: entry.path) } } - - return (subdirectories, filesRemoved) } - for subdirectory in subdirectories { - filesRemoved += try await self.removeItem(at: subdirectory) - } + return (subdirectories, filesRemoved) + } - // The directory should be empty now. Remove ourself. - filesRemoved += try await self.removeOneItem(at: path) + for subdirectory in subdirectories { + filesRemoved += try await self.removeItemSequentially(at: subdirectory) + } - return filesRemoved + // The directory should be empty now. Remove ourself. + filesRemoved += try await self.removeOneItem(at: path) + + return filesRemoved - case let .failure(errno): - throw FileSystemError.remove(errno: errno, path: path, location: .here()) - } } /// Moves the named file or directory to a new location. @@ -490,7 +504,7 @@ public struct FileSystem: Sendable, FileSystemProtocol { case .differentLogicalDevices: // Fall back to copy and remove. try await self.copyItem(at: sourcePath, to: destinationPath) - try await self.removeItem(at: sourcePath) + try await self.removeItem(at: sourcePath, strategy: .platformDefault) } } @@ -518,9 +532,9 @@ public struct FileSystem: Sendable, FileSystemProtocol { withItemAt existingPath: FilePath ) async throws { do { - try await self.removeItem(at: destinationPath) + try await self.removeItem(at: destinationPath, strategy: .platformDefault) try await self.moveItem(at: existingPath, to: destinationPath) - try await self.removeItem(at: existingPath) + try await self.removeItem(at: existingPath, strategy: .platformDefault) } catch let error as FileSystemError { throw FileSystemError( message: "Can't replace '\(destinationPath)' with '\(existingPath)'.", diff --git a/Sources/NIOFileSystem/FileSystemProtocol.swift b/Sources/NIOFileSystem/FileSystemProtocol.swift index b5bf701435..97bd38b8a2 100644 --- a/Sources/NIOFileSystem/FileSystemProtocol.swift +++ b/Sources/NIOFileSystem/FileSystemProtocol.swift @@ -262,6 +262,7 @@ public protocol FileSystemProtocol: Sendable { @discardableResult func removeItem( at path: FilePath, + strategy removalStrategy: RemovalStrategy, recursively removeItemRecursively: Bool ) async throws -> Int @@ -597,9 +598,10 @@ extension FileSystemProtocol { /// - Returns: The number of deleted items which may be zero if `path` did not exist. @discardableResult public func removeItem( - at path: FilePath + at path: FilePath, + strategy removalStrategy: RemovalStrategy ) async throws -> Int { - try await self.removeItem(at: path, recursively: true) + try await self.removeItem(at: path, strategy: removalStrategy, recursively: true) } /// Create a directory at the given path. @@ -659,7 +661,7 @@ extension FileSystemProtocol { try await execute(handle, directory) } } tearDown: { _ in - try await self.removeItem(at: directory, recursively: true) + try await self.removeItem(at: directory, strategy: .platformDefault, recursively: true) } } } diff --git a/Sources/NIOFileSystem/Internal/ParallelRemoval.swift b/Sources/NIOFileSystem/Internal/ParallelRemoval.swift new file mode 100644 index 0000000000..9700ab9125 --- /dev/null +++ b/Sources/NIOFileSystem/Internal/ParallelRemoval.swift @@ -0,0 +1,212 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the SwiftNIO open source project +// +// Copyright (c) 2024 Apple Inc. and the SwiftNIO project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of SwiftNIO project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import NIOCore + +@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) +extension FileSystem { + enum RemoveItem: Hashable, Sendable { + // A directory with dependent objects inside of it. We need to delete all of the objects (and + // their nested objects), before we can tackle deleting the directory itself. + case directory(entry: DirectoryEntry, numberOfObjects: UInt64) + // An object that can be removed right away (i.e., symlink, file, empty directory). We need + // this special case to do the accounting of dependent files for the parent directory. + case immediatelyRemovableObject(entry: DirectoryEntry) + // An object that has been identified to be deletable now. This is used as a signal to + // actually perform the deletion. This case will be sent after all accounting for the parent + // has been completed. + case deleteObject(entry: DirectoryEntry) + } + + fileprivate struct DirectoryReferences { + var root: FilePath + var state: [FilePath: Int64] = [:] + var yield: @Sendable ([RemoveItem]) -> Void + + mutating func act(on item: RemoveItem) { + switch item { + case .directory(let directoryEntry, let numberOfObjects): + // Initialize accounting for directory. + if !self.state.keys.contains(directoryEntry.path) { + self.state[directoryEntry.path] = Int64(numberOfObjects) + break + } + + // Directory was initialized earlier by other objects. Add numbers of dependent + // files we found for acounting purposes. + self.state[directoryEntry.path]! += Int64(numberOfObjects) + + // Send directory off for deletion (and accounting) if its empty now. + if self.state[directoryEntry.path] == 0 { + yield([.immediatelyRemovableObject(entry: .init(path: directoryEntry.path, type: .directory)!)]) + } + + case .immediatelyRemovableObject(let directoryEntry): + // We are actually deleting the root directory, and we're done. No need to look + // at the parent directory. + if directoryEntry.path == root { + yield([.deleteObject(entry: directoryEntry)]) + break + } + + // Need to do accounting for the parent directory, and not the current object we are + // looking at. + let parent = directoryEntry.path.removingLastComponent() + + // If we have not started accounting for the parent directory, lets initialize it. + if !self.state.keys.contains(parent) { + self.state[parent] = 0 + } + + // Signal deletion of directory, and account for the parent not having one less + // dependent. + yield([.deleteObject(entry: directoryEntry)]) + self.state[parent]! -= 1 + + // Accounting. Send parent for deletion if its empty. + if self.state[parent] == 0 { + yield([.immediatelyRemovableObject(entry: .init(path: parent, type: .directory)!)]) + } + case .deleteObject: + // This should not happen, because we handle this case outside this function call. + // Why does the Swift compiler not notice that we handle this case in the only place + // that this function is called, such that we should not be forced to handle this + // inside here as well. + // + // TODO: something better to do here than break? + break + } + } + } + + func removeConcurrently( + at path: FilePath + ) async throws -> Int { + let removalQueue = NIOAsyncSequenceProducer.makeSequence( + elementType: RemoveItem.self, + backPressureStrategy: NoBackPressureStrategy(), + finishOnDeinit: false, + delegate: DirCopyDelegate() + ) + + @Sendable func yield(_ contentsOf: [RemoveItem]) { + _ = removalQueue.source.yield(contentsOf: contentsOf) + } + + // Keep track of how many references are found to a directory (files, subdirectories, etc + // contained within a directory). At the end of processing, all of these should be zero. + // When any directory arrives at zero, we intend to delete it. This should then allow us to + // eventually cascade all the way up and delete the root directory. + var references: DirectoryReferences = .init(root: path, yield: yield) + + var deletedFiles: Int = 0 + try await withThrowingTaskGroup(of: Void.self) { group in + // Seed discovery of directories. We will recursively call this function from within. + group.addTask { + try await self.walkTreeForRemovalConcurrently(at: path, yield: yield) + } + + // Sequentially process stream of discovered objects. Kicks off more tasks if needed, + // all producing into this same sequence. + let iter = removalQueue.sequence.makeAsyncIterator() + var continueConsuming: Bool = true + while continueConsuming { + let item = await iter.next() + if let item = item { + switch item { + case .deleteObject(let directoryEntry): + deletedFiles += try await self.removeOneItem(at: directoryEntry.path) + + // We just deleted the root directry. Lets close the queue. + if directoryEntry.path == path { + removalQueue.source.finish() + } + + default: references.act(on: item) + } + } else { + continueConsuming = false + } + } + + try await group.waitForAll() + } + + return deletedFiles + } +} + +@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) +extension FileSystem { + func walkTreeForRemovalConcurrently( + at path: FilePath, + yield: @escaping @Sendable ([RemoveItem]) -> Void + ) async throws { + try await self.withDirectoryHandle(atPath: path) { directory in + var numberOfObjectsInDirectory: UInt64 = 0 + try await withThrowingTaskGroup(of: Void.self) { group in + for try await batch in directory.listContents().batched() { + for entry in batch { + numberOfObjectsInDirectory += 1 + switch entry.type { + case .directory: + // Recurse into ourself to discover the next subdirectory, but do that + // on a separate thread. + group.addTask { + try await walkTreeForRemovalConcurrently( + at: entry.path, + yield: yield + ) + } + default: + // Fire deletion events for anything that is not a directory with files in it. + yield([.immediatelyRemovableObject(entry: entry)]) + } + } + } + } + + if numberOfObjectsInDirectory == 0 { + // Directory is empty, so we can immediately send it off for deletion. + yield([.immediatelyRemovableObject(entry: .init(path: path, type: .directory)!)]) + } else { + // Once we have seen the number of objects inside this directory, dispatch an event + // with that information. + yield([ + .directory(entry: .init(path: path, type: .directory)!, numberOfObjects: numberOfObjectsInDirectory) + ]) + } + } + } +} + +// TODO: the following two are verbatim copies from ParallelDirCopy.swift. +// +// An 'always ask for more' no back-pressure strategy for a ``NIOAsyncSequenceProducer``. +@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) +private struct NoBackPressureStrategy: NIOAsyncSequenceProducerBackPressureStrategy { + mutating func didYield(bufferDepth: Int) -> Bool { true } + + mutating func didConsume(bufferDepth: Int) -> Bool { true } +} + +/// We ignore back pressure, the inherent handle limiting in copyDirectoryParallel means it is unnecessary. +@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) +private struct DirCopyDelegate: NIOAsyncSequenceProducerDelegate, Sendable { + @inlinable + func produceMore() {} + + @inlinable + func didTerminate() {} +} diff --git a/Sources/NIOFileSystem/RemovalStrategy.swift b/Sources/NIOFileSystem/RemovalStrategy.swift new file mode 100644 index 0000000000..43fbfee65e --- /dev/null +++ b/Sources/NIOFileSystem/RemovalStrategy.swift @@ -0,0 +1,66 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the SwiftNIO open source project +// +// Copyright (c) 2024 Apple Inc. and the SwiftNIO project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of SwiftNIO project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +// TODO: This is pretty much a verbatim copy of CopyStrategy.swift +public struct RemovalStrategy: Hashable, Sendable { + // Avoid exposing to prevent breaking changes + internal enum Wrapped: Hashable, Sendable { + case sequential + case parallel + } + + internal let wrapped: Wrapped + private init(_ wrapped: Wrapped) { + self.wrapped = wrapped + } + + internal static func determinePlatformDefault() -> Wrapped { + #if os(macOS) || os(Linux) || os(Windows) + return .parallel + #elseif os(iOS) || os(tvOS) || os(watchOS) || os(Android) + return .parallel + #else + return .sequential + #endif + } +} + +extension RemovalStrategy { + /// Operate in whatever manner is deemed a reasonable default for the platform. This will limit + /// the maximum file descriptors usage based on reasonable defaults. + /// + /// Current assumptions (which are subject to change): + /// - Only one copy operation would be performed at once + /// - The copy operation is not intended to be the primary activity on the device + public static let platformDefault: Self = Self(Self.determinePlatformDefault()) + + /// The copy is done asynchronously, but only one operation will occur at a time. This is the + /// only way to guarantee only one callback to the `shouldCopyItem` will happen at a time. + public static let sequential: Self = Self(.sequential) + + /// Allow multiple I/O operations to run concurrently, including file copies/directory creation + /// and scanning. + public static let parallel: Self = Self(.parallel) +} + +extension RemovalStrategy: CustomStringConvertible { + public var description: String { + switch self.wrapped { + case .sequential: + return "sequential" + case .parallel: + return "parallel" + } + } +} diff --git a/Tests/NIOFileSystemIntegrationTests/FileHandleTests.swift b/Tests/NIOFileSystemIntegrationTests/FileHandleTests.swift index 2e828ff9d5..b38ed78c6b 100644 --- a/Tests/NIOFileSystemIntegrationTests/FileHandleTests.swift +++ b/Tests/NIOFileSystemIntegrationTests/FileHandleTests.swift @@ -305,7 +305,7 @@ final class FileHandleTests: XCTestCase { func testWriteAndReadUnseekableFile() async throws { let privateTempDirPath = try await FileSystem.shared.createTemporaryDirectory(template: "test-XXX") self.addTeardownBlock { - try await FileSystem.shared.removeItem(at: privateTempDirPath, recursively: true) + try await FileSystem.shared.removeItem(at: privateTempDirPath, strategy: .platformDefault, recursively: true) } guard mkfifo(privateTempDirPath.appending("fifo").string, 0o644) == 0 else { @@ -326,7 +326,7 @@ final class FileHandleTests: XCTestCase { func testWriteAndReadUnseekableFileOverMaximumSizeAllowedThrowsError() async throws { let privateTempDirPath = try await FileSystem.shared.createTemporaryDirectory(template: "test-XXX") self.addTeardownBlock { - try await FileSystem.shared.removeItem(at: privateTempDirPath, recursively: true) + try await FileSystem.shared.removeItem(at: privateTempDirPath, strategy: .platformDefault, recursively: true) } guard mkfifo(privateTempDirPath.appending("fifo").string, 0o644) == 0 else { @@ -350,7 +350,7 @@ final class FileHandleTests: XCTestCase { func testWriteAndReadUnseekableFileWithOffsetsThrows() async throws { let privateTempDirPath = try await FileSystem.shared.createTemporaryDirectory(template: "test-XXX") self.addTeardownBlock { - try await FileSystem.shared.removeItem(at: privateTempDirPath, recursively: true) + try await FileSystem.shared.removeItem(at: privateTempDirPath, strategy: .platformDefault, recursively: true) } guard mkfifo(privateTempDirPath.appending("fifo").string, 0o644) == 0 else { diff --git a/Tests/NIOFileSystemIntegrationTests/FileSystemTests.swift b/Tests/NIOFileSystemIntegrationTests/FileSystemTests.swift index 5de5846603..e99666970d 100644 --- a/Tests/NIOFileSystemIntegrationTests/FileSystemTests.swift +++ b/Tests/NIOFileSystemIntegrationTests/FileSystemTests.swift @@ -222,7 +222,7 @@ final class FileSystemTests: XCTestCase { // Avoid dirtying the current working directory. if path.isRelative { self.addTeardownBlock { [fileSystem = self.fs] in - try await fileSystem.removeItem(at: path) + try await fileSystem.removeItem(at: path, strategy: .platformDefault) } } @@ -351,7 +351,7 @@ final class FileSystemTests: XCTestCase { // Avoid dirtying the current working directory. if directoryPath.isRelative { self.addTeardownBlock { [fileSystem = self.fs] in - try await fileSystem.removeItem(at: directoryPath) + try await fileSystem.removeItem(at: directoryPath, strategy: .platformDefault) } } @@ -399,7 +399,7 @@ final class FileSystemTests: XCTestCase { if directoryPath.isRelative { self.addTeardownBlock { [fileSystem = self.fs] in - try await fileSystem.removeItem(at: directoryPath, recursively: true) + try await fileSystem.removeItem(at: directoryPath, strategy: .platformDefault, recursively: true) } } @@ -586,8 +586,8 @@ final class FileSystemTests: XCTestCase { let sourcePath = try await self.fs.temporaryFilePath() let destPath = try await self.fs.temporaryFilePath() self.addTeardownBlock { - _ = try? await self.fs.removeItem(at: sourcePath) - _ = try? await self.fs.removeItem(at: destPath) + _ = try? await self.fs.removeItem(at: sourcePath, strategy: .platformDefault) + _ = try? await self.fs.removeItem(at: destPath, strategy: .platformDefault) } let sourceInfo = try await self.fs.withFileHandle( @@ -991,7 +991,7 @@ final class FileSystemTests: XCTestCase { let infoAfterCreation = try await self.fs.info(forFileAt: path) XCTAssertNotNil(infoAfterCreation) - let removed = try await self.fs.removeItem(at: path) + let removed = try await self.fs.removeItem(at: path, strategy: .platformDefault) XCTAssertEqual(removed, 1) let infoAfterRemoval = try await self.fs.info(forFileAt: path) @@ -1002,11 +1002,11 @@ final class FileSystemTests: XCTestCase { let path = try await self.fs.temporaryFilePath() let info = try await self.fs.info(forFileAt: path) XCTAssertNil(info) - let removed = try await self.fs.removeItem(at: path) + let removed = try await self.fs.removeItem(at: path, strategy: .platformDefault) XCTAssertEqual(removed, 0) } - func testRemoveDirectory() async throws { + func testRemoveDirectorySequentially() async throws { let path = try await self.fs.temporaryFilePath() let created = try await self.generateDirectoryStructure( root: path, @@ -1019,12 +1019,37 @@ final class FileSystemTests: XCTestCase { // Removing a non-empty directory recursively should throw 'notEmpty' await XCTAssertThrowsFileSystemErrorAsync { - try await self.fs.removeItem(at: path, recursively: false) + try await self.fs.removeItem(at: path, strategy: .sequential, recursively: false) } onError: { error in XCTAssertEqual(error.code, .notEmpty) } - let removed = try await self.fs.removeItem(at: path) + let removed = try await self.fs.removeItem(at: path, strategy: .sequential) + XCTAssertEqual(created, removed) + + let infoAfterRemoval = try await self.fs.info(forFileAt: path) + XCTAssertNil(infoAfterRemoval) + } + + func testRemoveDirectoryConcurrently() async throws { + let path = try await self.fs.temporaryFilePath() + let created = try await self.generateDirectoryStructure( + root: path, + maxDepth: 3, + maxFilesPerDirectory: 10 + ) + + let infoAfterCreation = try await self.fs.info(forFileAt: path) + XCTAssertNotNil(infoAfterCreation) + + // Removing a non-empty directory recursively should throw 'notEmpty' + await XCTAssertThrowsFileSystemErrorAsync { + try await self.fs.removeItem(at: path, strategy: .parallel, recursively: false) + } onError: { error in + XCTAssertEqual(error.code, .notEmpty) + } + + let removed = try await self.fs.removeItem(at: path, strategy: .parallel) XCTAssertEqual(created, removed) let infoAfterRemoval = try await self.fs.info(forFileAt: path) @@ -1602,7 +1627,7 @@ extension FileSystemTests { // Clean up after ourselves. self.addTeardownBlock { [fileSystem = self.fs] in - try await fileSystem.removeItem(at: temporaryDirectoryPath) + try await fileSystem.removeItem(at: temporaryDirectoryPath, strategy: .platformDefault) } guard let info = try await self.fs.info(forFileAt: temporaryDirectoryPath) else { @@ -1640,7 +1665,7 @@ extension FileSystemTests { let temporaryDirectoryPath = try await self.fs.createTemporaryDirectory(template: template) self.addTeardownBlock { [fileSystem = self.fs] in - try await fileSystem.removeItem(at: templateRoot, recursively: true) + try await fileSystem.removeItem(at: templateRoot, strategy: .platformDefault, recursively: true) } guard