Skip to content
This repository was archived by the owner on Jun 19, 2026. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 85 additions & 3 deletions pkg/receive/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"encoding/json"
"fmt"
"io"
"math"
"os"
"path/filepath"
"strings"
Expand Down Expand Up @@ -118,6 +119,87 @@ func (e *Endpoint) unmarshal(data []byte) error {
return nil
}

// ShardSize represents a shard size that can be either an absolute integer count
// or a percentage of available shards. Percentages are only supported for the
// rendezvous algorithm.
type ShardSize struct {
Value int // absolute count (used when IsPercent=false)
Percent float64 // 0.0-1.0 (used when IsPercent=true)
IsPercent bool
}

// IsZero returns true if neither an absolute value nor a percentage is set.
func (s ShardSize) IsZero() bool {
if s.IsPercent {
return s.Percent == 0
}
return s.Value == 0
}

// ResolveCount resolves the shard size to an absolute count given a total.
// For percentages, returns max(1, int(total * percent)).
// For absolute values, returns the Value directly.
func (s ShardSize) ResolveCount(total int) int {
if s.IsPercent {
return int(math.Max(1, float64(total)*s.Percent))
}
return s.Value
}

// String returns a human-readable representation of the shard size.
func (s ShardSize) String() string {
if s.IsPercent {
return fmt.Sprintf("%.0f%%", s.Percent*100)
}
return fmt.Sprintf("%d", s.Value)
}

// UnmarshalJSON supports both integer (e.g. 6) and percentage string (e.g. "50%") formats.
func (s *ShardSize) UnmarshalJSON(data []byte) error {
// Try integer first.
var intVal int
if err := json.Unmarshal(data, &intVal); err == nil {
s.Value = intVal
s.Percent = 0
s.IsPercent = false
return nil
}

// Try string (percentage format).
var strVal string
if err := json.Unmarshal(data, &strVal); err != nil {
return fmt.Errorf("shard_size must be an integer or a percentage string (e.g. \"50%%\"), got: %s", string(data))
}

strVal = strings.TrimSpace(strVal)
if !strings.HasSuffix(strVal, "%") {
return fmt.Errorf("shard_size string must end with '%%', got: %q", strVal)
}

numStr := strings.TrimSuffix(strVal, "%")
var pct float64
if _, err := fmt.Sscanf(numStr, "%f", &pct); err != nil {
return fmt.Errorf("invalid percentage value in shard_size: %q", strVal)
}

if pct < 0 || pct > 100 {
return fmt.Errorf("shard_size percentage must be between 0 and 100, got: %s", strVal)
}

s.Value = 0
s.Percent = pct / 100.0
s.IsPercent = true
return nil
}

// MarshalJSON serializes the shard size back to JSON.
func (s ShardSize) MarshalJSON() ([]byte, error) {
if s.IsPercent {
return json.Marshal(fmt.Sprintf("%.0f%%", s.Percent*100))
}
return json.Marshal(s.Value)
}

// HashringConfig represents the configuration for a hashring
// a receive node knows about.
type HashringConfig struct {
Expand All @@ -132,14 +214,14 @@ type HashringConfig struct {
}

type ShuffleShardingOverrideConfig struct {
ShardSize int `json:"shard_size"`
ShardSize ShardSize `json:"shard_size"`
Tenants []string `json:"tenants,omitempty"`
TenantMatcherType tenantMatcher `json:"tenant_matcher_type,omitempty"`
}

type ShuffleShardingConfig struct {
ShardSize int `json:"shard_size"`
CacheSize int `json:"cache_size"`
ShardSize ShardSize `json:"shard_size"`
CacheSize int `json:"cache_size"`
// ZoneAwarenessDisabled disables zone awareness. We still try to spread the load
// across the available zones, but we don't try to balance the shards across zones.
ZoneAwarenessDisabled bool `json:"zone_awareness_disabled"`
Expand Down
167 changes: 167 additions & 0 deletions pkg/receive/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"testing"

"github.com/pkg/errors"
"github.com/stretchr/testify/require"

"github.com/efficientgo/core/testutil"
)
Expand Down Expand Up @@ -123,3 +124,169 @@ func TestUnmarshalEndpointSlice(t *testing.T) {
})
}
}

func TestShardSizeUnmarshalJSON(t *testing.T) {
t.Parallel()

for _, tc := range []struct {
name string
input string
expected ShardSize
expectErr bool
}{
{
name: "integer value",
input: `6`,
expected: ShardSize{Value: 6},
},
{
name: "zero integer",
input: `0`,
expected: ShardSize{Value: 0},
},
{
name: "percentage string",
input: `"50%"`,
expected: ShardSize{Percent: 0.5, IsPercent: true},
},
{
name: "zero percentage",
input: `"0%"`,
expected: ShardSize{Percent: 0, IsPercent: true},
},
{
name: "100 percentage",
input: `"100%"`,
expected: ShardSize{Percent: 1.0, IsPercent: true},
},
{
name: "25 percentage",
input: `"25%"`,
expected: ShardSize{Percent: 0.25, IsPercent: true},
},
{
name: "invalid string without percent",
input: `"50"`,
expectErr: true,
},
{
name: "negative percentage",
input: `"-10%"`,
expectErr: true,
},
{
name: "over 100 percentage",
input: `"150%"`,
expectErr: true,
},
{
name: "invalid type",
input: `true`,
expectErr: true,
},
} {
t.Run(tc.name, func(t *testing.T) {
var s ShardSize
err := json.Unmarshal([]byte(tc.input), &s)
if tc.expectErr {
require.Error(t, err)
return
}
require.NoError(t, err)
require.Equal(t, tc.expected, s)
})
}
}

func TestShardSizeMarshalJSON(t *testing.T) {
t.Parallel()

for _, tc := range []struct {
name string
input ShardSize
expected string
}{
{
name: "integer value",
input: ShardSize{Value: 6},
expected: `6`,
},
{
name: "zero value",
input: ShardSize{},
expected: `0`,
},
{
name: "percentage",
input: ShardSize{Percent: 0.5, IsPercent: true},
expected: `"50%"`,
},
{
name: "100 percentage",
input: ShardSize{Percent: 1.0, IsPercent: true},
expected: `"100%"`,
},
} {
t.Run(tc.name, func(t *testing.T) {
data, err := json.Marshal(tc.input)
require.NoError(t, err)
require.Equal(t, tc.expected, string(data))
})
}
}

func TestShardSizeIsZero(t *testing.T) {
t.Parallel()

require.True(t, ShardSize{}.IsZero())
require.True(t, ShardSize{Value: 0}.IsZero())
require.True(t, ShardSize{Percent: 0, IsPercent: true}.IsZero())
require.False(t, ShardSize{Value: 1}.IsZero())
require.False(t, ShardSize{Percent: 0.5, IsPercent: true}.IsZero())
}

func TestShardSizeResolveCount(t *testing.T) {
t.Parallel()

// Absolute value: returns Value directly regardless of total.
require.Equal(t, 6, ShardSize{Value: 6}.ResolveCount(100))
require.Equal(t, 6, ShardSize{Value: 6}.ResolveCount(4))

// Percentage: max(1, total * pct).
require.Equal(t, 2, ShardSize{Percent: 0.5, IsPercent: true}.ResolveCount(4))
require.Equal(t, 1, ShardSize{Percent: 0.25, IsPercent: true}.ResolveCount(4))
require.Equal(t, 4, ShardSize{Percent: 1.0, IsPercent: true}.ResolveCount(4))
// Very small percentage still returns at least 1.
require.Equal(t, 1, ShardSize{Percent: 0.01, IsPercent: true}.ResolveCount(4))
}

func TestShardSizeRoundTripJSON(t *testing.T) {
t.Parallel()

// Test that ShardSize round-trips through full config JSON parsing.
cfgJSON := `[{
"hashring": "test",
"endpoints": [{"address": "node1"}],
"shuffle_sharding_config": {
"shard_size": "50%",
"overrides": [
{"shard_size": 6, "tenants": ["t1"]},
{"shard_size": "25%", "tenants": ["t2"]}
]
}
}]`

configs, err := ParseConfig([]byte(cfgJSON))
require.NoError(t, err)
require.Len(t, configs, 1)

ssc := configs[0].ShuffleShardingConfig
require.True(t, ssc.ShardSize.IsPercent)
require.InDelta(t, 0.5, ssc.ShardSize.Percent, 0.001)

require.Len(t, ssc.Overrides, 2)
require.False(t, ssc.Overrides[0].ShardSize.IsPercent)
require.Equal(t, 6, ssc.Overrides[0].ShardSize.Value)
require.True(t, ssc.Overrides[1].ShardSize.IsPercent)
require.InDelta(t, 0.25, ssc.Overrides[1].ShardSize.Percent, 0.001)
}
Loading
Loading