diff --git a/VERSION b/VERSION index 6e75c01de5..9d40912649 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -v0.21 +v0.22 diff --git a/proxy/healthy_endpoints_test.go b/proxy/healthy_endpoints_test.go index bc0fdea46a..9d8dde893f 100644 --- a/proxy/healthy_endpoints_test.go +++ b/proxy/healthy_endpoints_test.go @@ -20,10 +20,11 @@ const ( func defaultEndpointRegistry() *routing.EndpointRegistry { return routing.NewEndpointRegistry(routing.RegistryOptions{ - PassiveHealthCheckEnabled: true, - StatsResetPeriod: period, - MinRequests: 10, - MaxHealthCheckDropProbability: 1.0, + PassiveHealthCheck: &routing.PassiveHealthCheck{ + Period: period, + MinRequests: 10, + MaxDropProbability: 1.0, + }, }) } @@ -49,11 +50,8 @@ func sendGetRequests(t *testing.T, ps *httptest.Server) (failed int) { } func setupProxy(t *testing.T, doc string) (*testProxy, *httptest.Server) { - endpointRegistry := defaultEndpointRegistry() - tp, err := newTestProxyWithParams(doc, Params{ - EnablePassiveHealthCheck: true, - EndpointRegistry: endpointRegistry, + EndpointRegistry: defaultEndpointRegistry(), }) require.NoError(t, err) @@ -106,12 +104,10 @@ func TestPHCForSingleHealthyEndpoint(t *testing.T) { w.WriteHeader(http.StatusOK) })) defer service.Close() - endpointRegistry := defaultEndpointRegistry() doc := fmt.Sprintf(`* -> "%s"`, service.URL) tp, err := newTestProxyWithParams(doc, Params{ - EnablePassiveHealthCheck: true, - EndpointRegistry: endpointRegistry, + EndpointRegistry: defaultEndpointRegistry(), }) if err != nil { t.Fatal(err) diff --git a/proxy/proxy.go b/proxy/proxy.go index d2156d8f55..974f03f917 100644 --- a/proxy/proxy.go +++ b/proxy/proxy.go @@ -145,25 +145,12 @@ type OpenTracingParams struct { ExcludeTags []string } -type PassiveHealthCheck struct { - // The period of time after which the endpointregistry begins to calculate endpoints statistics - // from scratch - Period time.Duration - - // The minimum number of total requests that should be sent to an endpoint in a single period to - // potentially opt out the endpoint from the list of healthy endpoints - MinRequests int64 - - // The maximum probability of unhealthy endpoint to be dropped out from load balancing for every specific request - MaxDropProbability float64 -} - -func InitPassiveHealthChecker(o map[string]string) (bool, *PassiveHealthCheck, error) { +func InitPassiveHealthChecker(o map[string]string) (*routing.PassiveHealthCheck, error) { if len(o) == 0 { - return false, &PassiveHealthCheck{}, nil + return nil, nil } - result := &PassiveHealthCheck{} + result := &routing.PassiveHealthCheck{} keysInitialized := make(map[string]struct{}) for key, value := range o { @@ -171,41 +158,41 @@ func InitPassiveHealthChecker(o map[string]string) (bool, *PassiveHealthCheck, e case "period": period, err := time.ParseDuration(value) if err != nil { - return false, nil, fmt.Errorf("passive health check: invalid period value: %s", value) + return nil, fmt.Errorf("passive health check: invalid period value: %s", value) } if period < 0 { - return false, nil, fmt.Errorf("passive health check: invalid period value: %s", value) + return nil, fmt.Errorf("passive health check: invalid period value: %s", value) } result.Period = period case "min-requests": minRequests, err := strconv.Atoi(value) if err != nil { - return false, nil, fmt.Errorf("passive health check: invalid minRequests value: %s", value) + return nil, fmt.Errorf("passive health check: invalid minRequests value: %s", value) } if minRequests < 0 { - return false, nil, fmt.Errorf("passive health check: invalid minRequests value: %s", value) + return nil, fmt.Errorf("passive health check: invalid minRequests value: %s", value) } result.MinRequests = int64(minRequests) case "max-drop-probability": maxDropProbability, err := strconv.ParseFloat(value, 64) if err != nil { - return false, nil, fmt.Errorf("passive health check: invalid maxDropProbability value: %s", value) + return nil, fmt.Errorf("passive health check: invalid maxDropProbability value: %s", value) } if maxDropProbability < 0 || maxDropProbability > 1 { - return false, nil, fmt.Errorf("passive health check: invalid maxDropProbability value: %s", value) + return nil, fmt.Errorf("passive health check: invalid maxDropProbability value: %s", value) } result.MaxDropProbability = maxDropProbability default: - return false, nil, fmt.Errorf("passive health check: invalid parameter: key=%s,value=%s", key, value) + return nil, fmt.Errorf("passive health check: invalid parameter: key=%s,value=%s", key, value) } keysInitialized[key] = struct{}{} } if len(keysInitialized) != 3 { - return false, nil, fmt.Errorf("passive health check: missing required parameters") + return nil, fmt.Errorf("passive health check: missing required parameters") } - return true, result, nil + return result, nil } // Proxy initialization options. @@ -310,12 +297,6 @@ type Params struct { // and returns some metadata about endpoint. Information about the metadata // returned from the registry could be found in routing.Metrics interface. EndpointRegistry *routing.EndpointRegistry - - // EnablePassiveHealthCheck enables the healthy endpoints checker - EnablePassiveHealthCheck bool - - // PassiveHealthCheck defines the parameters for the healthy endpoints checker. - PassiveHealthCheck *PassiveHealthCheck } type ( @@ -790,7 +771,7 @@ func WithParams(p Params) *Proxy { hostname := os.Getenv("HOSTNAME") var healthyEndpointsChooser *healthyEndpoints - if p.EnablePassiveHealthCheck { + if p.EndpointRegistry.GetPassiveHealthCheck() != nil { healthyEndpointsChooser = &healthyEndpoints{ rnd: rand.New(loadbalancer.NewLockedSource()), endpointRegistry: p.EndpointRegistry, diff --git a/proxy/proxy_test.go b/proxy/proxy_test.go index 0ba1fce3eb..c595014581 100644 --- a/proxy/proxy_test.go +++ b/proxy/proxy_test.go @@ -2283,16 +2283,14 @@ func BenchmarkAccessLogEnable(b *testing.B) { benchmarkAccessLog(b, "enableAcces func TestInitPassiveHealthChecker(t *testing.T) { for i, ti := range []struct { - inputArg map[string]string - expectedEnabled bool - expectedParams *PassiveHealthCheck - expectedError error + inputArg map[string]string + expectedParams *routing.PassiveHealthCheck + expectedError error }{ { - inputArg: map[string]string{}, - expectedEnabled: false, - expectedParams: nil, - expectedError: nil, + inputArg: map[string]string{}, + expectedParams: nil, + expectedError: nil, }, { inputArg: map[string]string{ @@ -2300,9 +2298,8 @@ func TestInitPassiveHealthChecker(t *testing.T) { "min-requests": "10", "max-drop-probability": "0.9", }, - expectedEnabled: false, - expectedParams: nil, - expectedError: fmt.Errorf("passive health check: invalid period value: somethingInvalid"), + expectedParams: nil, + expectedError: fmt.Errorf("passive health check: invalid period value: somethingInvalid"), }, { inputArg: map[string]string{ @@ -2310,8 +2307,7 @@ func TestInitPassiveHealthChecker(t *testing.T) { "min-requests": "10", "max-drop-probability": "0.9", }, - expectedEnabled: true, - expectedParams: &PassiveHealthCheck{ + expectedParams: &routing.PassiveHealthCheck{ Period: 1 * time.Minute, MinRequests: 10, MaxDropProbability: 0.9, @@ -2324,9 +2320,8 @@ func TestInitPassiveHealthChecker(t *testing.T) { "min-requests": "10", "max-drop-probability": "0.9", }, - expectedEnabled: false, - expectedParams: nil, - expectedError: fmt.Errorf("passive health check: invalid period value: -1m"), + expectedParams: nil, + expectedError: fmt.Errorf("passive health check: invalid period value: -1m"), }, { inputArg: map[string]string{ @@ -2334,9 +2329,8 @@ func TestInitPassiveHealthChecker(t *testing.T) { "min-requests": "somethingInvalid", "max-drop-probability": "0.9", }, - expectedEnabled: false, - expectedParams: nil, - expectedError: fmt.Errorf("passive health check: invalid minRequests value: somethingInvalid"), + expectedParams: nil, + expectedError: fmt.Errorf("passive health check: invalid minRequests value: somethingInvalid"), }, { inputArg: map[string]string{ @@ -2344,9 +2338,8 @@ func TestInitPassiveHealthChecker(t *testing.T) { "min-requests": "-10", "max-drop-probability": "0.9", }, - expectedEnabled: false, - expectedParams: nil, - expectedError: fmt.Errorf("passive health check: invalid minRequests value: -10"), + expectedParams: nil, + expectedError: fmt.Errorf("passive health check: invalid minRequests value: -10"), }, { inputArg: map[string]string{ @@ -2354,9 +2347,8 @@ func TestInitPassiveHealthChecker(t *testing.T) { "min-requests": "10", "max-drop-probability": "somethingInvalid", }, - expectedEnabled: false, - expectedParams: nil, - expectedError: fmt.Errorf("passive health check: invalid maxDropProbability value: somethingInvalid"), + expectedParams: nil, + expectedError: fmt.Errorf("passive health check: invalid maxDropProbability value: somethingInvalid"), }, { inputArg: map[string]string{ @@ -2364,9 +2356,8 @@ func TestInitPassiveHealthChecker(t *testing.T) { "min-requests": "10", "max-drop-probability": "-0.1", }, - expectedEnabled: false, - expectedParams: nil, - expectedError: fmt.Errorf("passive health check: invalid maxDropProbability value: -0.1"), + expectedParams: nil, + expectedError: fmt.Errorf("passive health check: invalid maxDropProbability value: -0.1"), }, { inputArg: map[string]string{ @@ -2374,9 +2365,8 @@ func TestInitPassiveHealthChecker(t *testing.T) { "min-requests": "10", "max-drop-probability": "3.1415", }, - expectedEnabled: false, - expectedParams: nil, - expectedError: fmt.Errorf("passive health check: invalid maxDropProbability value: 3.1415"), + expectedParams: nil, + expectedError: fmt.Errorf("passive health check: invalid maxDropProbability value: 3.1415"), }, { inputArg: map[string]string{ @@ -2385,9 +2375,8 @@ func TestInitPassiveHealthChecker(t *testing.T) { "max-drop-probability": "0.9", "non-existing": "non-existing", }, - expectedEnabled: false, - expectedParams: nil, - expectedError: fmt.Errorf("passive health check: invalid parameter: key=non-existing,value=non-existing"), + expectedParams: nil, + expectedError: fmt.Errorf("passive health check: invalid parameter: key=non-existing,value=non-existing"), }, { inputArg: map[string]string{ @@ -2395,18 +2384,14 @@ func TestInitPassiveHealthChecker(t *testing.T) { "min-requests": "10", /* forgot max-drop-probability */ }, - expectedEnabled: false, - expectedParams: nil, - expectedError: fmt.Errorf("passive health check: missing required parameters"), + expectedParams: nil, + expectedError: fmt.Errorf("passive health check: missing required parameters"), }, } { t.Run(fmt.Sprintf("case_%d", i), func(t *testing.T) { - enabled, params, err := InitPassiveHealthChecker(ti.inputArg) - assert.Equal(t, ti.expectedEnabled, enabled) + params, err := InitPassiveHealthChecker(ti.inputArg) assert.Equal(t, ti.expectedError, err) - if enabled { - assert.Equal(t, ti.expectedParams, params) - } + assert.Equal(t, ti.expectedParams, params) }) } } diff --git a/routing/endpointregistry.go b/routing/endpointregistry.go index e43cb7f867..68342e865d 100644 --- a/routing/endpointregistry.go +++ b/routing/endpointregistry.go @@ -93,10 +93,8 @@ func newEntry() *entry { } type EndpointRegistry struct { - lastSeenTimeout time.Duration - statsResetPeriod time.Duration - minRequests int64 - maxHealthCheckDropProbability float64 + lastSeenTimeout time.Duration + passiveHealthCheck *PassiveHealthCheck quit chan struct{} @@ -107,11 +105,8 @@ type EndpointRegistry struct { var _ PostProcessor = &EndpointRegistry{} type RegistryOptions struct { - LastSeenTimeout time.Duration - PassiveHealthCheckEnabled bool - StatsResetPeriod time.Duration - MinRequests int64 - MaxHealthCheckDropProbability float64 + LastSeenTimeout time.Duration + PassiveHealthCheck *PassiveHealthCheck } func (r *EndpointRegistry) Do(routes []*Route) []*Route { @@ -151,7 +146,7 @@ func (r *EndpointRegistry) Do(routes []*Route) []*Route { } func (r *EndpointRegistry) updateStats() { - ticker := time.NewTicker(r.statsResetPeriod) + ticker := time.NewTicker(r.passiveHealthCheck.Period) for { r.data.Range(func(key, value any) bool { @@ -164,9 +159,9 @@ func (r *EndpointRegistry) updateStats() { failed := e.totalFailedRoundTrips[curSlot].Load() requests := e.totalRequests[curSlot].Load() - if requests > r.minRequests { + if requests > r.passiveHealthCheck.MinRequests { failedRoundTripsRatio := float64(failed) / float64(requests) - e.healthCheckDropProbability.Store(min(failedRoundTripsRatio, r.maxHealthCheckDropProbability)) + e.healthCheckDropProbability.Store(min(failedRoundTripsRatio, r.passiveHealthCheck.MaxDropProbability)) } else { e.healthCheckDropProbability.Store(0.0) } @@ -193,17 +188,15 @@ func NewEndpointRegistry(o RegistryOptions) *EndpointRegistry { } registry := &EndpointRegistry{ - lastSeenTimeout: o.LastSeenTimeout, - statsResetPeriod: o.StatsResetPeriod, - minRequests: o.MinRequests, - maxHealthCheckDropProbability: o.MaxHealthCheckDropProbability, + lastSeenTimeout: o.LastSeenTimeout, + passiveHealthCheck: o.PassiveHealthCheck, quit: make(chan struct{}), now: time.Now, data: sync.Map{}, } - if o.PassiveHealthCheckEnabled { + if o.PassiveHealthCheck != nil { go registry.updateStats() } @@ -219,6 +212,10 @@ func (r *EndpointRegistry) GetMetrics(hostPort string) Metrics { return e.(*entry) } +func (r *EndpointRegistry) GetPassiveHealthCheck() *PassiveHealthCheck { + return r.passiveHealthCheck +} + func (r *EndpointRegistry) allMetrics() map[string]Metrics { result := make(map[string]Metrics) r.data.Range(func(k, v any) bool { diff --git a/routing/routing.go b/routing/routing.go index efdb746305..a762da115e 100644 --- a/routing/routing.go +++ b/routing/routing.go @@ -46,6 +46,19 @@ func (o MatchingOptions) ignoreTrailingSlash() bool { return o&IgnoreTrailingSlash > 0 } +type PassiveHealthCheck struct { + // The period of time after which the endpointregistry begins to calculate endpoints statistics + // from scratch + Period time.Duration + + // The minimum number of total requests that should be sent to an endpoint in a single period to + // potentially opt out the endpoint from the list of healthy endpoints + MinRequests int64 + + // The maximum probability of unhealthy endpoint to be dropped out from load balancing for every specific request + MaxDropProbability float64 +} + // DataClient instances provide data sources for // route definitions. type DataClient interface { diff --git a/skipper.go b/skipper.go index ef179a5252..5f77a88b2e 100644 --- a/skipper.go +++ b/skipper.go @@ -1934,17 +1934,14 @@ func run(o Options, sig chan os.Signal, idleConnsCH chan struct{}) error { }) defer schedulerRegistry.Close() - passiveHealthCheckEnabled, passiveHealthCheck, err := proxy.InitPassiveHealthChecker(o.PassiveHealthCheck) + passiveHealthCheck, err := proxy.InitPassiveHealthChecker(o.PassiveHealthCheck) if err != nil { return err } // create a routing engine endpointRegistry := routing.NewEndpointRegistry(routing.RegistryOptions{ - PassiveHealthCheckEnabled: passiveHealthCheckEnabled, - StatsResetPeriod: passiveHealthCheck.Period, - MinRequests: passiveHealthCheck.MinRequests, - MaxHealthCheckDropProbability: passiveHealthCheck.MaxDropProbability, + PassiveHealthCheck: passiveHealthCheck, }) ro := routing.Options{ FilterRegistry: o.filterRegistry(), @@ -2011,8 +2008,6 @@ func run(o Options, sig chan os.Signal, idleConnsCH chan struct{}) error { proxyParams := proxy.Params{ Routing: routing, EndpointRegistry: endpointRegistry, - EnablePassiveHealthCheck: passiveHealthCheckEnabled, - PassiveHealthCheck: passiveHealthCheck, Flags: proxyFlags, PriorityRoutes: o.PriorityRoutes, IdleConnectionsPerHost: o.IdleConnectionsPerHost,