Skip to content
Open
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
* [FEATURE] Memberlist: Add `-memberlist.cluster-label` and `-memberlist.cluster-label-verification-disabled` to prevent accidental cross-cluster gossip joins and support rolling label rollout. #7385
* [FEATURE] Querier: Add timeout classification to classify query timeouts as 4XX (user error) or 5XX (system error) based on phase timing. When enabled, queries that spend most of their time in PromQL evaluation return `422 Unprocessable Entity` instead of `503 Service Unavailable`. #7374
* [FEATURE] Querier: Implement Resource Based Throttling in Querier. #7442
* [ENHANCEMENT] Parquet Converter: Add `parquet-converter.max-block-label-names` limit to skip conversion of TSDB blocks with too many label names. #7524
* [ENHANCEMENT] Upgrade prometheus alertmanager version to v0.32.1. #7462
* [ENHANCEMENT] Tenant Federation: Avoid purging the regex resolver LRU cache on user-sync ticks when the set of known users has not changed. #7489
* [ENHANCEMENT] Parquet Converter: Add a ring status page to expose the ring status. #7455
Expand Down
6 changes: 6 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -4468,6 +4468,12 @@ query_rejection:
# CLI flag: -parquet-converter.sort-columns
[parquet_converter_sort_columns: <list of string> | default = []]

# [Experimental] Maximum number of distinct label names allowed in a TSDB block
# for parquet conversion. If exceeded, the converter writes a no-convert marker.
# 0 to disable.
# CLI flag: -parquet-converter.max-block-label-names
[parquet_converter_max_block_label_names: <int> | default = 0]

# S3 server-side encryption type. Required to enable server-side encryption
# overrides for a specific tenant. If not set, the default S3 client settings
# are used.
Expand Down
1 change: 1 addition & 0 deletions docs/configuration/v1-guarantees.md
Original file line number Diff line number Diff line change
Expand Up @@ -133,3 +133,4 @@ Currently experimental features are:
- Ingester: Active Series Tracker
- Per-tenant `active_series_trackers` configuration in runtime config overrides
- Counts active series matching PromQL label matchers and exposes `cortex_ingester_active_series_per_tracker` metric
- Parquet Converter: `-parquet-converter.max-block-label-names` (int) - If enabled, adds a no-convert mark and skips blocks with too many labels.
142 changes: 142 additions & 0 deletions integration/parquet_converter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
//go:build integration

package integration

import (
"context"
"fmt"
"math/rand"
"path/filepath"
"testing"
"time"

"github.com/prometheus/prometheus/model/labels"
"github.com/stretchr/testify/require"
"github.com/thanos-io/objstore"
"github.com/thanos-io/thanos/pkg/block"
"github.com/thanos-io/thanos/pkg/block/metadata"

"github.com/cortexproject/cortex/integration/e2e"
e2ecache "github.com/cortexproject/cortex/integration/e2e/cache"
e2edb "github.com/cortexproject/cortex/integration/e2e/db"
"github.com/cortexproject/cortex/integration/e2ecortex"
"github.com/cortexproject/cortex/pkg/storage/bucket"
"github.com/cortexproject/cortex/pkg/storage/tsdb"
"github.com/cortexproject/cortex/pkg/util/log"
cortex_testutil "github.com/cortexproject/cortex/pkg/util/test"
)

func TestParquetConverter_NoConvertMarkWithTooManyLabels(t *testing.T) {
s, err := e2e.NewScenario(networkName)
require.NoError(t, err)
defer s.Close()

consul := e2edb.NewConsulWithName("consul")
memcached := e2ecache.NewMemcached()
require.NoError(t, s.StartAndWaitReady(consul, memcached))

baseFlags := mergeFlags(AlertmanagerLocalFlags(), BlocksStorageFlags())
flags := mergeFlags(
baseFlags,
map[string]string{
"-target": "all,parquet-converter",
"-blocks-storage.tsdb.block-ranges-period": "1m,24h",
"-blocks-storage.tsdb.ship-interval": "1s",
"-blocks-storage.bucket-store.sync-interval": "1s",
"-blocks-storage.bucket-store.metadata-cache.bucket-index-content-ttl": "1s",
"-blocks-storage.bucket-store.bucket-index.idle-timeout": "1s",
"-blocks-storage.bucket-store.bucket-index.enabled": "true",
"-blocks-storage.bucket-store.index-cache.backend": tsdb.IndexCacheBackendInMemory,
// compactor
"-compactor.cleanup-interval": "1s",
// Ingester.
"-ring.store": "consul",
"-consul.hostname": consul.NetworkHTTPEndpoint(),
// Distributor.
"-distributor.replication-factor": "1",
// Store-gateway.
"-store-gateway.sharding-enabled": "false",
"--querier.store-gateway-addresses": "nonExistent", // Make sure we do not call Store gateways
// alert manager
"-alertmanager.web.external-url": "http://localhost/alertmanager",
// Enable vertical sharding.
"-frontend.query-vertical-shard-size": "3",
"-frontend.max-cache-freshness": "1m",
// enable experimental promQL funcs
"-querier.enable-promql-experimental-functions": "true",
// parquet-converter
"-parquet-converter.ring.consul.hostname": consul.NetworkHTTPEndpoint(),
"-parquet-converter.conversion-interval": "1s",
"-parquet-converter.enabled": "true",
"-parquet-converter.max-block-label-names": "1",
// Querier
"-querier.enable-parquet-queryable": "true",
// Enable cache for parquet labels and chunks
"-blocks-storage.bucket-store.parquet-labels-cache.backend": "inmemory,memcached",
"-blocks-storage.bucket-store.parquet-labels-cache.memcached.addresses": "dns+" + memcached.NetworkEndpoint(e2ecache.MemcachedPort),
"-blocks-storage.bucket-store.chunks-cache.backend": "inmemory,memcached",
"-blocks-storage.bucket-store.chunks-cache.memcached.addresses": "dns+" + memcached.NetworkEndpoint(e2ecache.MemcachedPort),
},
)

// make alert manager config dir
require.NoError(t, writeFileToSharedDir(s, "alertmanager_configs", []byte{}))

ctx := context.Background()
rnd := rand.New(rand.NewSource(time.Now().Unix()))
dir := filepath.Join(s.SharedDir(), "data")
lbls := []labels.Labels{
labels.FromStrings("__name__", "test_series_a", "job", "test"),
}

numSamples := 60
scrapeInterval := time.Minute
now := time.Now()
start := now.Add(-time.Hour * 24)
end := now.Add(-time.Hour)

minio := e2edb.NewMinio(9000, flags["-blocks-storage.s3.bucket-name"])
require.NoError(t, s.StartAndWaitReady(minio))

cortex := e2ecortex.NewSingleBinary("cortex", flags, "")
require.NoError(t, s.StartAndWaitReady(cortex))
storage, err := e2ecortex.NewS3ClientForMinio(minio, flags["-blocks-storage.s3.bucket-name"])
require.NoError(t, err)
bkt := bucket.NewUserBucketClient("user-1", storage.GetBucket(), nil)

id, err := e2e.CreateBlock(ctx, rnd, dir, lbls, numSamples,
start.UnixMilli(),
end.UnixMilli(),
scrapeInterval.Milliseconds(), 10,
)
require.NoError(t, err)

err = block.Upload(ctx, log.Logger, bkt, filepath.Join(dir, id.String()), metadata.NoneFunc)
require.NoError(t, err)

// Wait for the converter to write the no-convert marker
cortex_testutil.Poll(t, 30*time.Second, true, func() interface{} {
noConvertMarkerPath := fmt.Sprintf("%s/parquet-no-convert-mark.json", id.String())
found := false
err := bkt.Iter(ctx, "", func(name string) error {
if name == noConvertMarkerPath {
found = true
}
return nil
}, objstore.WithRecursiveIter())
require.NoError(t, err)
return found
})

// confirm the conversion did not happen (check both paths)
blockID := id.String()
markerPaths := []string{
fmt.Sprintf("%s/parquet-converter-mark.json", blockID),
fmt.Sprintf("parquet-markers/%s-parquet-converter-mark.json", blockID),
}
for _, markerPath := range markerPaths {
exists, err := bkt.Exists(ctx, markerPath)
require.NoError(t, err)
require.False(t, exists, "converter mark should not exist at %s", markerPath)
}
}
42 changes: 42 additions & 0 deletions pkg/parquetconverter/converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -396,6 +396,21 @@ func (c *Converter) convertUser(ctx context.Context, logger log.Logger, ring rin
continue
}

maxBlockLabelNames := c.limits.ParquetConverterMaxBlockLabelNames(userID)

// If the threshold is enabled, check for no-convert mark
if maxBlockLabelNames > 0 {

noConvertMark, err := cortex_parquet.ReadNoConvertMark(ctx, b.ULID, uBucket, logger)
if err != nil {
level.Error(logger).Log("msg", "failed to read parquet no-convert marker", "block", b.ULID.String(), "err", err)
continue
}
if cortex_parquet.ValidNoConvertMarkVersion(noConvertMark.Version) {
continue
}
}

if err := os.RemoveAll(c.compactRootDir()); err != nil {
level.Error(logger).Log("msg", "failed to remove work directory", "path", c.compactRootDir(), "err", err)
if c.checkConvertError(userID, err) {
Expand Down Expand Up @@ -425,6 +440,33 @@ func (c *Converter) convertUser(ctx context.Context, logger log.Logger, ring rin
continue
}

if maxBlockLabelNames > 0 {
labelNames, err := tsdbBlock.LabelNames(ctx)
if err != nil {
_ = tsdbBlock.Close()
level.Error(logger).Log("msg", "failed to get label names", "block", b.ULID.String(), "err", err)
if c.checkConvertError(userID, err) {
return err
}
continue
}
labelNamesCount := len(labelNames)
if labelNamesCount > maxBlockLabelNames {
if err := cortex_parquet.WriteNoConvertMark(ctx, b.ULID, uBucket, labelNamesCount, maxBlockLabelNames); err != nil {
_ = tsdbBlock.Close()
level.Error(logger).Log("msg", "failed to write parquet no-convert marker", "block", b.ULID.String(), "err", err)
if c.checkConvertError(userID, err) {
return err
}
continue
}
level.Debug(logger).Log("msg", "skipping parquet conversion for block with too many label names", "block", b.ULID.String(), "label_names", labelNamesCount, "limit", maxBlockLabelNames)
c.metrics.skippedBlocks.WithLabelValues(userID, cortex_parquet.NoConvertReasonTooManyLabels).Inc()
_ = tsdbBlock.Close()
continue
}
}

level.Info(logger).Log("msg", "converting block", "block", b.ULID.String(), "dir", bdir)
start := time.Now()

Expand Down
125 changes: 125 additions & 0 deletions pkg/parquetconverter/converter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -488,3 +488,128 @@ func TestConverter_SkipBlocksWithExistingValidMarker(t *testing.T) {
// It should be 0 since the block was already converted
assert.Equal(t, 0.0, testutil.ToFloat64(c.metrics.convertedBlocks.WithLabelValues(user)))
}

func TestConverter_WriteNoConvertMarkForBlockWithTooManyLabels(t *testing.T) {
cfg := prepareConfig()
user := "user"
ringStore, closer := consul.NewInMemoryClient(ring.GetCodec(), log.NewNopLogger(), nil)
t.Cleanup(func() { assert.NoError(t, closer.Close()) })
dir := t.TempDir()

cfg.Ring.InstanceID = "parquet-converter-1"
cfg.Ring.InstanceAddr = "1.2.3.4"
cfg.Ring.KVStore.Mock = ringStore
bucketClient, err := filesystem.NewBucket(t.TempDir())
require.NoError(t, err)
userBucket := bucket.NewPrefixedBucketClient(bucketClient, user)
limits := &validation.Limits{}
flagext.DefaultValues(limits)
limits.ParquetConverterEnabled = true
limits.ParquetConverterMaxBlockLabelNames = 1

c, logger, _ := prepare(t, cfg, objstore.WithNoopInstr(bucketClient), limits, nil)

ctx := context.Background()

lbls := labels.FromStrings("__name__", "test", "job", "foo")

// Create a block
rnd := rand.New(rand.NewSource(time.Now().Unix()))

// 2h blocks are skipped by ShouldConvertBlockToParquet
blockID, err := e2e.CreateBlock(ctx, rnd, dir, []labels.Labels{lbls}, 2, 0, 4*time.Hour.Milliseconds(), time.Minute.Milliseconds(), 10)
require.NoError(t, err)

// Upload the block to the bucket
blockDir := fmt.Sprintf("%s/%s", dir, blockID.String())
b, err := tsdb.OpenBlock(nil, blockDir, nil, nil)
require.NoError(t, err)
err = block.Upload(ctx, logger, userBucket, b.Dir(), metadata.NoneFunc)
require.NoError(t, err)

err = services.StartAndAwaitRunning(context.Background(), c)
require.NoError(t, err)
defer services.StopAndAwaitTerminated(ctx, c) // nolint:errcheck

// Start the converter
err = c.convertUser(ctx, logger, c.ring, user)
require.NoError(t, err)

// Verify the marker was written correctly
readNoConvertMark, err := parquet.ReadNoConvertMark(ctx, blockID, userBucket, logger)
require.NoError(t, err)
require.True(t, parquet.ValidNoConvertMarkVersion(readNoConvertMark.Version))
require.Equal(t, parquet.NoConvertReasonTooManyLabels, readNoConvertMark.Reason)
require.Equal(t, 1, readNoConvertMark.Threshold)
require.Equal(t, 2, readNoConvertMark.LabelNamesCount)

// Confirm conversion did not happen
assert.Equal(t, 0.0, testutil.ToFloat64(c.metrics.convertedBlocks.WithLabelValues(user)))
}

func TestConverter_SkipBlockWhenNoConvertMarkAlreadyExists(t *testing.T) {
cfg := prepareConfig()
user := "user"
ringStore, closer := consul.NewInMemoryClient(ring.GetCodec(), log.NewNopLogger(), nil)
t.Cleanup(func() { assert.NoError(t, closer.Close()) })
dir := t.TempDir()

cfg.Ring.InstanceID = "parquet-converter-1"
cfg.Ring.InstanceAddr = "1.2.3.4"
cfg.Ring.KVStore.Mock = ringStore
bucketClient, err := filesystem.NewBucket(t.TempDir())
require.NoError(t, err)
userBucket := bucket.NewPrefixedBucketClient(bucketClient, user)
limits := &validation.Limits{}
flagext.DefaultValues(limits)
limits.ParquetConverterEnabled = true
limits.ParquetConverterMaxBlockLabelNames = 1

c, logger, _ := prepare(t, cfg, objstore.WithNoopInstr(bucketClient), limits, nil)

ctx := context.Background()

lbls := labels.FromStrings("__name__", "test", "job", "foo")
rnd := rand.New(rand.NewSource(time.Now().Unix()))

// 2h blocks are skipped by ShouldConvertBlockToParquet
blockID, err := e2e.CreateBlock(ctx, rnd, dir, []labels.Labels{lbls}, 2, 0,
4*time.Hour.Milliseconds(), time.Minute.Milliseconds(), 10)
require.NoError(t, err)

blockDir := fmt.Sprintf("%s/%s", dir, blockID.String())
b, err := tsdb.OpenBlock(nil, blockDir, nil, nil)
require.NoError(t, err)
err = block.Upload(ctx, logger, userBucket, b.Dir(), metadata.NoneFunc)
require.NoError(t, err)

markerV1 := parquet.NoConvertMark{
Version: parquet.CurrentNoConvertMarkVersion,
Reason: parquet.NoConvertReasonTooManyLabels,
LabelNamesCount: 2,
Threshold: 1,
}
markerBytes, err := json.Marshal(markerV1)
require.NoError(t, err)
markerPath := path.Join(blockID.String(), parquet.NoConvertMarkerFileName)
err = userBucket.Upload(ctx, markerPath, bytes.NewReader(markerBytes))
require.NoError(t, err)

err = services.StartAndAwaitRunning(context.Background(), c)
require.NoError(t, err)
defer services.StopAndAwaitTerminated(ctx, c) // nolint:errcheck

// start converter
err = c.convertUser(ctx, logger, c.ring, user)
require.NoError(t, err)

// confirm conversion was skipped
assert.Equal(t, 0.0, testutil.ToFloat64(c.metrics.convertedBlocks.WithLabelValues(user)))

markerAfter, err := parquet.ReadNoConvertMark(ctx, blockID, userBucket, logger)
require.NoError(t, err)
require.True(t, parquet.ValidNoConvertMarkVersion(markerAfter.Version))
require.Equal(t, parquet.NoConvertReasonTooManyLabels, markerAfter.Reason)
require.Equal(t, 1, markerAfter.Threshold)
require.Equal(t, 2, markerAfter.LabelNamesCount)
}
5 changes: 5 additions & 0 deletions pkg/parquetconverter/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
type metrics struct {
convertedBlocks *prometheus.CounterVec
convertBlockFailures *prometheus.CounterVec
skippedBlocks *prometheus.CounterVec
convertBlockDuration *prometheus.GaugeVec
convertParquetBlockDelay prometheus.Histogram
ownedUsers prometheus.Gauge
Expand All @@ -23,6 +24,10 @@ func newMetrics(reg prometheus.Registerer) *metrics {
Name: "cortex_parquet_converter_block_convert_failures_total",
Help: "Total number of failed block conversions per user.",
}, []string{"user"}),
skippedBlocks: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "cortex_parquet_converter_blocks_skipped_total",
Help: "Total number of blocks skipped during parquet conversion per user and reason.",
}, []string{"user", "reason"}),
convertBlockDuration: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{
Name: "cortex_parquet_converter_convert_block_duration_seconds",
Help: "Time taken to for the latest block conversion for the user.",
Expand Down
Loading
Loading