Skip to content

Commit af91403

Browse files
authored
add FastCDC chunking function to RPCs (#11299)
Split accepts Unknown and FastCDC, always returns FastCDC. Splice accepts Unknown and FastCDC. No need to overcomplicate and add more chunking functions we don't accept yet. In the future, we can store the unknown and fastcdc separately. For now, a new call to `Splice` will always overwrite for a given blob digest, similar to the AC.
1 parent 505c6f0 commit af91403

File tree

9 files changed

+67
-58
lines changed

9 files changed

+67
-58
lines changed

enterprise/server/byte_stream_server_proxy/byte_stream_server_proxy.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -756,7 +756,7 @@ func (s *ByteStreamServerProxy) writeChunked(ctx context.Context, stream bspb.By
756756
return nil
757757
}
758758

759-
chunker, err := chunking.NewChunker(ctx, int(chunking.MaxChunkSizeBytes()/4), chunkWriteFn)
759+
chunker, err := chunking.NewChunker(ctx, int(chunking.AvgChunkSizeBytes()), chunkWriteFn)
760760
if err != nil {
761761
return writeChunkedResult{}, status.InternalErrorf("creating chunker: %s", err)
762762
}
@@ -818,7 +818,7 @@ func (s *ByteStreamServerProxy) writeChunked(ctx context.Context, stream bspb.By
818818
// If there's only 1 chunk, the chunking threshold is misconfigured.
819819
// It should be set higher than the max chunk size to avoid overlap.
820820
if len(chunkDigests) == 1 {
821-
return result, status.InternalErrorf("chunking produced only 1 chunk; only chunked blobs larger than max_chunk_size_bytes are supported")
821+
return result, status.InternalErrorf("chunking produced only 1 chunk; only chunked blobs larger than 4x avg_chunk_size_bytes are supported")
822822
}
823823

824824
manifest := &chunking.Manifest{

enterprise/server/byte_stream_server_proxy/byte_stream_server_proxy_test.go

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -930,7 +930,6 @@ func TestWriteChunked(t *testing.T) {
930930
})
931931
require.NoError(t, openfeature.SetNamedProviderAndWait(t.Name(), testProvider))
932932

933-
flags.Set(t, "cache.max_chunk_size_bytes", 1024*1024)
934933
flags.Set(t, "cache.zstd_transcoding_enabled", true)
935934

936935
ctx := testContext()
@@ -1096,8 +1095,6 @@ func TestWriteChunkedFallbackBelowThreshold(t *testing.T) {
10961095
})
10971096
require.NoError(t, openfeature.SetNamedProviderAndWait(t.Name(), testProvider))
10981097

1099-
// Set threshold higher than our test blob size to trigger fallback
1100-
flags.Set(t, "cache.max_chunk_size_bytes", 10*1024*1024)
11011098
flags.Set(t, "cache.zstd_transcoding_enabled", true)
11021099

11031100
ctx := testContext()
@@ -1151,7 +1148,7 @@ func TestWriteChunkedFallbackBelowThreshold(t *testing.T) {
11511148
ctx, err = prefix.AttachUserPrefixToContext(ctx, proxyEnv.GetAuthenticator())
11521149
require.NoError(t, err)
11531150

1154-
// Create a blob smaller than the threshold (1MB vs 10MB threshold)
1151+
// Create a blob smaller than the threshold (1MB vs 2MB default max)
11551152
_, originalData := testdigest.RandomCASResourceBuf(t, 1*1024*1024)
11561153
blobDigest, err := digest.Compute(bytes.NewReader(originalData), repb.DigestFunction_BLAKE3)
11571154
require.NoError(t, err)
@@ -1233,7 +1230,6 @@ func setupChunkedBenchmarkEnv(b *testing.B) (bspb.ByteStreamClient, context.Cont
12331230
})
12341231
require.NoError(b, openfeature.SetNamedProviderAndWait(b.Name(), testProvider))
12351232

1236-
flags.Set(b, "cache.max_chunk_size_bytes", 1024*1024)
12371233
flags.Set(b, "cache.zstd_transcoding_enabled", true)
12381234

12391235
ctx := testContext()
@@ -1427,7 +1423,6 @@ func BenchmarkReadChunkedFromRemote(b *testing.B) {
14271423
})
14281424
require.NoError(b, openfeature.SetNamedProviderAndWait(b.Name(), testProvider))
14291425

1430-
flags.Set(b, "cache.max_chunk_size_bytes", 1024*1024)
14311426
flags.Set(b, "cache.zstd_transcoding_enabled", true)
14321427

14331428
ctx := testContext()

server/remote_cache/action_cache_server/action_cache_server_test.go

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -609,16 +609,14 @@ func TestRecordOrigin(t *testing.T) {
609609
}
610610

611611
func TestValidateActionResult_ChunkedOutputFile(t *testing.T) {
612-
flags.Set(t, "cache.max_chunk_size_bytes", 10)
613-
614612
ctx := context.Background()
615613
te := testenv.GetTestEnv(t)
616614
ctx, err := prefix.AttachUserPrefixToContext(ctx, te.GetAuthenticator())
617615
require.NoError(t, err)
618616
cache := te.GetCache()
619617

620-
chunk1RN, chunk1Data := testdigest.RandomCASResourceBuf(t, 100)
621-
chunk2RN, chunk2Data := testdigest.RandomCASResourceBuf(t, 100)
618+
chunk1RN, chunk1Data := testdigest.RandomCASResourceBuf(t, 2*1024*1024)
619+
chunk2RN, chunk2Data := testdigest.RandomCASResourceBuf(t, 2*1024*1024)
622620
require.NoError(t, cache.Set(ctx, chunk1RN, chunk1Data))
623621
require.NoError(t, cache.Set(ctx, chunk2RN, chunk2Data))
624622

server/remote_cache/byte_stream_server/byte_stream_server_test.go

Lines changed: 7 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -611,8 +611,6 @@ func TestReadChunked(t *testing.T) {
611611
fp, err := experiments.NewFlagProvider(t.Name())
612612
require.NoError(t, err)
613613

614-
flags.Set(t, "cache.max_chunk_size_bytes", 100)
615-
616614
ctx := context.Background()
617615
te := testenv.GetTestEnv(t)
618616
te.SetExperimentFlagProvider(fp)
@@ -623,32 +621,21 @@ func TestReadChunked(t *testing.T) {
623621
clientConn := runByteStreamServer(ctx, t, te)
624622
bsClient := bspb.NewByteStreamClient(clientConn)
625623

626-
chunk1 := []byte("This is the first chunk of data. ")
627-
chunk2 := []byte("This is the second chunk of data. ")
628-
chunk3 := []byte("This is the third and final chunk.")
624+
chunk1RN, chunk1 := testdigest.RandomCASResourceBuf(t, 1024*1024)
625+
chunk2RN, chunk2 := testdigest.RandomCASResourceBuf(t, 1024*1024)
626+
chunk3RN, chunk3 := testdigest.RandomCASResourceBuf(t, 1024*1024)
629627
fullBlob := append(append(chunk1, chunk2...), chunk3...)
630628

631-
chunk1Digest, err := digest.Compute(bytes.NewReader(chunk1), repb.DigestFunction_SHA256)
632-
require.NoError(t, err)
633-
chunk2Digest, err := digest.Compute(bytes.NewReader(chunk2), repb.DigestFunction_SHA256)
634-
require.NoError(t, err)
635-
chunk3Digest, err := digest.Compute(bytes.NewReader(chunk3), repb.DigestFunction_SHA256)
636-
require.NoError(t, err)
637-
638629
blobDigest, err := digest.Compute(bytes.NewReader(fullBlob), repb.DigestFunction_SHA256)
639630
require.NoError(t, err)
640631

641-
chunk1RN := digest.NewCASResourceName(chunk1Digest, "", repb.DigestFunction_SHA256)
642-
chunk2RN := digest.NewCASResourceName(chunk2Digest, "", repb.DigestFunction_SHA256)
643-
chunk3RN := digest.NewCASResourceName(chunk3Digest, "", repb.DigestFunction_SHA256)
644-
645-
require.NoError(t, te.GetCache().Set(ctx, chunk1RN.ToProto(), chunk1))
646-
require.NoError(t, te.GetCache().Set(ctx, chunk2RN.ToProto(), chunk2))
647-
require.NoError(t, te.GetCache().Set(ctx, chunk3RN.ToProto(), chunk3))
632+
require.NoError(t, te.GetCache().Set(ctx, chunk1RN, chunk1))
633+
require.NoError(t, te.GetCache().Set(ctx, chunk2RN, chunk2))
634+
require.NoError(t, te.GetCache().Set(ctx, chunk3RN, chunk3))
648635

649636
manifest := &chunking.Manifest{
650637
BlobDigest: blobDigest,
651-
ChunkDigests: []*repb.Digest{chunk1Digest, chunk2Digest, chunk3Digest},
638+
ChunkDigests: []*repb.Digest{chunk1RN.GetDigest(), chunk2RN.GetDigest(), chunk3RN.GetDigest()},
652639
InstanceName: "",
653640
DigestFunction: repb.DigestFunction_SHA256,
654641
}

server/remote_cache/capabilities_server/BUILD

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ go_library(
1111
"//proto:semver_go_proto",
1212
"//server/environment",
1313
"//server/real_environment",
14+
"//server/remote_cache/chunking",
1415
"//server/remote_cache/config",
1516
"//server/remote_cache/digest",
1617
"//server/util/bazel_request",

server/remote_cache/capabilities_server/capabilities_server.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55

66
"github.com/buildbuddy-io/buildbuddy/server/environment"
77
"github.com/buildbuddy-io/buildbuddy/server/real_environment"
8+
"github.com/buildbuddy-io/buildbuddy/server/remote_cache/chunking"
89
"github.com/buildbuddy-io/buildbuddy/server/remote_cache/digest"
910
"github.com/buildbuddy-io/buildbuddy/server/util/bazel_request"
1011

@@ -32,6 +33,9 @@ type CapabilitiesServer struct {
3233
}
3334

3435
func Register(env *real_environment.RealEnv) error {
36+
if err := chunking.ValidateConfig(); err != nil {
37+
return err
38+
}
3539
// Register to handle GetCapabilities messages, which tell the client
3640
// that this server supports CAS functionality.
3741
env.SetCapabilitiesServer(NewCapabilitiesServer(
@@ -64,8 +68,10 @@ func (s *CapabilitiesServer) GetCapabilities(ctx context.Context, req *repb.GetC
6468
}
6569
if s.supportCAS {
6670
splitSpliceEnabled := false
71+
chunkingEnabled := false
6772
if efp := s.env.GetExperimentFlagProvider(); efp != nil {
6873
splitSpliceEnabled = efp.Boolean(ctx, "cache.split_splice_enabled", false)
74+
chunkingEnabled = chunking.Enabled(ctx, efp)
6975
}
7076

7177
c.CacheCapabilities = &repb.CacheCapabilities{
@@ -88,6 +94,16 @@ func (s *CapabilitiesServer) GetCapabilities(ctx context.Context, req *repb.GetC
8894
SplitBlobSupport: splitSpliceEnabled,
8995
SpliceBlobSupport: splitSpliceEnabled,
9096
}
97+
98+
if chunkingEnabled {
99+
avgChunkSizeBytes := chunking.AvgChunkSizeBytes()
100+
if avgChunkSizeBytes > 0 {
101+
c.CacheCapabilities.FastCdc_2020Params = &repb.FastCdc2020Params{
102+
AvgChunkSizeBytes: uint64(avgChunkSizeBytes),
103+
Seed: 0,
104+
}
105+
}
106+
}
91107
}
92108
if s.supportRemoteExec {
93109
c.ExecutionCapabilities = &repb.ExecutionCapabilities{

server/remote_cache/chunking/chunking.go

Lines changed: 22 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"encoding/hex"
77
"errors"
88
"flag"
9+
"fmt"
910
"io"
1011
"strconv"
1112
"strings"
@@ -24,16 +25,28 @@ import (
2425

2526
var (
2627
chunkedManifestSalt = flag.String("cache.chunking.ac_key_salt", "", "If set, salt the AC key with this value.")
27-
maxChunkSizeBytes = flag.Int64("cache.max_chunk_size_bytes", 2<<20, "Only blobs larger (non-inclusive) than this threshold will be chunked (default 2MB). This is also the maximum size of a chunk. The average chunk size will be 1/4 of this value, and the minimum will be 1/16 of this value.")
28+
avgChunkSizeBytes = flag.Int64("cache.avg_chunk_size_bytes", 512*1024, "This is the average size of a chunk. Only blobs larger (non-inclusive) than 4x this value will be chunked. The maximum chunk size will be 4x this value, and the minimum will be 1/4 this value (default 512KB).")
2829
)
2930

3031
const (
3132
chunkedManifestPrefix = "_bb_chunked_manifest_v2_/"
3233
chunkOutputFilePrefix = "chunk_"
3334
)
3435

36+
func AvgChunkSizeBytes() int64 {
37+
return *avgChunkSizeBytes
38+
}
39+
3540
func MaxChunkSizeBytes() int64 {
36-
return *maxChunkSizeBytes
41+
return *avgChunkSizeBytes * 4
42+
}
43+
44+
func ValidateConfig() error {
45+
v := *avgChunkSizeBytes
46+
if v < 1024 || v > 1024*1024 {
47+
return fmt.Errorf("cache.avg_chunk_size_bytes must be between 1024 and 1048576, got %d", v)
48+
}
49+
return nil
3750
}
3851

3952
func Enabled(ctx context.Context, efp interfaces.ExperimentFlagProvider) bool {
@@ -176,7 +189,8 @@ type Manifest struct {
176189

177190
func (cm *Manifest) ToSplitBlobResponse() *repb.SplitBlobResponse {
178191
return &repb.SplitBlobResponse{
179-
ChunkDigests: cm.ChunkDigests,
192+
ChunkDigests: cm.ChunkDigests,
193+
ChunkingFunction: repb.ChunkingFunction_FAST_CDC_2020,
180194
}
181195
}
182196

@@ -190,10 +204,11 @@ func (cm *Manifest) ToFindMissingBlobsRequest() *repb.FindMissingBlobsRequest {
190204

191205
func (cm *Manifest) ToSpliceBlobRequest() *repb.SpliceBlobRequest {
192206
return &repb.SpliceBlobRequest{
193-
BlobDigest: cm.BlobDigest,
194-
ChunkDigests: cm.ChunkDigests,
195-
InstanceName: cm.InstanceName,
196-
DigestFunction: cm.DigestFunction,
207+
BlobDigest: cm.BlobDigest,
208+
ChunkDigests: cm.ChunkDigests,
209+
InstanceName: cm.InstanceName,
210+
DigestFunction: cm.DigestFunction,
211+
ChunkingFunction: repb.ChunkingFunction_FAST_CDC_2020,
197212
}
198213
}
199214

server/remote_cache/content_addressable_storage_server/content_addressable_storage_server.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1167,6 +1167,10 @@ func (s *ContentAddressableStorageServer) SpliceBlob(ctx context.Context, req *r
11671167
return nil, status.UnimplementedErrorf("SpliceBlob RPC is not currently enabled")
11681168
}
11691169

1170+
if cf := req.GetChunkingFunction(); cf != repb.ChunkingFunction_UNKNOWN && cf != repb.ChunkingFunction_FAST_CDC_2020 {
1171+
return nil, status.InvalidArgumentErrorf("unsupported chunking function %v", cf)
1172+
}
1173+
11701174
if req.GetBlobDigest() == nil {
11711175
return nil, status.UnimplementedError("SpliceBlob with no blob_digest is not supported")
11721176
}
@@ -1204,6 +1208,11 @@ func (s *ContentAddressableStorageServer) SplitBlob(ctx context.Context, req *re
12041208
return nil, status.UnimplementedErrorf("SplitBlob RPC is not currently enabled")
12051209
}
12061210

1211+
cf := req.GetChunkingFunction()
1212+
if cf != repb.ChunkingFunction_UNKNOWN && cf != repb.ChunkingFunction_FAST_CDC_2020 {
1213+
return nil, status.InvalidArgumentErrorf("unsupported chunking function %v", cf)
1214+
}
1215+
12071216
if req.GetBlobDigest() == nil {
12081217
return nil, status.InvalidArgumentError("blob_digest is required")
12091218
}

server/remote_cache/content_addressable_storage_server/content_addressable_storage_server_test.go

Lines changed: 7 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1069,8 +1069,6 @@ func TestFindMissingBlobsWithChunkedBlob(t *testing.T) {
10691069
fp, err := experiments.NewFlagProvider(t.Name())
10701070
require.NoError(t, err)
10711071

1072-
flags.Set(t, "cache.max_chunk_size_bytes", 100)
1073-
10741072
ctx := context.Background()
10751073
te := testenv.GetTestEnv(t)
10761074
te.SetExperimentFlagProvider(fp)
@@ -1082,31 +1080,21 @@ func TestFindMissingBlobsWithChunkedBlob(t *testing.T) {
10821080
casClient := repb.NewContentAddressableStorageClient(clientConn)
10831081
cache := te.GetCache()
10841082

1085-
chunk1 := []byte("This is the first chunk of data. ")
1086-
chunk2 := []byte("This is the second chunk of data. ")
1087-
chunk3 := []byte("This is the third and final chunk.")
1083+
chunk1RN, chunk1 := testdigest.RandomCASResourceBuf(t, 1024*1024)
1084+
chunk2RN, chunk2 := testdigest.RandomCASResourceBuf(t, 1024*1024)
1085+
chunk3RN, chunk3 := testdigest.RandomCASResourceBuf(t, 1024*1024)
10881086
fullBlob := append(append(chunk1, chunk2...), chunk3...)
10891087

10901088
blobDigest, err := digest.Compute(bytes.NewReader(fullBlob), repb.DigestFunction_SHA256)
10911089
require.NoError(t, err)
1092-
chunk1Digest, err := digest.Compute(bytes.NewReader(chunk1), repb.DigestFunction_SHA256)
1093-
require.NoError(t, err)
1094-
chunk2Digest, err := digest.Compute(bytes.NewReader(chunk2), repb.DigestFunction_SHA256)
1095-
require.NoError(t, err)
1096-
chunk3Digest, err := digest.Compute(bytes.NewReader(chunk3), repb.DigestFunction_SHA256)
1097-
require.NoError(t, err)
1098-
1099-
chunk1RN := digest.NewCASResourceName(chunk1Digest, "", repb.DigestFunction_SHA256)
1100-
chunk2RN := digest.NewCASResourceName(chunk2Digest, "", repb.DigestFunction_SHA256)
1101-
chunk3RN := digest.NewCASResourceName(chunk3Digest, "", repb.DigestFunction_SHA256)
11021090

1103-
require.NoError(t, cache.Set(ctx, chunk1RN.ToProto(), chunk1))
1104-
require.NoError(t, cache.Set(ctx, chunk2RN.ToProto(), chunk2))
1105-
require.NoError(t, cache.Set(ctx, chunk3RN.ToProto(), chunk3))
1091+
require.NoError(t, cache.Set(ctx, chunk1RN, chunk1))
1092+
require.NoError(t, cache.Set(ctx, chunk2RN, chunk2))
1093+
require.NoError(t, cache.Set(ctx, chunk3RN, chunk3))
11061094

11071095
manifest := &chunking.Manifest{
11081096
BlobDigest: blobDigest,
1109-
ChunkDigests: []*repb.Digest{chunk1Digest, chunk2Digest, chunk3Digest},
1097+
ChunkDigests: []*repb.Digest{chunk1RN.GetDigest(), chunk2RN.GetDigest(), chunk3RN.GetDigest()},
11101098
InstanceName: "",
11111099
DigestFunction: repb.DigestFunction_SHA256,
11121100
}

0 commit comments

Comments
 (0)