diff --git a/go.mod b/go.mod index 60528b76fda..9813fa612dc 100644 --- a/go.mod +++ b/go.mod @@ -82,7 +82,7 @@ require ( github.com/hashicorp/golang-lru/v2 v2.0.7 github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 github.com/parquet-go/parquet-go v0.25.0 - github.com/prometheus-community/parquet-common v0.0.0-20250522182606-e046c038dc73 + github.com/prometheus-community/parquet-common v0.0.0-20250528231323-eec9c3c020f0 github.com/prometheus/procfs v0.15.1 github.com/sercand/kuberesolver/v5 v5.1.1 github.com/tjhop/slog-gokit v0.1.3 diff --git a/go.sum b/go.sum index 1ee8edc2ddd..46725149ccb 100644 --- a/go.sum +++ b/go.sum @@ -1573,8 +1573,8 @@ github.com/posener/complete v1.1.1/go.mod h1:em0nMJCgc9GFtwrmVmEMR/ZL6WyhyjMBndr github.com/posener/complete v1.2.3/go.mod h1:WZIdtGGp+qx0sLrYKtIRAruyNpv6hFCicSgv7Sy7s/s= github.com/prashantv/gostub v1.1.0 h1:BTyx3RfQjRHnUWaGF9oQos79AlQ5k8WNktv7VGvVH4g= github.com/prashantv/gostub v1.1.0/go.mod h1:A5zLQHz7ieHGG7is6LLXLz7I8+3LZzsrV0P1IAHhP5U= -github.com/prometheus-community/parquet-common v0.0.0-20250522182606-e046c038dc73 h1:AogORrmarkYfUOI7/lqOhz9atYmLZo69vPQ/SFkPSxE= -github.com/prometheus-community/parquet-common v0.0.0-20250522182606-e046c038dc73/go.mod h1:zRW/xXBlELf8v9h9uqWvDkjOr3N5BtQGZ6LsDX9Ea/A= +github.com/prometheus-community/parquet-common v0.0.0-20250528231323-eec9c3c020f0 h1:XCSo9v3if0v0G+aAO/hSUr/Ck9KJXcUPzDFt1dJnAV8= +github.com/prometheus-community/parquet-common v0.0.0-20250528231323-eec9c3c020f0/go.mod h1:zRW/xXBlELf8v9h9uqWvDkjOr3N5BtQGZ6LsDX9Ea/A= github.com/prometheus-community/prom-label-proxy v0.8.1-0.20240127162815-c1195f9aabc0 h1:owfYHh79h8Y5HvNMGyww+DaVwo10CKiRW1RQrrZzIwg= github.com/prometheus-community/prom-label-proxy v0.8.1-0.20240127162815-c1195f9aabc0/go.mod h1:rT989D4UtOcfd9tVqIZRVIM8rkg+9XbreBjFNEKXvVI= github.com/prometheus/alertmanager v0.28.1 h1:BK5pCoAtaKg01BYRUJhEDV1tqJMEtYBGzPw8QdvnnvA= diff --git a/pkg/parquetconverter/converter.go b/pkg/parquetconverter/converter.go index 62496a6811f..1de6df37909 100644 --- a/pkg/parquetconverter/converter.go +++ b/pkg/parquetconverter/converter.go @@ -5,8 +5,10 @@ import ( "flag" "fmt" "hash/fnv" + "math/rand" "os" "path/filepath" + "sort" "strings" "time" @@ -47,6 +49,8 @@ var RingOp = ring.NewOp([]ring.InstanceState{ring.ACTIVE}, nil) type Config struct { MetaSyncConcurrency int `yaml:"meta_sync_concurrency"` ConversionInterval time.Duration `yaml:"conversion_interval"` + MaxRowsPerRowGroup int `yaml:"max_rows_per_row_group"` + FileBufferEnabled bool `yaml:"file_buffer_enabled"` DataDir string `yaml:"data_dir"` @@ -78,6 +82,8 @@ type Converter struct { blockRanges []int64 fetcherMetrics *block.FetcherMetrics + + baseConverterOptions []convert.ConvertOption } func (cfg *Config) RegisterFlags(f *flag.FlagSet) { @@ -85,7 +91,9 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.StringVar(&cfg.DataDir, "parquet-converter.data-dir", "./data", "Data directory in which to cache blocks and process conversions.") f.IntVar(&cfg.MetaSyncConcurrency, "parquet-converter.meta-sync-concurrency", 20, "Number of Go routines to use when syncing block meta files from the long term storage.") + f.IntVar(&cfg.MaxRowsPerRowGroup, "parquet-converter.max-rows-per-row-group", 1e6, "Max number of rows per parquet row group.") f.DurationVar(&cfg.ConversionInterval, "parquet-converter.conversion-interval", time.Minute, "The frequency at which the conversion job runs.") + f.BoolVar(&cfg.FileBufferEnabled, "parquet-converter.file-buffer-enabled", true, "Whether to enable buffering the writes in disk to reduce memory utilization.") } func NewConverter(cfg Config, storageCfg cortex_tsdb.BlocksStorageConfig, blockRanges []int64, logger log.Logger, registerer prometheus.Registerer, limits *validation.Overrides) (*Converter, error) { @@ -106,6 +114,11 @@ func newConverter(cfg Config, bkt objstore.InstrumentedBucket, storageCfg cortex blockRanges: blockRanges, fetcherMetrics: block.NewFetcherMetrics(registerer, nil, nil), bkt: bkt, + baseConverterOptions: []convert.ConvertOption{ + convert.WithSortBy(labels.MetricName), + convert.WithColDuration(time.Hour * 8), + convert.WithRowGroupSize(cfg.MaxRowsPerRowGroup), + }, } c.Service = services.NewBasicService(c.starting, c.running, c.stopping) @@ -163,6 +176,10 @@ func (c *Converter) running(ctx context.Context) error { continue } ownedUsers := map[string]struct{}{} + rand.Shuffle(len(users), func(i, j int) { + users[i], users[j] = users[j], users[i] + }) + for _, userID := range users { if !c.limits.ParquetConverterEnabled(userID) { continue @@ -293,11 +310,20 @@ func (c *Converter) convertUser(ctx context.Context, logger log.Logger, ring rin return errors.Wrap(err, "error creating block fetcher") } - blocks, _, err := fetcher.Fetch(ctx) + blks, _, err := fetcher.Fetch(ctx) if err != nil { return errors.Wrapf(err, "failed to fetch blocks for user %s", userID) } + blocks := make([]*metadata.Meta, 0, len(blks)) + for _, blk := range blks { + blocks = append(blocks, blk) + } + + sort.Slice(blocks, func(i, j int) bool { + return blocks[i].MinTime > blocks[j].MinTime + }) + for _, b := range blocks { ok, err := c.ownBlock(ring, b.ULID.String()) if err != nil { @@ -345,22 +371,27 @@ func (c *Converter) convertUser(ctx context.Context, logger log.Logger, ring rin } level.Info(logger).Log("msg", "converting block", "block", b.ULID.String(), "dir", bdir) + + converterOpts := append(c.baseConverterOptions, convert.WithName(b.ULID.String())) + + if c.cfg.FileBufferEnabled { + converterOpts = append(converterOpts, convert.WithColumnPageBuffers(parquet.NewFileBufferPool(bdir, "buffers.*"))) + } + _, err = convert.ConvertTSDBBlock( ctx, uBucket, tsdbBlock.MinTime(), tsdbBlock.MaxTime(), []convert.Convertible{tsdbBlock}, - convert.WithSortBy(labels.MetricName), - convert.WithColDuration(time.Hour*8), - convert.WithName(b.ULID.String()), - convert.WithColumnPageBuffers(parquet.NewFileBufferPool(bdir, "buffers.*")), + converterOpts..., ) _ = tsdbBlock.Close() if err != nil { level.Error(logger).Log("msg", "Error converting block", "err", err) + continue } err = cortex_parquet.WriteConverterMark(ctx, b.ULID, uBucket) diff --git a/pkg/querier/parquet_queryable.go b/pkg/querier/parquet_queryable.go index fce24c62d51..6f60d6b6909 100644 --- a/pkg/querier/parquet_queryable.go +++ b/pkg/querier/parquet_queryable.go @@ -5,6 +5,8 @@ import ( "time" "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/parquet-go/parquet-go" "github.com/pkg/errors" "github.com/prometheus-community/parquet-common/schema" "github.com/prometheus-community/parquet-common/search" @@ -16,14 +18,17 @@ import ( "github.com/prometheus/prometheus/tsdb/chunkenc" "github.com/prometheus/prometheus/util/annotations" "github.com/thanos-io/thanos/pkg/strutil" + "golang.org/x/sync/errgroup" "github.com/cortexproject/cortex/pkg/storage/bucket" cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb" "github.com/cortexproject/cortex/pkg/storage/tsdb/bucketindex" "github.com/cortexproject/cortex/pkg/tenant" "github.com/cortexproject/cortex/pkg/util" + util_log "github.com/cortexproject/cortex/pkg/util/log" "github.com/cortexproject/cortex/pkg/util/multierror" "github.com/cortexproject/cortex/pkg/util/services" + "github.com/cortexproject/cortex/pkg/util/validation" ) type parquetQueryableFallbackMetrics struct { @@ -59,12 +64,15 @@ type parquetQueryableWithFallback struct { // metrics metrics *parquetQueryableFallbackMetrics + + limits *validation.Overrides + logger log.Logger } func NewParquetQueryable( config Config, storageCfg cortex_tsdb.BlocksStorageConfig, - limits BlocksStoreLimits, + limits *validation.Overrides, blockStorageQueryable *BlocksStoreQueryable, logger log.Logger, reg prometheus.Registerer, @@ -93,18 +101,29 @@ func NewParquetQueryable( } userBkt := bucket.NewUserBucketClient(userID, bucketClient, limits) - shards := make([]*parquet_storage.ParquetShard, 0, len(blocks)) - - for _, block := range blocks { - // we always only have 1 shard - shard 0 - shard, err := parquet_storage.OpenParquetShard(ctx, userBkt, block.ID.String(), 0) - if err != nil { - return nil, err - } - shards = append(shards, shard) + shards := make([]*parquet_storage.ParquetShard, len(blocks)) + errGroup := &errgroup.Group{} + + for i, block := range blocks { + errGroup.Go(func() error { + // we always only have 1 shard - shard 0 + shard, err := parquet_storage.OpenParquetShard(ctx, + userBkt, + block.ID.String(), + 0, + parquet_storage.WithFileOptions( + parquet.SkipMagicBytes(true), + parquet.ReadBufferSize(100*1024), + parquet.SkipBloomFilters(true), + ), + parquet_storage.WithOptimisticReader(true), + ) + shards[i] = shard + return err + }) } - return shards, nil + return shards, errGroup.Wait() }) p := &parquetQueryableWithFallback{ @@ -115,6 +134,8 @@ func NewParquetQueryable( subservicesWatcher: services.NewFailureWatcher(), finder: blockStorageQueryable.finder, metrics: newParquetQueryableFallbackMetrics(reg), + limits: limits, + logger: logger, } p.Service = services.NewBasicService(p.starting, p.running, p.stopping) @@ -164,6 +185,8 @@ func (p *parquetQueryableWithFallback) Querier(mint, maxt int64) (storage.Querie blocksStoreQuerier: bsq, finder: p.finder, metrics: p.metrics, + limits: p.limits, + logger: p.logger, }, nil } @@ -181,6 +204,9 @@ type parquetQuerierWithFallback struct { // metrics metrics *parquetQueryableFallbackMetrics + + limits *validation.Overrides + logger log.Logger } func (q *parquetQuerierWithFallback) LabelValues(ctx context.Context, name string, hints *storage.LabelHints, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) { @@ -275,6 +301,18 @@ func (q *parquetQuerierWithFallback) LabelNames(ctx context.Context, hints *stor } func (q *parquetQuerierWithFallback) Select(ctx context.Context, sortSeries bool, hints *storage.SelectHints, matchers ...*labels.Matcher) storage.SeriesSet { + userID, err := tenant.TenantID(ctx) + if err != nil { + storage.ErrSeriesSet(err) + } + + if q.limits.QueryVerticalShardSize(userID) > 1 { + uLogger := util_log.WithUserID(userID, q.logger) + level.Warn(uLogger).Log("msg", "parquet queryable enabled but vertical sharding > 1. Falling back to the block storage") + + return q.blocksStoreQuerier.Select(ctx, sortSeries, hints, matchers...) + } + mint, maxt, limit := q.minT, q.maxT, 0 if hints != nil { @@ -288,6 +326,11 @@ func (q *parquetQuerierWithFallback) Select(ctx context.Context, sortSeries bool serieSets := []storage.SeriesSet{} + // Lets sort the series to merge + if len(parquet) > 0 && len(remaining) > 0 { + sortSeries = true + } + if len(parquet) > 0 { serieSets = append(serieSets, q.parquetQuerier.Select(InjectBlocksIntoContext(ctx, parquet...), sortSeries, hints, matchers...)) } diff --git a/pkg/querier/parquet_queryable_test.go b/pkg/querier/parquet_queryable_test.go index 17768e4eaa7..cd6aa02f874 100644 --- a/pkg/querier/parquet_queryable_test.go +++ b/pkg/querier/parquet_queryable_test.go @@ -19,6 +19,8 @@ import ( "github.com/cortexproject/cortex/pkg/querier/series" "github.com/cortexproject/cortex/pkg/storage/parquet" "github.com/cortexproject/cortex/pkg/storage/tsdb/bucketindex" + "github.com/cortexproject/cortex/pkg/util/flagext" + "github.com/cortexproject/cortex/pkg/util/validation" ) func TestParquetQueryableFallbackLogic(t *testing.T) { @@ -27,33 +29,79 @@ func TestParquetQueryableFallbackLogic(t *testing.T) { minT := int64(10) maxT := int64(20) - stores := &blocksStoreSetMock{mockedResponses: []interface{}{ - map[BlocksStoreClient][]ulid.ULID{ - &storeGatewayClientMock{remoteAddr: "1.1.1.1", - mockedSeriesResponses: []*storepb.SeriesResponse{ - mockSeriesResponse(labels.Labels{{Name: labels.MetricName, Value: "fromSg"}}, []cortexpb.Sample{{Value: 1, TimestampMs: minT}, {Value: 2, TimestampMs: minT + 1}}, nil, nil), - mockHintsResponse(block1, block2), - }, - mockedLabelNamesResponse: &storepb.LabelNamesResponse{ - Names: namesFromSeries(labels.FromMap(map[string]string{labels.MetricName: "fromSg", "fromSg": "fromSg"})), - Warnings: []string{}, - Hints: mockNamesHints(block1, block2), - }, - mockedLabelValuesResponse: &storepb.LabelValuesResponse{ - Values: valuesFromSeries(labels.MetricName, labels.FromMap(map[string]string{labels.MetricName: "fromSg", "fromSg": "fromSg"})), - Warnings: []string{}, - Hints: mockValuesHints(block1, block2), - }, - }: {block1, block2}}, - }, + createStore := func() *blocksStoreSetMock { + return &blocksStoreSetMock{mockedResponses: []interface{}{ + map[BlocksStoreClient][]ulid.ULID{ + &storeGatewayClientMock{remoteAddr: "1.1.1.1", + mockedSeriesResponses: []*storepb.SeriesResponse{ + mockSeriesResponse(labels.Labels{{Name: labels.MetricName, Value: "fromSg"}}, []cortexpb.Sample{{Value: 1, TimestampMs: minT}, {Value: 2, TimestampMs: minT + 1}}, nil, nil), + mockHintsResponse(block1, block2), + }, + mockedLabelNamesResponse: &storepb.LabelNamesResponse{ + Names: namesFromSeries(labels.FromMap(map[string]string{labels.MetricName: "fromSg", "fromSg": "fromSg"})), + Warnings: []string{}, + Hints: mockNamesHints(block1, block2), + }, + mockedLabelValuesResponse: &storepb.LabelValuesResponse{ + Values: valuesFromSeries(labels.MetricName, labels.FromMap(map[string]string{labels.MetricName: "fromSg", "fromSg": "fromSg"})), + Warnings: []string{}, + Hints: mockValuesHints(block1, block2), + }, + }: {block1, block2}}, + }, + } } matchers := []*labels.Matcher{ labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, "fromSg"), } ctx := user.InjectOrgID(context.Background(), "user-1") + + t.Run("should fallback when vertical sharding is enabled", func(t *testing.T) { + finder := &blocksFinderMock{} + stores := createStore() + + q := &blocksStoreQuerier{ + minT: minT, + maxT: maxT, + finder: finder, + stores: stores, + consistency: NewBlocksConsistencyChecker(0, 0, log.NewNopLogger(), nil), + logger: log.NewNopLogger(), + metrics: newBlocksStoreQueryableMetrics(prometheus.NewPedanticRegistry()), + limits: &blocksStoreLimitsMock{}, + + storeGatewayConsistencyCheckMaxAttempts: 3, + } + + mParquetQuerier := &mockParquetQuerier{} + pq := &parquetQuerierWithFallback{ + minT: minT, + maxT: maxT, + finder: finder, + blocksStoreQuerier: q, + parquetQuerier: mParquetQuerier, + metrics: newParquetQueryableFallbackMetrics(prometheus.NewRegistry()), + limits: defaultOverrides(t, 4), + logger: log.NewNopLogger(), + } + + finder.On("GetBlocks", mock.Anything, "user-1", minT, maxT).Return(bucketindex.Blocks{ + &bucketindex.Block{ID: block1, Parquet: &parquet.ConverterMarkMeta{Version: 1}}, + &bucketindex.Block{ID: block2, Parquet: &parquet.ConverterMarkMeta{Version: 1}}, + }, map[ulid.ULID]*bucketindex.BlockDeletionMark(nil), nil) + + t.Run("select", func(t *testing.T) { + ss := pq.Select(ctx, true, nil, matchers...) + require.NoError(t, ss.Err()) + require.Len(t, stores.queriedBlocks, 2) + require.Len(t, mParquetQuerier.queriedBlocks, 0) + }) + }) + t.Run("should fallback all blocks", func(t *testing.T) { finder := &blocksFinderMock{} + stores := createStore() q := &blocksStoreQuerier{ minT: minT, @@ -76,6 +124,8 @@ func TestParquetQueryableFallbackLogic(t *testing.T) { blocksStoreQuerier: q, parquetQuerier: mParquetQuerier, metrics: newParquetQueryableFallbackMetrics(prometheus.NewRegistry()), + limits: defaultOverrides(t, 0), + logger: log.NewNopLogger(), } finder.On("GetBlocks", mock.Anything, "user-1", minT, maxT).Return(bucketindex.Blocks{ @@ -111,6 +161,7 @@ func TestParquetQueryableFallbackLogic(t *testing.T) { t.Run("should fallback partial blocks", func(t *testing.T) { finder := &blocksFinderMock{} + stores := createStore() q := &blocksStoreQuerier{ minT: minT, @@ -133,6 +184,8 @@ func TestParquetQueryableFallbackLogic(t *testing.T) { blocksStoreQuerier: q, parquetQuerier: mParquetQuerier, metrics: newParquetQueryableFallbackMetrics(prometheus.NewRegistry()), + limits: defaultOverrides(t, 0), + logger: log.NewNopLogger(), } finder.On("GetBlocks", mock.Anything, "user-1", minT, maxT).Return(bucketindex.Blocks{ @@ -174,6 +227,7 @@ func TestParquetQueryableFallbackLogic(t *testing.T) { t.Run("should query only parquet blocks when possible", func(t *testing.T) { finder := &blocksFinderMock{} + stores := createStore() q := &blocksStoreQuerier{ minT: minT, @@ -196,6 +250,8 @@ func TestParquetQueryableFallbackLogic(t *testing.T) { blocksStoreQuerier: q, parquetQuerier: mParquetQuerier, metrics: newParquetQueryableFallbackMetrics(prometheus.NewRegistry()), + limits: defaultOverrides(t, 0), + logger: log.NewNopLogger(), } finder.On("GetBlocks", mock.Anything, "user-1", minT, maxT).Return(bucketindex.Blocks{ @@ -237,6 +293,16 @@ func TestParquetQueryableFallbackLogic(t *testing.T) { } +func defaultOverrides(t *testing.T, queryVerticalShardSize int) *validation.Overrides { + limits := validation.Limits{} + flagext.DefaultValues(&limits) + limits.QueryVerticalShardSize = queryVerticalShardSize + + overrides, err := validation.NewOverrides(limits, nil) + require.NoError(t, err) + return overrides +} + type mockParquetQuerier struct { queriedBlocks []*bucketindex.Block } diff --git a/vendor/github.com/prometheus-community/parquet-common/convert/convert.go b/vendor/github.com/prometheus-community/parquet-common/convert/convert.go index d4862646a69..e87aee883f6 100644 --- a/vendor/github.com/prometheus-community/parquet-common/convert/convert.go +++ b/vendor/github.com/prometheus-community/parquet-common/convert/convert.go @@ -23,9 +23,9 @@ import ( "strings" "time" - "github.com/efficientgo/core/errors" "github.com/hashicorp/go-multierror" "github.com/parquet-go/parquet-go" + "github.com/pkg/errors" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb" @@ -302,10 +302,13 @@ func sortedPostings(ctx context.Context, indexr tsdb.IndexReader, compare func(a } func (rr *TsdbRowReader) ReadRows(buf []parquet.Row) (int, error) { + type chkBytesOrError struct { + chkBytes [][]byte + err error + } type chunkSeriesPromise struct { - s storage.ChunkSeries - chunkBytesChan chan [][]byte - err error + s storage.ChunkSeries + c chan chkBytesOrError } c := make(chan chunkSeriesPromise, rr.concurrency) @@ -318,8 +321,8 @@ func (rr *TsdbRowReader) ReadRows(buf []parquet.Row) (int, error) { it := s.Iterator(nil) promise := chunkSeriesPromise{ - s: s, - chunkBytesChan: make(chan [][]byte, 1), + s: s, + c: make(chan chkBytesOrError, 1), } select { @@ -329,8 +332,7 @@ func (rr *TsdbRowReader) ReadRows(buf []parquet.Row) (int, error) { } go func() { chkBytes, err := rr.encoder.Encode(it) - promise.err = err - promise.chunkBytesChan <- chkBytes + promise.c <- chkBytesOrError{chkBytes: chkBytes, err: err} }() i++ } @@ -345,9 +347,12 @@ func (rr *TsdbRowReader) ReadRows(buf []parquet.Row) (int, error) { for promise := range c { j++ - if promise.err != nil { - return i, promise.err + + chkBytesOrErr := <-promise.c + if err := chkBytesOrErr.err; err != nil { + return 0, fmt.Errorf("unable encode chunks: %w", err) } + chkBytes := chkBytesOrErr.chkBytes rr.rowBuilder.Reset() lblsIdxs = lblsIdxs[:0] @@ -361,7 +366,6 @@ func (rr *TsdbRowReader) ReadRows(buf []parquet.Row) (int, error) { rr.rowBuilder.Add(colIndex.ColumnIndex, parquet.ValueOf(schema.EncodeIntSlice(lblsIdxs))) - chkBytes := <-promise.chunkBytesChan // skip series that have no chunks in the requested time if allChunksEmpty(chkBytes) { continue diff --git a/vendor/github.com/prometheus-community/parquet-common/convert/writer.go b/vendor/github.com/prometheus-community/parquet-common/convert/writer.go index e182bf3c980..af60bdb182a 100644 --- a/vendor/github.com/prometheus-community/parquet-common/convert/writer.go +++ b/vendor/github.com/prometheus-community/parquet-common/convert/writer.go @@ -18,9 +18,9 @@ import ( "fmt" "io" - "github.com/efficientgo/core/errors" "github.com/hashicorp/go-multierror" "github.com/parquet-go/parquet-go" + "github.com/pkg/errors" "github.com/prometheus/prometheus/util/zeropool" "github.com/thanos-io/objstore" "golang.org/x/sync/errgroup" diff --git a/vendor/github.com/prometheus-community/parquet-common/schema/encoder.go b/vendor/github.com/prometheus-community/parquet-common/schema/encoder.go index 6b6556a4c3f..cde6ca69bd6 100644 --- a/vendor/github.com/prometheus-community/parquet-common/schema/encoder.go +++ b/vendor/github.com/prometheus-community/parquet-common/schema/encoder.go @@ -95,11 +95,12 @@ func (e *PrometheusParquetChunksEncoder) Encode(it chunks.Iterator) ([][]byte, e chkIdx := e.schema.DataColumIdx(t) reEncodedChunksAppenders[chkIdx][chunkenc.EncXOR].Append(t, v) - if t < reEncodedChunks[chkIdx][chunkenc.EncXOR][len(reEncodedChunks[chkIdx][chunkenc.EncXOR])-1].MinTime { - reEncodedChunks[chkIdx][chunkenc.EncXOR][len(reEncodedChunks[chkIdx][chunkenc.EncXOR])-1].MinTime = t + chunk := reEncodedChunks[chkIdx][chunkenc.EncXOR][len(reEncodedChunks[chkIdx][chunkenc.EncXOR])-1] + if t < chunk.MinTime { + chunk.MinTime = t } - if t > reEncodedChunks[chkIdx][chunkenc.EncXOR][len(reEncodedChunks[chkIdx][chunkenc.EncXOR])-1].MaxTime { - reEncodedChunks[chkIdx][chunkenc.EncXOR][len(reEncodedChunks[chkIdx][chunkenc.EncXOR])-1].MaxTime = t + if t > chunk.MaxTime { + chunk.MaxTime = t } } case chunkenc.EncFloatHistogram: @@ -124,11 +125,12 @@ func (e *PrometheusParquetChunksEncoder) Encode(it chunks.Iterator) ([][]byte, e reEncodedChunks[chkIdx][chunkenc.EncFloatHistogram][len(reEncodedChunks[chkIdx][chunkenc.EncFloatHistogram])-1].Chunk = newC } - if t < reEncodedChunks[chkIdx][chunkenc.EncFloatHistogram][len(reEncodedChunks[chkIdx][chunkenc.EncFloatHistogram])-1].MinTime { - reEncodedChunks[chkIdx][chunkenc.EncFloatHistogram][len(reEncodedChunks[chkIdx][chunkenc.EncFloatHistogram])-1].MinTime = t + chunk := reEncodedChunks[chkIdx][chunkenc.EncFloatHistogram][len(reEncodedChunks[chkIdx][chunkenc.EncFloatHistogram])-1] + if t < chunk.MinTime { + chunk.MinTime = t } - if t > reEncodedChunks[chkIdx][chunkenc.EncFloatHistogram][len(reEncodedChunks[chkIdx][chunkenc.EncFloatHistogram])-1].MaxTime { - reEncodedChunks[chkIdx][chunkenc.EncFloatHistogram][len(reEncodedChunks[chkIdx][chunkenc.EncFloatHistogram])-1].MaxTime = t + if t > chunk.MaxTime { + chunk.MaxTime = t } } case chunkenc.EncHistogram: @@ -153,11 +155,12 @@ func (e *PrometheusParquetChunksEncoder) Encode(it chunks.Iterator) ([][]byte, e reEncodedChunks[chkIdx][chunkenc.EncHistogram][len(reEncodedChunks[chkIdx][chunkenc.EncHistogram])-1].Chunk = newC } - if t < reEncodedChunks[chkIdx][chunkenc.EncHistogram][len(reEncodedChunks[chkIdx][chunkenc.EncHistogram])-1].MinTime { - reEncodedChunks[chkIdx][chunkenc.EncHistogram][len(reEncodedChunks[chkIdx][chunkenc.EncHistogram])-1].MinTime = t + chunk := reEncodedChunks[chkIdx][chunkenc.EncHistogram][len(reEncodedChunks[chkIdx][chunkenc.EncHistogram])-1] + if t < chunk.MinTime { + chunk.MinTime = t } - if t > reEncodedChunks[chkIdx][chunkenc.EncHistogram][len(reEncodedChunks[chkIdx][chunkenc.EncHistogram])-1].MaxTime { - reEncodedChunks[chkIdx][chunkenc.EncHistogram][len(reEncodedChunks[chkIdx][chunkenc.EncHistogram])-1].MaxTime = t + if t > chunk.MaxTime { + chunk.MaxTime = t } } default: @@ -200,7 +203,9 @@ func NewPrometheusParquetChunksDecoder(pool chunkenc.Pool) *PrometheusParquetChu } func (e *PrometheusParquetChunksDecoder) Decode(data []byte, mint, maxt int64) ([]chunks.Meta, error) { - result := make([]chunks.Meta, 0, len(data)) + // We usually have only 1 chunk per column as the chunks got re-encoded. Lets create a slice with capacity of 5 + // just in case of re-encoding. + result := make([]chunks.Meta, 0, 5) b := bytes.NewBuffer(data) diff --git a/vendor/github.com/prometheus-community/parquet-common/schema/schema_builder.go b/vendor/github.com/prometheus-community/parquet-common/schema/schema_builder.go index 2ed8b423e89..b06db0d46ef 100644 --- a/vendor/github.com/prometheus-community/parquet-common/schema/schema_builder.go +++ b/vendor/github.com/prometheus-community/parquet-common/schema/schema_builder.go @@ -17,9 +17,8 @@ import ( "fmt" "strconv" - "github.com/efficientgo/core/errors" - "github.com/parquet-go/parquet-go" + "github.com/pkg/errors" ) type Builder struct { @@ -164,13 +163,14 @@ func (s *TSDBSchema) LabelsProjection() (*TSDBProjection, error) { g[c[0]] = lc.Node } return &TSDBProjection{ - Schema: WithCompression(parquet.NewSchema("labels-projection", g)), + Schema: WithCompression(parquet.NewSchema("labels-projection", g)), + ExtraOptions: []parquet.WriterOption{parquet.SkipPageBounds(ColIndexes)}, }, nil } func (s *TSDBSchema) ChunksProjection() (*TSDBProjection, error) { g := make(parquet.Group) - skipPageBoundsOpts := make([]parquet.WriterOption, 0, len(s.DataColsIndexes)) + writeOptions := make([]parquet.WriterOption, 0, len(s.DataColsIndexes)) for _, c := range s.Schema.Columns() { if ok := IsDataColumn(c[0]); !ok { @@ -181,11 +181,11 @@ func (s *TSDBSchema) ChunksProjection() (*TSDBProjection, error) { return nil, fmt.Errorf("column %v not found", c) } g[c[0]] = lc.Node - skipPageBoundsOpts = append(skipPageBoundsOpts, parquet.SkipPageBounds(c...)) + writeOptions = append(writeOptions, parquet.SkipPageBounds(c...)) } return &TSDBProjection{ Schema: WithCompression(parquet.NewSchema("chunk-projection", g)), - ExtraOptions: skipPageBoundsOpts, + ExtraOptions: writeOptions, }, nil } diff --git a/vendor/github.com/prometheus-community/parquet-common/search/constraint.go b/vendor/github.com/prometheus-community/parquet-common/search/constraint.go index 4c60c568491..03493be6dbc 100644 --- a/vendor/github.com/prometheus-community/parquet-common/search/constraint.go +++ b/vendor/github.com/prometheus-community/parquet-common/search/constraint.go @@ -21,6 +21,7 @@ import ( "sort" "github.com/parquet-go/parquet-go" + "github.com/pkg/errors" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus-community/parquet-common/schema" @@ -214,7 +215,10 @@ func (ec *equalConstraint) filter(ctx context.Context, rg parquet.RowGroup, prim return nil, nil } - pgs := ec.f.GetPages(ctx, cc) + pgs, err := ec.f.GetPages(ctx, cc) + if err != nil { + return nil, errors.Wrap(err, "failed to get pages") + } defer func() { _ = pgs.Close() }() oidx, err := cc.OffsetIndex() @@ -333,6 +337,10 @@ func (ec *equalConstraint) matches(v parquet.Value) bool { } func (ec *equalConstraint) skipByBloomfilter(cc parquet.ColumnChunk) (bool, error) { + if !ec.f.BloomFiltersLoaded { + return false, nil + } + bf := cc.BloomFilter() if bf == nil { return false, nil @@ -376,7 +384,11 @@ func (rc *regexConstraint) filter(ctx context.Context, rg parquet.RowGroup, prim } cc := rg.ColumnChunks()[col.ColumnIndex] - pgs := rc.f.GetPages(ctx, cc) + pgs, err := rc.f.GetPages(ctx, cc) + if err != nil { + return nil, errors.Wrap(err, "failed to get pages") + } + defer func() { _ = pgs.Close() }() oidx, err := cc.OffsetIndex() diff --git a/vendor/github.com/prometheus-community/parquet-common/search/materialize.go b/vendor/github.com/prometheus-community/parquet-common/search/materialize.go index d64f54b4d70..f12b5f615d7 100644 --- a/vendor/github.com/prometheus-community/parquet-common/search/materialize.go +++ b/vendor/github.com/prometheus-community/parquet-common/search/materialize.go @@ -22,8 +22,8 @@ import ( "sort" "sync" - "github.com/efficientgo/core/errors" "github.com/parquet-go/parquet-go" + "github.com/pkg/errors" "github.com/prometheus/prometheus/model/labels" prom_storage "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb/chunks" @@ -191,7 +191,10 @@ func (m *Materializer) MaterializeAllLabelValues(ctx context.Context, name strin return []string{}, nil } cc := labelsRg.ColumnChunks()[cIdx.ColumnIndex] - pages := m.b.LabelsFile().GetPages(ctx, cc) + pages, err := m.b.LabelsFile().GetPages(ctx, cc) + if err != nil { + return nil, errors.Wrap(err, "failed to get pages") + } p, err := pages.ReadPage() if err != nil { return []string{}, errors.Wrap(err, "failed to read page") @@ -222,8 +225,7 @@ func (m *Materializer) materializeAllLabels(ctx context.Context, rgi int, rr []R return nil, errors.Wrap(err, "materializer failed to decode column index") } for _, idx := range idxs { - v := make([]parquet.Value, 0, len(colsIdxs)) - colsMap[idx] = &v + colsMap[idx] = &[]parquet.Value{} } } @@ -237,7 +239,7 @@ func (m *Materializer) materializeAllLabels(ctx context.Context, rgi int, rr []R if err != nil { return errors.Wrap(err, "failed to materialize labels values") } - *v = append(*v, values...) + *v = values return nil }) } @@ -346,9 +348,12 @@ func (m *Materializer) materializeColumn(ctx context.Context, file *storage.Parq for _, p := range pageRanges { errGroup.Go(func() error { - pgs := file.GetPages(ctx, cc) + pgs, err := file.GetPages(ctx, cc, p.pages...) + if err != nil { + return errors.Wrap(err, "failed to get pages") + } defer func() { _ = pgs.Close() }() - err := pgs.SeekToRow(p.rows[0].from) + err = pgs.SeekToRow(p.rows[0].from) if err != nil { return errors.Wrap(err, "could not seek to row") } diff --git a/vendor/github.com/prometheus-community/parquet-common/search/parquet_queriable.go b/vendor/github.com/prometheus-community/parquet-common/search/parquet_queriable.go index 96cb17ca47c..dd556469afc 100644 --- a/vendor/github.com/prometheus-community/parquet-common/search/parquet_queriable.go +++ b/vendor/github.com/prometheus-community/parquet-common/search/parquet_queriable.go @@ -17,10 +17,12 @@ import ( "context" "runtime" "sort" + "sync" "github.com/prometheus/prometheus/model/labels" prom_storage "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/util/annotations" + "golang.org/x/sync/errgroup" "github.com/prometheus-community/parquet-common/convert" "github.com/prometheus-community/parquet-common/schema" @@ -105,15 +107,20 @@ func (p parquetQuerier) LabelValues(ctx context.Context, name string, hints *pro limit = int64(hints.Limit) } - resNameValues := [][]string{} + resNameValues := make([][]string, len(shards)) + errGroup, ctx := errgroup.WithContext(ctx) + errGroup.SetLimit(p.opts.concurrency) - for _, s := range shards { - r, err := s.LabelValues(ctx, name, matchers) - if err != nil { - return nil, nil, err - } + for i, s := range shards { + errGroup.Go(func() error { + r, err := s.LabelValues(ctx, name, limit, matchers) + resNameValues[i] = r + return err + }) + } - resNameValues = append(resNameValues, r...) + if err := errGroup.Wait(); err != nil { + return nil, nil, err } return util.MergeUnsortedSlices(int(limit), resNameValues...), nil, nil @@ -131,15 +138,20 @@ func (p parquetQuerier) LabelNames(ctx context.Context, hints *prom_storage.Labe limit = int64(hints.Limit) } - resNameSets := [][]string{} + resNameSets := make([][]string, len(shards)) + errGroup, ctx := errgroup.WithContext(ctx) + errGroup.SetLimit(p.opts.concurrency) - for _, s := range shards { - r, err := s.LabelNames(ctx, matchers) - if err != nil { - return nil, nil, err - } + for i, s := range shards { + errGroup.Go(func() error { + r, err := s.LabelNames(ctx, limit, matchers) + resNameSets[i] = r + return err + }) + } - resNameSets = append(resNameSets, r...) + if err := errGroup.Wait(); err != nil { + return nil, nil, err } return util.MergeUnsortedSlices(int(limit), resNameSets...), nil, nil @@ -161,14 +173,21 @@ func (p parquetQuerier) Select(ctx context.Context, sorted bool, sp *prom_storag minT, maxT = sp.Start, sp.End } skipChunks := sp != nil && sp.Func == "series" + errGroup, ctx := errgroup.WithContext(ctx) + errGroup.SetLimit(p.opts.concurrency) for i, shard := range shards { - ss, err := shard.Query(ctx, sorted, minT, maxT, skipChunks, matchers) - if err != nil { - return prom_storage.ErrSeriesSet(err) - } - seriesSet[i] = ss + errGroup.Go(func() error { + ss, err := shard.Query(ctx, sorted, minT, maxT, skipChunks, matchers) + seriesSet[i] = ss + return err + }) } + + if err := errGroup.Wait(); err != nil { + return prom_storage.ErrSeriesSet(err) + } + ss := convert.NewMergeChunkSeriesSet(seriesSet, labels.Compare, prom_storage.NewConcatenatingChunkSeriesMerger()) return convert.NewSeriesSetFromChunkSeriesSet(ss, skipChunks) @@ -191,8 +210,9 @@ func (p parquetQuerier) queryableShards(ctx context.Context, mint, maxt int64) ( } type queryableShard struct { - shard *storage.ParquetShard - m *Materializer + shard *storage.ParquetShard + m *Materializer + concurrency int } func newQueryableShard(opts *queryableOpts, block *storage.ParquetShard, d *schema.PrometheusParquetChunksDecoder) (*queryableShard, error) { @@ -206,8 +226,9 @@ func newQueryableShard(opts *queryableOpts, block *storage.ParquetShard, d *sche } return &queryableShard{ - shard: block, - m: m, + shard: block, + m: m, + concurrency: opts.concurrency, }, nil } @@ -221,17 +242,36 @@ func (b queryableShard) Query(ctx context.Context, sorted bool, mint, maxt int64 return nil, err } + errGroup, ctx := errgroup.WithContext(ctx) + errGroup.SetLimit(b.concurrency) + results := make([]prom_storage.ChunkSeries, 0, 1024) + rMtx := sync.Mutex{} + for i, group := range b.shard.LabelsFile().RowGroups() { - rr, err := Filter(ctx, group, cs...) - if err != nil { - return nil, err - } - series, err := b.m.Materialize(ctx, i, mint, maxt, skipChunks, rr) - if err != nil { - return nil, err - } - results = append(results, series...) + errGroup.Go(func() error { + rr, err := Filter(ctx, group, append([]Constraint{}, cs...)...) + if err != nil { + return err + } + + if len(rr) == 0 { + return nil + } + + series, err := b.m.Materialize(ctx, i, mint, maxt, skipChunks, rr) + if err != nil { + return err + } + rMtx.Lock() + results = append(results, series...) + rMtx.Unlock() + return nil + }) + } + + if err := errGroup.Wait(); err != nil { + return nil, err } if sorted { @@ -240,9 +280,9 @@ func (b queryableShard) Query(ctx context.Context, sorted bool, mint, maxt int64 return convert.NewChunksSeriesSet(results), nil } -func (b queryableShard) LabelNames(ctx context.Context, matchers []*labels.Matcher) ([][]string, error) { +func (b queryableShard) LabelNames(ctx context.Context, limit int64, matchers []*labels.Matcher) ([]string, error) { if len(matchers) == 0 { - return [][]string{b.m.MaterializeAllLabelNames()}, nil + return b.m.MaterializeAllLabelNames(), nil } cs, err := MatchersToConstraint(matchers...) if err != nil { @@ -253,25 +293,36 @@ func (b queryableShard) LabelNames(ctx context.Context, matchers []*labels.Match return nil, err } + errGroup, ctx := errgroup.WithContext(ctx) + errGroup.SetLimit(b.concurrency) + results := make([][]string, len(b.shard.LabelsFile().RowGroups())) + for i, group := range b.shard.LabelsFile().RowGroups() { - rr, err := Filter(ctx, group, cs...) - if err != nil { - return nil, err - } - series, err := b.m.MaterializeLabelNames(ctx, i, rr) - if err != nil { - return nil, err - } - results[i] = series + errGroup.Go(func() error { + rr, err := Filter(ctx, group, append([]Constraint{}, cs...)...) + if err != nil { + return err + } + series, err := b.m.MaterializeLabelNames(ctx, i, rr) + if err != nil { + return err + } + results[i] = series + return nil + }) + } + + if err := errGroup.Wait(); err != nil { + return nil, err } - return results, nil + return util.MergeUnsortedSlices(int(limit), results...), nil } -func (b queryableShard) LabelValues(ctx context.Context, name string, matchers []*labels.Matcher) ([][]string, error) { +func (b queryableShard) LabelValues(ctx context.Context, name string, limit int64, matchers []*labels.Matcher) ([]string, error) { if len(matchers) == 0 { - return b.allLabelValues(ctx, name) + return b.allLabelValues(ctx, name, limit) } cs, err := MatchersToConstraint(matchers...) if err != nil { @@ -282,33 +333,55 @@ func (b queryableShard) LabelValues(ctx context.Context, name string, matchers [ return nil, err } + errGroup, ctx := errgroup.WithContext(ctx) + errGroup.SetLimit(b.concurrency) + results := make([][]string, len(b.shard.LabelsFile().RowGroups())) + for i, group := range b.shard.LabelsFile().RowGroups() { - rr, err := Filter(ctx, group, cs...) - if err != nil { - return nil, err - } - series, err := b.m.MaterializeLabelValues(ctx, name, i, rr) - if err != nil { - return nil, err - } - results[i] = series + errGroup.Go(func() error { + rr, err := Filter(ctx, group, append([]Constraint{}, cs...)...) + if err != nil { + return err + } + series, err := b.m.MaterializeLabelValues(ctx, name, i, rr) + if err != nil { + return err + } + results[i] = series + return nil + }) } - return results, nil + if err := errGroup.Wait(); err != nil { + return nil, err + } + + return util.MergeUnsortedSlices(int(limit), results...), nil } -func (b queryableShard) allLabelValues(ctx context.Context, name string) ([][]string, error) { +func (b queryableShard) allLabelValues(ctx context.Context, name string, limit int64) ([]string, error) { + errGroup, ctx := errgroup.WithContext(ctx) + errGroup.SetLimit(b.concurrency) + results := make([][]string, len(b.shard.LabelsFile().RowGroups())) + for i := range b.shard.LabelsFile().RowGroups() { - series, err := b.m.MaterializeAllLabelValues(ctx, name, i) - if err != nil { - return nil, err - } - results[i] = series + errGroup.Go(func() error { + series, err := b.m.MaterializeAllLabelValues(ctx, name, i) + if err != nil { + return err + } + results[i] = series + return nil + }) + } + + if err := errGroup.Wait(); err != nil { + return nil, err } - return results, nil + return util.MergeUnsortedSlices(int(limit), results...), nil } type byLabels []prom_storage.ChunkSeries diff --git a/vendor/github.com/prometheus-community/parquet-common/storage/bucket_read_at.go b/vendor/github.com/prometheus-community/parquet-common/storage/bucket_read_at.go index 0f022ca8007..8f345fa0e10 100644 --- a/vendor/github.com/prometheus-community/parquet-common/storage/bucket_read_at.go +++ b/vendor/github.com/prometheus-community/parquet-common/storage/bucket_read_at.go @@ -59,3 +59,30 @@ func (b *bReadAt) ReadAt(p []byte, off int64) (n int, err error) { } return } + +type optimisticReaderAt struct { + r io.ReaderAt + b []byte + offset int64 +} + +func (b optimisticReaderAt) ReadAt(p []byte, off int64) (n int, err error) { + if off >= b.offset && off < b.offset+int64(len(b.b)) { + diff := off - b.offset + n := copy(p, b.b[diff:]) + return n, nil + } + + return b.r.ReadAt(p, off) +} + +func newOptimisticReaderAt(r io.ReaderAt, minOffset, maxOffset int64) io.ReaderAt { + if minOffset < maxOffset { + b := make([]byte, maxOffset-minOffset) + n, err := r.ReadAt(b, minOffset) + if err == nil { + return &optimisticReaderAt{r: r, b: b[:n], offset: minOffset} + } + } + return r +} diff --git a/vendor/github.com/prometheus-community/parquet-common/storage/parquet_shard.go b/vendor/github.com/prometheus-community/parquet-common/storage/parquet_shard.go index 628ebc7579d..3fd0cee19ba 100644 --- a/vendor/github.com/prometheus-community/parquet-common/storage/parquet_shard.go +++ b/vendor/github.com/prometheus-community/parquet-common/storage/parquet_shard.go @@ -19,29 +19,82 @@ import ( "github.com/parquet-go/parquet-go" "github.com/thanos-io/objstore" + "golang.org/x/sync/errgroup" "github.com/prometheus-community/parquet-common/schema" ) +var DefaultShardOptions = shardOptions{ + optimisticReader: true, +} + +type shardOptions struct { + fileOptions []parquet.FileOption + optimisticReader bool +} + type ParquetFile struct { *parquet.File ReadAtWithContext + BloomFiltersLoaded bool + + optimisticReader bool +} + +type ShardOption func(*shardOptions) + +func WithFileOptions(fileOptions ...parquet.FileOption) ShardOption { + return func(opts *shardOptions) { + opts.fileOptions = append(opts.fileOptions, fileOptions...) + } +} + +func WithOptimisticReader(optimisticReader bool) ShardOption { + return func(opts *shardOptions) { + opts.optimisticReader = optimisticReader + } } -func (f *ParquetFile) GetPages(ctx context.Context, cc parquet.ColumnChunk) *parquet.FilePages { +func (f *ParquetFile) GetPages(ctx context.Context, cc parquet.ColumnChunk, pagesToRead ...int) (*parquet.FilePages, error) { colChunk := cc.(*parquet.FileColumnChunk) - pages := colChunk.PagesFrom(f.WithContext(ctx)) - return pages + reader := f.WithContext(ctx) + + if len(pagesToRead) > 0 && f.optimisticReader { + offset, err := cc.OffsetIndex() + if err != nil { + return nil, err + } + minOffset := offset.Offset(pagesToRead[0]) + maxOffset := offset.Offset(pagesToRead[len(pagesToRead)-1]) + offset.CompressedPageSize(pagesToRead[len(pagesToRead)-1]) + reader = newOptimisticReaderAt(reader, minOffset, maxOffset) + } + + pages := colChunk.PagesFrom(reader) + return pages, nil } -func OpenFile(r ReadAtWithContext, size int64, options ...parquet.FileOption) (*ParquetFile, error) { - file, err := parquet.OpenFile(r, size, options...) +func OpenFile(r ReadAtWithContext, size int64, opts ...ShardOption) (*ParquetFile, error) { + cfg := DefaultShardOptions + + for _, opt := range opts { + opt(&cfg) + } + + c, err := parquet.NewFileConfig(cfg.fileOptions...) + if err != nil { + return nil, err + } + + file, err := parquet.OpenFile(r, size, cfg.fileOptions...) if err != nil { return nil, err } + return &ParquetFile{ - File: file, - ReadAtWithContext: r, + File: file, + ReadAtWithContext: r, + BloomFiltersLoaded: !c.SkipBloomFilters, + optimisticReader: cfg.optimisticReader, }, nil } @@ -53,26 +106,36 @@ type ParquetShard struct { // OpenParquetShard opens the sharded parquet block, // using the options param. -func OpenParquetShard(ctx context.Context, bkt objstore.Bucket, name string, shard int, options ...parquet.FileOption) (*ParquetShard, error) { +func OpenParquetShard(ctx context.Context, bkt objstore.Bucket, name string, shard int, opts ...ShardOption) (*ParquetShard, error) { labelsFileName := schema.LabelsPfileNameForShard(name, shard) chunksFileName := schema.ChunksPfileNameForShard(name, shard) - labelsAttr, err := bkt.Attributes(ctx, labelsFileName) - if err != nil { - return nil, err - } - labelsFile, err := OpenFile(NewBucketReadAt(ctx, labelsFileName, bkt), labelsAttr.Size, options...) - if err != nil { - return nil, err - } - chunksFileAttr, err := bkt.Attributes(ctx, chunksFileName) - if err != nil { - return nil, err - } - chunksFile, err := OpenFile(NewBucketReadAt(ctx, chunksFileName, bkt), chunksFileAttr.Size, options...) - if err != nil { + errGroup := errgroup.Group{} + + var labelsFile, chunksFile *ParquetFile + + errGroup.Go(func() error { + labelsAttr, err := bkt.Attributes(ctx, labelsFileName) + if err != nil { + return err + } + labelsFile, err = OpenFile(NewBucketReadAt(ctx, labelsFileName, bkt), labelsAttr.Size, opts...) + return err + }) + + errGroup.Go(func() error { + chunksFileAttr, err := bkt.Attributes(ctx, chunksFileName) + if err != nil { + return err + } + chunksFile, err = OpenFile(NewBucketReadAt(ctx, chunksFileName, bkt), chunksFileAttr.Size, opts...) + return err + }) + + if err := errGroup.Wait(); err != nil { return nil, err } + return &ParquetShard{ labelsFile: labelsFile, chunksFile: chunksFile, diff --git a/vendor/modules.txt b/vendor/modules.txt index 622998041b4..bde378aaa69 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -837,7 +837,7 @@ github.com/pkg/errors # github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 ## explicit github.com/pmezard/go-difflib/difflib -# github.com/prometheus-community/parquet-common v0.0.0-20250522182606-e046c038dc73 +# github.com/prometheus-community/parquet-common v0.0.0-20250528231323-eec9c3c020f0 ## explicit; go 1.23.4 github.com/prometheus-community/parquet-common/convert github.com/prometheus-community/parquet-common/schema