Skip to content

Commit

Permalink
Add parallel removal of items
Browse files Browse the repository at this point in the history
This PR refactors the logic to remove items. Instead of sequentially
discovering and deleting items one by one, we concurrently traverse all
directories that we find and then sequentially issue file deletions.

This new deletion logic is currently unbounded, and will spawn a bunch
of tasks no matter the underlying system.

Benchmark using `--configration=release` for both the `main` and
`parallel-removal` branches:
```
$ rm -rf .build && git switch main && swift build --configuration=release && cp .build/release/NIOTestCLI NIOTestCLI_main
[...]
Building for production...
[134/134] Linking NIOPerformanceTester
Build complete! (32.65s)
$ rm -rf .build && git switch parallel-removal && swift build --configuration=release
[...]
Building for production...
[134/134] Linking NIOPerformanceTester
Build complete! (32.19s)
$ hyperfine --max-runs 10 --prepare "rm -rf /tmp/swift-nio && cp -r $(pwd) /tmp/swift-nio" "./NIOTestCLI_main /tmp/swift-nio" ".build/release/NIOTestCLI /tmp/swift-nio" "rm -rf /tmp/swift-nio"
Benchmark 1: ./NIOTestCLI_main /tmp/swift-nio
  Time (mean ± σ):      3.019 s ±  0.032 s    [User: 0.601 s, System: 11.123 s]
  Range (min … max):    2.985 s …  3.072 s    10 runs

Benchmark 2: .build/release/NIOTestCLI /tmp/swift-nio
  Time (mean ± σ):      2.295 s ±  0.065 s    [User: 0.673 s, System: 7.677 s]
  Range (min … max):    2.212 s …  2.370 s    10 runs

Benchmark 3: rm -rf /tmp/swift-nio
  Time (mean ± σ):      1.517 s ±  0.037 s    [User: 0.015 s, System: 1.442 s]
  Range (min … max):    1.480 s …  1.590 s    10 runs

Summary
  rm -rf /tmp/swift-nio ran
    1.51 ± 0.06 times faster than .build/release/NIOTestCLI /tmp/swift-nio
    1.99 ± 0.05 times faster than ./NIOTestCLI_main /tmp/swift-nio
```

Resolves #2933
  • Loading branch information
mimischi committed Nov 29, 2024
1 parent 8a1523f commit eca6695
Show file tree
Hide file tree
Showing 7 changed files with 376 additions and 55 deletions.
22 changes: 12 additions & 10 deletions Sources/NIOFileSystem/CopyStrategy.swift
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@ public struct CopyStrategy: Hashable, Sendable {
}

extension CopyStrategy {
// A copy fundamentally can't work without two descriptors unless you copy
// everything into memory which is infeasible/inefficient for large copies.
// A copy fundamentally can't work without two descriptors unless you copy everything into
// memory which is infeasible/inefficient for large copies.
private static let minDescriptorsAllowed = 2

/// Operate in whatever manner is deemed a reasonable default for the platform. This will limit
Expand All @@ -85,19 +85,21 @@ extension CopyStrategy {
/// only way to guarantee only one callback to the `shouldCopyItem` will happen at a time.
public static let sequential: Self = Self(.sequential)

/// Allow multiple IO operations to run concurrently, including file copies/directory creation and scanning
/// Allow multiple I/O operations to run concurrently, including file copies/directory creation
/// and scanning.
///
/// - Parameter maxDescriptors: a conservative limit on the number of concurrently open
/// file descriptors involved in the copy. This number must be >= 2 though, if you are using a value that low
/// you should use ``sequential``
/// - Parameter maxDescriptors: a conservative limit on the number of concurrently open file
/// descriptors involved in the copy. This number must be >= 2 though, if you are using a
/// value that low you should use ``sequential``
///
/// - Throws: ``FileSystemError/Code-swift.struct/invalidArgument`` if `maxDescriptors`
/// is less than 2.
/// - Throws: ``FileSystemError/Code-swift.struct/invalidArgument`` if `maxDescriptors` is less
/// than 2.
///
public static func parallel(maxDescriptors: Int) throws -> Self {
guard maxDescriptors >= Self.minDescriptorsAllowed else {
// 2 is not quite the same as sequential, you could have two concurrent directory listings for example
// less than 2 and you can't actually do a _copy_ though so it's non-sensical.
// 2 is not quite the same as sequential, you could have two concurrent directory
// listings for example less than 2 and you can't actually do a _copy_ though so it's
// non-sensical.
throw FileSystemError(
code: .invalidArgument,
message: "Can't do a copy operation without at least 2 file descriptors '\(maxDescriptors)' is illegal",
Expand Down
68 changes: 41 additions & 27 deletions Sources/NIOFileSystem/FileSystem.swift
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,7 @@ public struct FileSystem: Sendable, FileSystemProtocol {
@discardableResult
public func removeItem(
at path: FilePath,
strategy removalStrategy: RemovalStrategy,
recursively removeItemRecursively: Bool
) async throws -> Int {
// Try to remove the item: we might just get lucky.
Expand Down Expand Up @@ -421,39 +422,52 @@ public struct FileSystem: Sendable, FileSystemProtocol {
)
}

var (subdirectories, filesRemoved) = try await self.withDirectoryHandle(
atPath: path
) { directory in
var subdirectories = [FilePath]()
var filesRemoved = 0
switch removalStrategy.wrapped {
case .sequential:
return try await self.removeItemSequentially(at: path)
case .parallel:
return try await self.removeConcurrently(at: path)
}

for try await batch in directory.listContents().batched() {
for entry in batch {
switch entry.type {
case .directory:
subdirectories.append(entry.path)
case let .failure(errno):
throw FileSystemError.remove(errno: errno, path: path, location: .here())
}
}

default:
filesRemoved += try await self.removeOneItem(at: entry.path)
}
@discardableResult
private func removeItemSequentially(
at path: FilePath
) async throws -> Int {
var (subdirectories, filesRemoved) = try await self.withDirectoryHandle(
atPath: path
) { directory in
var subdirectories = [FilePath]()
var filesRemoved = 0

for try await batch in directory.listContents().batched() {
for entry in batch {
switch entry.type {
case .directory:
subdirectories.append(entry.path)

default:
filesRemoved += try await self.removeOneItem(at: entry.path)
}
}

return (subdirectories, filesRemoved)
}

for subdirectory in subdirectories {
filesRemoved += try await self.removeItem(at: subdirectory)
}
return (subdirectories, filesRemoved)
}

// The directory should be empty now. Remove ourself.
filesRemoved += try await self.removeOneItem(at: path)
for subdirectory in subdirectories {
filesRemoved += try await self.removeItemSequentially(at: subdirectory)
}

return filesRemoved
// The directory should be empty now. Remove ourself.
filesRemoved += try await self.removeOneItem(at: path)

return filesRemoved

case let .failure(errno):
throw FileSystemError.remove(errno: errno, path: path, location: .here())
}
}

/// Moves the named file or directory to a new location.
Expand Down Expand Up @@ -490,7 +504,7 @@ public struct FileSystem: Sendable, FileSystemProtocol {
case .differentLogicalDevices:
// Fall back to copy and remove.
try await self.copyItem(at: sourcePath, to: destinationPath)
try await self.removeItem(at: sourcePath)
try await self.removeItem(at: sourcePath, strategy: .platformDefault)
}
}

Expand Down Expand Up @@ -518,9 +532,9 @@ public struct FileSystem: Sendable, FileSystemProtocol {
withItemAt existingPath: FilePath
) async throws {
do {
try await self.removeItem(at: destinationPath)
try await self.removeItem(at: destinationPath, strategy: .platformDefault)
try await self.moveItem(at: existingPath, to: destinationPath)
try await self.removeItem(at: existingPath)
try await self.removeItem(at: existingPath, strategy: .platformDefault)
} catch let error as FileSystemError {
throw FileSystemError(
message: "Can't replace '\(destinationPath)' with '\(existingPath)'.",
Expand Down
8 changes: 5 additions & 3 deletions Sources/NIOFileSystem/FileSystemProtocol.swift
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,7 @@ public protocol FileSystemProtocol: Sendable {
@discardableResult
func removeItem(
at path: FilePath,
strategy removalStrategy: RemovalStrategy,
recursively removeItemRecursively: Bool
) async throws -> Int

Expand Down Expand Up @@ -597,9 +598,10 @@ extension FileSystemProtocol {
/// - Returns: The number of deleted items which may be zero if `path` did not exist.
@discardableResult
public func removeItem(
at path: FilePath
at path: FilePath,
strategy removalStrategy: RemovalStrategy
) async throws -> Int {
try await self.removeItem(at: path, recursively: true)
try await self.removeItem(at: path, strategy: removalStrategy, recursively: true)
}

/// Create a directory at the given path.
Expand Down Expand Up @@ -659,7 +661,7 @@ extension FileSystemProtocol {
try await execute(handle, directory)
}
} tearDown: { _ in
try await self.removeItem(at: directory, recursively: true)
try await self.removeItem(at: directory, strategy: .platformDefault, recursively: true)
}
}
}
212 changes: 212 additions & 0 deletions Sources/NIOFileSystem/Internal/ParallelRemoval.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,212 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the SwiftNIO open source project
//
// Copyright (c) 2024 Apple Inc. and the SwiftNIO project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of SwiftNIO project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//

import NIOCore

@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
extension FileSystem {
enum RemoveItem: Hashable, Sendable {
// A directory with dependent objects inside of it. We need to delete all of the objects (and
// their nested objects), before we can tackle deleting the directory itself.
case directory(entry: DirectoryEntry, numberOfObjects: UInt64)
// An object that can be removed right away (i.e., symlink, file, empty directory). We need
// this special case to do the accounting of dependent files for the parent directory.
case immediatelyRemovableObject(entry: DirectoryEntry)
// An object that has been identified to be deletable now. This is used as a signal to
// actually perform the deletion. This case will be sent after all accounting for the parent
// has been completed.
case deleteObject(entry: DirectoryEntry)
}

fileprivate struct DirectoryReferences {
var root: FilePath
var state: [FilePath: Int64] = [:]
var yield: @Sendable ([RemoveItem]) -> Void

mutating func act(on item: RemoveItem) {
switch item {
case .directory(let directoryEntry, let numberOfObjects):
// Initialize accounting for directory.
if !self.state.keys.contains(directoryEntry.path) {
self.state[directoryEntry.path] = Int64(numberOfObjects)
break
}

// Directory was initialized earlier by other objects. Add numbers of dependent
// files we found for acounting purposes.
self.state[directoryEntry.path]! += Int64(numberOfObjects)

// Send directory off for deletion (and accounting) if its empty now.
if self.state[directoryEntry.path] == 0 {
yield([.immediatelyRemovableObject(entry: .init(path: directoryEntry.path, type: .directory)!)])
}

case .immediatelyRemovableObject(let directoryEntry):
// We are actually deleting the root directory, and we're done. No need to look
// at the parent directory.
if directoryEntry.path == root {
yield([.deleteObject(entry: directoryEntry)])
break
}

// Need to do accounting for the parent directory, and not the current object we are
// looking at.
let parent = directoryEntry.path.removingLastComponent()

// If we have not started accounting for the parent directory, lets initialize it.
if !self.state.keys.contains(parent) {
self.state[parent] = 0
}

// Signal deletion of directory, and account for the parent not having one less
// dependent.
yield([.deleteObject(entry: directoryEntry)])
self.state[parent]! -= 1

// Accounting. Send parent for deletion if its empty.
if self.state[parent] == 0 {
yield([.immediatelyRemovableObject(entry: .init(path: parent, type: .directory)!)])
}
case .deleteObject:
// This should not happen, because we handle this case outside this function call.
// Why does the Swift compiler not notice that we handle this case in the only place
// that this function is called, such that we should not be forced to handle this
// inside here as well.
//
// TODO: something better to do here than break?
break
}
}
}

func removeConcurrently(
at path: FilePath
) async throws -> Int {
let removalQueue = NIOAsyncSequenceProducer.makeSequence(
elementType: RemoveItem.self,
backPressureStrategy: NoBackPressureStrategy(),
finishOnDeinit: false,
delegate: DirCopyDelegate()
)

@Sendable func yield(_ contentsOf: [RemoveItem]) {
_ = removalQueue.source.yield(contentsOf: contentsOf)
}

// Keep track of how many references are found to a directory (files, subdirectories, etc
// contained within a directory). At the end of processing, all of these should be zero.
// When any directory arrives at zero, we intend to delete it. This should then allow us to
// eventually cascade all the way up and delete the root directory.
var references: DirectoryReferences = .init(root: path, yield: yield)

var deletedFiles: Int = 0
try await withThrowingTaskGroup(of: Void.self) { group in
// Seed discovery of directories. We will recursively call this function from within.
group.addTask {
try await self.walkTreeForRemovalConcurrently(at: path, yield: yield)
}

// Sequentially process stream of discovered objects. Kicks off more tasks if needed,
// all producing into this same sequence.
let iter = removalQueue.sequence.makeAsyncIterator()
var continueConsuming: Bool = true
while continueConsuming {
let item = await iter.next()
if let item = item {
switch item {
case .deleteObject(let directoryEntry):
deletedFiles += try await self.removeOneItem(at: directoryEntry.path)

// We just deleted the root directry. Lets close the queue.
if directoryEntry.path == path {
removalQueue.source.finish()
}

default: references.act(on: item)
}
} else {
continueConsuming = false
}
}

try await group.waitForAll()
}

return deletedFiles
}
}

@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
extension FileSystem {
func walkTreeForRemovalConcurrently(
at path: FilePath,
yield: @escaping @Sendable ([RemoveItem]) -> Void
) async throws {
try await self.withDirectoryHandle(atPath: path) { directory in
var numberOfObjectsInDirectory: UInt64 = 0
try await withThrowingTaskGroup(of: Void.self) { group in
for try await batch in directory.listContents().batched() {
for entry in batch {
numberOfObjectsInDirectory += 1
switch entry.type {
case .directory:
// Recurse into ourself to discover the next subdirectory, but do that
// on a separate thread.
group.addTask {
try await walkTreeForRemovalConcurrently(
at: entry.path,
yield: yield
)
}
default:
// Fire deletion events for anything that is not a directory with files in it.
yield([.immediatelyRemovableObject(entry: entry)])
}
}
}
}

if numberOfObjectsInDirectory == 0 {
// Directory is empty, so we can immediately send it off for deletion.
yield([.immediatelyRemovableObject(entry: .init(path: path, type: .directory)!)])
} else {
// Once we have seen the number of objects inside this directory, dispatch an event
// with that information.
yield([
.directory(entry: .init(path: path, type: .directory)!, numberOfObjects: numberOfObjectsInDirectory)
])
}
}
}
}

// TODO: the following two are verbatim copies from ParallelDirCopy.swift.
//
// An 'always ask for more' no back-pressure strategy for a ``NIOAsyncSequenceProducer``.
@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
private struct NoBackPressureStrategy: NIOAsyncSequenceProducerBackPressureStrategy {
mutating func didYield(bufferDepth: Int) -> Bool { true }

mutating func didConsume(bufferDepth: Int) -> Bool { true }
}

/// We ignore back pressure, the inherent handle limiting in copyDirectoryParallel means it is unnecessary.
@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
private struct DirCopyDelegate: NIOAsyncSequenceProducerDelegate, Sendable {
@inlinable
func produceMore() {}

@inlinable
func didTerminate() {}
}
Loading

0 comments on commit eca6695

Please sign in to comment.