-
Notifications
You must be signed in to change notification settings - Fork 439
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
contrib/envoyproxy: envoy external processing support #2895
base: main
Are you sure you want to change the base?
Conversation
fcbd354
to
a587a09
Compare
BenchmarksBenchmark execution time: 2024-12-11 11:03:14 Comparing candidate commit ee0cd57 in PR branch Found 0 performance improvements and 0 performance regressions! Performance is the same for 59 metrics, 0 unstable metrics. |
4134743
to
a96cc2a
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for contributing 🙏. Couple of requests:
- Normally you should not have to import any
listener
packages, nor to export functions from it. - Could you link to a manual CI run from the
System Tests
workflow pointing on your system-tests branch where the workflow run is green ? You probably have to add your scenario in the workflow for it to run - I suspect we need to add tests for the changes you did in the
emitter/waf/actions
package. Let's see if this would make sense to separate it in another PR - Let's review together the code of
envoy.go
on fridayso we can structure it better
bdd1915
to
1f2b791
Compare
9630788
to
0929de5
Compare
Consolidated system-test run: https://github.com/DataDog/dd-trace-go/actions/runs/11797728094 |
// ResetStatusCode resets the status code of the response writer. | ||
func ResetStatusCode(w http.ResponseWriter) { | ||
if rw, ok := w.(*responseWriter); ok { | ||
rw.status = 0 | ||
} | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This required because of this if
statement.
0929de5
to
e3bb1e5
Compare
c7063d8
to
b0844b8
Compare
Signed-off-by: Eliott Bouhana <[email protected]>
* Add support for context propagation * Normalize span tag use Co-authored-by: Flavien Darche <[email protected]> Signed-off-by: Eliott Bouhana <[email protected]>
b0844b8
to
961d73d
Compare
Datadog ReportBranch report: ✅ 0 Failed, 5110 Passed, 70 Skipped, 2m 52.72s Total Time |
wrappedResponseWriter http.ResponseWriter | ||
} | ||
|
||
func StreamServerInterceptor(opts ...grpctrace.Option) grpc.StreamServerInterceptor { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add a godoc to the exported stuff 🙏 Also it would be great to briefly explain what this actually does, as it took me some time to figure out.
func StreamServerInterceptor(opts ...grpctrace.Option) grpc.StreamServerInterceptor { | |
// StreamServerInterceptor returns a new grpc.StreamServerInterceptor intended to be used with an envoyproxy grpc server. | |
// It will use the regular grpc tracing package for all methods except from `envoy.service.ext_proc.v3.ExternalProcessor/Process` where it will [...] | |
func StreamServerInterceptor(opts ...grpctrace.Option) grpc.StreamServerInterceptor { |
(please feel free to modify the comment if I wrote anything incorrect, and add more context as you did in the PR description).
// Create a listener for the server. | ||
ln, err := net.Listen("tcp", ":50051") | ||
if err != nil { | ||
log.Fatal(err) | ||
} | ||
|
||
// Create the server interceptor using the envoy go control plane package. | ||
si := go_control_plane.StreamServerInterceptor() | ||
|
||
// Initialize the grpc server as normal, using the envoy server interceptor. | ||
s := grpc.NewServer(grpc.StreamInterceptor(si)) | ||
|
||
// ... register your services | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you add a more "real-world" like example? similar to https://github.com/envoyproxy/go-control-plane/blob/main/examples/dyplomat/main.go#L43-L53
(currently this example is just a generic grpc server without any envoyproxy stuff)
} | ||
} | ||
|
||
func ProcessRequestHeaders(ctx context.Context, req *envoyextproc.ProcessingRequest_RequestHeaders) (*envoyextproc.ProcessingResponse, *currentRequest, bool, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please add a godoc comment, or consider unexporting this function if this is not intended to be used directly.
} | ||
}() | ||
|
||
for { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I find this pattern a little bit odd for an interceptor / middleware.
Since it seems this is pretty much specifically intended to override the behaviour of ext_procv3.ExternalProcessorServer.Process
, have you considered exporting this functionality as an implementation of this interface instead of a middleware? This way, users could just do:
import envoytrace "gopkg.in/DataDog/dd-trace-go.v1/contrib/envoyproxy/go-control-plane"
// srv would be the user provided implementation of `ext_procv3.ExternalProcessorServer`
appsecBlockSrv := envoytrace.AppsecBlockingProcessorServer(srv) // internally you would call srv.Process() when the request is not blocked
ext_procv3.RegisterExternalProcessorServer(grpcServer, appsecBlockSrv)
if err != nil { | ||
// Note: Envoy is inconsistent with the "end_of_stream" value of its headers responses, | ||
// so we can't fully rely on it to determine when it will close (cancel) the stream. | ||
if err == io.EOF || err.(interface{ GRPCStatus() *status.Status }).GRPCStatus().Code() == codes.Canceled { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you can use s, ok := status.FromError(err)
to do the assertion safely.
}, nil | ||
|
||
case *envoyextproc.ProcessingRequest_ResponseBody: | ||
r := req.Request.(*envoyextproc.ProcessingRequest_ResponseBody) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this line is not necessary, the type asserted variable is already available in v
r := req.Request.(*envoyextproc.ProcessingRequest_ResponseBody) | |
r := v |
Motivation
This is the part 1 PR to support Envoy's External Processing.
You can find all related document for this implementation in Confluence ASM - GCP Services Extensions.
You can find the part 2 of this PR here.
What does this PR do?
This PR adds a new gRPC Interceptor (
StreamServerInterceptor
) to support the interception of ext_proc v3 calls to gRPC server. When the interceptor is applied, all messages of the external processing protocol are instrumented without returning an handle to the original server code. The implementation of a server using this instrumentation can be found in the part 2.The implementation includes:
content-type
andredirect
)http.Request
object.Tests
This PR includes unit testing in the
envoy_tests.go
, simulating scenarios of malicious or benign requests, validating span tags, security events and blocking results.System-tests have been implemented on this PR. A new
external-processing
scenario has been added in thegolang
stage.Reviewer's Checklist
Unsure? Have a question? Request a review!