-
Notifications
You must be signed in to change notification settings - Fork 528
/
apex.go
152 lines (122 loc) · 3.18 KB
/
apex.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
// Package apex provides Lambda support for Go via a
// Node.js shim and this package for operating over
// stdio.
package apex
import (
"encoding/json"
"io"
"log"
"os"
"sync"
)
// Handler handles Lambda events.
type Handler interface {
Handle(json.RawMessage, *Context) (interface{}, error)
}
// HandlerFunc implements Handler.
type HandlerFunc func(json.RawMessage, *Context) (interface{}, error)
// Handle Lambda event.
func (h HandlerFunc) Handle(event json.RawMessage, ctx *Context) (interface{}, error) {
return h(event, ctx)
}
// Context represents the context data provided by a Lambda invocation.
type Context struct {
InvokeID string `json:"invokeid"`
RequestID string `json:"awsRequestId"`
FunctionName string `json:"functionName"`
FunctionVersion string `json:"functionVersion"`
LogGroupName string `json:"logGroupName"`
LogStreamName string `json:"logStreamName"`
MemoryLimitInMB string `json:"memoryLimitInMB"`
IsDefaultFunctionVersion bool `json:"isDefaultFunctionVersion"`
ClientContext json.RawMessage `json:"clientContext"`
}
// Handle Lambda events with the given handler.
func Handle(h Handler) {
m := &manager{
Reader: os.Stdin,
Writer: os.Stdout,
Handler: h,
}
m.Start()
}
// HandleFunc handles Lambda events with the given handler function.
func HandleFunc(h HandlerFunc) {
Handle(h)
}
// input for the node shim.
type input struct {
Event json.RawMessage `json:"event"`
Context *Context `json:"context"`
}
// output from the node shim.
type output struct {
Error string `json:"error,omitempty"`
Value interface{} `json:"value,omitempty"`
}
// manager for operating over stdio.
type manager struct {
Reader io.Reader
Writer io.Writer
Handler Handler
}
// Start the manager.
func (m *manager) Start() {
m.output(m.handle(m.input()))
}
// input reads from the Reader and decodes JSON messages.
func (m *manager) input() <-chan *input {
dec := json.NewDecoder(m.Reader)
ch := make(chan *input)
go func() {
defer close(ch)
for {
msg := new(input)
err := dec.Decode(msg)
if err == io.EOF {
break
}
if err != nil {
log.Printf("error decoding input: %s", err)
break
}
ch <- msg
}
}()
return ch
}
// handle invokes the handler and sends the response to the output channel
func (m *manager) handle(in <-chan *input) <-chan *output {
ch := make(chan *output)
var wg sync.WaitGroup
go func() {
defer close(ch)
for msg := range in {
msg := msg
wg.Add(1)
go func() {
defer wg.Done()
ch <- m.invoke(msg)
}()
}
wg.Wait()
}()
return ch
}
// invoke calls the handler with `msg`.
func (m *manager) invoke(msg *input) *output {
v, err := m.Handler.Handle(msg.Event, msg.Context)
if err != nil {
return &output{Error: err.Error()}
}
return &output{Value: v}
}
// output encodes the JSON messages and writes to the Writer.
func (m *manager) output(ch <-chan *output) {
enc := json.NewEncoder(m.Writer)
for msg := range ch {
if err := enc.Encode(msg); err != nil {
log.Printf("error encoding output: %s", err)
}
}
}