perf(CDC): upload local/remote at the same time#11425
Conversation
There was a problem hiding this comment.
Pull request overview
This PR updates the chunked write path in ByteStreamServerProxy to upload each chunk to local cache and remote cache concurrently as chunks are produced, using a per-chunk FindMissingBlobs call to skip remote uploads for chunks already present. This is intended to simplify the flow and avoid re-opening/reading chunk data from local cache when uploading to remote.
Changes:
- Perform per-chunk parallel local write + remote (FindMissingBlobs + conditional upload) instead of “write all locally, then batch FMB + upload missing.”
- Remove the configurable missing-chunk upload concurrency flag and the
uploadMissingChunks/uploadChunkhelpers. - Compute dedupe metrics during chunk processing rather than after a batch FindMissing response.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
enterprise/server/byte_stream_server_proxy/byte_stream_server_proxy.go
Outdated
Show resolved
Hide resolved
c82a647 to
38bada4
Compare
73a234a to
4ced674
Compare
ee8864a to
a4a2f63
Compare
|
Having trouble seeing perf gains so going to hold off until I can. |
Want to get some metrics before #11425, to see if there's better ways to tune this. This adds tracing and some other duration metrics
51cb9ba to
09220b9
Compare
5fb3cce to
53af603
Compare
53af603 to
8d06eca
Compare
8d06eca to
faf3f01
Compare
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 4 out of 4 changed files in this pull request and generated 4 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| DigestFunction: repb.DigestFunction_BLAKE3, | ||
| }) | ||
| require.NoError(t, err) | ||
| require.Greater(t, len(splitResp.GetChunkDigests()), *chunkUploadConcurrency) |
There was a problem hiding this comment.
This assertion assumes *chunkUploadConcurrency is smaller than the number of produced chunks; if the test binary is run with -cache_proxy.chunk_upload_concurrency set higher, it will fail unrelated to the batching logic. Set cache_proxy.chunk_upload_concurrency to a fixed value within the test (and/or choose input size based on that value) to avoid flag-dependent failures.
| require.Greater(t, len(splitResp.GetChunkDigests()), *chunkUploadConcurrency) | |
| chunkCount := len(splitResp.GetChunkDigests()) | |
| require.Greater(t, chunkCount, 0) | |
| if *chunkUploadConcurrency >= chunkCount { | |
| t.Skipf("test requires cache_proxy.chunk_upload_concurrency (%d) to be smaller than produced chunk count (%d)", *chunkUploadConcurrency, chunkCount) | |
| } | |
| require.Greater(t, chunkCount, *chunkUploadConcurrency) |
| poolBuf := s.bufPool.Get(chunking.MaxChunkSizeBytes()) | ||
| _, compressSpn := tracing.StartNamedSpan(chunkCtx, "CompressZstd") | ||
| compressedData := compression.CompressZstd(compressBuf, chunkData) | ||
| compressedData := compression.CompressZstd(poolBuf, chunkData) | ||
| compressSpn.End() |
There was a problem hiding this comment.
poolBuf is allocated at MaxChunkSizeBytes(), which equals the max input chunk size. Zstd can slightly expand data, and compression.CompressZstd will allocate a new buffer when dst is too small. In that case, the uploader still retains poolBuf (unused) until upload completion, increasing memory and partially defeating pooling. Consider sizing the buffer to the zstd max-encoded size (or detecting when CompressZstd allocates and returning poolBuf immediately / pooling the actual compressed buffer).
| fmbG *errgroup.Group | ||
| fmbCtx context.Context | ||
| batchG *errgroup.Group | ||
| batchCtx context.Context | ||
|
|
There was a problem hiding this comment.
chunkUploader uses two separate errgroups/contexts (fmbCtx and batchCtx). As a result, an error in batch uploads won’t cancel in-flight / future FindMissingBlobs calls (and vice versa), and flush() may still wait for ongoing uploads even if an FMB error already makes the overall operation fail. Consider using a single shared cancelable context (or wiring cancellation between the two groups) so any error cancels all outstanding work and flush() can return promptly.
| sort.Ints(fmbSizes) | ||
| require.Equal(t, []int{1, *chunkUploadConcurrency}, fmbSizes) | ||
| require.Len(t, fmbDigests, len(uniqueChunks), "expected only unique digests to hit FindMissingBlobs") |
There was a problem hiding this comment.
This test’s expected FindMissingBlobs grouping depends on the global cache_proxy.chunk_upload_concurrency flag value; running tests with a different flag value (or if another test modifies the flag) can make the assertion fail even when the uploader is correct. Set cache_proxy.chunk_upload_concurrency explicitly in the test (and size uniqueChunks accordingly) to keep it deterministic.
enterprise/server/byte_stream_server_proxy/byte_stream_server_proxy.go
Outdated
Show resolved
Hide resolved
enterprise/server/byte_stream_server_proxy/byte_stream_server_proxy.go
Outdated
Show resolved
Hide resolved
enterprise/server/byte_stream_server_proxy/byte_stream_server_proxy.go
Outdated
Show resolved
Hide resolved
faf3f01 to
5efe29e
Compare
5efe29e to
fcf47ee
Compare
From the task list: https://github.com/buildbuddy-io/buildbuddy-internal/issues/6426
This creates a new uploader. If enabled (via experiment), this does not require the chunk to be stored locally, instead, we run the upload async using a FMB call and a max of 8 batch uploads.
This prevents re-opening the file and makes the upload start immediately: