Skip to content

Commit

Permalink
Merge pull request #174 from Schniz/schniz/support-email-triggering
Browse files Browse the repository at this point in the history
feat: add support for email triggers
  • Loading branch information
evanderkoogh authored Dec 2, 2024
2 parents 4fc00a8 + 4ac6f73 commit 0f2b9c6
Show file tree
Hide file tree
Showing 7 changed files with 117 additions and 3 deletions.
15 changes: 15 additions & 0 deletions .changeset/thirty-ties-float.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
---
'@microlabs/otel-cf-workers': minor
---

add support for `email` handlers

Example usage:

```ts
export default {
async email(message, env, ctx) {
// this is running in a trace!
},
};
```
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ One of the advantages of using Open Telemetry is that it makes it easier to do d

Triggers:

- [ ] Email (`handler.email`)
- [x] Email (`handler.email`)
- [x] HTTP (`handler.fetch`)
- [x] Queue (`handler.queue`)
- [x] Cron (`handler.scheduled`)
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
"scripts": {
"clean": "rimraf ./dist versions.json",
"format": "prettier --ignore-unknown --write .",
"build:src": "tsup src/index.ts --format cjs,esm --dts --clean --sourcemap",
"build:src": "tsup",
"build:versions": "pnpm version --json > versions.json",
"build": "run-s -l build:versions build:src",
"cs-version": "changeset version",
Expand Down
76 changes: 76 additions & 0 deletions src/instrumentation/email.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
import { setConfig, type Initialiser } from '../config'
import { wrap } from '../wrap'
import { exportSpans, proxyExecutionContext } from './common'
import { context as api_context, Exception, SpanKind, type SpanOptions, trace } from '@opentelemetry/api'
import { instrumentEnv } from './env'
import { versionAttributes } from './version'
import {
ATTR_FAAS_TRIGGER,
ATTR_MESSAGING_DESTINATION_NAME,
ATTR_RPC_MESSAGE_ID,
} from '@opentelemetry/semantic-conventions/incubating'

type EmailHandler = EmailExportedHandler
export type EmailHandlerArgs = Parameters<EmailHandler>

export function createEmailHandler(emailFn: EmailHandler, initialiser: Initialiser): EmailHandler {
const emailHandler: ProxyHandler<EmailHandler> = {
async apply(target, _thisArg, argArray: Parameters<EmailHandler>): Promise<void> {
const [message, orig_env, orig_ctx] = argArray
const config = initialiser(orig_env as Record<string, unknown>, message)
const env = instrumentEnv(orig_env as Record<string, unknown>)
const { ctx, tracker } = proxyExecutionContext(orig_ctx)
const context = setConfig(config)

try {
const args: EmailHandlerArgs = [message, env, ctx]
return await api_context.with(context, executeEmailHandler, undefined, target, args)
} catch (error) {
throw error
} finally {
orig_ctx.waitUntil(exportSpans(tracker))
}
},
}
return wrap(emailFn, emailHandler)
}

/**
* Converts the message headers into a record ready to be injected
* as OpenTelemetry attributes
*
* @example
* ```ts
* const headers = new Headers({ "Subject": "Hello!", From: "[email protected]" })
* headerAttributes({ headers })
* // => {"email.header.Subject": "Hello!", "email.header.From": "[email protected]"}
* ```
*/
function headerAttributes(message: { headers: Headers }): Record<string, unknown> {
return Object.fromEntries([...message.headers].map(([key, value]) => [`email.header.${key}`, value] as const))
}

async function executeEmailHandler(emailFn: EmailHandler, [message, env, ctx]: EmailHandlerArgs): Promise<void> {
const tracer = trace.getTracer('emailHandler')
const options = {
attributes: {
[ATTR_FAAS_TRIGGER]: 'other',
[ATTR_RPC_MESSAGE_ID]: message.headers.get('Message-Id') ?? undefined,
[ATTR_MESSAGING_DESTINATION_NAME]: message.to,
},
kind: SpanKind.CONSUMER,
} satisfies SpanOptions
Object.assign(options.attributes!, headerAttributes(message), versionAttributes(env))
const promise = tracer.startActiveSpan(`emailHandler ${message.to}`, options, async (span) => {
try {
const result = await emailFn(message, env, ctx)
span.end()
return result
} catch (error) {
span.recordException(error as Exception)
span.end()
throw error
}
})
return promise
}
8 changes: 8 additions & 0 deletions src/sdk.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,12 @@ import { DOClass, instrumentDOClass } from './instrumentation/do.js'
import { createScheduledHandler } from './instrumentation/scheduled.js'
//@ts-ignore
import * as versions from '../versions.json'
import { createEmailHandler } from './instrumentation/email.js'

type FetchHandler = ExportedHandlerFetchHandler<unknown, unknown>
type ScheduledHandler = ExportedHandlerScheduledHandler<unknown>
type QueueHandler = ExportedHandlerQueueHandler
type EmailHandler = EmailExportedHandler

export type ResolveConfigFn<Env = any> = (env: Env, trigger: Trigger) => TraceConfig
export type ConfigurationOption = TraceConfig | ResolveConfigFn
Expand Down Expand Up @@ -106,6 +108,12 @@ export function instrument<E, Q, C>(
const queuer = unwrap(handler.queue) as QueueHandler
handler.queue = createQueueHandler(queuer, initialiser)
}

if (handler.email) {
const emailer = unwrap(handler.email) as EmailHandler
handler.email = createEmailHandler(emailer, initialiser)
}

return handler
}

Expand Down
8 changes: 7 additions & 1 deletion src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -73,4 +73,10 @@ export interface DOConstructorTrigger {
name?: string
}

export type Trigger = Request | MessageBatch | ScheduledController | DOConstructorTrigger | 'do-alarm'
export type Trigger =
| Request
| MessageBatch
| ScheduledController
| DOConstructorTrigger
| 'do-alarm'
| ForwardableEmailMessage
9 changes: 9 additions & 0 deletions tsup.config.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
import { defineConfig } from 'tsup'

export default defineConfig({
entry: ['src/index.ts'],
format: ['cjs', 'esm'],
dts: true,
clean: true,
sourcemap: true,
})

0 comments on commit 0f2b9c6

Please sign in to comment.