FBus is a lightweight service-bus implementation written in F#.
It comes with default implementation for:
- Publish (broadcast), Send (direct) and Reply (direct)
- Conversation follow-up using headers (ConversationId and MessageId)
- RabbitMQ (with dead-letter support)
- Generic Host support with dependency injection
- System.Text.Json serialization
- Full testing capabilities using In-Memory mode
- Persistent queue/buffering across activation
- Sharding
Features that won't be implemented in FBus:
- Sagas: coordination is a big topic by itself - technically, everything required to handle this is available (ConversationId and MessageId). This can be handled outside of a service-bus.
Starting version 0.25.0, you must install plugin rabbitmq_consistent_hash_exchange
if you plan to use RabbitMQ transport.
In order to exchange messages using FBus, you have first to define messages. There are 2 types:
- events: messages that are broadcasted (see
Publish
) - commands: messages that are sent to one client (see
Send
orReply
)
It's advised to shared same definitions for types across publishers and consumers. If you do not share types, ensure you expose types using same namespaces.
To avoid mistakes, messages are marked with a dummy interface.
type EventMessage =
{ msg: string }
interface FBus.IMessageEvent
type CommandMessage =
{ msg: string }
interface FBus.IMessageCommand
For sharding scenarios, messages must also implement the IMessageKey
interface:
type IMessageKey =
abstract Key: string with get
NOTE: the default routing key is an empty string.
Prior using the bus, a configuration must be built:
FBus.Builder | Description | Default |
---|---|---|
configure |
Start configuration with default parameters. | |
withName |
Change service name. Used to identify a bus client (see IBusInitiator.Send and IBusConversation.Send ) |
Name based on computer name, pid and random number. |
withShard |
Name the shard. Name must be provided to enable sharding. | None |
withTransport |
Transport to use. | None |
withContainer |
Container to use | None |
withSerializer |
Serializer to use | None |
withConsumer |
Add message consumer | None |
withHook |
Hook on consumer message processing | None |
withRecovery |
Connect to dead letter for recovery only | false |
build |
Returns a new bus instance (FBus.IBusControl ) based on configuration |
n/a |
NOTE: bus clients are ephemeral by default - this is useful if you just want to connect to the bus for spying or sending commands for eg :-) Assigning a name (see withName
) makes the client public so no queues are deleted upon exit.
IBusControl
is the primary interface to control the bus:
IBusControl | Description | Comments |
---|---|---|
Start |
Start the bus. Returns IBusInitiator |
Must be called before sending messages. Start accepts a resolve context which can be used by the container. |
Stop |
Stop the bus. | Bus can be restarted later on. |
Dispose |
Dispose the bus instance. | Bus can't be reused. |
Once bus is started, IBusInitiator
is available:
IBusInitiator | Description |
---|---|
Publish |
Broadcast an event message to all subscribers. |
Send |
Send a command message to given client. |
NOTE: a new conversation is started when using this interface.
A consumer processes incoming messages: a context is provided (IBusConversation
) and a message.
IBusConversation
provides information to handlers and means to interact with the bus:
IBusConversation | Description |
---|---|
Sender |
Name of the client. |
ConversationId |
Id of the conversation (identifier is flowing from initiator to subsequent consumers). |
MessageId |
Id the this message. |
Reply |
Provide a shortcut to reply to sender. |
Publish |
Broadcast an event message to all subscribers. |
Send |
Send a command message to given client. |
NOTE: the current conversation is used when using this interface.
There are two kind of handlers:
Implement IBusConsumer
interface on the class. Multiple implementation are allowed as of F# 5.0.
Use withConsumer
to register the handlers.
type IBusConsumer<'t> =
abstract Handle: IBusConversation -> 't -> unit
A function which enable partial application scenario: use withFunConsumer
.
Use withFunConsumer
to register the handler.
FBus provides InMemory implementation for transport, serializer and activator. They only exist to help testing or to easily prototype.
FBus.InMemory | Description | Comments |
---|---|---|
useTransport |
Register InMemory transport | |
useSerializer |
Register marshal by reference serializer | Object is preserved and passed by reference. |
useContainer |
Register default activator (see System.Activator ) |
Default constructor must exist. |
NOTE: InMemory serializer does leak messages. This is by design.
FBus can work in-memory, this is especially useful when unit-testing. Prior running a test in-memory, an FBus.Testing.Session
instance has to be created. Multiple sessions can be created, they are completely isolated.
FBus.Testing.Session | Description | Comments |
---|---|---|
Use |
Configure FBus for unit-testing | Configure transport, serializer and activator. |
WaitForCompletion |
Wait for all messages to be processed | This method blocks until completion. |
ClearCache |
Clear InMemory serializer cache | Shall not be used unless necessary. |
FBus is thread safe as all existing extensions in this repository.
Following extension points are supported:
- Transports: which middleware is transporting messages.
- Serializers: how messages are exchanged on the wire.
- Containers: where and how consumers are allocated and hosted.
- Hooks: handlers in case of failures.
There are 2 types of messages:
- events: messages that are broadcasted (see
Publish
) - commands: messages that are sent to one client (understand
Send
)
In order to avoid mistakes, messages are marked with a dummy interface.
For events:
type EventMessage =
{ msg: string }
interface FBus.IMessageEvent
For commands:
type CommandMessage =
{ msg: string }
interface FBus.IMessageCommand
Two transports are available out of the box: RabbitMQ and InMemory. Still, it's possible to easily add new middlewares.
See FBus.IBusTransport
.
Containers are in charge of activating and running consumers.
See FBus.IBusContainer
.
Serializers transform objects into byte streams and vis-versa without relying on native middleware capabilities.
See FBus.IBusSerializer
.
Consumers can be configured at will. There is one major restriction: only one handler per type is supported. If you want several subscribers, you will have to handle delegation.
See FBus.IBusConsumer<>
.
Allow one to observe errors while processing messages.
See FBus.IBusHook
.
FBus.IBusHook | Description | Comments |
---|---|---|
OnStart |
Invoked after the bus is started - use to start additional services if required | |
OnStop |
Invoked before the bus is stopped - use to stop additional services if required | |
OnBeforeProcessing |
Invoked before processing a message | Must not throw - can return an IDisposable object released once processing is done. |
OnError |
Invoked on error | Must not throw. |
NOTE:
You must install plugin rabbitmq_consistent_hash_exchange
if you plan to use RabbitMQ transport.
FBus.RabbitMQ | Description | Comments |
---|---|---|
useDefaults |
Configure RabbitMQ as transport | Endpoint is set to amqp://guest:guest@localhost . |
useWith |
Configure RabbitMQ as transport with provided URI |
Transport leverages exchanges (one for each message type) to distribute messages across consumers (subscribing a queue).
It supports only a simple concurrency model:
- no concurrency at bus level for receive. This does not mean you can't have concurrency, you just have to handle it explicitely: you have to create multiple bus instances in-process and it's up to you to synchronize correctly among threads if required.
- Sending is a thread safe operation - but locking happens behind the scene to access underlying channel/connection.
- Automatic recovery is configured on connection.
The default implementation use following settings:
- messages are sent as persistent
- a consumer fetches one message at a time and ack/nack accordingly
- message goes to dead-letter on error
- prefetch size is 0
- prefetch count is 10
FBus.Json | Description | Comments |
---|---|---|
useDefaults |
Configure System.Text.Json as serializer | FSharp.SystemTextJson](https://github.com/Tarmil/FSharp.SystemTextJson) is used to deal with F# types. |
useWith |
Same as useDefaults but with provided configuration options |
FBus.QuickStart | Description | Comments
configure
| Configure FBus with RabbitMQ, Json and In-Memory Activator. |
FBus.GenericHost | Description | Comments |
---|---|---|
AddFBus |
Inject FBus in GenericHost container | FBus.IBusControl and FBus.IBusInitiator are available in injection context. |
NOTE: consumers are scoped (see IServiceProvider.CreateScope()
).
open FBus
open FBus.Builder
use bus = FBus.QuickStart.configure() |> build
let busInitiator = bus.Start()
busInitiator.Send "hello from FBus !"
open FBus
open FBus.Builder
type MessageConsumer() =
interface IConsumer<Message> with
member this.Handle context msg =
printfn "Received message: %A" msg
use bus =
FBus.QuickStart.configure()
|> withConsumer<MessageConsumer>
|> build
bus.Start() |> ignore
...
let configureBus builder =
builder
|> withName "server"
|> withConsumer<MessageConsumer>
|> Json.useDefaults
|> RabbitMQ.useDefaults
Host.CreateDefaultBuilder(argv)
.ConfigureServices(fun services -> services.AddFBus(configureBus) |> ignore)
.UseConsoleLifetime()
.Build()
.Run()
A makefile is available:
make [build]
: build FBusmake test
: build and test FBus
If you prefer to build using your IDE, solution file is named fbus.sln
.
The project is licensed under MIT.
For more information on the license see the license file.