From 9fb4d69ab832bf02f36e6aa30c6bcb7d57987b22 Mon Sep 17 00:00:00 2001 From: yeya24 Date: Thu, 18 Sep 2025 00:07:57 -0700 Subject: [PATCH 1/2] fix remote read snappy input regression due to empty request body Signed-off-by: yeya24 --- integration/query_frontend_test.go | 31 ++++++--------------- pkg/frontend/transport/handler.go | 8 ++++-- pkg/frontend/transport/handler_test.go | 38 ++++++++++++++++++++++++++ 3 files changed, 51 insertions(+), 26 deletions(-) diff --git a/integration/query_frontend_test.go b/integration/query_frontend_test.go index b77bfa64756..7a68b64a691 100644 --- a/integration/query_frontend_test.go +++ b/integration/query_frontend_test.go @@ -38,7 +38,6 @@ import ( type queryFrontendTestConfig struct { testMissingMetricName bool querySchedulerEnabled bool - queryStatsEnabled bool remoteReadEnabled bool testSubQueryStepSize bool setup func(t *testing.T, s *e2e.Scenario) (configFile string, flags map[string]string) @@ -61,7 +60,6 @@ func TestQueryFrontendWithBlocksStorageViaFlags(t *testing.T) { func TestQueryFrontendWithBlocksStorageViaFlagsAndQueryStatsEnabled(t *testing.T) { runQueryFrontendTest(t, queryFrontendTestConfig{ testMissingMetricName: false, - queryStatsEnabled: true, setup: func(t *testing.T, s *e2e.Scenario) (configFile string, flags map[string]string) { flags = BlocksStorageFlags() @@ -92,7 +90,6 @@ func TestQueryFrontendWithBlocksStorageViaFlagsAndWithQuerySchedulerAndQueryStat runQueryFrontendTest(t, queryFrontendTestConfig{ testMissingMetricName: false, querySchedulerEnabled: true, - queryStatsEnabled: true, setup: func(t *testing.T, s *e2e.Scenario) (configFile string, flags map[string]string) { flags = BlocksStorageFlags() @@ -168,7 +165,6 @@ func TestQueryFrontendWithVerticalSharding(t *testing.T) { runQueryFrontendTest(t, queryFrontendTestConfig{ testMissingMetricName: false, querySchedulerEnabled: false, - queryStatsEnabled: true, setup: func(t *testing.T, s *e2e.Scenario) (configFile string, flags map[string]string) { require.NoError(t, writeFileToSharedDir(s, cortexConfigFile, []byte(BlocksStorageConfig))) @@ -188,7 +184,6 @@ func TestQueryFrontendWithVerticalShardingQueryScheduler(t *testing.T) { runQueryFrontendTest(t, queryFrontendTestConfig{ testMissingMetricName: false, querySchedulerEnabled: true, - queryStatsEnabled: true, setup: func(t *testing.T, s *e2e.Scenario) (configFile string, flags map[string]string) { require.NoError(t, writeFileToSharedDir(s, cortexConfigFile, []byte(BlocksStorageConfig))) @@ -208,7 +203,6 @@ func TestQueryFrontendProtobufCodec(t *testing.T) { runQueryFrontendTest(t, queryFrontendTestConfig{ testMissingMetricName: false, querySchedulerEnabled: true, - queryStatsEnabled: true, setup: func(t *testing.T, s *e2e.Scenario) (configFile string, flags map[string]string) { require.NoError(t, writeFileToSharedDir(s, cortexConfigFile, []byte(BlocksStorageConfig))) @@ -228,7 +222,6 @@ func TestQuerierToQueryFrontendCompression(t *testing.T) { runQueryFrontendTest(t, queryFrontendTestConfig{ testMissingMetricName: false, querySchedulerEnabled: true, - queryStatsEnabled: true, setup: func(t *testing.T, s *e2e.Scenario) (configFile string, flags map[string]string) { require.NoError(t, writeFileToSharedDir(s, cortexConfigFile, []byte(BlocksStorageConfig))) @@ -294,7 +287,7 @@ func runQueryFrontendTest(t *testing.T, cfg queryFrontendTestConfig) { "-querier.split-queries-by-interval": "24h", "-querier.query-ingesters-within": "12h", // Required by the test on query /series out of ingesters time range "-frontend.memcached.addresses": "dns+" + memcached.NetworkEndpoint(e2ecache.MemcachedPort), - "-frontend.query-stats-enabled": strconv.FormatBool(cfg.queryStatsEnabled), + "-frontend.query-stats-enabled": "true", // Always enable query stats to capture regressions }) // Start the query-scheduler if enabled. @@ -382,7 +375,7 @@ func runQueryFrontendTest(t *testing.T, cfg queryFrontendTestConfig) { } // No need to repeat the test on Server-Timing header for each user. - if userID == 0 && cfg.queryStatsEnabled { + if userID == 0 { res, _, err := c.QueryRaw("{instance=~\"hello.*\"}", time.Now(), map[string]string{}) require.NoError(t, err) require.Regexp(t, "querier_wall_time;dur=[0-9.]*, response_time;dur=[0-9.]*$", res.Header.Values("Server-Timing")[0]) @@ -433,15 +426,11 @@ func runQueryFrontendTest(t *testing.T, cfg queryFrontendTestConfig) { wg.Wait() - extra := float64(2) + extra := float64(3) // Always include query stats test if cfg.testMissingMetricName { extra++ } - if cfg.queryStatsEnabled { - extra++ - } - if cfg.remoteReadEnabled { extra++ } @@ -458,15 +447,11 @@ func runQueryFrontendTest(t *testing.T, cfg queryFrontendTestConfig) { require.NoError(t, querier.WaitSumMetricsWithOptions(e2e.Greater(numUsers*numQueriesPerUser), []string{"cortex_request_duration_seconds"}, e2e.WithMetricCount)) require.NoError(t, querier.WaitSumMetricsWithOptions(e2e.Greater(numUsers*numQueriesPerUser), []string{"cortex_querier_request_duration_seconds"}, e2e.WithMetricCount)) - // Ensure query stats metrics are tracked only when enabled. - if cfg.queryStatsEnabled { - require.NoError(t, queryFrontend.WaitSumMetricsWithOptions( - e2e.Greater(0), - []string{"cortex_query_seconds_total"}, - e2e.WithLabelMatchers(labels.MustNewMatcher(labels.MatchEqual, "user", "user-1")))) - } else { - require.NoError(t, queryFrontend.WaitRemovedMetric("cortex_query_seconds_total")) - } + // Ensure query stats metrics are always tracked to capture regressions. + require.NoError(t, queryFrontend.WaitSumMetricsWithOptions( + e2e.Greater(0), + []string{"cortex_query_seconds_total"}, + e2e.WithLabelMatchers(labels.MustNewMatcher(labels.MatchEqual, "user", "user-1")))) // Ensure no service-specific metrics prefix is used by the wrong service. assertServiceMetricsPrefixes(t, Distributor, distributor) diff --git a/pkg/frontend/transport/handler.go b/pkg/frontend/transport/handler.go index 9aab95281ec..f35bab20ff1 100644 --- a/pkg/frontend/transport/handler.go +++ b/pkg/frontend/transport/handler.go @@ -285,7 +285,8 @@ func (f *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { // We parse form here so that we can use buf as body, in order to // prevent https://github.com/cortexproject/cortex/issues/5201. // Exclude remote read here as we don't have to buffer its body. - if !strings.Contains(r.URL.Path, "api/v1/read") { + isRemoteRead := strings.Contains(r.URL.Path, "api/v1/read") + if !isRemoteRead { if err := r.ParseForm(); err != nil { statusCode := http.StatusBadRequest if util.IsRequestBodyTooLarge(err) { @@ -300,8 +301,9 @@ func (f *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { r.Body = io.NopCloser(&buf) } - // Log request - if f.cfg.QueryStatsEnabled { + // Log request if the request is not remote read. + // We need to parse remote read proto to be properly log it so skip it. + if f.cfg.QueryStatsEnabled && !isRemoteRead { queryString = f.parseRequestQueryString(r, buf) f.logQueryRequest(r, queryString, source) } diff --git a/pkg/frontend/transport/handler_test.go b/pkg/frontend/transport/handler_test.go index ad40594d8c5..b6c90a31fcc 100644 --- a/pkg/frontend/transport/handler_test.go +++ b/pkg/frontend/transport/handler_test.go @@ -841,3 +841,41 @@ func TestHandlerMetricsCleanup(t *testing.T) { "cortex_query_samples_scanned_total", "cortex_query_peak_samples", "cortex_query_fetched_chunks_bytes_total", "cortex_query_fetched_data_bytes_total", "cortex_rejected_queries_total", "cortex_slow_queries_total")) } + +func TestHandler_RemoteReadRequest_DoesNotParseQueryString(t *testing.T) { + // Create a mock round tripper that captures the request + var capturedRequest *http.Request + roundTripper := roundTripperFunc(func(req *http.Request) (*http.Response, error) { + capturedRequest = req + return &http.Response{ + StatusCode: http.StatusOK, + Body: io.NopCloser(strings.NewReader("{}")), + }, nil + }) + + // Use a larger MaxBodySize to avoid the "request body too large" error + handler := NewHandler(HandlerConfig{QueryStatsEnabled: true, MaxBodySize: 10 * 1024 * 1024}, tenantfederation.Config{}, roundTripper, log.NewNopLogger(), nil) + handlerWithAuth := middleware.Merge(middleware.AuthenticateUser).Wrap(handler) + + // Create a remote read request with a body that would be corrupted by parseRequestQueryString + originalBody := "snappy-compressed-data" + req := httptest.NewRequest("POST", "http://fake/api/v1/read", strings.NewReader(originalBody)) + req.Header.Set("X-Scope-OrgId", "user-1") + req.Header.Set("Content-Type", "application/x-protobuf") + req.Header.Set("Content-Encoding", "snappy") + + resp := httptest.NewRecorder() + handlerWithAuth.ServeHTTP(resp, req) + + // Verify the request was successful + require.Equal(t, http.StatusOK, resp.Code) + + // Verify that the original request body was preserved and not corrupted + require.NotNil(t, capturedRequest) + bodyBytes, err := io.ReadAll(capturedRequest.Body) + require.NoError(t, err) + require.Equal(t, originalBody, string(bodyBytes)) + + // Verify that the request body is still readable (not replaced with empty buffer) + require.NotEmpty(t, string(bodyBytes)) +} From d1eaf33c795818d33dce3abe4055c0838cd96206 Mon Sep 17 00:00:00 2001 From: yeya24 Date: Thu, 18 Sep 2025 09:01:08 -0700 Subject: [PATCH 2/2] changelog Signed-off-by: yeya24 --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 61eee8a099e..553c6733e9d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -96,6 +96,7 @@ * [BUGFIX] Compactor: Delete the prefix `blocks_meta` from the metadata fetcher metrics. #6832 * [BUGFIX] Store Gateway: Avoid race condition by deduplicating entries in bucket stores user scan. #6863 * [BUGFIX] Runtime-config: Change to check tenant limit validation when loading runtime config only for `all`, `distributor`, `querier`, and `ruler` targets. #6880 +* [BUGFIX] Frontend: Fix remote read snappy input due to request string logging when query stats enabled. #7025 ## 1.19.0 2025-02-27