From 8ddf0ec1fdf44f8dc56cf668cf20614eaa5c8131 Mon Sep 17 00:00:00 2001 From: Ahmed Hassan Date: Fri, 14 Feb 2025 15:36:44 -0800 Subject: [PATCH 1/3] add dynamic interval cache splitter Signed-off-by: Ahmed Hassan --- .../queryrange/query_range_middlewares.go | 5 +++++ .../tripperware/queryrange/results_cache.go | 12 ++++++++---- .../queryrange/results_cache_test.go | 15 ++++++++++++--- .../queryrange/split_by_interval.go | 19 +++++++++++++++++++ 4 files changed, 44 insertions(+), 7 deletions(-) diff --git a/pkg/querier/tripperware/queryrange/query_range_middlewares.go b/pkg/querier/tripperware/queryrange/query_range_middlewares.go index 1a79b6f6569..48830ac3f08 100644 --- a/pkg/querier/tripperware/queryrange/query_range_middlewares.go +++ b/pkg/querier/tripperware/queryrange/query_range_middlewares.go @@ -124,7 +124,12 @@ func Middlewares( } return false } + queryCacheMiddleware, cache, err := NewResultsCacheMiddleware(log, cfg.ResultsCacheConfig, constSplitter(cfg.SplitQueriesByInterval), limits, prometheusCodec, cacheExtractor, shouldCache, registerer) + if cfg.DynamicQuerySplitsConfig.MaxShardsPerQuery > 0 || cfg.DynamicQuerySplitsConfig.MaxFetchedDataDurationPerQuery > 0 { + queryCacheMiddleware, cache, err = NewResultsCacheMiddleware(log, cfg.ResultsCacheConfig, dynamicIntervalSplitter{cfg, limits, queryAnalyzer, lookbackDelta}, limits, prometheusCodec, cacheExtractor, shouldCache, registerer) + } + if err != nil { return nil, nil, err } diff --git a/pkg/querier/tripperware/queryrange/results_cache.go b/pkg/querier/tripperware/queryrange/results_cache.go index d065ac9201b..ed9e4ddbaf7 100644 --- a/pkg/querier/tripperware/queryrange/results_cache.go +++ b/pkg/querier/tripperware/queryrange/results_cache.go @@ -141,16 +141,16 @@ func (PrometheusResponseExtractor) ResponseWithoutStats(resp tripperware.Respons // CacheSplitter generates cache keys. This is a useful interface for downstream // consumers who wish to implement their own strategies. type CacheSplitter interface { - GenerateCacheKey(userID string, r tripperware.Request) string + GenerateCacheKey(userID string, ctx context.Context, r tripperware.Request) (string, error) } // constSplitter is a utility for using a constant split interval when determining cache keys type constSplitter time.Duration // GenerateCacheKey generates a cache key based on the userID, Request and interval. -func (t constSplitter) GenerateCacheKey(userID string, r tripperware.Request) string { +func (t constSplitter) GenerateCacheKey(userID string, ctx context.Context, r tripperware.Request) (string, error) { currentInterval := r.GetStart() / int64(time.Duration(t)/time.Millisecond) - return fmt.Sprintf("%s:%s:%d:%d", userID, r.GetQuery(), r.GetStep(), currentInterval) + return fmt.Sprintf("%s:%s:%d:%d", userID, r.GetQuery(), r.GetStep(), currentInterval), nil } // ShouldCacheFn checks whether the current request should go to cache @@ -232,8 +232,12 @@ func (s resultsCache) Do(ctx context.Context, r tripperware.Request) (tripperwar return s.next.Do(ctx, r) } + key, err := s.splitter.GenerateCacheKey(tenant.JoinTenantIDs(tenantIDs), ctx, r) + if err != nil { + return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error()) + } + var ( - key = s.splitter.GenerateCacheKey(tenant.JoinTenantIDs(tenantIDs), r) extents []tripperware.Extent response tripperware.Response ) diff --git a/pkg/querier/tripperware/queryrange/results_cache_test.go b/pkg/querier/tripperware/queryrange/results_cache_test.go index 1b448f371f0..640206fa95a 100644 --- a/pkg/querier/tripperware/queryrange/results_cache_test.go +++ b/pkg/querier/tripperware/queryrange/results_cache_test.go @@ -1381,7 +1381,9 @@ func TestResultsCacheMaxFreshness(t *testing.T) { req := parsedRequest.WithStartEnd(int64(modelNow)-(50*1e3), int64(modelNow)-(10*1e3)) // fill cache - key := constSplitter(day).GenerateCacheKey("1", req) + key, err := constSplitter(day).GenerateCacheKey("1", ctx, req) + require.NoError(t, err) + rc.(*resultsCache).put(ctx, key, []tripperware.Extent{mkExtent(int64(modelNow)-(600*1e3), int64(modelNow))}) resp, err := rc.Do(ctx, req) @@ -1460,7 +1462,11 @@ func TestConstSplitter_generateCacheKey(t *testing.T) { tt := tt t.Run(fmt.Sprintf("%s - %s", tt.name, tt.interval), func(t *testing.T) { t.Parallel() - if got := constSplitter(tt.interval).GenerateCacheKey("fake", tt.r); got != tt.want { + ctx := user.InjectOrgID(context.Background(), "1") + got, err := constSplitter(tt.interval).GenerateCacheKey("fake", ctx, tt.r) + require.NoError(t, err) + + if got != tt.want { t.Errorf("generateKey() = %v, want %v", got, tt.want) } }) @@ -1563,7 +1569,10 @@ func TestResultsCacheFillCompatibility(t *testing.T) { // Check cache and make sure we write response in old format even though the response is new format. tenantIDs, err := tenant.TenantIDs(ctx) require.NoError(t, err) - cacheKey := cache.HashKey(constSplitter(day).GenerateCacheKey(tenant.JoinTenantIDs(tenantIDs), parsedRequest)) + key, err := constSplitter(day).GenerateCacheKey(tenant.JoinTenantIDs(tenantIDs), ctx, parsedRequest) + require.NoError(t, err) + + cacheKey := cache.HashKey(key) found, bufs, _ := c.Fetch(ctx, []string{cacheKey}) require.Equal(t, []string{cacheKey}, found) require.Len(t, bufs, 1) diff --git a/pkg/querier/tripperware/queryrange/split_by_interval.go b/pkg/querier/tripperware/queryrange/split_by_interval.go index d7f7b18a88b..b23329cea32 100644 --- a/pkg/querier/tripperware/queryrange/split_by_interval.go +++ b/pkg/querier/tripperware/queryrange/split_by_interval.go @@ -2,6 +2,7 @@ package queryrange import ( "context" + "fmt" "net/http" "time" @@ -364,6 +365,24 @@ func analyzeDurationFetchedByQueryExpr(expr parser.Expr, queryStart int64, query return time.Duration(durationFetchedByRangeCount) * baseInterval, time.Duration(durationFetchedBySelectorsCount) * baseInterval, time.Duration(durationFetchedByLookbackDeltaCount) * baseInterval } +// dynamicIntervalSplitter is a utility for using a dynamic split interval when determining cache keys +type dynamicIntervalSplitter struct { + cfg Config + limits tripperware.Limits + queryAnalyzer querysharding.Analyzer + lookbackDelta time.Duration +} + +// GenerateCacheKey generates a cache key based on the userID, Request and interval. +func (c dynamicIntervalSplitter) GenerateCacheKey(userID string, ctx context.Context, r tripperware.Request) (string, error) { + interval, err := dynamicIntervalFn(c.cfg, c.limits, c.queryAnalyzer, c.lookbackDelta)(ctx, r) + if err != nil { + return "", err + } + currentInterval := r.GetStart() / int64(interval/time.Millisecond) + return fmt.Sprintf("%s:%s:%d:%d", userID, r.GetQuery(), r.GetStep(), currentInterval), nil +} + func floorDiv(a, b int64) int64 { if a < 0 && a%b != 0 { return a/b - 1 From b7e97cbb461339ff97a351f85d0bc95b8c14f0dc Mon Sep 17 00:00:00 2001 From: Ahmed Hassan Date: Mon, 17 Feb 2025 14:22:46 -0800 Subject: [PATCH 2/3] use split interval from stats to generate cache key Signed-off-by: Ahmed Hassan --- .../queryrange/query_range_middlewares.go | 6 +---- .../tripperware/queryrange/results_cache.go | 21 ++++++++++----- .../queryrange/results_cache_test.go | 26 ++++++++----------- .../queryrange/split_by_interval.go | 19 -------------- 4 files changed, 26 insertions(+), 46 deletions(-) diff --git a/pkg/querier/tripperware/queryrange/query_range_middlewares.go b/pkg/querier/tripperware/queryrange/query_range_middlewares.go index 48830ac3f08..1a76f558e42 100644 --- a/pkg/querier/tripperware/queryrange/query_range_middlewares.go +++ b/pkg/querier/tripperware/queryrange/query_range_middlewares.go @@ -125,11 +125,7 @@ func Middlewares( return false } - queryCacheMiddleware, cache, err := NewResultsCacheMiddleware(log, cfg.ResultsCacheConfig, constSplitter(cfg.SplitQueriesByInterval), limits, prometheusCodec, cacheExtractor, shouldCache, registerer) - if cfg.DynamicQuerySplitsConfig.MaxShardsPerQuery > 0 || cfg.DynamicQuerySplitsConfig.MaxFetchedDataDurationPerQuery > 0 { - queryCacheMiddleware, cache, err = NewResultsCacheMiddleware(log, cfg.ResultsCacheConfig, dynamicIntervalSplitter{cfg, limits, queryAnalyzer, lookbackDelta}, limits, prometheusCodec, cacheExtractor, shouldCache, registerer) - } - + queryCacheMiddleware, cache, err := NewResultsCacheMiddleware(log, cfg.ResultsCacheConfig, splitter(cfg.SplitQueriesByInterval), limits, prometheusCodec, cacheExtractor, shouldCache, registerer) if err != nil { return nil, nil, err } diff --git a/pkg/querier/tripperware/queryrange/results_cache.go b/pkg/querier/tripperware/queryrange/results_cache.go index ed9e4ddbaf7..a971ff261cb 100644 --- a/pkg/querier/tripperware/queryrange/results_cache.go +++ b/pkg/querier/tripperware/queryrange/results_cache.go @@ -29,6 +29,7 @@ import ( "github.com/cortexproject/cortex/pkg/cortexpb" "github.com/cortexproject/cortex/pkg/querier" "github.com/cortexproject/cortex/pkg/querier/partialdata" + querier_stats "github.com/cortexproject/cortex/pkg/querier/stats" "github.com/cortexproject/cortex/pkg/querier/tripperware" "github.com/cortexproject/cortex/pkg/tenant" "github.com/cortexproject/cortex/pkg/util/flagext" @@ -141,16 +142,22 @@ func (PrometheusResponseExtractor) ResponseWithoutStats(resp tripperware.Respons // CacheSplitter generates cache keys. This is a useful interface for downstream // consumers who wish to implement their own strategies. type CacheSplitter interface { - GenerateCacheKey(userID string, ctx context.Context, r tripperware.Request) (string, error) + GenerateCacheKey(ctx context.Context, userID string, r tripperware.Request) string } -// constSplitter is a utility for using a constant split interval when determining cache keys -type constSplitter time.Duration +// splitter is a utility for using split interval when determining cache keys +type splitter time.Duration // GenerateCacheKey generates a cache key based on the userID, Request and interval. -func (t constSplitter) GenerateCacheKey(userID string, ctx context.Context, r tripperware.Request) (string, error) { - currentInterval := r.GetStart() / int64(time.Duration(t)/time.Millisecond) - return fmt.Sprintf("%s:%s:%d:%d", userID, r.GetQuery(), r.GetStep(), currentInterval), nil +func (t splitter) GenerateCacheKey(ctx context.Context, userID string, r tripperware.Request) string { + stats := querier_stats.FromContext(ctx) + interval := stats.LoadSplitInterval() + if interval == 0 { + interval = time.Duration(t) + } + + currentInterval := r.GetStart() / int64(interval/time.Millisecond) + return fmt.Sprintf("%s:%s:%d:%d", userID, r.GetQuery(), r.GetStep(), currentInterval) } // ShouldCacheFn checks whether the current request should go to cache @@ -232,7 +239,7 @@ func (s resultsCache) Do(ctx context.Context, r tripperware.Request) (tripperwar return s.next.Do(ctx, r) } - key, err := s.splitter.GenerateCacheKey(tenant.JoinTenantIDs(tenantIDs), ctx, r) + key := s.splitter.GenerateCacheKey(ctx, tenant.JoinTenantIDs(tenantIDs), r) if err != nil { return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error()) } diff --git a/pkg/querier/tripperware/queryrange/results_cache_test.go b/pkg/querier/tripperware/queryrange/results_cache_test.go index 640206fa95a..46422768b87 100644 --- a/pkg/querier/tripperware/queryrange/results_cache_test.go +++ b/pkg/querier/tripperware/queryrange/results_cache_test.go @@ -308,7 +308,7 @@ func TestStatsCacheQuerySamples(t *testing.T) { rcm, _, err := NewResultsCacheMiddleware( log.NewNopLogger(), cfg, - constSplitter(day), + splitter(day), mockLimits{}, PrometheusCodec, PrometheusResponseExtractor{}, @@ -1258,7 +1258,7 @@ func TestResultsCache(t *testing.T) { rcm, _, err := NewResultsCacheMiddleware( log.NewNopLogger(), cfg, - constSplitter(day), + splitter(day), mockLimits{}, PrometheusCodec, PrometheusResponseExtractor{}, @@ -1299,7 +1299,7 @@ func TestResultsCacheRecent(t *testing.T) { rcm, _, err := NewResultsCacheMiddleware( log.NewNopLogger(), cfg, - constSplitter(day), + splitter(day), mockLimits{maxCacheFreshness: 10 * time.Minute}, PrometheusCodec, PrometheusResponseExtractor{}, @@ -1364,7 +1364,7 @@ func TestResultsCacheMaxFreshness(t *testing.T) { rcm, _, err := NewResultsCacheMiddleware( log.NewNopLogger(), cfg, - constSplitter(day), + splitter(day), fakeLimits, PrometheusCodec, PrometheusResponseExtractor{}, @@ -1381,9 +1381,7 @@ func TestResultsCacheMaxFreshness(t *testing.T) { req := parsedRequest.WithStartEnd(int64(modelNow)-(50*1e3), int64(modelNow)-(10*1e3)) // fill cache - key, err := constSplitter(day).GenerateCacheKey("1", ctx, req) - require.NoError(t, err) - + key := splitter(day).GenerateCacheKey(ctx, "1", req) rc.(*resultsCache).put(ctx, key, []tripperware.Extent{mkExtent(int64(modelNow)-(600*1e3), int64(modelNow))}) resp, err := rc.Do(ctx, req) @@ -1403,7 +1401,7 @@ func Test_resultsCache_MissingData(t *testing.T) { rm, _, err := NewResultsCacheMiddleware( log.NewNopLogger(), cfg, - constSplitter(day), + splitter(day), mockLimits{}, PrometheusCodec, PrometheusResponseExtractor{}, @@ -1440,7 +1438,7 @@ func Test_resultsCache_MissingData(t *testing.T) { require.False(t, hit) } -func TestConstSplitter_generateCacheKey(t *testing.T) { +func TestSplitter_generateCacheKey(t *testing.T) { t.Parallel() tests := []struct { @@ -1463,8 +1461,7 @@ func TestConstSplitter_generateCacheKey(t *testing.T) { t.Run(fmt.Sprintf("%s - %s", tt.name, tt.interval), func(t *testing.T) { t.Parallel() ctx := user.InjectOrgID(context.Background(), "1") - got, err := constSplitter(tt.interval).GenerateCacheKey("fake", ctx, tt.r) - require.NoError(t, err) + got := splitter(tt.interval).GenerateCacheKey(ctx, "fake", tt.r) if got != tt.want { t.Errorf("generateKey() = %v, want %v", got, tt.want) @@ -1519,7 +1516,7 @@ func TestResultsCacheShouldCacheFunc(t *testing.T) { rcm, _, err := NewResultsCacheMiddleware( log.NewNopLogger(), cfg, - constSplitter(day), + splitter(day), mockLimits{maxCacheFreshness: 10 * time.Minute}, PrometheusCodec, PrometheusResponseExtractor{}, @@ -1551,7 +1548,7 @@ func TestResultsCacheFillCompatibility(t *testing.T) { rcm, _, err := NewResultsCacheMiddleware( log.NewNopLogger(), cfg, - constSplitter(day), + splitter(day), mockLimits{maxCacheFreshness: 10 * time.Minute}, PrometheusCodec, PrometheusResponseExtractor{}, @@ -1569,8 +1566,7 @@ func TestResultsCacheFillCompatibility(t *testing.T) { // Check cache and make sure we write response in old format even though the response is new format. tenantIDs, err := tenant.TenantIDs(ctx) require.NoError(t, err) - key, err := constSplitter(day).GenerateCacheKey(tenant.JoinTenantIDs(tenantIDs), ctx, parsedRequest) - require.NoError(t, err) + key := splitter(day).GenerateCacheKey(ctx, tenant.JoinTenantIDs(tenantIDs), parsedRequest) cacheKey := cache.HashKey(key) found, bufs, _ := c.Fetch(ctx, []string{cacheKey}) diff --git a/pkg/querier/tripperware/queryrange/split_by_interval.go b/pkg/querier/tripperware/queryrange/split_by_interval.go index b23329cea32..d7f7b18a88b 100644 --- a/pkg/querier/tripperware/queryrange/split_by_interval.go +++ b/pkg/querier/tripperware/queryrange/split_by_interval.go @@ -2,7 +2,6 @@ package queryrange import ( "context" - "fmt" "net/http" "time" @@ -365,24 +364,6 @@ func analyzeDurationFetchedByQueryExpr(expr parser.Expr, queryStart int64, query return time.Duration(durationFetchedByRangeCount) * baseInterval, time.Duration(durationFetchedBySelectorsCount) * baseInterval, time.Duration(durationFetchedByLookbackDeltaCount) * baseInterval } -// dynamicIntervalSplitter is a utility for using a dynamic split interval when determining cache keys -type dynamicIntervalSplitter struct { - cfg Config - limits tripperware.Limits - queryAnalyzer querysharding.Analyzer - lookbackDelta time.Duration -} - -// GenerateCacheKey generates a cache key based on the userID, Request and interval. -func (c dynamicIntervalSplitter) GenerateCacheKey(userID string, ctx context.Context, r tripperware.Request) (string, error) { - interval, err := dynamicIntervalFn(c.cfg, c.limits, c.queryAnalyzer, c.lookbackDelta)(ctx, r) - if err != nil { - return "", err - } - currentInterval := r.GetStart() / int64(interval/time.Millisecond) - return fmt.Sprintf("%s:%s:%d:%d", userID, r.GetQuery(), r.GetStep(), currentInterval), nil -} - func floorDiv(a, b int64) int64 { if a < 0 && a%b != 0 { return a/b - 1 From dd317446175c8e970a655e76a925f42ed6d07e08 Mon Sep 17 00:00:00 2001 From: Ahmed Hassan Date: Mon, 17 Feb 2025 14:46:37 -0800 Subject: [PATCH 3/3] rerun tests Signed-off-by: Ahmed Hassan