Skip to content

Commit

Permalink
manager event stream working?
Browse files Browse the repository at this point in the history
  • Loading branch information
samdeane committed Sep 30, 2024
1 parent 2065096 commit e044b4b
Show file tree
Hide file tree
Showing 7 changed files with 62 additions and 34 deletions.
10 changes: 4 additions & 6 deletions Sources/Logger/Channel.swift
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,7 @@ public actor Channel {
self.sequence = s

Task {
await manager.register(channel: self)
if autoRun {
await run()
}
await manager.add(channel: self, runImmediately: autoRun)
}
}

Expand Down Expand Up @@ -131,7 +128,7 @@ public actor Channel {
/// when the channel is created. For testing purposes it's useful to
/// be able to call it manually, in order to wait for all logged items
/// to be processed.
public func run() async {
internal func run() async {
for await item in sequence {
await handler.log(item.value, context: item.context)
}
Expand All @@ -141,7 +138,8 @@ public actor Channel {
/// Logging items to the channel after this call
/// will do nothing.
/// NB: Under normal conditions this function does not need to be called.
public func shutdown() async {
public func shutdown() {
print("shutdown \(name)")
sequence.continuation.finish()
}
}
Expand Down
3 changes: 3 additions & 0 deletions Sources/Logger/Handler/BasicHandler.swift
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ public actor BasicHandler: Handler {
/// Log something.
public func log(_ value: Sendable, context: Context) async { await logger(value, context, self) }

public func shutdown() async {
}

/// Calculate a text tag indicating the context.
/// Provided as a utility for logger callbacks to use as they need.
internal func tag(for context: Context) -> String {
Expand Down
1 change: 1 addition & 0 deletions Sources/Logger/Handler/Handler.swift
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,5 @@
public protocol Handler: Sendable {
var name: String { get }
func log(_ value: Sendable, context: Context) async
func shutdown() async
}
3 changes: 3 additions & 0 deletions Sources/Logger/Handler/OSLogHandler.swift
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@

}
}

public func shutdown() async {
}
}

#endif
2 changes: 1 addition & 1 deletion Sources/Logger/Handler/StreamHandler.swift
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public actor StreamHandler: Handler {
continuation.yield(value)
}

public func finish() {
public func shutdown() {
continuation.finish()
}
}
Expand Down
52 changes: 34 additions & 18 deletions Sources/Logger/Manager.swift
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,9 @@ public actor Manager {
private var changedChannels: Channels?
nonisolated(unsafe) var fatalHandler: FatalHandler = defaultFatalHandler

public enum Event {
public enum Event: Sendable {
case started
case shuttingDown
case shutdown
case channelAdded
case channelStarted
Expand All @@ -40,10 +41,7 @@ public actor Manager {
/// Stream of manager events. Clients can watch this if they are
/// interested in changes to the state of channels. When this stream
/// ends, the manager has shut down, along with all channels.
public let events: AsyncStream<Event>

/// Private continuation used to yield events to the stream.
private let eventSender: AsyncStream<Event>.Continuation
public let events: YieldingSequence<Event>

/**
An array of the names of the log channels
Expand All @@ -57,23 +55,23 @@ public actor Manager {
self.settings = settings
let enabled = settings.enabledChannelIDs
self.channelsEnabledInSettings = enabled
var c: AsyncStream<Event>.Continuation? = nil
let s = AsyncStream<Event> { continuation in
c = continuation
}
self.events = s
self.eventSender = c!
self.events = YieldingSequence()

logStartup(channels: enabled)
}

/// Initiate a shutdown of the manager.
public func shutdown() async {
for channel in channels {
await channel.shutdown()
events.yield(.shuttingDown)
await withTaskGroup(of: Void.self) { group in
for channel in channels {
group.addTask {
await channel.shutdown()
}
}
}
eventSender.yield(.shutdown)
eventSender.finish()
events.yield(.shutdown)
events.finish()
}

/**
Expand Down Expand Up @@ -138,7 +136,7 @@ public actor Manager {
channels.isEmpty
? "All channels currently disabled.\n" : "Enabled log channels: \(channels)\n")
}
eventSender.yield(.started)
events.yield(.started)
}

/**
Expand Down Expand Up @@ -193,8 +191,26 @@ extension Manager {
}

extension Manager {
func register(channel: Channel) {
/// Add a channel to the manager.
///
/// We emit a channelAdded event.
/// If `runImmediately` is true we kick off running the channel.
func add(channel: Channel, runImmediately: Bool) async {
channels.insert(channel)
eventSender.yield(.channelAdded)
events.yield(.channelAdded)
if runImmediately {
await run(channel: channel)
}
}

/// Run a channel.
/// We emit a `Event.channelStarted` event,
/// run the channel until it is done, then
/// emit a `Event.channelShutdown` event.
func run(channel: Channel) async {
events.yield(.channelStarted)
await channel.run()
events.yield(.channelShutdown)
}

}
25 changes: 16 additions & 9 deletions Tests/LoggerTests/LoggerTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@
// // For licensing terms, see http://elegantchaos.com/license/liberal/.
// // -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-

import Logger
import Testing

@testable import Logger

/// Settings supplier for the tests.
struct TestSettings: ManagerSettings {
var enabledChannelIDs: Set<Channel.ID> { [] }
Expand All @@ -25,23 +26,29 @@ func withTestChannel(action: @escaping @Sendable (Channel, StreamHandler) async
{
let st =
LogStream { continuation in
let handler = StreamHandler("test", continuation: continuation)
let manager = Manager(settings: TestSettings())
let channel = Channel(
"test", handler: handler, alwaysEnabled: true, manager: manager)
Task {
do {
let handler = StreamHandler("test", continuation: continuation)
let manager = Manager(settings: TestSettings())
let channel = Channel(
"test", handler: handler, alwaysEnabled: true, manager: manager, autoRun: false)
print("running action")
try await action(channel, handler)
print("done")
await channel.shutdown()
await channel.run()
print("flushed")
await handler.finish()
await manager.shutdown()
continuation.finish()
// await channel.shutdown()
// await manager.run(channel: channel)
} catch {
continuation.finish(throwing: error)
}
}
Task {
for await event in await manager.events {
print("» \(event)")
}
print("events done")
}
}

print("returned stream")
Expand Down

0 comments on commit e044b4b

Please sign in to comment.