From a20aed5ad7f1190aee8e6b963bfcc97d53208dc7 Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Tue, 15 Apr 2025 02:20:17 +0000 Subject: [PATCH 1/2] fix query frontend per tenant metrics leak when cleaning up user labels Signed-off-by: Ben Ye --- pkg/frontend/transport/handler.go | 48 ++++++++--- pkg/frontend/transport/handler_test.go | 106 +++++++++++++++++++++++++ 2 files changed, 142 insertions(+), 12 deletions(-) diff --git a/pkg/frontend/transport/handler.go b/pkg/frontend/transport/handler.go index c1c6ce05d73..da8a4a1ff49 100644 --- a/pkg/frontend/transport/handler.go +++ b/pkg/frontend/transport/handler.go @@ -167,18 +167,7 @@ func NewHandler(cfg HandlerConfig, tenantFederationCfg tenantfederation.Config, []string{"reason", "source", "user"}, ) - h.activeUsers = util.NewActiveUsersCleanupWithDefaultValues(func(user string) { - h.querySeconds.DeleteLabelValues(user) - h.queryFetchedSeries.DeleteLabelValues(user) - h.queryFetchedSamples.DeleteLabelValues(user) - h.queryScannedSamples.DeleteLabelValues(user) - h.queryPeakSamples.DeleteLabelValues(user) - h.queryChunkBytes.DeleteLabelValues(user) - h.queryDataBytes.DeleteLabelValues(user) - if err := util.DeleteMatchingLabels(h.rejectedQueries, map[string]string{"user": user}); err != nil { - level.Warn(log).Log("msg", "failed to remove cortex_rejected_queries_total metric for user", "user", user, "err", err) - } - }) + h.activeUsers = util.NewActiveUsersCleanupWithDefaultValues(h.cleanupMetricsForInactiveUser) // If cleaner stops or fail, we will simply not clean the metrics for inactive users. _ = h.activeUsers.StartAsync(context.Background()) } @@ -186,6 +175,41 @@ func NewHandler(cfg HandlerConfig, tenantFederationCfg tenantfederation.Config, return h } +func (h *Handler) cleanupMetricsForInactiveUser(user string) { + if !h.cfg.QueryStatsEnabled { + return + } + + // Create a map with the user label to match + userLabel := map[string]string{"user": user} + + // Clean up all metrics for the user + if err := util.DeleteMatchingLabels(h.querySeconds, userLabel); err != nil { + level.Warn(h.log).Log("msg", "failed to remove cortex_query_seconds_total metric for user", "user", user, "err", err) + } + if err := util.DeleteMatchingLabels(h.queryFetchedSeries, userLabel); err != nil { + level.Warn(h.log).Log("msg", "failed to remove cortex_query_fetched_series_total metric for user", "user", user, "err", err) + } + if err := util.DeleteMatchingLabels(h.queryFetchedSamples, userLabel); err != nil { + level.Warn(h.log).Log("msg", "failed to remove cortex_query_samples_total metric for user", "user", user, "err", err) + } + if err := util.DeleteMatchingLabels(h.queryScannedSamples, userLabel); err != nil { + level.Warn(h.log).Log("msg", "failed to remove cortex_query_samples_scanned_total metric for user", "user", user, "err", err) + } + if err := util.DeleteMatchingLabels(h.queryPeakSamples, userLabel); err != nil { + level.Warn(h.log).Log("msg", "failed to remove cortex_query_peak_samples metric for user", "user", user, "err", err) + } + if err := util.DeleteMatchingLabels(h.queryChunkBytes, userLabel); err != nil { + level.Warn(h.log).Log("msg", "failed to remove cortex_query_fetched_chunks_bytes_total metric for user", "user", user, "err", err) + } + if err := util.DeleteMatchingLabels(h.queryDataBytes, userLabel); err != nil { + level.Warn(h.log).Log("msg", "failed to remove cortex_query_fetched_data_bytes_total metric for user", "user", user, "err", err) + } + if err := util.DeleteMatchingLabels(h.rejectedQueries, userLabel); err != nil { + level.Warn(h.log).Log("msg", "failed to remove cortex_rejected_queries_total metric for user", "user", user, "err", err) + } +} + func (f *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { var ( stats *querier_stats.QueryStats diff --git a/pkg/frontend/transport/handler_test.go b/pkg/frontend/transport/handler_test.go index a7b91ad2407..5482732fea8 100644 --- a/pkg/frontend/transport/handler_test.go +++ b/pkg/frontend/transport/handler_test.go @@ -637,3 +637,109 @@ func Test_TenantFederation_MaxTenant(t *testing.T) { }) } } + +func TestHandlerMetricsCleanup(t *testing.T) { + reg := prometheus.NewPedanticRegistry() + handler := NewHandler(HandlerConfig{QueryStatsEnabled: true}, tenantfederation.Config{}, http.DefaultTransport, log.NewNopLogger(), reg) + + user1 := "user1" + user2 := "user2" + source := "api" + + // Simulate activity for user1 + handler.querySeconds.WithLabelValues(source, user1).Add(1.0) + handler.queryFetchedSeries.WithLabelValues(source, user1).Add(100) + handler.queryFetchedSamples.WithLabelValues(source, user1).Add(1000) + handler.queryScannedSamples.WithLabelValues(source, user1).Add(2000) + handler.queryPeakSamples.WithLabelValues(source, user1).Observe(500) + handler.queryChunkBytes.WithLabelValues(source, user1).Add(1024) + handler.queryDataBytes.WithLabelValues(source, user1).Add(2048) + handler.rejectedQueries.WithLabelValues(reasonTooManySamples, source, user1).Add(5) + + // Simulate activity for user2 + handler.querySeconds.WithLabelValues(source, user2).Add(2.0) + handler.queryFetchedSeries.WithLabelValues(source, user2).Add(200) + handler.queryFetchedSamples.WithLabelValues(source, user2).Add(2000) + handler.queryScannedSamples.WithLabelValues(source, user2).Add(4000) + handler.queryPeakSamples.WithLabelValues(source, user2).Observe(1000) + handler.queryChunkBytes.WithLabelValues(source, user2).Add(2048) + handler.queryDataBytes.WithLabelValues(source, user2).Add(4096) + handler.rejectedQueries.WithLabelValues(reasonTooManySamples, source, user2).Add(10) + + // Verify initial state - both users should have metrics + require.NoError(t, promtest.GatherAndCompare(reg, strings.NewReader(` + # HELP cortex_query_seconds_total Total amount of wall clock time spend processing queries. + # TYPE cortex_query_seconds_total counter + cortex_query_seconds_total{source="api",user="user1"} 1 + cortex_query_seconds_total{source="api",user="user2"} 2 + # HELP cortex_query_fetched_series_total Number of series fetched to execute a query. + # TYPE cortex_query_fetched_series_total counter + cortex_query_fetched_series_total{source="api",user="user1"} 100 + cortex_query_fetched_series_total{source="api",user="user2"} 200 + # HELP cortex_query_samples_total Number of samples fetched to execute a query. + # TYPE cortex_query_samples_total counter + cortex_query_samples_total{source="api",user="user1"} 1000 + cortex_query_samples_total{source="api",user="user2"} 2000 + # HELP cortex_query_samples_scanned_total Number of samples scanned to execute a query. + # TYPE cortex_query_samples_scanned_total counter + cortex_query_samples_scanned_total{source="api",user="user1"} 2000 + cortex_query_samples_scanned_total{source="api",user="user2"} 4000 + # HELP cortex_query_peak_samples Highest count of samples considered to execute a query. + # TYPE cortex_query_peak_samples histogram + cortex_query_peak_samples_bucket{source="api",user="user1",le="+Inf"} 1 + cortex_query_peak_samples_sum{source="api",user="user1"} 500 + cortex_query_peak_samples_count{source="api",user="user1"} 1 + cortex_query_peak_samples_bucket{source="api",user="user2",le="+Inf"} 1 + cortex_query_peak_samples_sum{source="api",user="user2"} 1000 + cortex_query_peak_samples_count{source="api",user="user2"} 1 + # HELP cortex_query_fetched_chunks_bytes_total Size of all chunks fetched to execute a query in bytes. + # TYPE cortex_query_fetched_chunks_bytes_total counter + cortex_query_fetched_chunks_bytes_total{source="api",user="user1"} 1024 + cortex_query_fetched_chunks_bytes_total{source="api",user="user2"} 2048 + # HELP cortex_query_fetched_data_bytes_total Size of all data fetched to execute a query in bytes. + # TYPE cortex_query_fetched_data_bytes_total counter + cortex_query_fetched_data_bytes_total{source="api",user="user1"} 2048 + cortex_query_fetched_data_bytes_total{source="api",user="user2"} 4096 + # HELP cortex_rejected_queries_total The total number of queries that were rejected. + # TYPE cortex_rejected_queries_total counter + cortex_rejected_queries_total{reason="too_many_samples",source="api",user="user1"} 5 + cortex_rejected_queries_total{reason="too_many_samples",source="api",user="user2"} 10 + `), "cortex_query_seconds_total", "cortex_query_fetched_series_total", "cortex_query_samples_total", + "cortex_query_samples_scanned_total", "cortex_query_peak_samples", "cortex_query_fetched_chunks_bytes_total", + "cortex_query_fetched_data_bytes_total", "cortex_rejected_queries_total")) + + // Clean up metrics for user1 + handler.cleanupMetricsForInactiveUser(user1) + + // Verify final state - only user2 should have metrics + require.NoError(t, promtest.GatherAndCompare(reg, strings.NewReader(` + # HELP cortex_query_seconds_total Total amount of wall clock time spend processing queries. + # TYPE cortex_query_seconds_total counter + cortex_query_seconds_total{source="api",user="user2"} 2 + # HELP cortex_query_fetched_series_total Number of series fetched to execute a query. + # TYPE cortex_query_fetched_series_total counter + cortex_query_fetched_series_total{source="api",user="user2"} 200 + # HELP cortex_query_samples_total Number of samples fetched to execute a query. + # TYPE cortex_query_samples_total counter + cortex_query_samples_total{source="api",user="user2"} 2000 + # HELP cortex_query_samples_scanned_total Number of samples scanned to execute a query. + # TYPE cortex_query_samples_scanned_total counter + cortex_query_samples_scanned_total{source="api",user="user2"} 4000 + # HELP cortex_query_peak_samples Highest count of samples considered to execute a query. + # TYPE cortex_query_peak_samples histogram + cortex_query_peak_samples_bucket{source="api",user="user2",le="+Inf"} 1 + cortex_query_peak_samples_sum{source="api",user="user2"} 1000 + cortex_query_peak_samples_count{source="api",user="user2"} 1 + # HELP cortex_query_fetched_chunks_bytes_total Size of all chunks fetched to execute a query in bytes. + # TYPE cortex_query_fetched_chunks_bytes_total counter + cortex_query_fetched_chunks_bytes_total{source="api",user="user2"} 2048 + # HELP cortex_query_fetched_data_bytes_total Size of all data fetched to execute a query in bytes. + # TYPE cortex_query_fetched_data_bytes_total counter + cortex_query_fetched_data_bytes_total{source="api",user="user2"} 4096 + # HELP cortex_rejected_queries_total The total number of queries that were rejected. + # TYPE cortex_rejected_queries_total counter + cortex_rejected_queries_total{reason="too_many_samples",source="api",user="user2"} 10 + `), "cortex_query_seconds_total", "cortex_query_fetched_series_total", "cortex_query_samples_total", + "cortex_query_samples_scanned_total", "cortex_query_peak_samples", "cortex_query_fetched_chunks_bytes_total", + "cortex_query_fetched_data_bytes_total", "cortex_rejected_queries_total")) +} From fe76d3d50ada2f78aa4d3c342a18370910a4e66b Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Tue, 15 Apr 2025 02:28:41 +0000 Subject: [PATCH 2/2] changelog Signed-off-by: Ben Ye --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 91507d24650..117f77584c8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -30,6 +30,7 @@ * [BUGFIX] Ingester: Add check to avoid query 5xx when closing tsdb. #6616 * [BUGFIX] Querier: Fix panic when marshaling QueryResultRequest. #6601 * [BUGFIX] Ingester: Avoid resharding for query when restart readonly ingesters. #6642 +* [BUGFIX] Query Frontend: Fix query frontend per `user` metrics clean up. #6698 ## 1.19.0 2025-02-27