From d2d010ab55be497eb91929357168b7fbc5400047 Mon Sep 17 00:00:00 2001 From: Siddarth Gundu Date: Mon, 18 May 2026 15:05:31 +0530 Subject: [PATCH 1/9] Add parquet no-convert marker and read/write logic Signed-off-by: Siddarth Gundu --- pkg/storage/parquet/no_convert_marker.go | 72 ++++++++++++++++++++++++ 1 file changed, 72 insertions(+) create mode 100644 pkg/storage/parquet/no_convert_marker.go diff --git a/pkg/storage/parquet/no_convert_marker.go b/pkg/storage/parquet/no_convert_marker.go new file mode 100644 index 00000000000..e6a801d5430 --- /dev/null +++ b/pkg/storage/parquet/no_convert_marker.go @@ -0,0 +1,72 @@ +package parquet + +import ( + "bytes" + "context" + "encoding/json" + "io" + "path" + + "github.com/efficientgo/core/errors" + "github.com/go-kit/log" + "github.com/oklog/ulid/v2" + "github.com/thanos-io/objstore" + "github.com/thanos-io/thanos/pkg/runutil" + + "github.com/cortexproject/cortex/pkg/storage/bucket" +) + +const ( + NoConvertMarkerFileName = "parquet-no-convert-mark.json" + + CurrentNoConvertMarkVersion = NoConvertMarkVersion1 + NoConvertMarkVersion1 = 1 +) + +type NoConvertMark struct { + Version int `json:"version"` + Reason string `json:"reason"` + LabelNamesCount int `json:"label_names_count"` + Threshold int `json:"threshold"` +} + +func ReadNoConvertMark(ctx context.Context, id ulid.ULID, userBkt objstore.InstrumentedBucket, logger log.Logger) (*NoConvertMark, error) { + markerPath := path.Join(id.String(), NoConvertMarkerFileName) + reader, err := userBkt.WithExpectedErrs(bucket.IsOneOfTheExpectedErrors(userBkt.IsAccessDeniedErr, userBkt.IsObjNotFoundErr)).Get(ctx, markerPath) + if err != nil { + if userBkt.IsObjNotFoundErr(err) || userBkt.IsAccessDeniedErr(err) { + return &NoConvertMark{}, nil + } + + return &NoConvertMark{}, err + } + defer runutil.CloseWithLogOnErr(logger, reader, "close parquet no-convert marker file reader") + + markerContent, err := io.ReadAll(reader) + if err != nil { + return &NoConvertMark{}, errors.Wrapf(err, "read file: %s", NoConvertMarkerFileName) + } + + marker := NoConvertMark{} + err = json.Unmarshal(markerContent, &marker) + return &marker, err +} + +func WriteNoConvertMark(ctx context.Context, id ulid.ULID, userBkt objstore.Bucket, labelNamesCount int, maxBlockLabelNames int) error { + noConvertMarker := NoConvertMark{ + Version: CurrentNoConvertMarkVersion, + Reason: "too_many_labels", + LabelNamesCount: labelNamesCount, + Threshold: maxBlockLabelNames, + } + noConvertMarkerPath := path.Join(id.String(), NoConvertMarkerFileName) + b, err := json.Marshal(noConvertMarker) + if err != nil { + return err + } + return userBkt.Upload(ctx, noConvertMarkerPath, bytes.NewReader(b)) +} + +func ValidNoConvertMarkVersion(version int) bool { + return version == NoConvertMarkVersion1 +} From 7d9ef9f06fea6cf5ca9dafea944c5f75ac10f26d Mon Sep 17 00:00:00 2001 From: Siddarth Gundu Date: Mon, 18 May 2026 15:12:17 +0530 Subject: [PATCH 2/9] parquetconverter: skip blocks with too many label names - Add max-block-label-names limit, blocks exceeding it get a no-convert marker instead of being converted. Signed-off-by: Siddarth Gundu --- pkg/parquetconverter/converter.go | 41 ++++++++ pkg/parquetconverter/converter_test.go | 125 +++++++++++++++++++++++++ pkg/util/validation/limits.go | 13 ++- 3 files changed, 176 insertions(+), 3 deletions(-) diff --git a/pkg/parquetconverter/converter.go b/pkg/parquetconverter/converter.go index a5ed6ce0c05..5b6e5e58297 100644 --- a/pkg/parquetconverter/converter.go +++ b/pkg/parquetconverter/converter.go @@ -396,6 +396,21 @@ func (c *Converter) convertUser(ctx context.Context, logger log.Logger, ring rin continue } + maxBlockLabelNames := c.limits.ParquetConverterMaxBlockLabelNames(userID) + + // If the threshold is enabled, check for no-convert mark + if maxBlockLabelNames > 0 { + + noConvertMark, err := cortex_parquet.ReadNoConvertMark(ctx, b.ULID, uBucket, logger) + if err != nil { + level.Error(logger).Log("msg", "failed to read parquet no-convert marker", "block", b.ULID.String(), "err", err) + continue + } + if cortex_parquet.ValidNoConvertMarkVersion(noConvertMark.Version) { + continue + } + } + if err := os.RemoveAll(c.compactRootDir()); err != nil { level.Error(logger).Log("msg", "failed to remove work directory", "path", c.compactRootDir(), "err", err) if c.checkConvertError(userID, err) { @@ -425,6 +440,32 @@ func (c *Converter) convertUser(ctx context.Context, logger log.Logger, ring rin continue } + if maxBlockLabelNames > 0 { + labelNames, err := tsdbBlock.LabelNames(ctx) + if err != nil { + _ = tsdbBlock.Close() + level.Error(logger).Log("msg", "failed to get label names", "block", b.ULID.String(), "err", err) + if c.checkConvertError(userID, err) { + return err + } + continue + } + labelNamesCount := len(labelNames) + if labelNamesCount > maxBlockLabelNames { + if err := cortex_parquet.WriteNoConvertMark(ctx, b.ULID, uBucket, labelNamesCount, maxBlockLabelNames); err != nil { + _ = tsdbBlock.Close() + level.Error(logger).Log("msg", "failed to write parquet no-convert marker", "block", b.ULID.String(), "err", err) + if c.checkConvertError(userID, err) { + return err + } + continue + } + level.Info(logger).Log("msg", "skipping parquet conversion for block with too many label names", "block", b.ULID.String(), "label_names", labelNamesCount, "limit", maxBlockLabelNames) + _ = tsdbBlock.Close() + continue + } + } + level.Info(logger).Log("msg", "converting block", "block", b.ULID.String(), "dir", bdir) start := time.Now() diff --git a/pkg/parquetconverter/converter_test.go b/pkg/parquetconverter/converter_test.go index bdcf46b3d36..03760a136ab 100644 --- a/pkg/parquetconverter/converter_test.go +++ b/pkg/parquetconverter/converter_test.go @@ -488,3 +488,128 @@ func TestConverter_SkipBlocksWithExistingValidMarker(t *testing.T) { // It should be 0 since the block was already converted assert.Equal(t, 0.0, testutil.ToFloat64(c.metrics.convertedBlocks.WithLabelValues(user))) } + +func TestConverter_WriteNoConvertMarkForBlockWithTooManyLabels(t *testing.T) { + cfg := prepareConfig() + user := "user" + ringStore, closer := consul.NewInMemoryClient(ring.GetCodec(), log.NewNopLogger(), nil) + t.Cleanup(func() { assert.NoError(t, closer.Close()) }) + dir := t.TempDir() + + cfg.Ring.InstanceID = "parquet-converter-1" + cfg.Ring.InstanceAddr = "1.2.3.4" + cfg.Ring.KVStore.Mock = ringStore + bucketClient, err := filesystem.NewBucket(t.TempDir()) + require.NoError(t, err) + userBucket := bucket.NewPrefixedBucketClient(bucketClient, user) + limits := &validation.Limits{} + flagext.DefaultValues(limits) + limits.ParquetConverterEnabled = true + limits.ParquetConverterMaxBlockLabelNames = 1 + + c, logger, _ := prepare(t, cfg, objstore.WithNoopInstr(bucketClient), limits, nil) + + ctx := context.Background() + + lbls := labels.FromStrings("__name__", "test", "job", "foo") + + // Create a block + rnd := rand.New(rand.NewSource(time.Now().Unix())) + + // 2h blocks are skipped by ShouldConvertBlockToParquet + blockID, err := e2e.CreateBlock(ctx, rnd, dir, []labels.Labels{lbls}, 2, 0, 4*time.Hour.Milliseconds(), time.Minute.Milliseconds(), 10) + require.NoError(t, err) + + // Upload the block to the bucket + blockDir := fmt.Sprintf("%s/%s", dir, blockID.String()) + b, err := tsdb.OpenBlock(nil, blockDir, nil, nil) + require.NoError(t, err) + err = block.Upload(ctx, logger, userBucket, b.Dir(), metadata.NoneFunc) + require.NoError(t, err) + + err = services.StartAndAwaitRunning(context.Background(), c) + require.NoError(t, err) + defer services.StopAndAwaitTerminated(ctx, c) + + // Start the converter + err = c.convertUser(ctx, logger, c.ring, user) + require.NoError(t, err) + + // Verify the marker was written correctly + readNoConvertMark, err := parquet.ReadNoConvertMark(ctx, blockID, userBucket, logger) + require.NoError(t, err) + require.True(t, parquet.ValidNoConvertMarkVersion(readNoConvertMark.Version)) + require.Equal(t, "too_many_labels", readNoConvertMark.Reason) + require.Equal(t, 1, readNoConvertMark.Threshold) + require.Equal(t, 2, readNoConvertMark.LabelNamesCount) + + // Confirm conversion did not happen + assert.Equal(t, 0.0, testutil.ToFloat64(c.metrics.convertedBlocks.WithLabelValues(user))) +} + +func TestConverter_SkipBlockWhenNoConvertMarkAlreadyExists(t *testing.T) { + cfg := prepareConfig() + user := "user" + ringStore, closer := consul.NewInMemoryClient(ring.GetCodec(), log.NewNopLogger(), nil) + t.Cleanup(func() { assert.NoError(t, closer.Close()) }) + dir := t.TempDir() + + cfg.Ring.InstanceID = "parquet-converter-1" + cfg.Ring.InstanceAddr = "1.2.3.4" + cfg.Ring.KVStore.Mock = ringStore + bucketClient, err := filesystem.NewBucket(t.TempDir()) + require.NoError(t, err) + userBucket := bucket.NewPrefixedBucketClient(bucketClient, user) + limits := &validation.Limits{} + flagext.DefaultValues(limits) + limits.ParquetConverterEnabled = true + limits.ParquetConverterMaxBlockLabelNames = 1 + + c, logger, _ := prepare(t, cfg, objstore.WithNoopInstr(bucketClient), limits, nil) + + ctx := context.Background() + + lbls := labels.FromStrings("__name__", "test", "job", "foo") + rnd := rand.New(rand.NewSource(time.Now().Unix())) + + // 2h blocks are skipped by ShouldConvertBlockToParquet + blockID, err := e2e.CreateBlock(ctx, rnd, dir, []labels.Labels{lbls}, 2, 0, + 4*time.Hour.Milliseconds(), time.Minute.Milliseconds(), 10) + require.NoError(t, err) + + blockDir := fmt.Sprintf("%s/%s", dir, blockID.String()) + b, err := tsdb.OpenBlock(nil, blockDir, nil, nil) + require.NoError(t, err) + err = block.Upload(ctx, logger, userBucket, b.Dir(), metadata.NoneFunc) + require.NoError(t, err) + + markerV1 := parquet.NoConvertMark{ + Version: parquet.CurrentNoConvertMarkVersion, + Reason: "too_many_labels", + LabelNamesCount: 2, + Threshold: 1, + } + markerBytes, err := json.Marshal(markerV1) + require.NoError(t, err) + markerPath := path.Join(blockID.String(), parquet.NoConvertMarkerFileName) + err = userBucket.Upload(ctx, markerPath, bytes.NewReader(markerBytes)) + require.NoError(t, err) + + err = services.StartAndAwaitRunning(context.Background(), c) + require.NoError(t, err) + defer services.StopAndAwaitTerminated(ctx, c) + + // start converter + err = c.convertUser(ctx, logger, c.ring, user) + require.NoError(t, err) + + // confirm conversion was skipped + assert.Equal(t, 0.0, testutil.ToFloat64(c.metrics.convertedBlocks.WithLabelValues(user))) + + markerAfter, err := parquet.ReadNoConvertMark(ctx, blockID, userBucket, logger) + require.NoError(t, err) + require.True(t, parquet.ValidNoConvertMarkVersion(markerAfter.Version)) + require.Equal(t, "too_many_labels", markerAfter.Reason) + require.Equal(t, 1, markerAfter.Threshold) + require.Equal(t, 2, markerAfter.LabelNamesCount) +} diff --git a/pkg/util/validation/limits.go b/pkg/util/validation/limits.go index 019a5adc3ed..fc37e4a4054 100644 --- a/pkg/util/validation/limits.go +++ b/pkg/util/validation/limits.go @@ -241,9 +241,10 @@ type Limits struct { CompactorPartitionSeriesCount int64 `yaml:"compactor_partition_series_count" json:"compactor_partition_series_count"` // Parquet converter - ParquetConverterEnabled bool `yaml:"parquet_converter_enabled" json:"parquet_converter_enabled"` - ParquetConverterTenantShardSize float64 `yaml:"parquet_converter_tenant_shard_size" json:"parquet_converter_tenant_shard_size"` - ParquetConverterSortColumns []string `yaml:"parquet_converter_sort_columns" json:"parquet_converter_sort_columns"` + ParquetConverterEnabled bool `yaml:"parquet_converter_enabled" json:"parquet_converter_enabled"` + ParquetConverterTenantShardSize float64 `yaml:"parquet_converter_tenant_shard_size" json:"parquet_converter_tenant_shard_size"` + ParquetConverterSortColumns []string `yaml:"parquet_converter_sort_columns" json:"parquet_converter_sort_columns"` + ParquetConverterMaxBlockLabelNames int `yaml:"parquet_converter_max_block_label_names" json:"parquet_converter_max_block_label_names"` // This config doesn't have a CLI flag registered here because they're registered in // their own original config struct. S3SSEType string `yaml:"s3_sse_type" json:"s3_sse_type" doc:"nocli|description=S3 server-side encryption type. Required to enable server-side encryption overrides for a specific tenant. If not set, the default S3 client settings are used."` @@ -369,6 +370,7 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) { f.Float64Var(&l.ParquetConverterTenantShardSize, "parquet-converter.tenant-shard-size", 0, "The default tenant's shard size when the shuffle-sharding strategy is used by the parquet converter. When this setting is specified in the per-tenant overrides, a value of 0 disables shuffle sharding for the tenant. If the value is < 1 and > 0 the shard size will be a percentage of the total parquet converters.") f.BoolVar(&l.ParquetConverterEnabled, "parquet-converter.enabled", false, "If set, enables the Parquet converter to create the parquet files.") f.Var((*flagext.StringSlice)(&l.ParquetConverterSortColumns), "parquet-converter.sort-columns", "Additional label names for specific tenants to sort by after metric name, in order of precedence. These are applied during Parquet file generation.") + f.IntVar(&l.ParquetConverterMaxBlockLabelNames, "parquet-converter.max-block-label-names", 0, "[Experimental] Maximum number of distinct label names allowed in a TSDB block for parquet conversion. If exceeded, the converter writes a no-convert marker. 0 to disable.") // Parquet Queryable enforced limits. f.IntVar(&l.ParquetMaxFetchedRowCount, "querier.parquet-queryable.max-fetched-row-count", 0, "The maximum number of rows that can be fetched when querying parquet storage. Each row maps to a series in a parquet file. This limit applies before materializing chunks. 0 to disable.") @@ -1029,6 +1031,11 @@ func (o *Overrides) ParquetConverterSortColumns(userID string) []string { return o.GetOverridesForUser(userID).ParquetConverterSortColumns } +// ParquetConverterMaxBlockLabelNames returns the maximum number of distinct label names allowed in a TSDB block for parquet conversion. +func (o *Overrides) ParquetConverterMaxBlockLabelNames(userID string) int { + return o.GetOverridesForUser(userID).ParquetConverterMaxBlockLabelNames +} + // ParquetMaxFetchedRowCount returns the maximum number of rows that can be fetched when querying parquet storage. func (o *Overrides) ParquetMaxFetchedRowCount(userID string) int { return o.GetOverridesForUser(userID).ParquetMaxFetchedRowCount From df4b725e21b7b5d794455215196b00551dc8f80d Mon Sep 17 00:00:00 2001 From: Siddarth Gundu Date: Mon, 18 May 2026 15:26:27 +0530 Subject: [PATCH 3/9] Update config docs for max-block-label-names Signed-off-by: Siddarth Gundu --- docs/configuration/config-file-reference.md | 6 ++++++ schemas/cortex-config-schema.json | 6 ++++++ 2 files changed, 12 insertions(+) diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index a605dd798ba..1a67655824c 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -4468,6 +4468,12 @@ query_rejection: # CLI flag: -parquet-converter.sort-columns [parquet_converter_sort_columns: | default = []] +# [Experimental] Maximum number of distinct label names allowed in a TSDB block +# for parquet conversion. If exceeded, the converter writes a no-convert marker. +# 0 to disable. +# CLI flag: -parquet-converter.max-block-label-names +[parquet_converter_max_block_label_names: | default = 0] + # S3 server-side encryption type. Required to enable server-side encryption # overrides for a specific tenant. If not set, the default S3 client settings # are used. diff --git a/schemas/cortex-config-schema.json b/schemas/cortex-config-schema.json index 51aee2c0f59..9081ddaa683 100644 --- a/schemas/cortex-config-schema.json +++ b/schemas/cortex-config-schema.json @@ -5415,6 +5415,12 @@ "type": "boolean", "x-cli-flag": "parquet-converter.enabled" }, + "parquet_converter_max_block_label_names": { + "default": 0, + "description": "[Experimental] Maximum number of distinct label names allowed in a TSDB block for parquet conversion. If exceeded, the converter writes a no-convert marker. 0 to disable.", + "type": "number", + "x-cli-flag": "parquet-converter.max-block-label-names" + }, "parquet_converter_sort_columns": { "default": [], "description": "Additional label names for specific tenants to sort by after metric name, in order of precedence. These are applied during Parquet file generation.", From 185bae977e71d22f0789feec54c816fe56bcd9fb Mon Sep 17 00:00:00 2001 From: Siddarth Gundu Date: Mon, 18 May 2026 15:28:13 +0530 Subject: [PATCH 4/9] integration: Add test for parquet no-convert marker Signed-off-by: Siddarth Gundu --- integration/parquet_converter_test.go | 142 ++++++++++++++++++++++++++ 1 file changed, 142 insertions(+) create mode 100644 integration/parquet_converter_test.go diff --git a/integration/parquet_converter_test.go b/integration/parquet_converter_test.go new file mode 100644 index 00000000000..4a10d3f1e0f --- /dev/null +++ b/integration/parquet_converter_test.go @@ -0,0 +1,142 @@ +//go:build integration + +package integration + +import ( + "context" + "fmt" + "math/rand" + "path/filepath" + "testing" + "time" + + "github.com/prometheus/prometheus/model/labels" + "github.com/stretchr/testify/require" + "github.com/thanos-io/objstore" + "github.com/thanos-io/thanos/pkg/block" + "github.com/thanos-io/thanos/pkg/block/metadata" + + "github.com/cortexproject/cortex/integration/e2e" + e2ecache "github.com/cortexproject/cortex/integration/e2e/cache" + e2edb "github.com/cortexproject/cortex/integration/e2e/db" + "github.com/cortexproject/cortex/integration/e2ecortex" + "github.com/cortexproject/cortex/pkg/storage/bucket" + "github.com/cortexproject/cortex/pkg/storage/tsdb" + "github.com/cortexproject/cortex/pkg/util/log" + cortex_testutil "github.com/cortexproject/cortex/pkg/util/test" +) + +func TestParquetConverter_NoConvertMarkWithTooManyLabels(t *testing.T) { + s, err := e2e.NewScenario(networkName) + require.NoError(t, err) + defer s.Close() + + consul := e2edb.NewConsulWithName("consul") + memcached := e2ecache.NewMemcached() + require.NoError(t, s.StartAndWaitReady(consul, memcached)) + + baseFlags := mergeFlags(AlertmanagerLocalFlags(), BlocksStorageFlags()) + flags := mergeFlags( + baseFlags, + map[string]string{ + "-target": "all,parquet-converter", + "-blocks-storage.tsdb.block-ranges-period": "1m,24h", + "-blocks-storage.tsdb.ship-interval": "1s", + "-blocks-storage.bucket-store.sync-interval": "1s", + "-blocks-storage.bucket-store.metadata-cache.bucket-index-content-ttl": "1s", + "-blocks-storage.bucket-store.bucket-index.idle-timeout": "1s", + "-blocks-storage.bucket-store.bucket-index.enabled": "true", + "-blocks-storage.bucket-store.index-cache.backend": tsdb.IndexCacheBackendInMemory, + // compactor + "-compactor.cleanup-interval": "1s", + // Ingester. + "-ring.store": "consul", + "-consul.hostname": consul.NetworkHTTPEndpoint(), + // Distributor. + "-distributor.replication-factor": "1", + // Store-gateway. + "-store-gateway.sharding-enabled": "false", + "--querier.store-gateway-addresses": "nonExistent", // Make sure we do not call Store gateways + // alert manager + "-alertmanager.web.external-url": "http://localhost/alertmanager", + // Enable vertical sharding. + "-frontend.query-vertical-shard-size": "3", + "-frontend.max-cache-freshness": "1m", + // enable experimental promQL funcs + "-querier.enable-promql-experimental-functions": "true", + // parquet-converter + "-parquet-converter.ring.consul.hostname": consul.NetworkHTTPEndpoint(), + "-parquet-converter.conversion-interval": "1s", + "-parquet-converter.enabled": "true", + "-parquet-converter.max-block-label-names": "1", + // Querier + "-querier.enable-parquet-queryable": "true", + // Enable cache for parquet labels and chunks + "-blocks-storage.bucket-store.parquet-labels-cache.backend": "inmemory,memcached", + "-blocks-storage.bucket-store.parquet-labels-cache.memcached.addresses": "dns+" + memcached.NetworkEndpoint(e2ecache.MemcachedPort), + "-blocks-storage.bucket-store.chunks-cache.backend": "inmemory,memcached", + "-blocks-storage.bucket-store.chunks-cache.memcached.addresses": "dns+" + memcached.NetworkEndpoint(e2ecache.MemcachedPort), + }, + ) + + // make alert manager config dir + require.NoError(t, writeFileToSharedDir(s, "alertmanager_configs", []byte{})) + + ctx := context.Background() + rnd := rand.New(rand.NewSource(time.Now().Unix())) + dir := filepath.Join(s.SharedDir(), "data") + lbls := []labels.Labels{ + labels.FromStrings("__name__", "test_series_a", "job", "test"), + } + + numSamples := 60 + scrapeInterval := time.Minute + now := time.Now() + start := now.Add(-time.Hour * 24) + end := now.Add(-time.Hour) + + minio := e2edb.NewMinio(9000, flags["-blocks-storage.s3.bucket-name"]) + require.NoError(t, s.StartAndWaitReady(minio)) + + cortex := e2ecortex.NewSingleBinary("cortex", flags, "") + require.NoError(t, s.StartAndWaitReady(cortex)) + storage, err := e2ecortex.NewS3ClientForMinio(minio, flags["-blocks-storage.s3.bucket-name"]) + require.NoError(t, err) + bkt := bucket.NewUserBucketClient("user-1", storage.GetBucket(), nil) + + id, err := e2e.CreateBlock(ctx, rnd, dir, lbls, numSamples, + start.UnixMilli(), + end.UnixMilli(), + scrapeInterval.Milliseconds(), 10, + ) + require.NoError(t, err) + + err = block.Upload(ctx, log.Logger, bkt, filepath.Join(dir, id.String()), metadata.NoneFunc) + require.NoError(t, err) + + // Wait for the converter to write the no-convert marker + cortex_testutil.Poll(t, 30*time.Second, true, func() interface{} { + noConvertMarkerPath := fmt.Sprintf("%s/parquet-no-convert-mark.json", id.String()) + found := false + err := bkt.Iter(ctx, "", func(name string) error { + if name == noConvertMarkerPath { + found = true + } + return nil + }, objstore.WithRecursiveIter()) + require.NoError(t, err) + return found + }) + + // confirm the conversion did not happen (check both paths) + blockID := id.String() + markerPaths := []string{ + fmt.Sprintf("%s/parquet-converter-mark.json", blockID), + fmt.Sprintf("parquet-markers/%s-parquet-converter-mark.json", blockID), + } + for _, markerPath := range markerPaths { + exists, err := bkt.Exists(ctx, markerPath) + require.NoError(t, err) + require.False(t, exists, "converter mark should not exist at %s", markerPath) + } +} From 119074c2566efdf90e4f73c5801e494f05849069 Mon Sep 17 00:00:00 2001 From: Siddarth Gundu Date: Mon, 18 May 2026 16:28:38 +0530 Subject: [PATCH 5/9] Update changelog with parquet-converter.max-block-label-names limit Signed-off-by: Siddarth Gundu --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 3f86f69e383..6e869bf2b81 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ * [FEATURE] Memberlist: Add `-memberlist.cluster-label` and `-memberlist.cluster-label-verification-disabled` to prevent accidental cross-cluster gossip joins and support rolling label rollout. #7385 * [FEATURE] Querier: Add timeout classification to classify query timeouts as 4XX (user error) or 5XX (system error) based on phase timing. When enabled, queries that spend most of their time in PromQL evaluation return `422 Unprocessable Entity` instead of `503 Service Unavailable`. #7374 * [FEATURE] Querier: Implement Resource Based Throttling in Querier. #7442 +* [ENHANCEMENT] Parquet Converter: Add `parquet-converter.max-block-label-names` limit to skip conversion of TSDB blocks with too many label names. # * [ENHANCEMENT] Upgrade prometheus alertmanager version to v0.32.1. #7462 * [ENHANCEMENT] Tenant Federation: Avoid purging the regex resolver LRU cache on user-sync ticks when the set of known users has not changed. #7489 * [ENHANCEMENT] Parquet Converter: Add a ring status page to expose the ring status. #7455 From b7f083adce8652c610d1298a610e6c428ddf0224 Mon Sep 17 00:00:00 2001 From: Siddarth Gundu Date: Mon, 18 May 2026 16:40:31 +0530 Subject: [PATCH 6/9] chore: Add //nolint:errcheck to fix lint errors and update Changelog correctly Signed-off-by: Siddarth Gundu --- CHANGELOG.md | 2 +- pkg/parquetconverter/converter_test.go | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6e869bf2b81..8050b78ad6e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,7 +10,7 @@ * [FEATURE] Memberlist: Add `-memberlist.cluster-label` and `-memberlist.cluster-label-verification-disabled` to prevent accidental cross-cluster gossip joins and support rolling label rollout. #7385 * [FEATURE] Querier: Add timeout classification to classify query timeouts as 4XX (user error) or 5XX (system error) based on phase timing. When enabled, queries that spend most of their time in PromQL evaluation return `422 Unprocessable Entity` instead of `503 Service Unavailable`. #7374 * [FEATURE] Querier: Implement Resource Based Throttling in Querier. #7442 -* [ENHANCEMENT] Parquet Converter: Add `parquet-converter.max-block-label-names` limit to skip conversion of TSDB blocks with too many label names. # +* [ENHANCEMENT] Parquet Converter: Add `parquet-converter.max-block-label-names` limit to skip conversion of TSDB blocks with too many label names. #7524 * [ENHANCEMENT] Upgrade prometheus alertmanager version to v0.32.1. #7462 * [ENHANCEMENT] Tenant Federation: Avoid purging the regex resolver LRU cache on user-sync ticks when the set of known users has not changed. #7489 * [ENHANCEMENT] Parquet Converter: Add a ring status page to expose the ring status. #7455 diff --git a/pkg/parquetconverter/converter_test.go b/pkg/parquetconverter/converter_test.go index 03760a136ab..c6b79f8a12f 100644 --- a/pkg/parquetconverter/converter_test.go +++ b/pkg/parquetconverter/converter_test.go @@ -529,7 +529,7 @@ func TestConverter_WriteNoConvertMarkForBlockWithTooManyLabels(t *testing.T) { err = services.StartAndAwaitRunning(context.Background(), c) require.NoError(t, err) - defer services.StopAndAwaitTerminated(ctx, c) + defer services.StopAndAwaitTerminated(ctx, c) // nolint:errcheck // Start the converter err = c.convertUser(ctx, logger, c.ring, user) @@ -597,7 +597,7 @@ func TestConverter_SkipBlockWhenNoConvertMarkAlreadyExists(t *testing.T) { err = services.StartAndAwaitRunning(context.Background(), c) require.NoError(t, err) - defer services.StopAndAwaitTerminated(ctx, c) + defer services.StopAndAwaitTerminated(ctx, c) // nolint:errcheck // start converter err = c.convertUser(ctx, logger, c.ring, user) From 414ebecef23f735ab9ac4dda7c089f7d59d0bfd6 Mon Sep 17 00:00:00 2001 From: Siddarth Gundu Date: Mon, 18 May 2026 17:33:13 +0530 Subject: [PATCH 7/9] fix: Add missing parquet_converter_max_block_label_names to exporter test Signed-off-by: Siddarth Gundu --- pkg/util/validation/exporter_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/util/validation/exporter_test.go b/pkg/util/validation/exporter_test.go index 6067ed96067..1414d3f2bb5 100644 --- a/pkg/util/validation/exporter_test.go +++ b/pkg/util/validation/exporter_test.go @@ -101,6 +101,7 @@ func TestOverridesExporter_withConfig(t *testing.T) { cortex_overrides{limit_name="out_of_order_results_cache_ttl",user="tenant-a"} 0 cortex_overrides{limit_name="out_of_order_time_window",user="tenant-a"} 0 cortex_overrides{limit_name="parquet_converter_enabled",user="tenant-a"} 0 + cortex_overrides{limit_name="parquet_converter_max_block_label_names",user="tenant-a"} 0 cortex_overrides{limit_name="parquet_converter_tenant_shard_size",user="tenant-a"} 0 cortex_overrides{limit_name="parquet_max_fetched_chunk_bytes",user="tenant-a"} 0 cortex_overrides{limit_name="parquet_max_fetched_data_bytes",user="tenant-a"} 0 From 06314317fbc956b57a5a56ee4227fe56ebbf7d0a Mon Sep 17 00:00:00 2001 From: Siddarth Gundu Date: Tue, 19 May 2026 17:07:47 +0530 Subject: [PATCH 8/9] Add skippedBlocks metric and change skip log to Debug - Add a new cortex_parquet_converter_blocks_skipped_total counter with user and reason labels - Extract "too_many_labels" to a constant to avoid string duplication Signed-off-by: Siddarth Gundu --- pkg/parquetconverter/converter.go | 3 ++- pkg/parquetconverter/converter_test.go | 6 +++--- pkg/parquetconverter/metrics.go | 5 +++++ pkg/storage/parquet/no_convert_marker.go | 4 +++- 4 files changed, 13 insertions(+), 5 deletions(-) diff --git a/pkg/parquetconverter/converter.go b/pkg/parquetconverter/converter.go index 5b6e5e58297..07e60a00410 100644 --- a/pkg/parquetconverter/converter.go +++ b/pkg/parquetconverter/converter.go @@ -460,7 +460,8 @@ func (c *Converter) convertUser(ctx context.Context, logger log.Logger, ring rin } continue } - level.Info(logger).Log("msg", "skipping parquet conversion for block with too many label names", "block", b.ULID.String(), "label_names", labelNamesCount, "limit", maxBlockLabelNames) + level.Debug(logger).Log("msg", "skipping parquet conversion for block with too many label names", "block", b.ULID.String(), "label_names", labelNamesCount, "limit", maxBlockLabelNames) + c.metrics.skippedBlocks.WithLabelValues(userID, cortex_parquet.NoConvertReasonTooManyLabels).Inc() _ = tsdbBlock.Close() continue } diff --git a/pkg/parquetconverter/converter_test.go b/pkg/parquetconverter/converter_test.go index c6b79f8a12f..bb588f3df62 100644 --- a/pkg/parquetconverter/converter_test.go +++ b/pkg/parquetconverter/converter_test.go @@ -539,7 +539,7 @@ func TestConverter_WriteNoConvertMarkForBlockWithTooManyLabels(t *testing.T) { readNoConvertMark, err := parquet.ReadNoConvertMark(ctx, blockID, userBucket, logger) require.NoError(t, err) require.True(t, parquet.ValidNoConvertMarkVersion(readNoConvertMark.Version)) - require.Equal(t, "too_many_labels", readNoConvertMark.Reason) + require.Equal(t, parquet.NoConvertReasonTooManyLabels, readNoConvertMark.Reason) require.Equal(t, 1, readNoConvertMark.Threshold) require.Equal(t, 2, readNoConvertMark.LabelNamesCount) @@ -585,7 +585,7 @@ func TestConverter_SkipBlockWhenNoConvertMarkAlreadyExists(t *testing.T) { markerV1 := parquet.NoConvertMark{ Version: parquet.CurrentNoConvertMarkVersion, - Reason: "too_many_labels", + Reason: parquet.NoConvertReasonTooManyLabels, LabelNamesCount: 2, Threshold: 1, } @@ -609,7 +609,7 @@ func TestConverter_SkipBlockWhenNoConvertMarkAlreadyExists(t *testing.T) { markerAfter, err := parquet.ReadNoConvertMark(ctx, blockID, userBucket, logger) require.NoError(t, err) require.True(t, parquet.ValidNoConvertMarkVersion(markerAfter.Version)) - require.Equal(t, "too_many_labels", markerAfter.Reason) + require.Equal(t, parquet.NoConvertReasonTooManyLabels, markerAfter.Reason) require.Equal(t, 1, markerAfter.Threshold) require.Equal(t, 2, markerAfter.LabelNamesCount) } diff --git a/pkg/parquetconverter/metrics.go b/pkg/parquetconverter/metrics.go index 2b3e80b0cfd..dcbfaf4a20f 100644 --- a/pkg/parquetconverter/metrics.go +++ b/pkg/parquetconverter/metrics.go @@ -8,6 +8,7 @@ import ( type metrics struct { convertedBlocks *prometheus.CounterVec convertBlockFailures *prometheus.CounterVec + skippedBlocks *prometheus.CounterVec convertBlockDuration *prometheus.GaugeVec convertParquetBlockDelay prometheus.Histogram ownedUsers prometheus.Gauge @@ -23,6 +24,10 @@ func newMetrics(reg prometheus.Registerer) *metrics { Name: "cortex_parquet_converter_block_convert_failures_total", Help: "Total number of failed block conversions per user.", }, []string{"user"}), + skippedBlocks: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ + Name: "cortex_parquet_converter_blocks_skipped_total", + Help: "Total number of blocks skipped during parquet conversion per user and reason.", + }, []string{"user", "reason"}), convertBlockDuration: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{ Name: "cortex_parquet_converter_convert_block_duration_seconds", Help: "Time taken to for the latest block conversion for the user.", diff --git a/pkg/storage/parquet/no_convert_marker.go b/pkg/storage/parquet/no_convert_marker.go index e6a801d5430..c5d7d10e0cf 100644 --- a/pkg/storage/parquet/no_convert_marker.go +++ b/pkg/storage/parquet/no_convert_marker.go @@ -21,6 +21,8 @@ const ( CurrentNoConvertMarkVersion = NoConvertMarkVersion1 NoConvertMarkVersion1 = 1 + + NoConvertReasonTooManyLabels = "too_many_labels" ) type NoConvertMark struct { @@ -55,7 +57,7 @@ func ReadNoConvertMark(ctx context.Context, id ulid.ULID, userBkt objstore.Instr func WriteNoConvertMark(ctx context.Context, id ulid.ULID, userBkt objstore.Bucket, labelNamesCount int, maxBlockLabelNames int) error { noConvertMarker := NoConvertMark{ Version: CurrentNoConvertMarkVersion, - Reason: "too_many_labels", + Reason: NoConvertReasonTooManyLabels, LabelNamesCount: labelNamesCount, Threshold: maxBlockLabelNames, } From e63715f195c505e0bf037e8318bdb34f0c7cdbe6 Mon Sep 17 00:00:00 2001 From: Siddarth Gundu Date: Tue, 19 May 2026 17:39:25 +0530 Subject: [PATCH 9/9] docs: Add max-block-label-names to experimental features list Signed-off-by: Siddarth Gundu --- docs/configuration/v1-guarantees.md | 1 + 1 file changed, 1 insertion(+) diff --git a/docs/configuration/v1-guarantees.md b/docs/configuration/v1-guarantees.md index e52c65a8a5b..fb79456cb06 100644 --- a/docs/configuration/v1-guarantees.md +++ b/docs/configuration/v1-guarantees.md @@ -133,3 +133,4 @@ Currently experimental features are: - Ingester: Active Series Tracker - Per-tenant `active_series_trackers` configuration in runtime config overrides - Counts active series matching PromQL label matchers and exposes `cortex_ingester_active_series_per_tracker` metric +- Parquet Converter: `-parquet-converter.max-block-label-names` (int) - If enabled, adds a no-convert mark and skips blocks with too many labels.