diff --git a/.github/workflows/ci-workflow.yaml b/.github/workflows/ci-workflow.yaml index 4af864c9..cdfd870d 100644 --- a/.github/workflows/ci-workflow.yaml +++ b/.github/workflows/ci-workflow.yaml @@ -114,7 +114,7 @@ jobs: # We upload this to the artifacts that are attached to the action just to make it easy for # someone to pull down a build from another branch. - name: "Upload Binaries" - uses: actions/upload-artifact@v2 + uses: actions/upload-artifact@v4 with: name: "gazette-x86_64-linux-gnu.zip" path: ".build-ci/gazette-x86_64-linux-gnu.zip" diff --git a/Makefile b/Makefile index b671508d..200a493d 100644 --- a/Makefile +++ b/Makefile @@ -20,7 +20,9 @@ ci-release-gazette-examples-targets = \ # Targets of protobufs which must be compiled. protobuf-targets = \ ./broker/protocol/protocol.pb.go \ + ./broker/protocol/protocol.pb.gw.go \ ./consumer/protocol/protocol.pb.go \ + ./consumer/protocol/protocol.pb.gw.go \ ./consumer/recoverylog/recorded_op.pb.go \ ./examples/word-count/word_count.pb.go diff --git a/broker/protocol/protocol.pb.gw.go b/broker/protocol/protocol.pb.gw.go new file mode 100644 index 00000000..70261898 --- /dev/null +++ b/broker/protocol/protocol.pb.gw.go @@ -0,0 +1,200 @@ +// Code generated by protoc-gen-grpc-gateway. DO NOT EDIT. +// source: broker/protocol/protocol.proto + +/* +Package protocol is a reverse proxy. + +It translates gRPC into RESTful JSON APIs. +*/ +package protocol + +import ( + "context" + "io" + "net/http" + + "github.com/golang/protobuf/descriptor" + "github.com/golang/protobuf/proto" + "github.com/grpc-ecosystem/grpc-gateway/runtime" + "github.com/grpc-ecosystem/grpc-gateway/utilities" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/grpclog" + "google.golang.org/grpc/metadata" + "google.golang.org/grpc/status" +) + +// Suppress "imported and not used" errors +var _ codes.Code +var _ io.Reader +var _ status.Status +var _ = runtime.String +var _ = utilities.NewDoubleArray +var _ = descriptor.ForMessage +var _ = metadata.Join + +func request_Journal_List_0(ctx context.Context, marshaler runtime.Marshaler, client JournalClient, req *http.Request, pathParams map[string]string) (Journal_ListClient, runtime.ServerMetadata, error) { + var protoReq ListRequest + var metadata runtime.ServerMetadata + + newReader, berr := utilities.IOReaderFactory(req.Body) + if berr != nil { + return nil, metadata, status.Errorf(codes.InvalidArgument, "%v", berr) + } + if err := marshaler.NewDecoder(newReader()).Decode(&protoReq); err != nil && err != io.EOF { + return nil, metadata, status.Errorf(codes.InvalidArgument, "%v", err) + } + + stream, err := client.List(ctx, &protoReq) + if err != nil { + return nil, metadata, err + } + header, err := stream.Header() + if err != nil { + return nil, metadata, err + } + metadata.HeaderMD = header + return stream, metadata, nil + +} + +func request_Journal_Read_0(ctx context.Context, marshaler runtime.Marshaler, client JournalClient, req *http.Request, pathParams map[string]string) (Journal_ReadClient, runtime.ServerMetadata, error) { + var protoReq ReadRequest + var metadata runtime.ServerMetadata + + newReader, berr := utilities.IOReaderFactory(req.Body) + if berr != nil { + return nil, metadata, status.Errorf(codes.InvalidArgument, "%v", berr) + } + if err := marshaler.NewDecoder(newReader()).Decode(&protoReq); err != nil && err != io.EOF { + return nil, metadata, status.Errorf(codes.InvalidArgument, "%v", err) + } + + stream, err := client.Read(ctx, &protoReq) + if err != nil { + return nil, metadata, err + } + header, err := stream.Header() + if err != nil { + return nil, metadata, err + } + metadata.HeaderMD = header + return stream, metadata, nil + +} + +// RegisterJournalHandlerServer registers the http handlers for service Journal to "mux". +// UnaryRPC :call JournalServer directly. +// StreamingRPC :currently unsupported pending https://github.com/grpc/grpc-go/issues/906. +// Note that using this registration option will cause many gRPC library features to stop working. Consider using RegisterJournalHandlerFromEndpoint instead. +func RegisterJournalHandlerServer(ctx context.Context, mux *runtime.ServeMux, server JournalServer) error { + + mux.Handle("POST", pattern_Journal_List_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) { + err := status.Error(codes.Unimplemented, "streaming calls are not yet supported in the in-process transport") + _, outboundMarshaler := runtime.MarshalerForRequest(mux, req) + runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err) + return + }) + + mux.Handle("POST", pattern_Journal_Read_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) { + err := status.Error(codes.Unimplemented, "streaming calls are not yet supported in the in-process transport") + _, outboundMarshaler := runtime.MarshalerForRequest(mux, req) + runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err) + return + }) + + return nil +} + +// RegisterJournalHandlerFromEndpoint is same as RegisterJournalHandler but +// automatically dials to "endpoint" and closes the connection when "ctx" gets done. +func RegisterJournalHandlerFromEndpoint(ctx context.Context, mux *runtime.ServeMux, endpoint string, opts []grpc.DialOption) (err error) { + conn, err := grpc.Dial(endpoint, opts...) + if err != nil { + return err + } + defer func() { + if err != nil { + if cerr := conn.Close(); cerr != nil { + grpclog.Infof("Failed to close conn to %s: %v", endpoint, cerr) + } + return + } + go func() { + <-ctx.Done() + if cerr := conn.Close(); cerr != nil { + grpclog.Infof("Failed to close conn to %s: %v", endpoint, cerr) + } + }() + }() + + return RegisterJournalHandler(ctx, mux, conn) +} + +// RegisterJournalHandler registers the http handlers for service Journal to "mux". +// The handlers forward requests to the grpc endpoint over "conn". +func RegisterJournalHandler(ctx context.Context, mux *runtime.ServeMux, conn *grpc.ClientConn) error { + return RegisterJournalHandlerClient(ctx, mux, NewJournalClient(conn)) +} + +// RegisterJournalHandlerClient registers the http handlers for service Journal +// to "mux". The handlers forward requests to the grpc endpoint over the given implementation of "JournalClient". +// Note: the gRPC framework executes interceptors within the gRPC handler. If the passed in "JournalClient" +// doesn't go through the normal gRPC flow (creating a gRPC client etc.) then it will be up to the passed in +// "JournalClient" to call the correct interceptors. +func RegisterJournalHandlerClient(ctx context.Context, mux *runtime.ServeMux, client JournalClient) error { + + mux.Handle("POST", pattern_Journal_List_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) { + ctx, cancel := context.WithCancel(req.Context()) + defer cancel() + inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req) + rctx, err := runtime.AnnotateContext(ctx, mux, req) + if err != nil { + runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err) + return + } + resp, md, err := request_Journal_List_0(rctx, inboundMarshaler, client, req, pathParams) + ctx = runtime.NewServerMetadataContext(ctx, md) + if err != nil { + runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err) + return + } + + forward_Journal_List_0(ctx, mux, outboundMarshaler, w, req, func() (proto.Message, error) { return resp.Recv() }, mux.GetForwardResponseOptions()...) + + }) + + mux.Handle("POST", pattern_Journal_Read_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) { + ctx, cancel := context.WithCancel(req.Context()) + defer cancel() + inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req) + rctx, err := runtime.AnnotateContext(ctx, mux, req) + if err != nil { + runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err) + return + } + resp, md, err := request_Journal_Read_0(rctx, inboundMarshaler, client, req, pathParams) + ctx = runtime.NewServerMetadataContext(ctx, md) + if err != nil { + runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err) + return + } + + forward_Journal_Read_0(ctx, mux, outboundMarshaler, w, req, func() (proto.Message, error) { return resp.Recv() }, mux.GetForwardResponseOptions()...) + + }) + + return nil +} + +var ( + pattern_Journal_List_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1, 2, 2}, []string{"v1", "journals", "list"}, "", runtime.AssumeColonVerbOpt(true))) + + pattern_Journal_Read_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1, 2, 2}, []string{"v1", "journals", "read"}, "", runtime.AssumeColonVerbOpt(true))) +) + +var ( + forward_Journal_List_0 = runtime.ForwardResponseStream + + forward_Journal_Read_0 = runtime.ForwardResponseStream +) diff --git a/broker/protocol/protocol_gateway.yaml b/broker/protocol/protocol_gateway.yaml new file mode 100644 index 00000000..d75c93ea --- /dev/null +++ b/broker/protocol/protocol_gateway.yaml @@ -0,0 +1,11 @@ +type: google.api.Service +config_version: 3 + +http: + rules: + - selector: protocol.Journal.List + post: /v1/journals/list + body: "*" + - selector: protocol.Journal.Read + post: /v1/journals/read + body: "*" diff --git a/cmd/gazette/main.go b/cmd/gazette/main.go index 2fdbcd77..bc1ae2b1 100644 --- a/cmd/gazette/main.go +++ b/cmd/gazette/main.go @@ -8,6 +8,8 @@ import ( "syscall" "time" + "github.com/gogo/gateway" + "github.com/grpc-ecosystem/grpc-gateway/runtime" "github.com/jessevdk/go-flags" log "github.com/sirupsen/logrus" "go.gazette.dev/core/allocator" @@ -85,7 +87,7 @@ func (cmdServe) Execute(args []string) error { } // Bind our server listener, grabbing a random available port if Port is zero. - srv, err := server.New("", Config.Broker.Host, Config.Broker.Port, serverTLS, peerTLS, Config.Broker.MaxGRPCRecvSize) + srv, err := server.New("", Config.Broker.Host, Config.Broker.Port, serverTLS, peerTLS, Config.Broker.MaxGRPCRecvSize, nil) mbp.Must(err, "building Server instance") // If a file:// root was provided, ensure it exists and apply it. @@ -116,6 +118,13 @@ func (cmdServe) Execute(args []string) error { signalCh = make(chan os.Signal, 1) ) pb.RegisterJournalServer(srv.GRPCServer, pb.NewVerifiedJournalServer(service, verifier)) + + var mux *runtime.ServeMux = runtime.NewServeMux( + runtime.WithMarshalerOption(runtime.MIMEWildcard, new(gateway.JSONPb)), + runtime.WithProtoErrorHandler(runtime.DefaultHTTPProtoErrorHandler), + ) + pb.RegisterJournalHandler(tasks.Context(), mux, srv.GRPCLoopback) + srv.HTTPMux.Handle("/v1/", Config.Broker.CORSWrapper(mux)) srv.HTTPMux.Handle("/", http_gateway.NewGateway(pb.NewRoutedJournalClient(lo, pb.NoopDispatchRouter{}))) ks.WatchApplyDelay = Config.Broker.WatchDelay diff --git a/consumer/protocol/protocol.pb.gw.go b/consumer/protocol/protocol.pb.gw.go new file mode 100644 index 00000000..e597cbd4 --- /dev/null +++ b/consumer/protocol/protocol.pb.gw.go @@ -0,0 +1,250 @@ +// Code generated by protoc-gen-grpc-gateway. DO NOT EDIT. +// source: consumer/protocol/protocol.proto + +/* +Package protocol is a reverse proxy. + +It translates gRPC into RESTful JSON APIs. +*/ +package protocol + +import ( + "context" + "io" + "net/http" + + "github.com/golang/protobuf/descriptor" + "github.com/golang/protobuf/proto" + "github.com/grpc-ecosystem/grpc-gateway/runtime" + "github.com/grpc-ecosystem/grpc-gateway/utilities" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/grpclog" + "google.golang.org/grpc/metadata" + "google.golang.org/grpc/status" +) + +// Suppress "imported and not used" errors +var _ codes.Code +var _ io.Reader +var _ status.Status +var _ = runtime.String +var _ = utilities.NewDoubleArray +var _ = descriptor.ForMessage +var _ = metadata.Join + +func request_Shard_Stat_0(ctx context.Context, marshaler runtime.Marshaler, client ShardClient, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) { + var protoReq StatRequest + var metadata runtime.ServerMetadata + + newReader, berr := utilities.IOReaderFactory(req.Body) + if berr != nil { + return nil, metadata, status.Errorf(codes.InvalidArgument, "%v", berr) + } + if err := marshaler.NewDecoder(newReader()).Decode(&protoReq); err != nil && err != io.EOF { + return nil, metadata, status.Errorf(codes.InvalidArgument, "%v", err) + } + + msg, err := client.Stat(ctx, &protoReq, grpc.Header(&metadata.HeaderMD), grpc.Trailer(&metadata.TrailerMD)) + return msg, metadata, err + +} + +func local_request_Shard_Stat_0(ctx context.Context, marshaler runtime.Marshaler, server ShardServer, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) { + var protoReq StatRequest + var metadata runtime.ServerMetadata + + newReader, berr := utilities.IOReaderFactory(req.Body) + if berr != nil { + return nil, metadata, status.Errorf(codes.InvalidArgument, "%v", berr) + } + if err := marshaler.NewDecoder(newReader()).Decode(&protoReq); err != nil && err != io.EOF { + return nil, metadata, status.Errorf(codes.InvalidArgument, "%v", err) + } + + msg, err := server.Stat(ctx, &protoReq) + return msg, metadata, err + +} + +func request_Shard_List_0(ctx context.Context, marshaler runtime.Marshaler, client ShardClient, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) { + var protoReq ListRequest + var metadata runtime.ServerMetadata + + newReader, berr := utilities.IOReaderFactory(req.Body) + if berr != nil { + return nil, metadata, status.Errorf(codes.InvalidArgument, "%v", berr) + } + if err := marshaler.NewDecoder(newReader()).Decode(&protoReq); err != nil && err != io.EOF { + return nil, metadata, status.Errorf(codes.InvalidArgument, "%v", err) + } + + msg, err := client.List(ctx, &protoReq, grpc.Header(&metadata.HeaderMD), grpc.Trailer(&metadata.TrailerMD)) + return msg, metadata, err + +} + +func local_request_Shard_List_0(ctx context.Context, marshaler runtime.Marshaler, server ShardServer, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) { + var protoReq ListRequest + var metadata runtime.ServerMetadata + + newReader, berr := utilities.IOReaderFactory(req.Body) + if berr != nil { + return nil, metadata, status.Errorf(codes.InvalidArgument, "%v", berr) + } + if err := marshaler.NewDecoder(newReader()).Decode(&protoReq); err != nil && err != io.EOF { + return nil, metadata, status.Errorf(codes.InvalidArgument, "%v", err) + } + + msg, err := server.List(ctx, &protoReq) + return msg, metadata, err + +} + +// RegisterShardHandlerServer registers the http handlers for service Shard to "mux". +// UnaryRPC :call ShardServer directly. +// StreamingRPC :currently unsupported pending https://github.com/grpc/grpc-go/issues/906. +// Note that using this registration option will cause many gRPC library features to stop working. Consider using RegisterShardHandlerFromEndpoint instead. +func RegisterShardHandlerServer(ctx context.Context, mux *runtime.ServeMux, server ShardServer) error { + + mux.Handle("POST", pattern_Shard_Stat_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) { + ctx, cancel := context.WithCancel(req.Context()) + defer cancel() + var stream runtime.ServerTransportStream + ctx = grpc.NewContextWithServerTransportStream(ctx, &stream) + inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req) + rctx, err := runtime.AnnotateIncomingContext(ctx, mux, req) + if err != nil { + runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err) + return + } + resp, md, err := local_request_Shard_Stat_0(rctx, inboundMarshaler, server, req, pathParams) + md.HeaderMD, md.TrailerMD = metadata.Join(md.HeaderMD, stream.Header()), metadata.Join(md.TrailerMD, stream.Trailer()) + ctx = runtime.NewServerMetadataContext(ctx, md) + if err != nil { + runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err) + return + } + + forward_Shard_Stat_0(ctx, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...) + + }) + + mux.Handle("POST", pattern_Shard_List_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) { + ctx, cancel := context.WithCancel(req.Context()) + defer cancel() + var stream runtime.ServerTransportStream + ctx = grpc.NewContextWithServerTransportStream(ctx, &stream) + inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req) + rctx, err := runtime.AnnotateIncomingContext(ctx, mux, req) + if err != nil { + runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err) + return + } + resp, md, err := local_request_Shard_List_0(rctx, inboundMarshaler, server, req, pathParams) + md.HeaderMD, md.TrailerMD = metadata.Join(md.HeaderMD, stream.Header()), metadata.Join(md.TrailerMD, stream.Trailer()) + ctx = runtime.NewServerMetadataContext(ctx, md) + if err != nil { + runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err) + return + } + + forward_Shard_List_0(ctx, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...) + + }) + + return nil +} + +// RegisterShardHandlerFromEndpoint is same as RegisterShardHandler but +// automatically dials to "endpoint" and closes the connection when "ctx" gets done. +func RegisterShardHandlerFromEndpoint(ctx context.Context, mux *runtime.ServeMux, endpoint string, opts []grpc.DialOption) (err error) { + conn, err := grpc.Dial(endpoint, opts...) + if err != nil { + return err + } + defer func() { + if err != nil { + if cerr := conn.Close(); cerr != nil { + grpclog.Infof("Failed to close conn to %s: %v", endpoint, cerr) + } + return + } + go func() { + <-ctx.Done() + if cerr := conn.Close(); cerr != nil { + grpclog.Infof("Failed to close conn to %s: %v", endpoint, cerr) + } + }() + }() + + return RegisterShardHandler(ctx, mux, conn) +} + +// RegisterShardHandler registers the http handlers for service Shard to "mux". +// The handlers forward requests to the grpc endpoint over "conn". +func RegisterShardHandler(ctx context.Context, mux *runtime.ServeMux, conn *grpc.ClientConn) error { + return RegisterShardHandlerClient(ctx, mux, NewShardClient(conn)) +} + +// RegisterShardHandlerClient registers the http handlers for service Shard +// to "mux". The handlers forward requests to the grpc endpoint over the given implementation of "ShardClient". +// Note: the gRPC framework executes interceptors within the gRPC handler. If the passed in "ShardClient" +// doesn't go through the normal gRPC flow (creating a gRPC client etc.) then it will be up to the passed in +// "ShardClient" to call the correct interceptors. +func RegisterShardHandlerClient(ctx context.Context, mux *runtime.ServeMux, client ShardClient) error { + + mux.Handle("POST", pattern_Shard_Stat_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) { + ctx, cancel := context.WithCancel(req.Context()) + defer cancel() + inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req) + rctx, err := runtime.AnnotateContext(ctx, mux, req) + if err != nil { + runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err) + return + } + resp, md, err := request_Shard_Stat_0(rctx, inboundMarshaler, client, req, pathParams) + ctx = runtime.NewServerMetadataContext(ctx, md) + if err != nil { + runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err) + return + } + + forward_Shard_Stat_0(ctx, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...) + + }) + + mux.Handle("POST", pattern_Shard_List_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) { + ctx, cancel := context.WithCancel(req.Context()) + defer cancel() + inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req) + rctx, err := runtime.AnnotateContext(ctx, mux, req) + if err != nil { + runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err) + return + } + resp, md, err := request_Shard_List_0(rctx, inboundMarshaler, client, req, pathParams) + ctx = runtime.NewServerMetadataContext(ctx, md) + if err != nil { + runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err) + return + } + + forward_Shard_List_0(ctx, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...) + + }) + + return nil +} + +var ( + pattern_Shard_Stat_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1, 2, 2}, []string{"v1", "shards", "stat"}, "", runtime.AssumeColonVerbOpt(true))) + + pattern_Shard_List_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1, 2, 2}, []string{"v1", "shards", "list"}, "", runtime.AssumeColonVerbOpt(true))) +) + +var ( + forward_Shard_Stat_0 = runtime.ForwardResponseMessage + + forward_Shard_List_0 = runtime.ForwardResponseMessage +) diff --git a/consumer/protocol/protocol_gateway.yaml b/consumer/protocol/protocol_gateway.yaml new file mode 100644 index 00000000..b62f7bbd --- /dev/null +++ b/consumer/protocol/protocol_gateway.yaml @@ -0,0 +1,11 @@ +type: google.api.Service +config_version: 3 + +http: + rules: + - selector: consumer.Shard.List + post: /v1/shards/list + body: "*" + - selector: consumer.Shard.Stat + post: /v1/shards/stat + body: "*" diff --git a/go.mod b/go.mod index 6aa1da5c..b5c3db32 100644 --- a/go.mod +++ b/go.mod @@ -13,6 +13,7 @@ require ( github.com/aws/aws-sdk-go v1.40.35 github.com/dustin/go-humanize v1.0.0 github.com/dustinkirkland/golang-petname v0.0.0-20191129215211-8e5a1ed0cff0 + github.com/gogo/gateway v1.1.0 github.com/gogo/protobuf v1.3.2 github.com/golang-jwt/jwt/v5 v5.2.1 github.com/golang/protobuf v1.5.4 @@ -20,6 +21,7 @@ require ( github.com/google/uuid v1.6.0 github.com/gorilla/schema v1.4.1 github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 + github.com/grpc-ecosystem/grpc-gateway v1.16.0 github.com/hashicorp/golang-lru v0.5.4 github.com/jessevdk/go-flags v1.5.0 github.com/jgraettinger/cockroach-encoding v1.1.0 @@ -62,6 +64,8 @@ require ( github.com/coreos/go-systemd/v22 v22.3.2 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/docker/spdystream v0.0.0-20160310174837-449fdfce4d96 // indirect + github.com/ghodss/yaml v1.0.0 // indirect + github.com/golang/glog v1.2.1 // indirect github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect github.com/google/gofuzz v1.0.0 // indirect github.com/google/s2a-go v0.1.4 // indirect diff --git a/go.sum b/go.sum index dfd6aa30..47ceaccc 100644 --- a/go.sum +++ b/go.sum @@ -148,6 +148,7 @@ github.com/form3tech-oss/jwt-go v3.2.2+incompatible h1:TcekIExNqud5crz4xD2pavyTg github.com/form3tech-oss/jwt-go v3.2.2+incompatible/go.mod h1:pbq4aXjuKjdthFRnoDwaVPLA+WlJuPGy+QneDUgJi2k= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/ghodss/yaml v0.0.0-20150909031657-73d445a93680/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= +github.com/ghodss/yaml v1.0.0 h1:wQHKEahhL6wmXdzwWG11gIVCkOv05bNOh+Rxn0yngAk= github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= github.com/go-gl/glfw v0.0.0-20190409004039-e6da0acd62b1/go.mod h1:vR7hzQXu2zJy9AVAgeJqvqgH9Q5CA+iKCZ2gyEVpxRU= github.com/go-gl/glfw/v3.3/glfw v0.0.0-20191125211704-12ad95a8df72/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8= @@ -165,8 +166,11 @@ github.com/go-openapi/spec v0.0.0-20160808142527-6aced65f8501/go.mod h1:J8+jY1nA github.com/go-openapi/swag v0.0.0-20160704191624-1d0bd113de87/go.mod h1:DXUve3Dpr1UfpPtxFw+EFuQ41HhCWZfha5jSVRG7C7I= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= +github.com/gogo/gateway v1.1.0 h1:u0SuhL9+Il+UbjM9VIE3ntfRujKbvVpFvNB4HbjeVQ0= +github.com/gogo/gateway v1.1.0/go.mod h1:S7rR8FRQyG3QFESeSv4l2WnsyzlCLG0CzBbUUo/mbic= github.com/gogo/protobuf v0.0.0-20171007142547-342cbe0a0415/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= +github.com/gogo/protobuf v1.2.0/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4= github.com/gogo/protobuf v1.2.2-0.20190723190241-65acae22fc9d/go.mod h1:SlYgWuQ5SjCEi6WLHjHCa1yvBfUnHcTbrrZtXPKa29o= github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= @@ -174,6 +178,8 @@ github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69 github.com/golang-jwt/jwt/v5 v5.2.1 h1:OuVbFODueb089Lh128TAcimifWaLhJwVflnrgM17wHk= github.com/golang-jwt/jwt/v5 v5.2.1/go.mod h1:pqrtFR0X4osieyHYxtmOUWsAWrfe1Q5UVIyoH402zdk= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= +github.com/golang/glog v1.2.1 h1:OptwRhECazUx5ix5TTWC3EZhsZEHWcYWY4FQHTIubm4= +github.com/golang/glog v1.2.1/go.mod h1:6AhwSGph0fcJtXVM/PEHPqZlFeoLxhs7/t5UDAwmO+w= github.com/golang/groupcache v0.0.0-20160516000752-02826c3e7903/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/groupcache v0.0.0-20191227052852-215e87163ea7/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= @@ -261,6 +267,8 @@ github.com/gregjones/httpcache v0.0.0-20170728041850-787624de3eb7/go.mod h1:Fecb github.com/gregjones/httpcache v0.0.0-20180305231024-9cad4c3443a7/go.mod h1:FecbI9+v66THATjSRHfNgh1IVFe/9kFxbXtjV0ctIMA= github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 h1:Ovs26xHkKqVztRpIrF/92BcuyuQ/YW4NSIpoGtfXNho= github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0/go.mod h1:8NvIoxWQoOIhqOTXgfV/d3M/q6VIi02HzZEHgUlZvzk= +github.com/grpc-ecosystem/grpc-gateway v1.8.5/go.mod h1:vNeuVxBJEsws4ogUvrchl83t/GYV9WGTSLVdBhOQFDY= +github.com/grpc-ecosystem/grpc-gateway v1.16.0 h1:gmcG1KaJ57LophUzW0Hy8NmPhnMZb4M0+kPpLofRdBo= github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw= github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= @@ -379,6 +387,7 @@ github.com/prometheus/procfs v0.7.3 h1:4jVXhlkAyzOScmCkXBTOLRLTz8EeU+eyjrwB/EPq0 github.com/prometheus/procfs v0.7.3/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA= github.com/rivo/uniseg v0.2.0 h1:S1pD9weZBuJdFmowNwbpi7BJ8TNftyUImj/0WQi72jY= github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= +github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg= github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs= @@ -501,6 +510,7 @@ golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73r golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20181114220301-adae6a3d119a/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20181220203305-927f97764cc3/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20190108225652-1e06a53dbb7e/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= @@ -568,6 +578,7 @@ golang.org/x/sys v0.0.0-20170830134202-bb24a47a89ea/go.mod h1:STP8DvDyc/dI5b8T5h golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20181107165924-66b7b1311ac8/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190209173611-3b5209105503/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -760,6 +771,7 @@ google.golang.org/genproto/googleapis/api v0.0.0-20240528184218-531527333157/go. google.golang.org/genproto/googleapis/rpc v0.0.0-20240528184218-531527333157 h1:Zy9XzmMEflZ/MAaA7vNcoebnRAld7FsPW1EeBB7V0m8= google.golang.org/genproto/googleapis/rpc v0.0.0-20240528184218-531527333157/go.mod h1:EfXuqaE1J41VCDicxHzUDm+8rk+7ZdXzHV0IhO/I6s0= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= +google.golang.org/grpc v1.19.1/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38= google.golang.org/grpc v1.21.1/go.mod h1:oYelfM1adQP15Ek0mdvEgi9Df8B9CZIaU1084ijfRaM= google.golang.org/grpc v1.22.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg= @@ -806,7 +818,9 @@ gopkg.in/gemnasium/logrus-airbrake-hook.v2 v2.1.2/go.mod h1:Xk6kEKp8OKb+X14hQBKW gopkg.in/inf.v0 v0.9.0/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw= gopkg.in/inf.v0 v0.9.1 h1:73M5CoZyi3ZLMOyDlQh031Cx6N9NDJ2Vvfl76EDAgDc= gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw= +gopkg.in/resty.v1 v1.12.0/go.mod h1:mDo4pnntr5jdWRML875a/NmxYqAlA73dVijT2AXvQQo= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= +gopkg.in/yaml.v2 v2.0.0-20170812160011-eb3733d160e7/go.mod h1:JAlM8MvJe8wmxCU4Bli9HhUf9+ttbYbLASfIpnQbh74= gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.3/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= diff --git a/mainboilerplate/runconsumer/run_consumer.go b/mainboilerplate/runconsumer/run_consumer.go index 6b044cdb..a8924741 100644 --- a/mainboilerplate/runconsumer/run_consumer.go +++ b/mainboilerplate/runconsumer/run_consumer.go @@ -8,11 +8,14 @@ import ( "context" "crypto/tls" "fmt" + "net" "os" "os/signal" "syscall" "time" + "github.com/gogo/gateway" + "github.com/grpc-ecosystem/grpc-gateway/runtime" "github.com/jessevdk/go-flags" "github.com/prometheus/client_golang/prometheus" log "github.com/sirupsen/logrus" @@ -25,6 +28,10 @@ import ( mbp "go.gazette.dev/core/mainboilerplate" "go.gazette.dev/core/server" "go.gazette.dev/core/task" + + // This import isn't required, but it convinces `go mod tidy` to not remove + // packages which are required for building the protoc-gen-grpc-gateway plugin. + _ "github.com/grpc-ecosystem/grpc-gateway/protoc-gen-grpc-gateway/descriptor" ) // Application is the user-defined consumer Application which is executed @@ -106,8 +113,9 @@ const iniFilename = "gazette.ini" // Cmd wraps a Config and Application to provide an Execute entry-point. type Cmd struct { - Cfg Config - App Application + Cfg Config + App Application + WrapListener func(net.Listener, *tls.Config) (net.Listener, error) } func (sc Cmd) Execute(args []string) error { @@ -149,7 +157,14 @@ func (sc Cmd) Execute(args []string) error { } // Bind our server listener, grabbing a random available port if Port is zero. - srv, err := server.New("", bc.Consumer.Host, bc.Consumer.Port, serverTLS, peerTLS, bc.Consumer.MaxGRPCRecvSize) + srv, err := server.New( + "", // Bind all interfaces + bc.Consumer.Host, + bc.Consumer.Port, + serverTLS, peerTLS, + bc.Consumer.MaxGRPCRecvSize, + sc.WrapListener, + ) mbp.Must(err, "building Server instance") if bc.Broker.Cache.Size <= 0 { @@ -182,6 +197,13 @@ func (sc Cmd) Execute(args []string) error { signalCh = make(chan os.Signal, 1) ) pc.RegisterShardServer(srv.GRPCServer, pc.NewVerifiedShardServer(service, service.Verifier)) + + var mux *runtime.ServeMux = runtime.NewServeMux( + runtime.WithMarshalerOption(runtime.MIMEWildcard, new(gateway.JSONPb)), + runtime.WithProtoErrorHandler(runtime.DefaultHTTPProtoErrorHandler), + ) + pc.RegisterShardHandler(tasks.Context(), mux, srv.GRPCLoopback) + srv.HTTPMux.Handle("/v1/", bc.Consumer.CORSWrapper(mux)) ks.WatchApplyDelay = bc.Consumer.WatchDelay // Register Resolver as a prometheus.Collector for tracking shard status diff --git a/mainboilerplate/service.go b/mainboilerplate/service.go index c91086fd..43e6e4c1 100644 --- a/mainboilerplate/service.go +++ b/mainboilerplate/service.go @@ -1,6 +1,8 @@ package mainboilerplate import ( + "net/http" + petname "github.com/dustinkirkland/golang-petname" "go.gazette.dev/core/broker/protocol" "go.gazette.dev/core/server" @@ -14,16 +16,17 @@ type ZoneConfig struct { // ServiceConfig represents identification and addressing configuration of the process. type ServiceConfig struct { ZoneConfig - ID string `long:"id" env:"ID" description:"Unique ID of this process. Auto-generated if not set"` - Host string `long:"host" env:"HOST" description:"Addressable, advertised hostname or IP of this process. Hostname is used if not set"` - Port string `long:"port" env:"PORT" description:"Service port for HTTP and gRPC requests. A random port is used if not set. Port may also take the form 'unix:///path/to/socket' to use a Unix Domain Socket"` - ServerCertFile string `long:"server-cert-file" env:"SERVER_CERT_FILE" default:"" description:"Path to the server TLS certificate. This option toggles whether TLS is used. If absent, all other TLS settings are ignored."` - ServerCertKeyFile string `long:"server-cert-key-file" env:"SERVER_CERT_KEY_FILE" default:"" description:"Path to the server TLS private key"` - ServerCAFile string `long:"server-ca-file" env:"SERVER_CA_FILE" default:"" description:"Path to the trusted CA for server verification of client certificates. When present, client certificates are required and verified against this CA. When absent, client certificates are not required but are verified against the system CA pool if presented."` - PeerCertFile string `long:"peer-cert-file" env:"PEER_CERT_FILE" default:"" description:"Path to the client TLS certificate for peer-to-peer requests"` - PeerCertKeyFile string `long:"peer-cert-key-file" env:"PEER_CERT_KEY_FILE" default:"" description:"Path to the client TLS private key for peer-to-peer requests"` - PeerCAFile string `long:"peer-ca-file" env:"PEER_CA_FILE" default:"" description:"Path to the trusted CA for client verification of peer server certificates. When absent, the system CA pool is used instead."` - MaxGRPCRecvSize uint32 `long:"max-grpc-recv-size" env:"MAX_GRPC_RECV_SIZE" default:"4194304" description:"Maximum size of gRPC messages accepted by this server, in bytes"` + ID string `long:"id" env:"ID" description:"Unique ID of this process. Auto-generated if not set"` + Host string `long:"host" env:"HOST" description:"Addressable, advertised hostname or IP of this process. Hostname is used if not set"` + Port string `long:"port" env:"PORT" description:"Service port for HTTP and gRPC requests. A random port is used if not set. Port may also take the form 'unix:///path/to/socket' to use a Unix Domain Socket"` + ServerCertFile string `long:"server-cert-file" env:"SERVER_CERT_FILE" default:"" description:"Path to the server TLS certificate. This option toggles whether TLS is used. If absent, all other TLS settings are ignored."` + ServerCertKeyFile string `long:"server-cert-key-file" env:"SERVER_CERT_KEY_FILE" default:"" description:"Path to the server TLS private key"` + ServerCAFile string `long:"server-ca-file" env:"SERVER_CA_FILE" default:"" description:"Path to the trusted CA for server verification of client certificates. When present, client certificates are required and verified against this CA. When absent, client certificates are not required but are verified against the system CA pool if presented."` + PeerCertFile string `long:"peer-cert-file" env:"PEER_CERT_FILE" default:"" description:"Path to the client TLS certificate for peer-to-peer requests"` + PeerCertKeyFile string `long:"peer-cert-key-file" env:"PEER_CERT_KEY_FILE" default:"" description:"Path to the client TLS private key for peer-to-peer requests"` + PeerCAFile string `long:"peer-ca-file" env:"PEER_CA_FILE" default:"" description:"Path to the trusted CA for client verification of peer server certificates. When absent, the system CA pool is used instead."` + MaxGRPCRecvSize uint32 `long:"max-grpc-recv-size" env:"MAX_GRPC_RECV_SIZE" default:"4194304" description:"Maximum size of gRPC messages accepted by this server, in bytes"` + AllowOrigin []string `long:"allow-origin" env:"ALLOW_ORIGIN" description:"Origin to allow in CORS contexts"` } // ProcessSpec of the ServiceConfig. @@ -37,3 +40,22 @@ func (cfg ServiceConfig) BuildProcessSpec(srv *server.Server) protocol.ProcessSp Endpoint: srv.Endpoint(), } } + +func (cfg ServiceConfig) CORSWrapper(wrapped http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + var origin = r.Header.Get("origin") + + for _, match := range cfg.AllowOrigin { + if origin == match { + w.Header().Set("Access-Control-Allow-Origin", origin) + w.Header().Set("Access-Control-Allow-Methods", "GET, POST, OPTIONS") + w.Header().Set("Access-Control-Allow-Headers", "Cache-Control, Content-Language, Content-Length, Content-Type, Expires, Last-Modified, Pragma, Authorization") + } + } + + if r.Method == "OPTIONS" { + return + } + wrapped.ServeHTTP(w, r.WithContext(protocol.WithDispatchDefault(r.Context()))) + }) +} diff --git a/mk/common-build.mk b/mk/common-build.mk index a0d1af77..4577fccf 100644 --- a/mk/common-build.mk +++ b/mk/common-build.mk @@ -111,13 +111,25 @@ ${WORKDIR}/rocksdb-v%/librocksdb.so: %.pb.go: %.proto ${WORKDIR}/protoc-gen-gogo PATH=${WORKDIR}:$${PATH} ;\ protoc -I . $(foreach module, $(PROTOC_INC_MODULES), -I$(module_path)) \ - --gogo_out=paths=source_relative,plugins=grpc:. $*.proto + --gogo_out=paths=source_relative,plugins=grpc:. \ + $*.proto + +%.pb.gw.go: %.proto ${WORKDIR}/protoc-gen-grpc-gateway + PATH=${WORKDIR}:$${PATH} ;\ + protoc -I . $(foreach module, $(PROTOC_INC_MODULES), -I$(module_path)) \ + --grpc-gateway_out=logtostderr=true,paths=source_relative,generate_unbound_methods=false,grpc_api_configuration=$*_gateway.yaml:. \ + $*.proto # Rule to build protoc-gen-gogo. ${WORKDIR}/protoc-gen-gogo: go mod download github.com/golang/protobuf go build -o $@ github.com/gogo/protobuf/protoc-gen-gogo +# Rule to build protoc-gen-grpc-gateway. +${WORKDIR}/protoc-gen-grpc-gateway: + go mod download github.com/grpc-ecosystem/grpc-gateway + go build -o $@ github.com/grpc-ecosystem/grpc-gateway/protoc-gen-grpc-gateway + # Rule for generic go-install-able Go binaries. ${WORKDIR}/go-path/bin/%: go-install diff --git a/server/server.go b/server/server.go index 8f3d38fb..c5f8dedd 100644 --- a/server/server.go +++ b/server/server.go @@ -51,7 +51,12 @@ type Server struct { // and `port` for serving traffic directed at `host`. // `port` may be empty, in which case a random free port is assigned. // if `tlsConfig` is non-nil, the Server uses TLS (and is otherwise in the clear). -func New(iface, host, port string, serverTLS, peerTLS *tls.Config, maxSize uint32) (*Server, error) { +func New( + iface, host, port string, + serverTLS, peerTLS *tls.Config, + maxGRPCRecvSize uint32, + wrapListener func(net.Listener, *tls.Config) (net.Listener, error), +) (*Server, error) { var network, bind string if port == "" { network, bind = "tcp", fmt.Sprintf("%s:0", iface) // Assign a random free port. @@ -100,10 +105,14 @@ func New(iface, host, port string, serverTLS, peerTLS *tls.Config, maxSize uint3 GRPCServer: grpc.NewServer( grpc.StreamInterceptor(grpc_prometheus.StreamServerInterceptor), grpc.UnaryInterceptor(grpc_prometheus.UnaryServerInterceptor), - grpc.MaxRecvMsgSize(int(maxSize)), + grpc.MaxRecvMsgSize(int(maxGRPCRecvSize)), ), } - if serverTLS != nil { + if wrapListener != nil { + if listener, err = wrapListener(listener, serverTLS); err != nil { + return nil, fmt.Errorf("failed to wrap listener: %w", err) + } + } else if serverTLS != nil { listener = tls.NewListener(listener, serverTLS) } srv.CMux = cmux.New(listener) @@ -193,7 +202,7 @@ func BuildTLSConfig(certPath, keyPath, trustedCAPath string) (*tls.Config, error // MustLoopback builds and returns a new Server instance bound to a random // port on the loopback interface. It panics on error. func MustLoopback() *Server { - if srv, err := New("127.0.0.1", "127.0.0.1", "", nil, nil, 1<<20); err != nil { + if srv, err := New("127.0.0.1", "127.0.0.1", "", nil, nil, 1<<20, nil); err != nil { log.WithField("err", err).Panic("failed to build Server") panic("not reached") } else {