Skip to content

Commit

Permalink
consumer: add support for fine-grain Authorizations
Browse files Browse the repository at this point in the history
This change closely parallels the corersponding change to the Gazette
broker and uses much of the same infrastructure.

Built-in Shard APIs now expect and verify Authorizations which carry a
set of capabilities and a label selector which scopes the resources
(shards) which are authorized to the caller.

Shards which don't match the claim's selector are not visible to the
client and are indistinguishable from not existing at all.

AuthShardClient drives an Authorizer to obtain and attach a suitable
credential prior to starting an RPC. AuthShardServer uses a Verifier
to verify a caller's claims prior to dispatching a service handler.

Custom and application-specific APIs will likely want to use the
Service.Authorizer and Service.Verifier fields to power their own
authorizing client and server middleware.
  • Loading branch information
jgraettinger committed Aug 24, 2024
1 parent 98859d9 commit aaa6cd8
Show file tree
Hide file tree
Showing 13 changed files with 446 additions and 117 deletions.
178 changes: 178 additions & 0 deletions consumer/protocol/auth.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
package protocol

import (
context "context"
time "time"

pb "go.gazette.dev/core/broker/protocol"
grpc "google.golang.org/grpc"
)

// NewAuthShardClient returns an *AuthShardClient which uses the Authorizer
// to obtain and attach an Authorization bearer token to every issued request.
func NewAuthShardClient(sc ShardClient, auth pb.Authorizer) *AuthShardClient {
return &AuthShardClient{Authorizer: auth, Inner: sc}
}

type AuthShardClient struct {
Authorizer pb.Authorizer
Inner ShardClient
}

func (a *AuthShardClient) Stat(ctx context.Context, in *StatRequest, opts ...grpc.CallOption) (*StatResponse, error) {
var claims, ok = pb.GetClaims(ctx)
if !ok {
claims = pb.Claims{
Capability: pb.Capability_READ,
Selector: pb.LabelSelector{
Include: pb.MustLabelSet("id", in.Shard.String()),
},
}
}
if ctx, err := a.Authorizer.Authorize(ctx, claims, withExp(false)); err != nil {
return nil, err
} else {
return a.Inner.Stat(ctx, in, opts...)
}
}

func (a *AuthShardClient) List(ctx context.Context, in *ListRequest, opts ...grpc.CallOption) (*ListResponse, error) {
var claims, ok = pb.GetClaims(ctx)
if !ok {
claims = pb.Claims{
Capability: pb.Capability_LIST,
Selector: in.Selector,
}
}
if ctx, err := a.Authorizer.Authorize(ctx, claims, withExp(false)); err != nil {
return nil, err
} else {
return a.Inner.List(ctx, in, opts...)
}
}

func (a *AuthShardClient) Apply(ctx context.Context, in *ApplyRequest, opts ...grpc.CallOption) (*ApplyResponse, error) {
var claims, ok = pb.GetClaims(ctx)
if !ok {
claims = pb.Claims{Capability: pb.Capability_APPLY}
}
if ctx, err := a.Authorizer.Authorize(ctx, claims, withExp(false)); err != nil {
return nil, err
} else {
return a.Inner.Apply(ctx, in, opts...)
}
}

func (a *AuthShardClient) GetHints(ctx context.Context, in *GetHintsRequest, opts ...grpc.CallOption) (*GetHintsResponse, error) {
var claims, ok = pb.GetClaims(ctx)
if !ok {
claims = pb.Claims{
Capability: pb.Capability_READ,
Selector: pb.LabelSelector{
Include: pb.MustLabelSet("id", in.Shard.String()),
},
}
}
if ctx, err := a.Authorizer.Authorize(ctx, claims, withExp(false)); err != nil {
return nil, err
} else {
return a.Inner.GetHints(ctx, in, opts...)
}
}

func (a *AuthShardClient) Unassign(ctx context.Context, in *UnassignRequest, opts ...grpc.CallOption) (*UnassignResponse, error) {
var claims, ok = pb.GetClaims(ctx)
if !ok {
claims = pb.Claims{Capability: pb.Capability_APPLY}
for _, id := range in.Shards {
claims.Selector.Include.AddValue("id", id.String())
}
}
if ctx, err := a.Authorizer.Authorize(ctx, claims, withExp(false)); err != nil {
return nil, err
} else {
return a.Inner.Unassign(ctx, in, opts...)
}
}

func withExp(blocking bool) time.Duration {
if blocking {
return time.Hour
} else {
return time.Minute
}
}

// AuthShardServer is similar to ShardServer except:
// - Requests have already been verified with accompanying Claims.
// - The Context or Stream.Context() argument may be subject to a deadline
// bound to the expiration of the user's Claims.
type AuthShardServer interface {
Stat(context.Context, pb.Claims, *StatRequest) (*StatResponse, error)
List(context.Context, pb.Claims, *ListRequest) (*ListResponse, error)
Apply(context.Context, pb.Claims, *ApplyRequest) (*ApplyResponse, error)
GetHints(context.Context, pb.Claims, *GetHintsRequest) (*GetHintsResponse, error)
Unassign(context.Context, pb.Claims, *UnassignRequest) (*UnassignResponse, error)
}

// NewVerifiedShardServer adapts an AuthShardServer into a ShardServer by
// using the provided Verifier to verify incoming request Authorizations.
func NewVerifiedShardServer(ajs AuthShardServer, verifier pb.Verifier) *VerifiedAuthServer {
return &VerifiedAuthServer{
Verifier: verifier,
Inner: ajs,
}
}

type VerifiedAuthServer struct {
Verifier pb.Verifier
Inner AuthShardServer
}

func (s *VerifiedAuthServer) Stat(ctx context.Context, req *StatRequest) (*StatResponse, error) {
if ctx, cancel, claims, err := s.Verifier.Verify(ctx, pb.Capability_READ); err != nil {
return nil, err
} else {
defer cancel()
return s.Inner.Stat(ctx, claims, req)
}
}

func (s *VerifiedAuthServer) List(ctx context.Context, req *ListRequest) (*ListResponse, error) {
if ctx, cancel, claims, err := s.Verifier.Verify(ctx, pb.Capability_LIST); err != nil {
return nil, err
} else {
defer cancel()
return s.Inner.List(ctx, claims, req)
}
}

func (s *VerifiedAuthServer) Apply(ctx context.Context, req *ApplyRequest) (*ApplyResponse, error) {
if ctx, cancel, claims, err := s.Verifier.Verify(ctx, pb.Capability_APPLY); err != nil {
return nil, err
} else {
defer cancel()
return s.Inner.Apply(ctx, claims, req)
}
}

func (s *VerifiedAuthServer) GetHints(ctx context.Context, req *GetHintsRequest) (*GetHintsResponse, error) {
if ctx, cancel, claims, err := s.Verifier.Verify(ctx, pb.Capability_READ); err != nil {
return nil, err
} else {
defer cancel()
return s.Inner.GetHints(ctx, claims, req)
}
}

func (s *VerifiedAuthServer) Unassign(ctx context.Context, req *UnassignRequest) (*UnassignResponse, error) {
if ctx, cancel, claims, err := s.Verifier.Verify(ctx, pb.Capability_APPLY); err != nil {
return nil, err
} else {
defer cancel()
return s.Inner.Unassign(ctx, claims, req)
}
}

var _ ShardServer = &VerifiedAuthServer{}
var _ ShardClient = &AuthShardClient{}
6 changes: 3 additions & 3 deletions consumer/recovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,9 +198,9 @@ func beginRecovery(s *shard) error {
var err error
var spec = s.Spec()

s.recovery.hints, err = s.svc.GetHints(s.ctx, &pc.GetHintsRequest{
Shard: spec.Id,
})
s.recovery.hints, err = s.svc.GetHints(s.ctx,
pb.Claims{Capability: pb.Capability_READ},
&pc.GetHintsRequest{Shard: spec.Id})

if err == nil && s.recovery.hints.Status != pc.Status_OK {
err = fmt.Errorf(s.recovery.hints.Status.String())
Expand Down
49 changes: 30 additions & 19 deletions consumer/resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ func NewResolver(state *allocator.State, newShard func(keyspace.KeyValue) *shard
// responsible consumer process.
type ResolveArgs struct {
Context context.Context
// Authorized Claims under which we're resolving.
Claims pb.Claims
// ShardID to be resolved.
ShardID pc.ShardID
// Whether we may resolve to another consumer peer. If false and this
Expand Down Expand Up @@ -144,14 +146,24 @@ func (r *Resolver) Resolve(args ResolveArgs) (res Resolution, err error) {
}
res.Header.Etcd = pbx.FromEtcdResponseHeader(ks.Header)

// Extract Assignments.
var assignments = ks.KeyValues.Prefixed(
allocator.ItemAssignmentsPrefix(ks, args.ShardID.String()))

// Extract ShardSpec.
if item, ok := allocator.LookupItem(ks, args.ShardID.String()); ok {
res.Spec = item.ItemValue.(*pc.ShardSpec)
var spec = item.ItemValue.(*pc.ShardSpec)

// Is the caller authorized to the shard?
if args.Claims.Selector.Matches(spec.LabelSetExt(pb.LabelSet{})) {
res.Spec = spec
} else {
// Clear to act as if the shard doesn't exist.
assignments = keyspace.KeyValues{}
}
}
// Extract Route.
var assignments = ks.KeyValues.Prefixed(
allocator.ItemAssignmentsPrefix(ks, args.ShardID.String()))

// Build Route from extracted assignments.
pbx.Init(&res.Header.Route, assignments)
pbx.AttachEndpoints(&res.Header.Route, ks)

Expand Down Expand Up @@ -337,23 +349,22 @@ func (r *Resolver) updateLocalShards() {
// application must make an appropriate selection from among the returned
// ShardSpecs for its use case.
//
// var mapping message.MappingFunc = ...
// var mappedID pc.ShardID
//
// if journal, _, err := mapping(key); err != nil {
// // Handle error.
// } else if specs := resolver.ShardsWithSource(journal); len(specs) == 0 {
// err = fmt.Errorf("no ShardSpec is consuming mapped journal %s", journal)
// // Handle error.
// } else {
// mappedID = specs[0].Id
// }
// var mapping message.MappingFunc = ...
// var mappedID pc.ShardID
//
// var resolution, err = svc.Resolver.Resolve(consumer.ResolveArgs{
// ShardID: specs[0].Id,
// ...
// })
// if journal, _, err := mapping(key); err != nil {
// // Handle error.
// } else if specs := resolver.ShardsWithSource(journal); len(specs) == 0 {
// err = fmt.Errorf("no ShardSpec is consuming mapped journal %s", journal)
// // Handle error.
// } else {
// mappedID = specs[0].Id
// }
//
// var resolution, err = svc.Resolver.Resolve(consumer.ResolveArgs{
// ShardID: specs[0].Id,
// ...
// })
func (r *Resolver) ShardsWithSource(journal pb.Journal) []*pc.ShardSpec {
r.state.KS.Mu.RLock()
var specs = r.journals[journal]
Expand Down
46 changes: 28 additions & 18 deletions consumer/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,14 @@ import (
type Service struct {
// Application served by the Service.
App Application
// Authorizer of peer-to-peer requests.
// Consumer applications may want to use pc.NewAuthShardClient() to build
// clients with self-signed Authorizations using the Loopback connection.
Authorizer pb.Authorizer
// Verifier of requests.
// Custom consumer application APIs should secure themselves by verifying
// authorizations using this Verifier. See pc.NewVerifiedShardServer() as a model.
Verifier pb.Verifier
// Resolver of Service shards.
Resolver *Resolver
// Distributed allocator state of the service.
Expand All @@ -47,21 +55,23 @@ type Service struct {
// ShardAPI holds function delegates which power the ShardServer API.
// They're exposed to allow consumer applications to wrap or alter their behavior.
ShardAPI struct {
Stat func(context.Context, *Service, *pc.StatRequest) (*pc.StatResponse, error)
List func(context.Context, *Service, *pc.ListRequest) (*pc.ListResponse, error)
Apply func(context.Context, *Service, *pc.ApplyRequest) (*pc.ApplyResponse, error)
GetHints func(context.Context, *Service, *pc.GetHintsRequest) (*pc.GetHintsResponse, error)
Unassign func(context.Context, *Service, *pc.UnassignRequest) (*pc.UnassignResponse, error)
Stat func(context.Context, pb.Claims, *Service, *pc.StatRequest) (*pc.StatResponse, error)
List func(context.Context, pb.Claims, *Service, *pc.ListRequest) (*pc.ListResponse, error)
Apply func(context.Context, pb.Claims, *Service, *pc.ApplyRequest) (*pc.ApplyResponse, error)
GetHints func(context.Context, pb.Claims, *Service, *pc.GetHintsRequest) (*pc.GetHintsResponse, error)
Unassign func(context.Context, pb.Claims, *Service, *pc.UnassignRequest) (*pc.UnassignResponse, error)
}

// stoppingCh is closed when the Service is in the process of shutting down.
stoppingCh chan struct{}
}

// NewService constructs a new Service of the Application, driven by allocator.State.
func NewService(app Application, state *allocator.State, rjc pb.RoutedJournalClient, lo *grpc.ClientConn, etcd *clientv3.Client) *Service {
func NewService(app Application, authorizer pb.Authorizer, verifier pb.Verifier, state *allocator.State, rjc pb.RoutedJournalClient, lo *grpc.ClientConn, etcd *clientv3.Client) *Service {
var svc = &Service{
App: app,
Authorizer: authorizer,
Verifier: verifier,
State: state,
Loopback: lo,
Journals: rjc,
Expand Down Expand Up @@ -137,29 +147,29 @@ func addTrace(ctx context.Context, format string, args ...interface{}) {
}

// Stat calls its ShardAPI delegate.
func (svc *Service) Stat(ctx context.Context, req *pc.StatRequest) (*pc.StatResponse, error) {
return svc.ShardAPI.Stat(ctx, svc, req)
func (svc *Service) Stat(ctx context.Context, claims pb.Claims, req *pc.StatRequest) (*pc.StatResponse, error) {
return svc.ShardAPI.Stat(ctx, claims, svc, req)
}

// List calls its ShardAPI delegate.
func (svc *Service) List(ctx context.Context, req *pc.ListRequest) (*pc.ListResponse, error) {
return svc.ShardAPI.List(ctx, svc, req)
func (svc *Service) List(ctx context.Context, claims pb.Claims, req *pc.ListRequest) (*pc.ListResponse, error) {
return svc.ShardAPI.List(ctx, claims, svc, req)
}

// Apply calls its ShardAPI delegate.
func (svc *Service) Apply(ctx context.Context, req *pc.ApplyRequest) (*pc.ApplyResponse, error) {
return svc.ShardAPI.Apply(ctx, svc, req)
func (svc *Service) Apply(ctx context.Context, claims pb.Claims, req *pc.ApplyRequest) (*pc.ApplyResponse, error) {
return svc.ShardAPI.Apply(ctx, claims, svc, req)
}

// GetHints calls its ShardAPI delegate.
func (svc *Service) GetHints(ctx context.Context, req *pc.GetHintsRequest) (*pc.GetHintsResponse, error) {
return svc.ShardAPI.GetHints(ctx, svc, req)
func (svc *Service) GetHints(ctx context.Context, claims pb.Claims, req *pc.GetHintsRequest) (*pc.GetHintsResponse, error) {
return svc.ShardAPI.GetHints(ctx, claims, svc, req)
}

// Unassign calls its ShardAPI delegate.
func (svc *Service) Unassign(ctx context.Context, req *pc.UnassignRequest) (*pc.UnassignResponse, error) {
return svc.ShardAPI.Unassign(ctx, svc, req)
func (svc *Service) Unassign(ctx context.Context, claims pb.Claims, req *pc.UnassignRequest) (*pc.UnassignResponse, error) {
return svc.ShardAPI.Unassign(ctx, claims, svc, req)
}

// Service implements the ShardServer interface.
var _ pc.ShardServer = (*Service)(nil)
// Service implements the AuthShardServer interface.
var _ pc.AuthShardServer = (*Service)(nil)
Loading

0 comments on commit aaa6cd8

Please sign in to comment.