Skip to content

Commit 7de7e0f

Browse files
luluz66tyler-french
authored andcommitted
add a gcs eviction graph for metadata server (#11424)
buildbuddy-io/buildbuddy-internal#6723
1 parent b41542e commit 7de7e0f

File tree

3 files changed

+278
-166
lines changed

3 files changed

+278
-166
lines changed

enterprise/server/byte_stream_server_proxy/BUILD

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ go_library(
2121
"//server/util/authutil",
2222
"//server/util/bytebufferpool",
2323
"//server/util/compression",
24-
"//server/util/lib/set",
2524
"//server/util/log",
2625
"//server/util/prefix",
2726
"//server/util/status",

enterprise/server/byte_stream_server_proxy/byte_stream_server_proxy.go

Lines changed: 80 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ import (
2020
"github.com/buildbuddy-io/buildbuddy/server/util/authutil"
2121
"github.com/buildbuddy-io/buildbuddy/server/util/bytebufferpool"
2222
"github.com/buildbuddy-io/buildbuddy/server/util/compression"
23-
"github.com/buildbuddy-io/buildbuddy/server/util/lib/set"
2423
"github.com/buildbuddy-io/buildbuddy/server/util/log"
2524
"github.com/buildbuddy-io/buildbuddy/server/util/prefix"
2625
"github.com/buildbuddy-io/buildbuddy/server/util/status"
@@ -33,7 +32,7 @@ import (
3332
)
3433

3534
var (
36-
chunkUploadConcurrency = flag.Int("cache_proxy.chunk_upload_concurrency", 8, "Maximum number of concurrent chunk uploads when uploading missing chunks to remote cache.")
35+
chunkUploadConcurrency = flag.Int("cache_proxy.chunk_upload_concurrency", 2, "Maximum number of concurrent chunk BatchUpdateBlobsRequest uploads in flight when uploading missing chunks to remote cache.")
3736
)
3837

3938
type ByteStreamServerProxy struct {
@@ -733,9 +732,18 @@ func (s *ByteStreamServerProxy) writeChunked(ctx context.Context, stream bspb.By
733732
compressBuf := s.compressBufPool.Get(chunking.MaxChunkSizeBytes())
734733
defer s.compressBufPool.Put(compressBuf)
735734

736-
// chunkWriteFn is called on each new chunk once it's available through the chunker's pipe.
737-
// We write chunks to local first, then use FindMissingBlobs + upload for remote.
738-
// Chunks are stored and read compressed with ZSTD.
735+
// Up to 2 batch upload RPCs in flight at a time. We build a batch
736+
// while previous ones are sending. Each batch is <=2MB compressed.
737+
uploadGroup, uCtx := errgroup.WithContext(ctx)
738+
uploadGroup.SetLimit(*chunkUploadConcurrency)
739+
pendingBatch := &repb.BatchUpdateBlobsRequest{
740+
InstanceName: instanceName,
741+
DigestFunction: digestFunction,
742+
}
743+
var pendingBatchSize int64
744+
var chunksDeduped int
745+
var chunkBytesDeduped int64
746+
739747
chunkWriteFn := func(chunkData []byte) error {
740748
chunkDigest, err := digest.Compute(bytes.NewReader(chunkData), digestFunction)
741749
if err != nil {
@@ -746,15 +754,59 @@ func (s *ByteStreamServerProxy) writeChunked(ctx context.Context, stream bspb.By
746754
chunkRN := digest.NewCASResourceName(chunkDigest, instanceName, digestFunction)
747755
chunkRN.SetCompressor(repb.Compressor_ZSTD)
748756
compressedData := compression.CompressZstd(compressBuf, chunkData)
749-
stream := &rawWriteStream{
757+
758+
eg, gCtx := errgroup.WithContext(ctx)
759+
eg.Go(func() error {
760+
fmbResp, err := s.remoteCAS.FindMissingBlobs(gCtx, &repb.FindMissingBlobsRequest{
761+
InstanceName: instanceName,
762+
DigestFunction: digestFunction,
763+
BlobDigests: []*repb.Digest{chunkDigest},
764+
})
765+
if err != nil {
766+
return status.InternalErrorf("finding missing blob on remote for chunk %s: %s", chunkRN.DownloadString(), err)
767+
}
768+
if len(fmbResp.GetMissingBlobDigests()) == 0 {
769+
chunksDeduped++
770+
chunkBytesDeduped += chunkDigest.GetSizeBytes()
771+
return nil
772+
}
773+
774+
additionalSize := int64(len(compressedData))
775+
776+
// If this chunk won't fit, fire off the current batch.
777+
// uploadGroup.Go blocks when 2 RPCs are in flight.
778+
if pendingBatchSize+additionalSize > cachetools.BatchUploadLimitBytes && len(pendingBatch.Requests) > 0 {
779+
batch := pendingBatch
780+
pendingBatch = &repb.BatchUpdateBlobsRequest{
781+
InstanceName: instanceName,
782+
DigestFunction: digestFunction,
783+
}
784+
pendingBatchSize = 0
785+
uploadGroup.Go(func() error {
786+
return s.sendBatchUpload(uCtx, batch)
787+
})
788+
}
789+
790+
dataCopy := make([]byte, len(compressedData))
791+
copy(dataCopy, compressedData)
792+
pendingBatch.Requests = append(pendingBatch.Requests, &repb.BatchUpdateBlobsRequest_Request{
793+
Digest: chunkDigest,
794+
Data: dataCopy,
795+
Compressor: repb.Compressor_ZSTD,
796+
})
797+
pendingBatchSize += additionalSize
798+
return nil
799+
})
800+
801+
localStream := &rawWriteStream{
750802
ctx: ctx,
751803
resourceName: chunkRN.NewUploadString(),
752804
data: compressedData,
753805
}
754-
if err := s.local.Write(stream); err != nil {
806+
if err := s.local.Write(localStream); err != nil {
755807
return status.InternalErrorf("writing chunk %s to local: %s", chunkRN.DownloadString(), err)
756808
}
757-
return nil
809+
return eg.Wait()
758810
}
759811

760812
chunker, err := chunking.NewChunker(ctx, int(chunking.AvgChunkSizeBytes()), chunkWriteFn)
@@ -800,20 +852,30 @@ func (s *ByteStreamServerProxy) writeChunked(ctx context.Context, stream bspb.By
800852
}
801853
}
802854

803-
// Close blocks until all chunk writes complete, ensuring chunkDigests is fully populated.
804855
if err := chunker.Close(); err != nil {
805856
return writeChunkedResult{}, status.InternalErrorf("closing chunker: %s", err)
806857
}
807858

859+
if len(pendingBatch.Requests) > 0 {
860+
uploadGroup.Go(func() error {
861+
return s.sendBatchUpload(uCtx, pendingBatch)
862+
})
863+
}
864+
if err := uploadGroup.Wait(); err != nil {
865+
return writeChunkedResult{}, status.InternalErrorf("uploading chunks to remote: %s", err)
866+
}
867+
808868
var chunkBytesTotal int64
809869
for _, d := range chunkDigests {
810870
chunkBytesTotal += d.GetSizeBytes()
811871
}
812872

813873
result := writeChunkedResult{
814-
blobBytes: blobSize,
815-
chunksTotal: len(chunkDigests),
816-
chunkBytesTotal: chunkBytesTotal,
874+
blobBytes: blobSize,
875+
chunksTotal: len(chunkDigests),
876+
chunksDeduped: chunksDeduped,
877+
chunkBytesTotal: chunkBytesTotal,
878+
chunkBytesDeduped: chunkBytesDeduped,
817879
}
818880

819881
// If there's only 1 chunk, the chunking threshold is misconfigured.
@@ -829,63 +891,22 @@ func (s *ByteStreamServerProxy) writeChunked(ctx context.Context, stream bspb.By
829891
DigestFunction: digestFunction,
830892
}
831893

832-
missingBlobs, err := s.remoteCAS.FindMissingBlobs(ctx, manifest.ToFindMissingBlobsRequest())
833-
if err != nil {
834-
return writeChunkedResult{}, status.InternalErrorf("finding missing blobs on remote: %s", err)
835-
}
836-
missingDigests := missingBlobs.GetMissingBlobDigests()
837-
missingSet := make(set.Set[string], len(missingDigests))
838-
for _, d := range missingDigests {
839-
missingSet.Add(d.GetHash())
840-
}
841-
842-
// Deduped chunks are ones that already exist on remote.
843-
for _, d := range chunkDigests {
844-
if !missingSet.Contains(d.GetHash()) {
845-
result.chunksDeduped++
846-
result.chunkBytesDeduped += d.GetSizeBytes()
847-
}
848-
}
849-
if len(missingDigests) > 0 {
850-
if err := s.uploadMissingChunks(ctx, missingDigests, instanceName, digestFunction); err != nil {
851-
return writeChunkedResult{}, status.InternalErrorf("uploading missing chunks to remote: %s", err)
852-
}
853-
}
854-
855894
if _, err := s.remoteCAS.SpliceBlob(ctx, manifest.ToSpliceBlobRequest()); err != nil {
856895
return writeChunkedResult{}, status.InternalErrorf("splice blob on remote: %s", err)
857896
}
858897

859898
return result, stream.SendAndClose(&bspb.WriteResponse{CommittedSize: bytesReceived})
860899
}
861900

862-
func (s *ByteStreamServerProxy) uploadMissingChunks(ctx context.Context, missingDigests []*repb.Digest, instanceName string, digestFunction repb.DigestFunction_Value) error {
863-
g, gCtx := errgroup.WithContext(ctx)
864-
g.SetLimit(*chunkUploadConcurrency)
865-
for _, chunkDigest := range missingDigests {
866-
chunkRN := digest.NewCASResourceName(chunkDigest, instanceName, digestFunction)
867-
g.Go(func() error {
868-
return s.uploadChunk(gCtx, chunkRN)
869-
})
870-
}
871-
return g.Wait()
872-
}
873-
874-
func (s *ByteStreamServerProxy) uploadChunk(ctx context.Context, rn *digest.CASResourceName) error {
875-
rnCopy, err := digest.CASResourceNameFromProto(rn.ToProto())
901+
func (s *ByteStreamServerProxy) sendBatchUpload(ctx context.Context, req *repb.BatchUpdateBlobsRequest) error {
902+
rsp, err := s.remoteCAS.BatchUpdateBlobs(ctx, req)
876903
if err != nil {
877904
return err
878905
}
879-
rnCopy.SetCompressor(repb.Compressor_ZSTD)
880-
reader, err := s.localCache.Reader(ctx, rnCopy.ToProto(), 0, 0)
881-
if err != nil {
882-
return status.InternalErrorf("reading chunk %s from local cache: %s", rn.DownloadString(), err)
883-
}
884-
defer reader.Close()
885-
886-
_, _, err = cachetools.UploadFromReaderWithCompression(ctx, s.remote, rnCopy, reader, repb.Compressor_ZSTD)
887-
if err != nil {
888-
return status.InternalErrorf("uploading chunk %s to remote: %s", rn.DownloadString(), err)
906+
for _, r := range rsp.GetResponses() {
907+
if r.GetStatus().GetCode() != 0 {
908+
return status.InternalErrorf("batch upload chunk %s: %s", r.GetDigest().GetHash(), r.GetStatus().GetMessage())
909+
}
889910
}
890911
return nil
891912
}

0 commit comments

Comments
 (0)