Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

endpointsharding: Skip nil child pickers while updating ClientConn state #7584

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions balancer/endpointsharding/endpointsharding.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,13 +190,18 @@ func (es *endpointSharding) updateState() {
defer es.mu.Unlock()

children := es.children.Load()
pickerCount := 0
childStates := make([]ChildState, 0, children.Len())

for _, child := range children.Values() {
bw := child.(*balancerWrapper)
childState := bw.childState
childStates = append(childStates, childState)
childPicker := childState.State.Picker
if childPicker == nil {
continue
}
pickerCount++
switch childState.State.ConnectivityState {
case connectivity.Ready:
readyPickers = append(readyPickers, childPicker)
Expand All @@ -210,6 +215,15 @@ func (es *endpointSharding) updateState() {
}
}

// If there are no pickers available yet, return a queuing picker.
if pickerCount == 0 {
es.cc.UpdateState(balancer.State{
ConnectivityState: connectivity.Connecting,
Picker: base.NewErrPicker(balancer.ErrNoSubConnAvailable),
})
Comment on lines +220 to +223
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If no children have produced a picker, why are we doing so here?

I think the safest behavior here is to either do what you said before (and go through the exercise to make "initial state == connecting" an API guarantee), or to only have this endpointsharding LB policy produce pickers in sync with its children: i.e. whenever any child updates its picker, this policy updates its combined picker.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we don't send a picker here, the unit test added in the PR indicates that the channel remains in IDLE mode.

return
}

// Construct the round robin picker based off the aggregated state. Whatever
// the aggregated state, use the pickers present that are currently in that
// state only.
Expand Down
69 changes: 69 additions & 0 deletions balancer/endpointsharding/endpointsharding_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,19 +28,25 @@ import (

"google.golang.org/grpc"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/balancer/stub"
"google.golang.org/grpc/internal/grpctest"
"google.golang.org/grpc/internal/stubserver"
"google.golang.org/grpc/internal/testutils/roundrobin"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/resolver/manual"
"google.golang.org/grpc/serviceconfig"
"google.golang.org/grpc/status"

testgrpc "google.golang.org/grpc/interop/grpc_testing"
)

const defaultShortTestTimeout = 100 * time.Millisecond

type s struct {
grpctest.Tester
}
Expand Down Expand Up @@ -157,3 +163,66 @@ func (s) TestEndpointShardingBasic(t *testing.T) {
t.Fatalf("error in expected round robin: %v", err)
}
}

// TestEndpointShardingStuckConnecting verifies that the endpointsharding policy
// handles child policies that haven't given a picker update correctly and
// doesn't panic.
func (s) TestEndpointShardingStuckConnecting(t *testing.T) {
childPolicyName := t.Name()
stub.Register(childPolicyName, stub.BalancerFuncs{
UpdateClientConnState: func(_ *stub.BalancerData, ccs balancer.ClientConnState) error {
t.Logf("Not sending any picker update for resolver state: %v", ccs)
return nil
},
})
childLbJSON := json.RawMessage(fmt.Sprintf(`[{%q: {}}]`, childPolicyName))
childLBConfig, parseErr := ParseConfig(childLbJSON)
if parseErr != nil {
t.Fatalf("Failed to parse child LB config: %v", parseErr)
}
backend1 := stubserver.StartTestService(t, nil)
defer backend1.Stop()

mr := manual.NewBuilderWithScheme("e2e-test")
defer mr.Close()

petiolePolicyName := t.Name() + "-petiole"
stub.Register(petiolePolicyName, stub.BalancerFuncs{
Init: func(bd *stub.BalancerData) {
bd.Data = NewBalancer(bd.ClientConn, bd.BuildOptions)
},
UpdateClientConnState: func(bd *stub.BalancerData, state balancer.ClientConnState) error {
return bd.Data.(balancer.Balancer).UpdateClientConnState(balancer.ClientConnState{
BalancerConfig: childLBConfig,
ResolverState: state.ResolverState,
})
},
})
json := fmt.Sprintf(`{"loadBalancingConfig": [{"%s":{}}]}`, petiolePolicyName)
sc := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(json)
mr.InitialState(resolver.State{
Endpoints: []resolver.Endpoint{
{Addresses: []resolver.Address{{Addr: backend1.Address}}},
},
ServiceConfig: sc,
})

cc, err := grpc.NewClient(mr.Scheme()+":///", grpc.WithResolvers(mr), grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
log.Fatalf("Failed to dial: %v", err)
}
defer cc.Close()
ctx, cancel := context.WithTimeout(context.Background(), defaultShortTestTimeout)
defer cancel()
client := testgrpc.NewTestServiceClient(cc)

// Even though the child LB policy hasn't given a picker update, it is
// assumed to be in CONNECTING.
if _, err := client.EmptyCall(ctx, &testgrpc.Empty{}); status.Code(err) != codes.DeadlineExceeded {
t.Fatalf("EmptyCall() = %s, want %s", status.Code(err), codes.DeadlineExceeded)
}

if got, want := cc.GetState(), connectivity.Connecting; got != want {
t.Errorf("ClientConn.GetState() = %v, want = %v", got, want)
}
}