diff --git a/VERSION b/VERSION index 6e75c01de5..9d40912649 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -v0.21 +v0.22 diff --git a/proxy/healthy_endpoints_test.go b/proxy/healthy_endpoints_test.go index 0e75e65cda..b2b833ca84 100644 --- a/proxy/healthy_endpoints_test.go +++ b/proxy/healthy_endpoints_test.go @@ -22,14 +22,14 @@ const ( period = 100 * time.Millisecond ) -func defaultEndpointRegistry() *routing.EndpointRegistry { - return routing.NewEndpointRegistry(routing.RegistryOptions{ - PassiveHealthCheckEnabled: true, - StatsResetPeriod: period, - MinRequests: 2, - MaxHealthCheckDropProbability: 0.95, - MinHealthCheckDropProbability: 0.01, - }) +func defaultPassiveHealthCheck() *routing.PassiveHealthCheck { + return &routing.PassiveHealthCheck{ + Period: period, + MinRequests: 2, + MaxDropProbability: 0.95, + MinDropProbability: 0.01, + MaxUnhealthyEndpointsRatio: 1.0, + } } func sendGetRequest(t *testing.T, ps *httptest.Server, consistentHashKey int) *http.Response { @@ -62,14 +62,10 @@ func fireVegeta(t *testing.T, ps *httptest.Server, freq int, per time.Duration, func setupProxy(t *testing.T, doc string) (*metricstest.MockMetrics, *httptest.Server) { m := &metricstest.MockMetrics{} - endpointRegistry := defaultEndpointRegistry() + endpointRegistry := routing.NewEndpointRegistry(routing.RegistryOptions{PassiveHealthCheck: defaultPassiveHealthCheck()}) proxyParams := Params{ - EnablePassiveHealthCheck: true, - EndpointRegistry: endpointRegistry, - Metrics: m, - PassiveHealthCheck: &PassiveHealthCheck{ - MaxUnhealthyEndpointsRatio: 1.0, - }, + EndpointRegistry: endpointRegistry, + Metrics: m, } return m, setupProxyWithCustomProxyParams(t, doc, proxyParams) @@ -78,12 +74,8 @@ func setupProxy(t *testing.T, doc string) (*metricstest.MockMetrics, *httptest.S func setupProxyWithCustomEndpointRegisty(t *testing.T, doc string, endpointRegistry *routing.EndpointRegistry) (*metricstest.MockMetrics, *httptest.Server) { m := &metricstest.MockMetrics{} proxyParams := Params{ - EnablePassiveHealthCheck: true, - EndpointRegistry: endpointRegistry, - Metrics: m, - PassiveHealthCheck: &PassiveHealthCheck{ - MaxUnhealthyEndpointsRatio: 1.0, - }, + EndpointRegistry: endpointRegistry, + Metrics: m, } return m, setupProxyWithCustomProxyParams(t, doc, proxyParams) @@ -151,15 +143,18 @@ func TestPHCWithoutRequests(t *testing.T) { func TestPHCForSingleHealthyEndpoint(t *testing.T) { servicesString := setupServices(t, 1, 0) - endpointRegistry := defaultEndpointRegistry() + endpointRegistry := routing.NewEndpointRegistry(routing.RegistryOptions{ + PassiveHealthCheck: &routing.PassiveHealthCheck{ + MaxUnhealthyEndpointsRatio: 1.0, + MaxDropProbability: 0.95, + MinRequests: 2, + Period: period, + }, + }) doc := fmt.Sprintf(`* -> %s`, servicesString) tp, err := newTestProxyWithParams(doc, Params{ - EnablePassiveHealthCheck: true, - EndpointRegistry: endpointRegistry, - PassiveHealthCheck: &PassiveHealthCheck{ - MaxUnhealthyEndpointsRatio: 1.0, - }, + EndpointRegistry: endpointRegistry, }) if err != nil { t.Fatal(err) @@ -227,11 +222,13 @@ func TestPHC(t *testing.T) { t.Run("consistentHash", func(t *testing.T) { consistantHashCustomEndpointRegistry := routing.NewEndpointRegistry(routing.RegistryOptions{ - PassiveHealthCheckEnabled: true, - StatsResetPeriod: 1 * time.Second, - MinRequests: 1, // with 2 test case fails on github actions - MaxHealthCheckDropProbability: 0.95, - MinHealthCheckDropProbability: 0.01, + PassiveHealthCheck: &routing.PassiveHealthCheck{ + Period: 1 * time.Second, + MinRequests: 1, // with 2 test case fails on github actions + MaxDropProbability: 0.95, + MinDropProbability: 0.01, + MaxUnhealthyEndpointsRatio: 1.0, + }, }) mockMetrics, ps := setupProxyWithCustomEndpointRegisty(t, fmt.Sprintf(`* -> backendTimeout("5ms") -> consistentHashKey("${request.header.ConsistentHashKey}") -> `, servicesString), consistantHashCustomEndpointRegistry) @@ -245,11 +242,13 @@ func TestPHC(t *testing.T) { t.Run("consistent hash with balance factor", func(t *testing.T) { consistantHashCustomEndpointRegistry := routing.NewEndpointRegistry(routing.RegistryOptions{ - PassiveHealthCheckEnabled: true, - StatsResetPeriod: 1 * time.Second, - MinRequests: 1, // with 2 test case fails on github actions - MaxHealthCheckDropProbability: 0.95, - MinHealthCheckDropProbability: 0.01, + PassiveHealthCheck: &routing.PassiveHealthCheck{ + Period: 1 * time.Second, + MinRequests: 1, // with 2 test case fails on github actions + MaxDropProbability: 0.95, + MinDropProbability: 0.01, + MaxUnhealthyEndpointsRatio: 1.0, + }, }) mockMetrics, ps := setupProxyWithCustomEndpointRegisty(t, fmt.Sprintf(`* -> backendTimeout("5ms") -> consistentHashKey("${request.header.ConsistentHashKey}") -> consistentHashBalanceFactor(1.25) -> `, servicesString), consistantHashCustomEndpointRegistry) @@ -301,14 +300,11 @@ func TestPHCMaxUnhealthyEndpointsRatioParam(t *testing.T) { servicesString := setupServices(t, healthy, unhealthy) mockMetrics := &metricstest.MockMetrics{} - endpointRegistry := defaultEndpointRegistry() + passiveHealthCheck := defaultPassiveHealthCheck() + passiveHealthCheck.MaxUnhealthyEndpointsRatio = maxUnhealthyEndpointsRatio proxyParams := Params{ - EnablePassiveHealthCheck: true, - EndpointRegistry: endpointRegistry, - Metrics: mockMetrics, - PassiveHealthCheck: &PassiveHealthCheck{ - MaxUnhealthyEndpointsRatio: maxUnhealthyEndpointsRatio, - }, + EndpointRegistry: routing.NewEndpointRegistry(routing.RegistryOptions{PassiveHealthCheck: passiveHealthCheck}), + Metrics: mockMetrics, } ps := setupProxyWithCustomProxyParams(t, fmt.Sprintf(`* -> backendTimeout("20ms") -> `, servicesString), proxyParams) diff --git a/proxy/proxy.go b/proxy/proxy.go index 5c4fe9faca..e38c7d12d6 100644 --- a/proxy/proxy.go +++ b/proxy/proxy.go @@ -146,34 +146,12 @@ type OpenTracingParams struct { ExcludeTags []string } -type PassiveHealthCheck struct { - // The period of time after which the endpointregistry begins to calculate endpoints statistics - // from scratch - Period time.Duration - - // The minimum number of total requests that should be sent to an endpoint in a single period to - // potentially opt out the endpoint from the list of healthy endpoints - MinRequests int64 - - // The minimal ratio of failed requests in a single period to potentially opt out the endpoint - // from the list of healthy endpoints. This ratio is equal to the minimal non-zero probability of - // dropping endpoint out from load balancing for every specific request - MinDropProbability float64 - - // The maximum probability of unhealthy endpoint to be dropped out from load balancing for every specific request - MaxDropProbability float64 - - // MaxUnhealthyEndpointsRatio is the maximum ratio of unhealthy endpoints in the list of all endpoints PHC will check - // in case of all endpoints are unhealthy - MaxUnhealthyEndpointsRatio float64 -} - -func InitPassiveHealthChecker(o map[string]string) (bool, *PassiveHealthCheck, error) { +func InitPassiveHealthChecker(o map[string]string) (*routing.PassiveHealthCheck, error) { if len(o) == 0 { - return false, &PassiveHealthCheck{}, nil + return nil, nil } - result := &PassiveHealthCheck{ + result := &routing.PassiveHealthCheck{ MinDropProbability: 0.0, MaxUnhealthyEndpointsRatio: 1.0, } @@ -190,28 +168,28 @@ func InitPassiveHealthChecker(o map[string]string) (bool, *PassiveHealthCheck, e case "period": period, err := time.ParseDuration(value) if err != nil { - return false, nil, fmt.Errorf("passive health check: invalid period value: %s", value) + return nil, fmt.Errorf("passive health check: invalid period value: %s", value) } if period < 0 { - return false, nil, fmt.Errorf("passive health check: invalid period value: %s", value) + return nil, fmt.Errorf("passive health check: invalid period value: %s", value) } result.Period = period case "min-requests": minRequests, err := strconv.Atoi(value) if err != nil { - return false, nil, fmt.Errorf("passive health check: invalid minRequests value: %s", value) + return nil, fmt.Errorf("passive health check: invalid minRequests value: %s", value) } if minRequests < 0 { - return false, nil, fmt.Errorf("passive health check: invalid minRequests value: %s", value) + return nil, fmt.Errorf("passive health check: invalid minRequests value: %s", value) } result.MinRequests = int64(minRequests) case "max-drop-probability": maxDropProbability, err := strconv.ParseFloat(value, 64) if err != nil { - return false, nil, fmt.Errorf("passive health check: invalid maxDropProbability value: %s", value) + return nil, fmt.Errorf("passive health check: invalid maxDropProbability value: %s", value) } if maxDropProbability < 0 || maxDropProbability > 1 { - return false, nil, fmt.Errorf("passive health check: invalid maxDropProbability value: %s", value) + return nil, fmt.Errorf("passive health check: invalid maxDropProbability value: %s", value) } result.MaxDropProbability = maxDropProbability @@ -219,33 +197,33 @@ func InitPassiveHealthChecker(o map[string]string) (bool, *PassiveHealthCheck, e case "min-drop-probability": minDropProbability, err := strconv.ParseFloat(value, 64) if err != nil { - return false, nil, fmt.Errorf("passive health check: invalid minDropProbability value: %s", value) + return nil, fmt.Errorf("passive health check: invalid minDropProbability value: %s", value) } if minDropProbability < 0 || minDropProbability > 1 { - return false, nil, fmt.Errorf("passive health check: invalid minDropProbability value: %s", value) + return nil, fmt.Errorf("passive health check: invalid minDropProbability value: %s", value) } result.MinDropProbability = minDropProbability case "max-unhealthy-endpoints-ratio": maxUnhealthyEndpointsRatio, err := strconv.ParseFloat(value, 64) if err != nil { - return false, nil, fmt.Errorf("passive health check: invalid maxUnhealthyEndpointsRatio value: %q", value) + return nil, fmt.Errorf("passive health check: invalid maxUnhealthyEndpointsRatio value: %q", value) } if maxUnhealthyEndpointsRatio < 0 || maxUnhealthyEndpointsRatio > 1 { - return false, nil, fmt.Errorf("passive health check: invalid maxUnhealthyEndpointsRatio value: %q", value) + return nil, fmt.Errorf("passive health check: invalid maxUnhealthyEndpointsRatio value: %q", value) } result.MaxUnhealthyEndpointsRatio = maxUnhealthyEndpointsRatio default: - return false, nil, fmt.Errorf("passive health check: invalid parameter: key=%s,value=%s", key, value) + return nil, fmt.Errorf("passive health check: invalid parameter: key=%s,value=%s", key, value) } } if len(requiredParams) != 0 { - return false, nil, fmt.Errorf("passive health check: missing required parameters %+v", maps.Keys(requiredParams)) + return nil, fmt.Errorf("passive health check: missing required parameters %+v", maps.Keys(requiredParams)) } if result.MinDropProbability >= result.MaxDropProbability { - return false, nil, fmt.Errorf("passive health check: minDropProbability should be less than maxDropProbability") + return nil, fmt.Errorf("passive health check: minDropProbability should be less than maxDropProbability") } - return true, result, nil + return result, nil } // Proxy initialization options. @@ -354,12 +332,6 @@ type Params struct { // and returns some metadata about endpoint. Information about the metadata // returned from the registry could be found in routing.Metrics interface. EndpointRegistry *routing.EndpointRegistry - - // EnablePassiveHealthCheck enables the healthy endpoints checker - EnablePassiveHealthCheck bool - - // PassiveHealthCheck defines the parameters for the healthy endpoints checker. - PassiveHealthCheck *PassiveHealthCheck } type ( @@ -838,10 +810,10 @@ func WithParams(p Params) *Proxy { hostname := os.Getenv("HOSTNAME") var healthyEndpointsChooser *healthyEndpoints - if p.EnablePassiveHealthCheck { + if p.EndpointRegistry.GetPassiveHealthCheck() != nil { healthyEndpointsChooser = &healthyEndpoints{ rnd: rand.New(loadbalancer.NewLockedSource()), - maxUnhealthyEndpointsRatio: p.PassiveHealthCheck.MaxUnhealthyEndpointsRatio, + maxUnhealthyEndpointsRatio: p.EndpointRegistry.GetPassiveHealthCheck().MaxUnhealthyEndpointsRatio, } } return &Proxy{ diff --git a/proxy/proxy_test.go b/proxy/proxy_test.go index 21bc5e0c03..86e25c6036 100644 --- a/proxy/proxy_test.go +++ b/proxy/proxy_test.go @@ -2290,16 +2290,14 @@ func BenchmarkAccessLogEnable(b *testing.B) { benchmarkAccessLog(b, "enableAcces func TestInitPassiveHealthChecker(t *testing.T) { for i, ti := range []struct { - inputArg map[string]string - expectedEnabled bool - expectedParams *PassiveHealthCheck - expectedError error + inputArg map[string]string + expectedParams *routing.PassiveHealthCheck + expectedError error }{ { - inputArg: map[string]string{}, - expectedEnabled: false, - expectedParams: nil, - expectedError: nil, + inputArg: map[string]string{}, + expectedParams: nil, + expectedError: nil, }, { inputArg: map[string]string{ @@ -2309,9 +2307,8 @@ func TestInitPassiveHealthChecker(t *testing.T) { "min-drop-probability": "0.05", "max-unhealthy-endpoints-ratio": "0.3", }, - expectedEnabled: false, - expectedParams: nil, - expectedError: fmt.Errorf("passive health check: invalid period value: somethingInvalid"), + expectedParams: nil, + expectedError: fmt.Errorf("passive health check: invalid period value: somethingInvalid"), }, { inputArg: map[string]string{ @@ -2321,8 +2318,7 @@ func TestInitPassiveHealthChecker(t *testing.T) { "min-drop-probability": "0.05", "max-unhealthy-endpoints-ratio": "0.3", }, - expectedEnabled: true, - expectedParams: &PassiveHealthCheck{ + expectedParams: &routing.PassiveHealthCheck{ Period: 1 * time.Minute, MinRequests: 10, MaxDropProbability: 0.9, @@ -2339,9 +2335,8 @@ func TestInitPassiveHealthChecker(t *testing.T) { "min-drop-probability": "0.05", "max-unhealthy-endpoints-ratio": "0.3", }, - expectedEnabled: false, - expectedParams: nil, - expectedError: fmt.Errorf("passive health check: invalid period value: -1m"), + expectedParams: nil, + expectedError: fmt.Errorf("passive health check: invalid period value: -1m"), }, { inputArg: map[string]string{ @@ -2351,9 +2346,8 @@ func TestInitPassiveHealthChecker(t *testing.T) { "min-drop-probability": "0.05", "max-unhealthy-endpoints-ratio": "0.3", }, - expectedEnabled: false, - expectedParams: nil, - expectedError: fmt.Errorf("passive health check: invalid minRequests value: somethingInvalid"), + expectedParams: nil, + expectedError: fmt.Errorf("passive health check: invalid minRequests value: somethingInvalid"), }, { inputArg: map[string]string{ @@ -2363,9 +2357,8 @@ func TestInitPassiveHealthChecker(t *testing.T) { "min-drop-probability": "0.05", "max-unhealthy-endpoints-ratio": "0.3", }, - expectedEnabled: false, - expectedParams: nil, - expectedError: fmt.Errorf("passive health check: invalid minRequests value: -10"), + expectedParams: nil, + expectedError: fmt.Errorf("passive health check: invalid minRequests value: -10"), }, { inputArg: map[string]string{ @@ -2375,9 +2368,8 @@ func TestInitPassiveHealthChecker(t *testing.T) { "min-drop-probability": "0.05", "max-unhealthy-endpoints-ratio": "0.3", }, - expectedEnabled: false, - expectedParams: nil, - expectedError: fmt.Errorf("passive health check: invalid maxDropProbability value: somethingInvalid"), + expectedParams: nil, + expectedError: fmt.Errorf("passive health check: invalid maxDropProbability value: somethingInvalid"), }, { inputArg: map[string]string{ @@ -2387,9 +2379,8 @@ func TestInitPassiveHealthChecker(t *testing.T) { "min-drop-probability": "0.05", "max-unhealthy-endpoints-ratio": "0.3", }, - expectedEnabled: false, - expectedParams: nil, - expectedError: fmt.Errorf("passive health check: invalid maxDropProbability value: -0.1"), + expectedParams: nil, + expectedError: fmt.Errorf("passive health check: invalid maxDropProbability value: -0.1"), }, { inputArg: map[string]string{ @@ -2399,9 +2390,8 @@ func TestInitPassiveHealthChecker(t *testing.T) { "min-drop-probability": "0.05", "max-unhealthy-endpoints-ratio": "0.3", }, - expectedEnabled: false, - expectedParams: nil, - expectedError: fmt.Errorf("passive health check: invalid maxDropProbability value: 3.1415"), + expectedParams: nil, + expectedError: fmt.Errorf("passive health check: invalid maxDropProbability value: 3.1415"), }, { inputArg: map[string]string{ @@ -2411,9 +2401,8 @@ func TestInitPassiveHealthChecker(t *testing.T) { "min-drop-probability": "somethingInvalid", "max-unhealthy-endpoints-ratio": "0.3", }, - expectedEnabled: false, - expectedParams: nil, - expectedError: fmt.Errorf("passive health check: invalid minDropProbability value: somethingInvalid"), + expectedParams: nil, + expectedError: fmt.Errorf("passive health check: invalid minDropProbability value: somethingInvalid"), }, { inputArg: map[string]string{ @@ -2423,9 +2412,8 @@ func TestInitPassiveHealthChecker(t *testing.T) { "min-drop-probability": "-0.1", "max-unhealthy-endpoints-ratio": "0.3", }, - expectedEnabled: false, - expectedParams: nil, - expectedError: fmt.Errorf("passive health check: invalid minDropProbability value: -0.1"), + expectedParams: nil, + expectedError: fmt.Errorf("passive health check: invalid minDropProbability value: -0.1"), }, { inputArg: map[string]string{ @@ -2435,9 +2423,8 @@ func TestInitPassiveHealthChecker(t *testing.T) { "min-drop-probability": "3.1415", "max-unhealthy-endpoints-ratio": "0.3", }, - expectedEnabled: false, - expectedParams: nil, - expectedError: fmt.Errorf("passive health check: invalid minDropProbability value: 3.1415"), + expectedParams: nil, + expectedError: fmt.Errorf("passive health check: invalid minDropProbability value: 3.1415"), }, { inputArg: map[string]string{ @@ -2447,9 +2434,8 @@ func TestInitPassiveHealthChecker(t *testing.T) { "min-drop-probability": "0.9", "max-unhealthy-endpoints-ratio": "0.3", }, - expectedEnabled: false, - expectedParams: nil, - expectedError: fmt.Errorf("passive health check: minDropProbability should be less than maxDropProbability"), + expectedParams: nil, + expectedError: fmt.Errorf("passive health check: minDropProbability should be less than maxDropProbability"), }, { inputArg: map[string]string{ @@ -2459,9 +2445,8 @@ func TestInitPassiveHealthChecker(t *testing.T) { "min-drop-probability": "0.05", "max-unhealthy-endpoints-ratio": "-0.1", }, - expectedEnabled: false, - expectedParams: nil, - expectedError: fmt.Errorf(`passive health check: invalid maxUnhealthyEndpointsRatio value: "-0.1"`), + expectedParams: nil, + expectedError: fmt.Errorf(`passive health check: invalid maxUnhealthyEndpointsRatio value: "-0.1"`), }, { inputArg: map[string]string{ @@ -2471,9 +2456,8 @@ func TestInitPassiveHealthChecker(t *testing.T) { "min-drop-probability": "0.05", "max-unhealthy-endpoints-ratio": "3.1415", }, - expectedEnabled: false, - expectedParams: nil, - expectedError: fmt.Errorf(`passive health check: invalid maxUnhealthyEndpointsRatio value: "3.1415"`), + expectedParams: nil, + expectedError: fmt.Errorf(`passive health check: invalid maxUnhealthyEndpointsRatio value: "3.1415"`), }, { inputArg: map[string]string{ @@ -2483,9 +2467,8 @@ func TestInitPassiveHealthChecker(t *testing.T) { "min-drop-probability": "0.05", "max-unhealthy-endpoints-ratio": "somethingInvalid", }, - expectedEnabled: false, - expectedParams: nil, - expectedError: fmt.Errorf(`passive health check: invalid maxUnhealthyEndpointsRatio value: "somethingInvalid"`), + expectedParams: nil, + expectedError: fmt.Errorf(`passive health check: invalid maxUnhealthyEndpointsRatio value: "somethingInvalid"`), }, { inputArg: map[string]string{ @@ -2496,9 +2479,8 @@ func TestInitPassiveHealthChecker(t *testing.T) { "max-unhealthy-endpoints-ratio": "0.3", "non-existing": "non-existing", }, - expectedEnabled: false, - expectedParams: nil, - expectedError: fmt.Errorf("passive health check: invalid parameter: key=non-existing,value=non-existing"), + expectedParams: nil, + expectedError: fmt.Errorf("passive health check: invalid parameter: key=non-existing,value=non-existing"), }, { inputArg: map[string]string{ @@ -2508,9 +2490,8 @@ func TestInitPassiveHealthChecker(t *testing.T) { "min-drop-probability": "0.05", "max-unhealthy-endpoints-ratio": "0.3", }, - expectedEnabled: false, - expectedParams: nil, - expectedError: fmt.Errorf("passive health check: missing required parameters [max-drop-probability]"), + expectedParams: nil, + expectedError: fmt.Errorf("passive health check: missing required parameters [max-drop-probability]"), }, { inputArg: map[string]string{ @@ -2520,8 +2501,7 @@ func TestInitPassiveHealthChecker(t *testing.T) { /* using default min-drop-probability */ "max-unhealthy-endpoints-ratio": "0.3", }, - expectedEnabled: true, - expectedParams: &PassiveHealthCheck{ + expectedParams: &routing.PassiveHealthCheck{ Period: 1 * time.Minute, MinRequests: 10, MaxDropProbability: 0.9, @@ -2538,8 +2518,7 @@ func TestInitPassiveHealthChecker(t *testing.T) { "min-drop-probability": "0.05", /* using default max-unhealthy-endpoints-ratio */ }, - expectedEnabled: true, - expectedParams: &PassiveHealthCheck{ + expectedParams: &routing.PassiveHealthCheck{ Period: 1 * time.Minute, MinRequests: 10, MaxDropProbability: 0.9, @@ -2550,12 +2529,9 @@ func TestInitPassiveHealthChecker(t *testing.T) { }, } { t.Run(fmt.Sprintf("case_%d", i), func(t *testing.T) { - enabled, params, err := InitPassiveHealthChecker(ti.inputArg) - assert.Equal(t, ti.expectedEnabled, enabled) + params, err := InitPassiveHealthChecker(ti.inputArg) assert.Equal(t, ti.expectedError, err) - if enabled { - assert.Equal(t, ti.expectedParams, params) - } + assert.Equal(t, ti.expectedParams, params) }) } } diff --git a/routing/endpointregistry.go b/routing/endpointregistry.go index 4609d7bc96..e6ba8426dc 100644 --- a/routing/endpointregistry.go +++ b/routing/endpointregistry.go @@ -95,11 +95,8 @@ func newEntry() *entry { } type EndpointRegistry struct { - lastSeenTimeout time.Duration - statsResetPeriod time.Duration - minRequests int64 - minHealthCheckDropProbability float64 - maxHealthCheckDropProbability float64 + lastSeenTimeout time.Duration + passiveHealthCheck *PassiveHealthCheck quit chan struct{} @@ -110,12 +107,8 @@ type EndpointRegistry struct { var _ PostProcessor = &EndpointRegistry{} type RegistryOptions struct { - LastSeenTimeout time.Duration - PassiveHealthCheckEnabled bool - StatsResetPeriod time.Duration - MinRequests int64 - MinHealthCheckDropProbability float64 - MaxHealthCheckDropProbability float64 + LastSeenTimeout time.Duration + PassiveHealthCheck *PassiveHealthCheck } func (r *EndpointRegistry) Do(routes []*Route) []*Route { @@ -155,7 +148,7 @@ func (r *EndpointRegistry) Do(routes []*Route) []*Route { } func (r *EndpointRegistry) updateStats() { - ticker := time.NewTicker(r.statsResetPeriod) + ticker := time.NewTicker(r.passiveHealthCheck.Period) for { r.data.Range(func(key, value any) bool { @@ -169,11 +162,11 @@ func (r *EndpointRegistry) updateStats() { newDropProbability := 0.0 failed := e.totalFailedRoundTrips[curSlot].Load() requests := e.totalRequests[curSlot].Load() - if requests > r.minRequests { + if requests > r.passiveHealthCheck.MinRequests { failedRoundTripsRatio := float64(failed) / float64(requests) - if failedRoundTripsRatio > r.minHealthCheckDropProbability { + if failedRoundTripsRatio > r.passiveHealthCheck.MinDropProbability { log.Infof("Passive health check: marking %q as unhealthy due to failed round trips ratio: %0.2f", key, failedRoundTripsRatio) - newDropProbability = min(failedRoundTripsRatio, r.maxHealthCheckDropProbability) + newDropProbability = min(failedRoundTripsRatio, r.passiveHealthCheck.MaxDropProbability) } } e.healthCheckDropProbability.Store(newDropProbability) @@ -200,18 +193,15 @@ func NewEndpointRegistry(o RegistryOptions) *EndpointRegistry { } registry := &EndpointRegistry{ - lastSeenTimeout: o.LastSeenTimeout, - statsResetPeriod: o.StatsResetPeriod, - minRequests: o.MinRequests, - minHealthCheckDropProbability: o.MinHealthCheckDropProbability, - maxHealthCheckDropProbability: o.MaxHealthCheckDropProbability, + lastSeenTimeout: o.LastSeenTimeout, + passiveHealthCheck: o.PassiveHealthCheck, quit: make(chan struct{}), now: time.Now, data: sync.Map{}, } - if o.PassiveHealthCheckEnabled { + if o.PassiveHealthCheck != nil { go registry.updateStats() } @@ -227,6 +217,10 @@ func (r *EndpointRegistry) GetMetrics(hostPort string) Metrics { return e.(*entry) } +func (r *EndpointRegistry) GetPassiveHealthCheck() *PassiveHealthCheck { + return r.passiveHealthCheck +} + func (r *EndpointRegistry) allMetrics() map[string]Metrics { result := make(map[string]Metrics) r.data.Range(func(k, v any) bool { diff --git a/routing/routing.go b/routing/routing.go index 068f1c98e1..4104246817 100644 --- a/routing/routing.go +++ b/routing/routing.go @@ -46,6 +46,28 @@ func (o MatchingOptions) ignoreTrailingSlash() bool { return o&IgnoreTrailingSlash > 0 } +type PassiveHealthCheck struct { + // The period of time after which the endpointregistry begins to calculate endpoints statistics + // from scratch + Period time.Duration + + // The minimum number of total requests that should be sent to an endpoint in a single period to + // potentially opt out the endpoint from the list of healthy endpoints + MinRequests int64 + + // The minimal ratio of failed requests in a single period to potentially opt out the endpoint + // from the list of healthy endpoints. This ratio is equal to the minimal non-zero probability of + // dropping endpoint out from load balancing for every specific request + MinDropProbability float64 + + // The maximum probability of unhealthy endpoint to be dropped out from load balancing for every specific request + MaxDropProbability float64 + + // MaxUnhealthyEndpointsRatio is the maximum ratio of unhealthy endpoints in the list of all endpoints PHC will check + // in case of all endpoints are unhealthy + MaxUnhealthyEndpointsRatio float64 +} + // DataClient instances provide data sources for // route definitions. type DataClient interface { diff --git a/skipper.go b/skipper.go index 19e24f027f..9146b1d21c 100644 --- a/skipper.go +++ b/skipper.go @@ -1984,18 +1984,14 @@ func run(o Options, sig chan os.Signal, idleConnsCH chan struct{}) error { }) defer schedulerRegistry.Close() - passiveHealthCheckEnabled, passiveHealthCheck, err := proxy.InitPassiveHealthChecker(o.PassiveHealthCheck) + passiveHealthCheck, err := proxy.InitPassiveHealthChecker(o.PassiveHealthCheck) if err != nil { return err } // create a routing engine endpointRegistry := routing.NewEndpointRegistry(routing.RegistryOptions{ - PassiveHealthCheckEnabled: passiveHealthCheckEnabled, - StatsResetPeriod: passiveHealthCheck.Period, - MinRequests: passiveHealthCheck.MinRequests, - MinHealthCheckDropProbability: passiveHealthCheck.MinDropProbability, - MaxHealthCheckDropProbability: passiveHealthCheck.MaxDropProbability, + PassiveHealthCheck: passiveHealthCheck, }) ro := routing.Options{ FilterRegistry: o.filterRegistry(), @@ -2084,8 +2080,6 @@ func run(o Options, sig chan os.Signal, idleConnsCH chan struct{}) error { CustomHttpRoundTripperWrap: o.CustomHttpRoundTripperWrap, RateLimiters: ratelimitRegistry, EndpointRegistry: endpointRegistry, - EnablePassiveHealthCheck: passiveHealthCheckEnabled, - PassiveHealthCheck: passiveHealthCheck, } if o.EnableBreakers || len(o.BreakerSettings) > 0 {