Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
* [ENHANCEMENT] Upgraded Docker base images to `alpine:3.18`. #5684
* [ENHANCEMENT] Index Cache: Multi level cache adds config `max_backfill_items` to cap max items to backfill per async operation. #5686
* [ENHANCEMENT] Query Frontend: Log number of split queries in `query stats` log. #5703
* [BUGFIX] Distributor: Do not use label with empty values for sharding #5717


## 1.16.0 2023-11-20

Expand Down
6 changes: 4 additions & 2 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -466,8 +466,10 @@ func shardByUser(userID string) uint32 {
func shardByAllLabels(userID string, labels []cortexpb.LabelAdapter) uint32 {
h := shardByUser(userID)
for _, label := range labels {
h = ingester_client.HashAdd32(h, label.Name)
h = ingester_client.HashAdd32(h, label.Value)
if len(label.Value) > 0 {
h = ingester_client.HashAdd32(h, label.Name)
h = ingester_client.HashAdd32(h, label.Value)
}
}
return h
}
Expand Down
89 changes: 88 additions & 1 deletion pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2391,6 +2391,7 @@ type prepConfig struct {
replicationFactor int
enableTracker bool
errFail error
tokens [][]uint32
}

func prepare(tb testing.TB, cfg prepConfig) ([]*Distributor, []*mockIngester, []*prometheus.Registry, *ring.Ring) {
Expand All @@ -2417,14 +2418,20 @@ func prepare(tb testing.TB, cfg prepConfig) ([]*Distributor, []*mockIngester, []
ingesterDescs := map[string]ring.InstanceDesc{}
ingestersByAddr := map[string]*mockIngester{}
for i := range ingesters {
var tokens []uint32
if len(cfg.tokens) > i {
tokens = cfg.tokens[i]
} else {
tokens = []uint32{uint32((math.MaxUint32 / cfg.numIngesters) * i)}
}
addr := fmt.Sprintf("%d", i)
ingesterDescs[addr] = ring.InstanceDesc{
Addr: addr,
Zone: "",
State: ring.ACTIVE,
Timestamp: time.Now().Unix(),
RegisteredTimestamp: time.Now().Add(-2 * time.Hour).Unix(),
Tokens: []uint32{uint32((math.MaxUint32 / cfg.numIngesters) * i)},
Tokens: tokens,
}
ingestersByAddr[addr] = ingesters[i]
}
Expand Down Expand Up @@ -3303,6 +3310,86 @@ func TestDistributor_Push_Relabel(t *testing.T) {
}
}

func TestDistributor_Push_EmptyLabel(t *testing.T) {
t.Parallel()
ctx := user.InjectOrgID(context.Background(), "pushEmptyLabel")
type testcase struct {
name string
inputSeries []labels.Labels
expectedSeries labels.Labels
}

cases := []testcase{
{
name: "with empty label",
inputSeries: []labels.Labels{
{ //Token 1106054332 without filtering
{Name: "__name__", Value: "foo"},
{Name: "empty", Value: ""},
},
{ //Token 3827924124 without filtering
{Name: "__name__", Value: "foo"},
{Name: "changHash", Value: ""},
},
},
expectedSeries: labels.Labels{
//Token 1797290973
{Name: "__name__", Value: "foo"},
},
},
}

for _, tc := range cases {
tc := tc
t.Run(tc.name, func(t *testing.T) {
t.Parallel()
var err error
var limits validation.Limits
flagext.DefaultValues(&limits)

token := [][]uint32{
{1},
{2},
{3},
{1106054333},
{5},
{6},
{7},
{8},
{9},
{3827924125},
}

ds, ingesters, _, _ := prepare(t, prepConfig{
numIngesters: 10,
happyIngesters: 10,
numDistributors: 1,
shardByAllLabels: true,
limits: &limits,
replicationFactor: 1,
shuffleShardSize: 10,
tokens: token,
})

// Push the series to the distributor
req := mockWriteRequest(tc.inputSeries, 1, 1)
_, err = ds[0].Push(ctx, req)
require.NoError(t, err)

// Since each test pushes only 1 series, we do expect the ingester
// to have received exactly 1 series
ingesterWithSeries := 0
for i := range ingesters {
timeseries := ingesters[i].series()
if len(timeseries) > 0 {
ingesterWithSeries++
}
}
assert.Equal(t, 1, ingesterWithSeries)
})
}
}

func TestDistributor_Push_RelabelDropWillExportMetricOfDroppedSamples(t *testing.T) {
t.Parallel()
metricRelabelConfigs := []*relabel.Config{
Expand Down