Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,8 @@ func Middlewares(
}
return false
}
queryCacheMiddleware, cache, err := NewResultsCacheMiddleware(log, cfg.ResultsCacheConfig, constSplitter(cfg.SplitQueriesByInterval), limits, prometheusCodec, cacheExtractor, shouldCache, registerer)

Comment thread
afhassan marked this conversation as resolved.
queryCacheMiddleware, cache, err := NewResultsCacheMiddleware(log, cfg.ResultsCacheConfig, splitter(cfg.SplitQueriesByInterval), limits, prometheusCodec, cacheExtractor, shouldCache, registerer)
if err != nil {
return nil, nil, err
}
Expand Down
23 changes: 17 additions & 6 deletions pkg/querier/tripperware/queryrange/results_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/cortexproject/cortex/pkg/cortexpb"
"github.com/cortexproject/cortex/pkg/querier"
"github.com/cortexproject/cortex/pkg/querier/partialdata"
querier_stats "github.com/cortexproject/cortex/pkg/querier/stats"
"github.com/cortexproject/cortex/pkg/querier/tripperware"
"github.com/cortexproject/cortex/pkg/tenant"
"github.com/cortexproject/cortex/pkg/util/flagext"
Expand Down Expand Up @@ -141,15 +142,21 @@ func (PrometheusResponseExtractor) ResponseWithoutStats(resp tripperware.Respons
// CacheSplitter generates cache keys. This is a useful interface for downstream
// consumers who wish to implement their own strategies.
type CacheSplitter interface {
GenerateCacheKey(userID string, r tripperware.Request) string
GenerateCacheKey(ctx context.Context, userID string, r tripperware.Request) string
}

// constSplitter is a utility for using a constant split interval when determining cache keys
type constSplitter time.Duration
// splitter is a utility for using split interval when determining cache keys
type splitter time.Duration

// GenerateCacheKey generates a cache key based on the userID, Request and interval.
func (t constSplitter) GenerateCacheKey(userID string, r tripperware.Request) string {
currentInterval := r.GetStart() / int64(time.Duration(t)/time.Millisecond)
func (t splitter) GenerateCacheKey(ctx context.Context, userID string, r tripperware.Request) string {
stats := querier_stats.FromContext(ctx)
interval := stats.LoadSplitInterval()
if interval == 0 {
interval = time.Duration(t)
}

currentInterval := r.GetStart() / int64(interval/time.Millisecond)
return fmt.Sprintf("%s:%s:%d:%d", userID, r.GetQuery(), r.GetStep(), currentInterval)
}

Expand Down Expand Up @@ -232,8 +239,12 @@ func (s resultsCache) Do(ctx context.Context, r tripperware.Request) (tripperwar
return s.next.Do(ctx, r)
}

key := s.splitter.GenerateCacheKey(ctx, tenant.JoinTenantIDs(tenantIDs), r)
if err != nil {
return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error())
}

var (
key = s.splitter.GenerateCacheKey(tenant.JoinTenantIDs(tenantIDs), r)
extents []tripperware.Extent
response tripperware.Response
)
Expand Down
27 changes: 16 additions & 11 deletions pkg/querier/tripperware/queryrange/results_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ func TestStatsCacheQuerySamples(t *testing.T) {
rcm, _, err := NewResultsCacheMiddleware(
log.NewNopLogger(),
cfg,
constSplitter(day),
splitter(day),
mockLimits{},
PrometheusCodec,
PrometheusResponseExtractor{},
Expand Down Expand Up @@ -1258,7 +1258,7 @@ func TestResultsCache(t *testing.T) {
rcm, _, err := NewResultsCacheMiddleware(
log.NewNopLogger(),
cfg,
constSplitter(day),
splitter(day),
mockLimits{},
PrometheusCodec,
PrometheusResponseExtractor{},
Expand Down Expand Up @@ -1299,7 +1299,7 @@ func TestResultsCacheRecent(t *testing.T) {
rcm, _, err := NewResultsCacheMiddleware(
log.NewNopLogger(),
cfg,
constSplitter(day),
splitter(day),
mockLimits{maxCacheFreshness: 10 * time.Minute},
PrometheusCodec,
PrometheusResponseExtractor{},
Expand Down Expand Up @@ -1364,7 +1364,7 @@ func TestResultsCacheMaxFreshness(t *testing.T) {
rcm, _, err := NewResultsCacheMiddleware(
log.NewNopLogger(),
cfg,
constSplitter(day),
splitter(day),
fakeLimits,
PrometheusCodec,
PrometheusResponseExtractor{},
Expand All @@ -1381,7 +1381,7 @@ func TestResultsCacheMaxFreshness(t *testing.T) {
req := parsedRequest.WithStartEnd(int64(modelNow)-(50*1e3), int64(modelNow)-(10*1e3))

// fill cache
key := constSplitter(day).GenerateCacheKey("1", req)
key := splitter(day).GenerateCacheKey(ctx, "1", req)
rc.(*resultsCache).put(ctx, key, []tripperware.Extent{mkExtent(int64(modelNow)-(600*1e3), int64(modelNow))})

resp, err := rc.Do(ctx, req)
Expand All @@ -1401,7 +1401,7 @@ func Test_resultsCache_MissingData(t *testing.T) {
rm, _, err := NewResultsCacheMiddleware(
log.NewNopLogger(),
cfg,
constSplitter(day),
splitter(day),
mockLimits{},
PrometheusCodec,
PrometheusResponseExtractor{},
Expand Down Expand Up @@ -1438,7 +1438,7 @@ func Test_resultsCache_MissingData(t *testing.T) {
require.False(t, hit)
}

func TestConstSplitter_generateCacheKey(t *testing.T) {
func TestSplitter_generateCacheKey(t *testing.T) {
t.Parallel()

tests := []struct {
Expand All @@ -1460,7 +1460,10 @@ func TestConstSplitter_generateCacheKey(t *testing.T) {
tt := tt
t.Run(fmt.Sprintf("%s - %s", tt.name, tt.interval), func(t *testing.T) {
t.Parallel()
if got := constSplitter(tt.interval).GenerateCacheKey("fake", tt.r); got != tt.want {
ctx := user.InjectOrgID(context.Background(), "1")
got := splitter(tt.interval).GenerateCacheKey(ctx, "fake", tt.r)

if got != tt.want {
t.Errorf("generateKey() = %v, want %v", got, tt.want)
}
})
Expand Down Expand Up @@ -1513,7 +1516,7 @@ func TestResultsCacheShouldCacheFunc(t *testing.T) {
rcm, _, err := NewResultsCacheMiddleware(
log.NewNopLogger(),
cfg,
constSplitter(day),
splitter(day),
mockLimits{maxCacheFreshness: 10 * time.Minute},
PrometheusCodec,
PrometheusResponseExtractor{},
Expand Down Expand Up @@ -1545,7 +1548,7 @@ func TestResultsCacheFillCompatibility(t *testing.T) {
rcm, _, err := NewResultsCacheMiddleware(
log.NewNopLogger(),
cfg,
constSplitter(day),
splitter(day),
mockLimits{maxCacheFreshness: 10 * time.Minute},
PrometheusCodec,
PrometheusResponseExtractor{},
Expand All @@ -1563,7 +1566,9 @@ func TestResultsCacheFillCompatibility(t *testing.T) {
// Check cache and make sure we write response in old format even though the response is new format.
tenantIDs, err := tenant.TenantIDs(ctx)
require.NoError(t, err)
cacheKey := cache.HashKey(constSplitter(day).GenerateCacheKey(tenant.JoinTenantIDs(tenantIDs), parsedRequest))
key := splitter(day).GenerateCacheKey(ctx, tenant.JoinTenantIDs(tenantIDs), parsedRequest)

cacheKey := cache.HashKey(key)
found, bufs, _ := c.Fetch(ctx, []string{cacheKey})
require.Equal(t, []string{cacheKey}, found)
require.Len(t, bufs, 1)
Expand Down