Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New streaming backend #135

Open
wants to merge 132 commits into
base: master
Choose a base branch
from
Open

New streaming backend #135

wants to merge 132 commits into from

Conversation

flashmob
Copy link
Owner

@flashmob flashmob commented Feb 21, 2019

Problem

The current 'backend' processes the emails by buffering the entire message in memory, then applying each Processor using the decorator pattern. This approach works OK - most emails are never over 1MB anyway, and the buffers are recycled after processing, keeping them allocated for the next message, (being nice the garbage collector). However, some things can be done more efficiently - for example the message could be compressed while it's being saved on-the-fly. This is the main idea of 'streaming'.

Solution

The go Go way of streaming is to use the io.Writer and io.Reader interfaces. What if we could use the current decorator pattern that we use for the backends, extend that by making the processors implement the io.Writer interface? 🤔

Yes, we can do just that. A little bit of background first: Did you know that the io.Writer way is usually a decorator pattern? When we make an instance of a writer, we usually pass some underlying writer to it, allowing us to wire multiple writers together. Some people call this pattern 'chaining'

Normally, when using io.Writer, if you would like to create a chain, you need to manually wire them with a few lines of code. This solution takes it further, by allowing you to wire the Writers by configuration.

Technical Details

Each Writer is an instance of a StreamDecorator, it's a struct that implements io.Writer. Additionally, the struct contains two callback functions Open and Close, both could be set when the StreamDecorator is being initialized, and called back at the start and end of the stream. The Open callback is also used to pass down the *mail.Envelope which can be used to keep the state for the email currently being processed.

type StreamDecorator struct {
	p     func(StreamProcessor) StreamProcessor
	e     *mail.Envelope
	Close streamCloseWith
	Open  streamOpenWith
}

type streamOpenWith func(e *mail.Envelope) error

type streamCloseWith func() error

in gateway.go there's a new newStreamStack method that instantiates the StreamDecorator structs and wires them up.

A new method was added to the Backend interface

ProcessStream(r io.Reader, e *mail.Envelope) (Result, error)

A new configuration option was also added to the config file: stream_save_process.
The value is a string with the names of each StreamDecorator to chain, delimited by a pipe |.

This is how the io.Reader is passed from the DATA command down to the backend. The ProcessStream method calls all the Open methods on our writers, and then begins the streaming of data using io.Copy. At the end of the stream, it calls Close() on our decorators in the other they were wired.

Examples

Perhaps the best way to understand this is to look at some example code.

There are 3 examples of StreamDecorator implementations in the backends dir:

  • s_header.go - adds a 'delivery header' to the front of the stream
  • s_compress.go - uses zlib to compress the stream
  • s_decompress.go - uses zlib to decompress the stream
  • s_debug - logs the stream

You will notice that each of the files contain a function that looks just like the io.Writer interface,
without the Write keyword. I.e StreamProcessWith(func(p []byte) (int, error)
This is an anonymous function which is converted to an io.Writerwhen it is returned. Here is the code
of s_debug.go

func StreamDebug() *StreamDecorator {
	sd := &StreamDecorator{}
	sd.p =
		func(sp StreamProcessor) StreamProcessor {
			sd.Open = func(e *mail.Envelope) error {
				return nil
			}
			return StreamProcessWith(func(p []byte) (int, error) {
				fmt.Println(string(p))
				Log().WithField("p", string(p)).Info("Debug stream")
				return sp.Write(p)
			})
		}
	return sd
}

The most important detail here is that the sp identifier refers to the next io.Writer in the chain.
In other words, sp contains a reference to the underlying writer.

(The sd.Open statement does nothing, it's just there here as an example / to be used as a template.)

In the api_test.go file, there is a test called `TestStreamProcessor'. The writers are chained with the
following config setting:

"stream_save_process": "Header|compress|Decompress|debug"

Which means it will call the Write method on the Header first, and then down to each underlying writer in the stream.

Todo:

  • Configurable stream buffer size & ability to recycle it.
  • Write more advanced processors (parse headers, streaming version of the maildir processor, sql database, save in chunks & deduplicate on-the-fly)
  • Fuzz testing
  • Test in production

@flashmob flashmob mentioned this pull request Feb 21, 2019
…_parser.go)

- use a 4KB buffer to process the stream (io.CopyBuffer instead of io.Copy
- Add a new 'process' stream decorator
flashmob added 4 commits July 7, 2020 01:11
stream processor decorators have a new Shutdown function, switched to use this instead of Svc
start developing background processing
…spatcher is started for each processor type

- new ValidatingProcessor type
- removed "gw" prefix form gateway config options
@flashmob flashmob mentioned this pull request Jul 12, 2020
flashmob added 2 commits July 14, 2020 11:43
background processing: borrow a new envelope and copy the existing protocol params to it.
@flashmob
Copy link
Owner Author

4 done

@flashmob
Copy link
Owner Author

Still have a lot of small problems, but once that's fixed then real testing can start!

@flashmob
Copy link
Owner Author

flashmob commented Aug 3, 2020

mysql driver taking shape.

envelopoe: added mime parse error to enveloper
fixed tests
make ChunkPrefetchCount & ChunkMaxBytes configurable
update comments to parseInfo
chunkPrefetchCountDefault configurable (add chunkPrefetchMax)
@kushsharma
Copy link

@flashmob Are you still planning to finish this PR? I see you have invested quite some time but then the motivation died down? 😄
We need you back.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants