@@ -20,7 +20,6 @@ import (
2020 "github.com/buildbuddy-io/buildbuddy/server/util/authutil"
2121 "github.com/buildbuddy-io/buildbuddy/server/util/bytebufferpool"
2222 "github.com/buildbuddy-io/buildbuddy/server/util/compression"
23- "github.com/buildbuddy-io/buildbuddy/server/util/lib/set"
2423 "github.com/buildbuddy-io/buildbuddy/server/util/log"
2524 "github.com/buildbuddy-io/buildbuddy/server/util/prefix"
2625 "github.com/buildbuddy-io/buildbuddy/server/util/status"
@@ -30,10 +29,12 @@ import (
3029
3130 repb "github.com/buildbuddy-io/buildbuddy/proto/remote_execution"
3231 bspb "google.golang.org/genproto/googleapis/bytestream"
32+ "google.golang.org/grpc/codes"
33+ gstatus "google.golang.org/grpc/status"
3334)
3435
3536var (
36- chunkUploadConcurrency = flag .Int ("cache_proxy.chunk_upload_concurrency" , 8 , "Maximum number of concurrent chunk uploads when uploading missing chunks to remote cache." )
37+ chunkUploadConcurrency = flag .Int ("cache_proxy.chunk_upload_concurrency" , 2 , "Maximum number of concurrent chunk BatchUpdateBlobsRequest uploads in flight when uploading missing chunks to remote cache." )
3738)
3839
3940type ByteStreamServerProxy struct {
@@ -733,33 +734,84 @@ func (s *ByteStreamServerProxy) writeChunked(ctx context.Context, stream bspb.By
733734 compressBuf := s .compressBufPool .Get (chunking .MaxChunkSizeBytes ())
734735 defer s .compressBufPool .Put (compressBuf )
735736
736- // chunkWriteFn is called on each new chunk once it's available through the chunker's pipe.
737- // We write chunks to local first, then use FindMissingBlobs + upload for remote.
738- // Chunks are stored and read compressed with ZSTD.
737+ uploadGroup , uCtx := errgroup .WithContext (ctx )
738+ uploadGroup .SetLimit (* chunkUploadConcurrency )
739+ pendingBatch := & repb.BatchUpdateBlobsRequest {
740+ InstanceName : instanceName ,
741+ DigestFunction : digestFunction ,
742+ }
743+ var pendingBatchSize int64
744+ var chunksDeduped int
745+ var chunkBytesDeduped int64
746+
739747 chunkWriteFn := func (chunkData []byte ) error {
740748 chunkDigest , err := digest .Compute (bytes .NewReader (chunkData ), digestFunction )
741749 if err != nil {
742- return status .InternalErrorf ( "computing chunked digest for Write: %s" , err )
750+ return status .WrapError ( err , "computing chunk digest" )
743751 }
744752 chunkDigests = append (chunkDigests , chunkDigest )
745753
746754 chunkRN := digest .NewCASResourceName (chunkDigest , instanceName , digestFunction )
747755 chunkRN .SetCompressor (repb .Compressor_ZSTD )
748756 compressedData := compression .CompressZstd (compressBuf , chunkData )
749- stream := & rawWriteStream {
757+
758+ eg , gCtx := errgroup .WithContext (ctx )
759+ eg .Go (func () error {
760+ fmbResp , err := s .remoteCAS .FindMissingBlobs (gCtx , & repb.FindMissingBlobsRequest {
761+ InstanceName : instanceName ,
762+ DigestFunction : digestFunction ,
763+ BlobDigests : []* repb.Digest {chunkDigest },
764+ })
765+ if err != nil {
766+ return status .WrapErrorf (err , "finding missing blob on remote for chunk %s" , chunkRN .DownloadString ())
767+ }
768+ if len (fmbResp .GetMissingBlobDigests ()) == 0 {
769+ chunksDeduped ++
770+ chunkBytesDeduped += chunkDigest .GetSizeBytes ()
771+ return nil
772+ }
773+
774+ additionalSize := int64 (len (compressedData ))
775+
776+ // If this chunk won't fit, fire off the current batch.
777+ // uploadGroup.Go blocks when 2 RPCs are in flight.
778+ if pendingBatchSize + additionalSize > cachetools .BatchUploadLimitBytes && len (pendingBatch .Requests ) > 0 {
779+ batch := pendingBatch
780+ pendingBatch = & repb.BatchUpdateBlobsRequest {
781+ InstanceName : instanceName ,
782+ DigestFunction : digestFunction ,
783+ }
784+ pendingBatchSize = 0
785+ uploadGroup .Go (func () error {
786+ return s .sendBatchUpload (uCtx , batch )
787+ })
788+ }
789+
790+ dataCopy := make ([]byte , len (compressedData ))
791+ copy (dataCopy , compressedData )
792+ pendingBatch .Requests = append (pendingBatch .Requests , & repb.BatchUpdateBlobsRequest_Request {
793+ Digest : chunkDigest ,
794+ Data : dataCopy ,
795+ Compressor : repb .Compressor_ZSTD ,
796+ })
797+ pendingBatchSize += additionalSize
798+ return nil
799+ })
800+
801+ localStream := & rawWriteStream {
750802 ctx : ctx ,
751803 resourceName : chunkRN .NewUploadString (),
752804 data : compressedData ,
753805 }
754- if err := s .local .Write (stream ); err != nil {
755- return status .InternalErrorf ( "writing chunk %s to local: %s " , chunkRN .DownloadString (), err )
806+ if err := s .local .Write (localStream ); err != nil {
807+ return status .WrapErrorf ( err , "writing chunk %s to local" , chunkRN .DownloadString ())
756808 }
757- return nil
809+ return eg . Wait ()
758810 }
759811
760812 chunker , err := chunking .NewChunker (ctx , int (chunking .AvgChunkSizeBytes ()), chunkWriteFn )
761813 if err != nil {
762- return writeChunkedResult {}, status .InternalErrorf ( "creating chunker: %s" , err )
814+ return writeChunkedResult {}, status .WrapError ( err , "creating chunker" )
763815 }
764816 defer chunker .Close ()
765817
@@ -800,20 +852,30 @@ func (s *ByteStreamServerProxy) writeChunked(ctx context.Context, stream bspb.By
800852 }
801853 }
802854
803- // Close blocks until all chunk writes complete, ensuring chunkDigests is fully populated.
804855 if err := chunker .Close (); err != nil {
805856 return writeChunkedResult {}, status .InternalErrorf ("closing chunker: %s" , err )
806857 }
807858
859+ if len (pendingBatch .Requests ) > 0 {
860+ uploadGroup .Go (func () error {
861+ return s .sendBatchUpload (uCtx , pendingBatch )
862+ })
863+ }
864+ if err := uploadGroup .Wait (); err != nil {
865+ return writeChunkedResult {}, status .WrapError (err , "uploading chunks to remote" )
866+ }
867+
808868 var chunkBytesTotal int64
809869 for _ , d := range chunkDigests {
810870 chunkBytesTotal += d .GetSizeBytes ()
811871 }
812872
813873 result := writeChunkedResult {
814- blobBytes : blobSize ,
815- chunksTotal : len (chunkDigests ),
816- chunkBytesTotal : chunkBytesTotal ,
874+ blobBytes : blobSize ,
875+ chunksTotal : len (chunkDigests ),
876+ chunksDeduped : chunksDeduped ,
877+ chunkBytesTotal : chunkBytesTotal ,
878+ chunkBytesDeduped : chunkBytesDeduped ,
817879 }
818880
819881 // If there's only 1 chunk, the chunking threshold is misconfigured.
@@ -829,63 +891,22 @@ func (s *ByteStreamServerProxy) writeChunked(ctx context.Context, stream bspb.By
829891 DigestFunction : digestFunction ,
830892 }
831893
832- missingBlobs , err := s .remoteCAS .FindMissingBlobs (ctx , manifest .ToFindMissingBlobsRequest ())
833- if err != nil {
834- return writeChunkedResult {}, status .InternalErrorf ("finding missing blobs on remote: %s" , err )
835- }
836- missingDigests := missingBlobs .GetMissingBlobDigests ()
837- missingSet := make (set.Set [string ], len (missingDigests ))
838- for _ , d := range missingDigests {
839- missingSet .Add (d .GetHash ())
840- }
841-
842- // Deduped chunks are ones that already exist on remote.
843- for _ , d := range chunkDigests {
844- if ! missingSet .Contains (d .GetHash ()) {
845- result .chunksDeduped ++
846- result .chunkBytesDeduped += d .GetSizeBytes ()
847- }
848- }
849- if len (missingDigests ) > 0 {
850- if err := s .uploadMissingChunks (ctx , missingDigests , instanceName , digestFunction ); err != nil {
851- return writeChunkedResult {}, status .InternalErrorf ("uploading missing chunks to remote: %s" , err )
852- }
853- }
854-
855894 if _ , err := s .remoteCAS .SpliceBlob (ctx , manifest .ToSpliceBlobRequest ()); err != nil {
856- return writeChunkedResult {}, status .InternalErrorf ( "splice blob on remote: %s" , err )
895+ return writeChunkedResult {}, status .WrapError ( err , "splice blob on remote" )
857896 }
858897
859898 return result , stream .SendAndClose (& bspb.WriteResponse {CommittedSize : bytesReceived })
860899}
861900
862- func (s * ByteStreamServerProxy ) uploadMissingChunks (ctx context.Context , missingDigests []* repb.Digest , instanceName string , digestFunction repb.DigestFunction_Value ) error {
863- g , gCtx := errgroup .WithContext (ctx )
864- g .SetLimit (* chunkUploadConcurrency )
865- for _ , chunkDigest := range missingDigests {
866- chunkRN := digest .NewCASResourceName (chunkDigest , instanceName , digestFunction )
867- g .Go (func () error {
868- return s .uploadChunk (gCtx , chunkRN )
869- })
870- }
871- return g .Wait ()
872- }
873-
874- func (s * ByteStreamServerProxy ) uploadChunk (ctx context.Context , rn * digest.CASResourceName ) error {
875- rnCopy , err := digest .CASResourceNameFromProto (rn .ToProto ())
901+ func (s * ByteStreamServerProxy ) sendBatchUpload (ctx context.Context , req * repb.BatchUpdateBlobsRequest ) error {
902+ rsp , err := s .remoteCAS .BatchUpdateBlobs (ctx , req )
876903 if err != nil {
877904 return err
878905 }
879- rnCopy .SetCompressor (repb .Compressor_ZSTD )
880- reader , err := s .localCache .Reader (ctx , rnCopy .ToProto (), 0 , 0 )
881- if err != nil {
882- return status .InternalErrorf ("reading chunk %s from local cache: %s" , rn .DownloadString (), err )
883- }
884- defer reader .Close ()
885-
886- _ , _ , err = cachetools .UploadFromReaderWithCompression (ctx , s .remote , rnCopy , reader , repb .Compressor_ZSTD )
887- if err != nil {
888- return status .InternalErrorf ("uploading chunk %s to remote: %s" , rn .DownloadString (), err )
906+ for _ , r := range rsp .GetResponses () {
907+ if c := r .GetStatus ().GetCode (); c != 0 {
908+ return gstatus .Errorf (codes .Code (c ), "batch upload chunk %s: %s" , r .GetDigest ().GetHash (), r .GetStatus ().GetMessage ())
909+ }
889910 }
890911 return nil
891912}
0 commit comments