From 258112621e4530ccd3fc06f87d142db9e5a24311 Mon Sep 17 00:00:00 2001 From: Justin Jung Date: Wed, 28 May 2025 08:29:00 -0700 Subject: [PATCH 1/8] Short-circuit replication set if 422 is returned Signed-off-by: Justin Jung --- pkg/ring/replication_set.go | 9 +++++++++ pkg/ring/replication_set_test.go | 15 +++++++++++++++ 2 files changed, 24 insertions(+) diff --git a/pkg/ring/replication_set.go b/pkg/ring/replication_set.go index 745c742f990..4c0012a84a9 100644 --- a/pkg/ring/replication_set.go +++ b/pkg/ring/replication_set.go @@ -3,9 +3,12 @@ package ring import ( "context" "fmt" + "net/http" "sort" "time" + "github.com/weaveworks/common/httpgrpc" + "github.com/cortexproject/cortex/pkg/querier/partialdata" ) @@ -80,6 +83,12 @@ func (r ReplicationSet) Do(ctx context.Context, delay time.Duration, zoneResults return nil, res.err } + if httpRes, ok := httpgrpc.HTTPResponseFromError(res.err); ok { + if httpRes.Code == http.StatusUnprocessableEntity { + return nil, res.err + } + } + // force one of the delayed requests to start if delay > 0 && r.MaxUnavailableZones == 0 { forceStart <- struct{}{} diff --git a/pkg/ring/replication_set_test.go b/pkg/ring/replication_set_test.go index d72e4cd5257..5efa875d53f 100644 --- a/pkg/ring/replication_set_test.go +++ b/pkg/ring/replication_set_test.go @@ -3,11 +3,13 @@ package ring import ( "context" "errors" + "net/http" "testing" "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/weaveworks/common/httpgrpc" "go.uber.org/atomic" "github.com/cortexproject/cortex/pkg/querier/partialdata" @@ -213,6 +215,19 @@ func TestReplicationSet_Do(t *testing.T) { expectedError: errZoneFailure, queryPartialData: true, }, + { + name: "with partial data enabled, should fail on instances returning 422 in one zone", + instances: []InstanceDesc{{Zone: "zone1"}, {Zone: "zone2"}, {Zone: "zone3"}}, + f: func(ctx context.Context, ing *InstanceDesc) (interface{}, error) { + if ing.Zone == "zone1" { + return nil, httpgrpc.Errorf(http.StatusUnprocessableEntity, "breached limit") + } + return 1, nil + }, + maxUnavailableZones: 1, + expectedError: httpgrpc.Errorf(http.StatusUnprocessableEntity, "breached limit"), + queryPartialData: true, + }, { name: "max unavailable zones = 1, should succeed on instances failing in 1 out of 3 zones (6 instances)", instances: []InstanceDesc{{Zone: "zone1"}, {Zone: "zone1"}, {Zone: "zone2"}, {Zone: "zone2"}, {Zone: "zone3"}, {Zone: "zone3"}}, From 28d0fd604193b56a1343b5799365ea2d2e18ab6a Mon Sep 17 00:00:00 2001 From: Justin Jung Date: Wed, 28 May 2025 08:29:31 -0700 Subject: [PATCH 2/8] Do not return partial data if at least half of fleet is down Signed-off-by: Justin Jung --- pkg/ring/replication_set_tracker.go | 8 ++++++-- pkg/ring/replication_set_tracker_test.go | 12 ++++++++++++ 2 files changed, 18 insertions(+), 2 deletions(-) diff --git a/pkg/ring/replication_set_tracker.go b/pkg/ring/replication_set_tracker.go index a0f594b442c..d06fb16129f 100644 --- a/pkg/ring/replication_set_tracker.go +++ b/pkg/ring/replication_set_tracker.go @@ -1,6 +1,8 @@ package ring -import "fmt" +import ( + "fmt" +) type replicationSetResultTracker interface { // Signals an instance has done the execution, either successful (no error) @@ -160,7 +162,9 @@ func (t *zoneAwareResultTracker) failed() bool { func (t *zoneAwareResultTracker) failedCompletely() bool { failedZones := len(t.failuresByZone) - return failedZones == t.zoneCount + allZonesFailed := failedZones == t.zoneCount + atLeastHalfOfFleetFailed := len(t.errors) >= t.numInstances/2 + return allZonesFailed || (t.failed() && atLeastHalfOfFleetFailed) } func (t *zoneAwareResultTracker) getResults() []interface{} { diff --git a/pkg/ring/replication_set_tracker_test.go b/pkg/ring/replication_set_tracker_test.go index ed782739c77..f41db46c685 100644 --- a/pkg/ring/replication_set_tracker_test.go +++ b/pkg/ring/replication_set_tracker_test.go @@ -467,6 +467,18 @@ func TestZoneAwareResultTracker(t *testing.T) { assert.True(t, tracker.failedCompletely()) }, }, + "failedCompletely() should return true if failed() is true and half of the fleet are unavailable": { + instances: []InstanceDesc{instance1, instance2, instance3, instance4, instance5, instance6}, + maxUnavailableZones: 1, + run: func(t *testing.T, tracker *zoneAwareResultTracker) { + tracker.done(&instance1, nil, errors.New("test")) // Zone-a + tracker.done(&instance2, nil, errors.New("test")) // Zone-a + tracker.done(&instance3, nil, errors.New("test")) // Zone-b + + assert.True(t, tracker.failed()) + assert.True(t, tracker.failedCompletely()) + }, + }, "finished() should return true only if all instances are done": { instances: []InstanceDesc{instance1, instance2}, maxUnavailableZones: 1, From be403486ee684172d1427554126002ab2b2bc758 Mon Sep 17 00:00:00 2001 From: Justin Jung Date: Wed, 28 May 2025 08:45:43 -0700 Subject: [PATCH 3/8] Add user and request info in partial data log Signed-off-by: Justin Jung --- pkg/distributor/query.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pkg/distributor/query.go b/pkg/distributor/query.go index 8de7630e755..4f7bf3377ad 100644 --- a/pkg/distributor/query.go +++ b/pkg/distributor/query.go @@ -22,6 +22,7 @@ import ( "github.com/cortexproject/cortex/pkg/util/extract" "github.com/cortexproject/cortex/pkg/util/grpcutil" "github.com/cortexproject/cortex/pkg/util/limiter" + util_log "github.com/cortexproject/cortex/pkg/util/log" "github.com/cortexproject/cortex/pkg/util/validation" ) @@ -330,7 +331,7 @@ func (d *Distributor) queryIngesterStream(ctx context.Context, replicationSet ri reqStats.AddFetchedSamples(uint64(resp.SamplesCount())) if partialdata.IsPartialDataError(err) { - level.Warn(d.log).Log("msg", "returning partial data", "err", err.Error()) + level.Warn(util_log.WithContext(ctx, d.log)).Log("msg", "returning partial data", "err", err.Error()) d.ingesterPartialDataQueries.Inc() return resp, err } From e8379dae97aa9e682c9ac87abf683668a9684b53 Mon Sep 17 00:00:00 2001 From: Justin Jung Date: Wed, 28 May 2025 08:49:31 -0700 Subject: [PATCH 4/8] No need to return all errors from ingesters to user Signed-off-by: Justin Jung --- pkg/distributor/query.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/distributor/query.go b/pkg/distributor/query.go index 4f7bf3377ad..9645a65672f 100644 --- a/pkg/distributor/query.go +++ b/pkg/distributor/query.go @@ -333,7 +333,7 @@ func (d *Distributor) queryIngesterStream(ctx context.Context, replicationSet ri if partialdata.IsPartialDataError(err) { level.Warn(util_log.WithContext(ctx, d.log)).Log("msg", "returning partial data", "err", err.Error()) d.ingesterPartialDataQueries.Inc() - return resp, err + return resp, partialdata.ErrPartialData } return resp, nil From 373395c7e90c7812eeb1fb621ec0c43fef340833 Mon Sep 17 00:00:00 2001 From: Justin Jung Date: Wed, 28 May 2025 09:32:05 -0700 Subject: [PATCH 5/8] Fix test Signed-off-by: Justin Jung --- pkg/ring/replication_set_test.go | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/pkg/ring/replication_set_test.go b/pkg/ring/replication_set_test.go index 5efa875d53f..b85e8384e57 100644 --- a/pkg/ring/replication_set_test.go +++ b/pkg/ring/replication_set_test.go @@ -198,12 +198,17 @@ func TestReplicationSet_Do(t *testing.T) { expectedError: errZoneFailure, }, { - name: "with partial data enabled and max unavailable zones = 1, should succeed on instances failing in 2 out of 3 zones (3 instances)", - instances: []InstanceDesc{{Addr: "10.0.0.1", Zone: "zone1"}, {Addr: "10.0.0.2", Zone: "zone2"}, {Addr: "10.0.0.3", Zone: "zone3"}}, - f: failingFunctionOnZones("zone1", "zone2"), + name: "with partial data enabled and max unavailable zones = 1, should succeed on instances failing in 2 out of 3 zones (6 instances)", + instances: []InstanceDesc{{Addr: "10.0.0.1", Zone: "zone1"}, {Addr: "10.0.0.2", Zone: "zone2"}, {Addr: "10.0.0.3", Zone: "zone3"}, {Addr: "10.0.0.4", Zone: "zone1"}, {Addr: "10.0.0.5", Zone: "zone2"}, {Addr: "10.0.0.6", Zone: "zone3"}}, + f: func(ctx context.Context, ing *InstanceDesc) (interface{}, error) { + if ing.Addr == "10.0.0.1" || ing.Addr == "10.0.0.2" { + return nil, errZoneFailure + } + return 1, nil + }, maxUnavailableZones: 1, queryPartialData: true, - want: []interface{}{1}, + want: []interface{}{1, 1, 1, 1}, expectedError: partialdata.ErrPartialData, errStrContains: []string{"10.0.0.1", "10.0.0.2", "zone failed"}, }, @@ -216,10 +221,10 @@ func TestReplicationSet_Do(t *testing.T) { queryPartialData: true, }, { - name: "with partial data enabled, should fail on instances returning 422 in one zone", + name: "with partial data enabled, should fail on instances returning 422 in two zones", instances: []InstanceDesc{{Zone: "zone1"}, {Zone: "zone2"}, {Zone: "zone3"}}, f: func(ctx context.Context, ing *InstanceDesc) (interface{}, error) { - if ing.Zone == "zone1" { + if ing.Zone == "zone1" || ing.Zone == "zone2" { return nil, httpgrpc.Errorf(http.StatusUnprocessableEntity, "breached limit") } return 1, nil @@ -281,7 +286,7 @@ func TestReplicationSet_Do(t *testing.T) { } got, err := r.Do(ctx, tt.delay, tt.zoneResultsQuorum, tt.queryPartialData, tt.f) if tt.expectedError != nil { - assert.ErrorIs(t, err, tt.expectedError) + assert.ErrorContains(t, err, tt.expectedError.Error()) for _, str := range tt.errStrContains { assert.ErrorContains(t, err, str) } From ef8473f4790d4fbd29553ce9c3ff71f5abcfdf36 Mon Sep 17 00:00:00 2001 From: Justin Jung Date: Wed, 28 May 2025 14:07:24 -0700 Subject: [PATCH 6/8] Add zone info in the log Signed-off-by: Justin Jung --- pkg/ring/replication_set_tracker.go | 2 +- pkg/ring/replication_set_tracker_test.go | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/ring/replication_set_tracker.go b/pkg/ring/replication_set_tracker.go index d06fb16129f..bc7401240d7 100644 --- a/pkg/ring/replication_set_tracker.go +++ b/pkg/ring/replication_set_tracker.go @@ -123,7 +123,7 @@ func newZoneAwareResultTracker(instances []InstanceDesc, maxUnavailableZones int func (t *zoneAwareResultTracker) done(instance *InstanceDesc, result interface{}, err error) { if err != nil { t.failuresByZone[instance.Zone]++ - t.errors = append(t.errors, fmt.Errorf("(%s) %w", instance.GetAddr(), err)) + t.errors = append(t.errors, fmt.Errorf("(%s, %s) %w", instance.GetAddr(), instance.GetZone(), err)) } else { if _, ok := t.resultsPerZone[instance.Zone]; !ok { // If it is the first result in the zone, then total number of instances diff --git a/pkg/ring/replication_set_tracker_test.go b/pkg/ring/replication_set_tracker_test.go index f41db46c685..e5ee5c9de16 100644 --- a/pkg/ring/replication_set_tracker_test.go +++ b/pkg/ring/replication_set_tracker_test.go @@ -495,11 +495,11 @@ func TestZoneAwareResultTracker(t *testing.T) { maxUnavailableZones: 1, run: func(t *testing.T, tracker *zoneAwareResultTracker) { tracker.done(&instance1, nil, errors.New("test1")) - err1 := fmt.Errorf("(%s) %w", instance1.GetAddr(), errors.New("test1")) + err1 := fmt.Errorf("(%s, %s) %w", instance1.GetAddr(), instance2.GetZone(), errors.New("test1")) assert.ElementsMatch(t, []error{err1}, tracker.getErrors()) tracker.done(&instance2, nil, errors.New("test2")) - err2 := fmt.Errorf("(%s) %w", instance2.GetAddr(), errors.New("test2")) + err2 := fmt.Errorf("(%s, %s) %w", instance2.GetAddr(), instance2.GetZone(), errors.New("test2")) assert.ElementsMatch(t, []error{err1, err2}, tracker.getErrors()) }, }, From 5a690fa1394b53b20407b3ec7bf97955db85df41 Mon Sep 17 00:00:00 2001 From: Justin Jung Date: Thu, 29 May 2025 08:47:12 -0700 Subject: [PATCH 7/8] Change condition to check validation.LimitError Signed-off-by: Justin Jung --- pkg/ring/replication_set.go | 10 +++------- pkg/ring/replication_set_test.go | 13 ++++++------- pkg/util/validation/limits.go | 5 +++++ pkg/util/validation/limits_test.go | 6 ++++++ 4 files changed, 20 insertions(+), 14 deletions(-) diff --git a/pkg/ring/replication_set.go b/pkg/ring/replication_set.go index 4c0012a84a9..31e4dc016fa 100644 --- a/pkg/ring/replication_set.go +++ b/pkg/ring/replication_set.go @@ -3,13 +3,11 @@ package ring import ( "context" "fmt" - "net/http" "sort" "time" - "github.com/weaveworks/common/httpgrpc" - "github.com/cortexproject/cortex/pkg/querier/partialdata" + "github.com/cortexproject/cortex/pkg/util/validation" ) // ReplicationSet describes the instances to talk to for a given key, and how @@ -83,10 +81,8 @@ func (r ReplicationSet) Do(ctx context.Context, delay time.Duration, zoneResults return nil, res.err } - if httpRes, ok := httpgrpc.HTTPResponseFromError(res.err); ok { - if httpRes.Code == http.StatusUnprocessableEntity { - return nil, res.err - } + if validation.IsLimitError(res.err) { + return nil, res.err } // force one of the delayed requests to start diff --git a/pkg/ring/replication_set_test.go b/pkg/ring/replication_set_test.go index b85e8384e57..401ec7d4094 100644 --- a/pkg/ring/replication_set_test.go +++ b/pkg/ring/replication_set_test.go @@ -3,16 +3,15 @@ package ring import ( "context" "errors" - "net/http" "testing" "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "github.com/weaveworks/common/httpgrpc" "go.uber.org/atomic" "github.com/cortexproject/cortex/pkg/querier/partialdata" + "github.com/cortexproject/cortex/pkg/util/validation" ) func TestReplicationSet_GetAddresses(t *testing.T) { @@ -221,16 +220,16 @@ func TestReplicationSet_Do(t *testing.T) { queryPartialData: true, }, { - name: "with partial data enabled, should fail on instances returning 422 in two zones", - instances: []InstanceDesc{{Zone: "zone1"}, {Zone: "zone2"}, {Zone: "zone3"}}, + name: "with partial data enabled, should fail on instances returning 422", + instances: []InstanceDesc{{Addr: "1", Zone: "zone1"}, {Addr: "2", Zone: "zone2"}, {Addr: "3", Zone: "zone3"}, {Addr: "4", Zone: "zone1"}, {Addr: "5", Zone: "zone2"}, {Addr: "6", Zone: "zone3"}}, f: func(ctx context.Context, ing *InstanceDesc) (interface{}, error) { - if ing.Zone == "zone1" || ing.Zone == "zone2" { - return nil, httpgrpc.Errorf(http.StatusUnprocessableEntity, "breached limit") + if ing.Addr == "1" || ing.Addr == "2" { + return nil, validation.LimitError("limit breached") } return 1, nil }, maxUnavailableZones: 1, - expectedError: httpgrpc.Errorf(http.StatusUnprocessableEntity, "breached limit"), + expectedError: validation.LimitError("limit breached"), queryPartialData: true, }, { diff --git a/pkg/util/validation/limits.go b/pkg/util/validation/limits.go index 8d46235ebe6..6419dc6ba89 100644 --- a/pkg/util/validation/limits.go +++ b/pkg/util/validation/limits.go @@ -50,6 +50,11 @@ func (e LimitError) Error() string { return string(e) } +func IsLimitError(e error) bool { + var limitError LimitError + return errors.As(e, &limitError) +} + type DisabledRuleGroup struct { Namespace string `yaml:"namespace" doc:"nocli|description=namespace in which the rule group belongs"` Name string `yaml:"name" doc:"nocli|description=name of the rule group"` diff --git a/pkg/util/validation/limits_test.go b/pkg/util/validation/limits_test.go index 414cb3e8d45..aec6e010651 100644 --- a/pkg/util/validation/limits_test.go +++ b/pkg/util/validation/limits_test.go @@ -2,6 +2,7 @@ package validation import ( "encoding/json" + "fmt" "reflect" "regexp" "strings" @@ -885,3 +886,8 @@ func TestLimitsPerLabelSetsForSeries(t *testing.T) { }) } } + +func TestIsLimitsError(t *testing.T) { + assert.False(t, IsLimitError(fmt.Errorf("test error"))) + assert.True(t, IsLimitError(LimitError("test error"))) +} From a20f985dd42c72c2a4e7349da8437933989192f0 Mon Sep 17 00:00:00 2001 From: Justin Jung Date: Thu, 29 May 2025 12:31:44 -0700 Subject: [PATCH 8/8] Nit Signed-off-by: Justin Jung --- pkg/util/validation/limits_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/util/validation/limits_test.go b/pkg/util/validation/limits_test.go index aec6e010651..df7760e3829 100644 --- a/pkg/util/validation/limits_test.go +++ b/pkg/util/validation/limits_test.go @@ -887,7 +887,7 @@ func TestLimitsPerLabelSetsForSeries(t *testing.T) { } } -func TestIsLimitsError(t *testing.T) { +func TestIsLimitError(t *testing.T) { assert.False(t, IsLimitError(fmt.Errorf("test error"))) assert.True(t, IsLimitError(LimitError("test error"))) }