Skip to content

Commit

Permalink
Skip nil pickers instead of assuming CONNECTING
Browse files Browse the repository at this point in the history
  • Loading branch information
arjan-bal committed Sep 30, 2024
1 parent 235e14e commit 368ae08
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 13 deletions.
22 changes: 15 additions & 7 deletions balancer/endpointsharding/endpointsharding.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,13 +116,7 @@ func (es *endpointSharding) UpdateClientConnState(state balancer.ClientConnState
bal = child.(*balancerWrapper)
} else {
bal = &balancerWrapper{
childState: ChildState{
Endpoint: endpoint,
State: balancer.State{
ConnectivityState: connectivity.Connecting,
Picker: base.NewErrPicker(balancer.ErrNoSubConnAvailable),
},
},
childState: ChildState{Endpoint: endpoint},
ClientConn: es.cc,
es: es,
}
Expand Down Expand Up @@ -196,13 +190,18 @@ func (es *endpointSharding) updateState() {
defer es.mu.Unlock()

children := es.children.Load()
pickerCount := 0
childStates := make([]ChildState, 0, children.Len())

for _, child := range children.Values() {
bw := child.(*balancerWrapper)
childState := bw.childState
childStates = append(childStates, childState)
childPicker := childState.State.Picker
if childPicker == nil {
continue
}
pickerCount++
switch childState.State.ConnectivityState {
case connectivity.Ready:
readyPickers = append(readyPickers, childPicker)
Expand All @@ -216,6 +215,15 @@ func (es *endpointSharding) updateState() {
}
}

// If there are no pickers available yet, return a queuing picker.
if pickerCount == 0 {
es.cc.UpdateState(balancer.State{
ConnectivityState: connectivity.Connecting,
Picker: base.NewErrPicker(balancer.ErrNoSubConnAvailable),
})
return
}

// Construct the round robin picker based off the aggregated state. Whatever
// the aggregated state, use the pickers present that are currently in that
// state only.
Expand Down
20 changes: 14 additions & 6 deletions balancer/endpointsharding/endpointsharding_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,25 +176,33 @@ func (s) TestEndpointShardingStuckConnecting(t *testing.T) {
},
})
childLbJSON := json.RawMessage(fmt.Sprintf(`[{%q: {}}]`, childPolicyName))
var parseErr error
childLBConfig, parseErr = ParseConfig(childLbJSON)
childLBConfig, parseErr := ParseConfig(childLbJSON)
if parseErr != nil {
t.Fatalf("Failed to parse child LB config: %v", parseErr)
}
backend1 := stubserver.StartTestService(t, nil)
defer backend1.Stop()
backend2 := stubserver.StartTestService(t, nil)
defer backend2.Stop()

mr := manual.NewBuilderWithScheme("e2e-test")
defer mr.Close()

json := `{"loadBalancingConfig": [{"fake_petiole":{}}]}`
petiolePolicyName := t.Name() + "-petiole"
stub.Register(petiolePolicyName, stub.BalancerFuncs{
Init: func(bd *stub.BalancerData) {
bd.Data = NewBalancer(bd.ClientConn, bd.BuildOptions)
},
UpdateClientConnState: func(bd *stub.BalancerData, state balancer.ClientConnState) error {
return bd.Data.(balancer.Balancer).UpdateClientConnState(balancer.ClientConnState{
BalancerConfig: childLBConfig,
ResolverState: state.ResolverState,
})
},
})
json := fmt.Sprintf(`{"loadBalancingConfig": [{"%s":{}}]}`, petiolePolicyName)
sc := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(json)
mr.InitialState(resolver.State{
Endpoints: []resolver.Endpoint{
{Addresses: []resolver.Address{{Addr: backend1.Address}}},
{Addresses: []resolver.Address{{Addr: backend2.Address}}},
},
ServiceConfig: sc,
})
Expand Down

0 comments on commit 368ae08

Please sign in to comment.