77 "fmt"
88 "io"
99 "strconv"
10+ "time"
1011
1112 "github.com/buildbuddy-io/buildbuddy/enterprise/server/remote_crypter"
1213 "github.com/buildbuddy-io/buildbuddy/enterprise/server/util/proxy_util"
@@ -26,6 +27,7 @@ import (
2627 "github.com/buildbuddy-io/buildbuddy/server/util/status"
2728 "github.com/buildbuddy-io/buildbuddy/server/util/tracing"
2829 "github.com/prometheus/client_golang/prometheus"
30+ "go.opentelemetry.io/otel/attribute"
2931 "golang.org/x/sync/errgroup"
3032
3133 repb "github.com/buildbuddy-io/buildbuddy/proto/remote_execution"
@@ -402,6 +404,8 @@ type byteStreamMetrics struct {
402404 chunksDeduped int
403405 chunkBytesTotal int64
404406 chunkBytesDeduped int64
407+ chunkingDuration time.Duration
408+ remoteDuration time.Duration
405409}
406410
407411func recordReadMetrics (bsm byteStreamMetrics , cacheStatus string ) {
@@ -486,6 +490,8 @@ func (s *ByteStreamServerProxy) Write(stream bspb.ByteStream_WriteServer) error
486490 chunksDeduped : result .chunksDeduped ,
487491 chunkBytesTotal : result .chunkBytesTotal ,
488492 chunkBytesDeduped : result .chunkBytesDeduped ,
493+ chunkingDuration : result .chunkingDuration ,
494+ remoteDuration : result .remoteDuration ,
489495 })
490496 return nil
491497 }
@@ -665,6 +671,10 @@ func recordWriteMetrics(bsm byteStreamMetrics) {
665671 if bsm .chunkBytesDeduped > 0 {
666672 metrics .ByteStreamChunkedWriteDedupedChunkBytes .With (chunkedLabels ).Add (float64 (bsm .chunkBytesDeduped ))
667673 }
674+ totalDuration := bsm .chunkingDuration + bsm .remoteDuration
675+ metrics .ByteStreamChunkedWriteDurationUsec .With (chunkedLabels ).Observe (float64 (totalDuration .Microseconds ()))
676+ metrics .ByteStreamChunkedWriteChunkingDurationUsec .With (chunkedLabels ).Observe (float64 (bsm .chunkingDuration .Microseconds ()))
677+ metrics .ByteStreamChunkedWriteRemoteDurationUsec .With (chunkedLabels ).Observe (float64 (bsm .remoteDuration .Microseconds ()))
668678 }
669679}
670680
@@ -699,9 +709,14 @@ type writeChunkedResult struct {
699709 chunksDeduped int
700710 chunkBytesTotal int64
701711 chunkBytesDeduped int64
712+ chunkingDuration time.Duration
713+ remoteDuration time.Duration
702714}
703715
704716func (s * ByteStreamServerProxy ) writeChunked (ctx context.Context , stream bspb.ByteStream_WriteServer ) (writeChunkedResult , error ) {
717+ ctx , spn := tracing .StartSpan (ctx )
718+ defer spn .End ()
719+
705720 firstReq , err := stream .Recv ()
706721 if err != nil {
707722 return writeChunkedResult {}, status .WrapErrorf (err , "receive first request" )
@@ -717,6 +732,13 @@ func (s *ByteStreamServerProxy) writeChunked(ctx context.Context, stream bspb.By
717732 return writeChunkedResult {firstReq : firstReq }, status .UnimplementedError ("blob too small for chunking" )
718733 }
719734
735+ if spn .IsRecording () {
736+ spn .SetAttributes (
737+ attribute .Int64 ("blob_size" , blobSize ),
738+ attribute .String ("compressor" , rn .GetCompressor ().String ()),
739+ )
740+ }
741+
720742 ctx , err = prefix .AttachUserPrefixToContext (ctx , s .authenticator )
721743 if err != nil {
722744 return writeChunkedResult {}, err
@@ -737,6 +759,9 @@ func (s *ByteStreamServerProxy) writeChunked(ctx context.Context, stream bspb.By
737759 // We write chunks to local first, then use FindMissingBlobs + upload for remote.
738760 // Chunks are stored and read compressed with ZSTD.
739761 chunkWriteFn := func (chunkData []byte ) error {
762+ chunkCtx , chunkSpn := tracing .StartNamedSpan (ctx , "chunkWriteFn" )
763+ defer chunkSpn .End ()
764+
740765 chunkDigest , err := digest .Compute (bytes .NewReader (chunkData ), digestFunction )
741766 if err != nil {
742767 return status .InternalErrorf ("computing chunked digest for Write: %s" , err )
@@ -745,13 +770,26 @@ func (s *ByteStreamServerProxy) writeChunked(ctx context.Context, stream bspb.By
745770
746771 chunkRN := digest .NewCASResourceName (chunkDigest , instanceName , digestFunction )
747772 chunkRN .SetCompressor (repb .Compressor_ZSTD )
773+
774+ _ , compressSpn := tracing .StartNamedSpan (chunkCtx , "CompressZstd" )
748775 compressedData := compression .CompressZstd (compressBuf , chunkData )
749- stream := & rawWriteStream {
750- ctx : ctx ,
776+ compressSpn .End ()
777+
778+ if chunkSpn .IsRecording () {
779+ chunkSpn .SetAttributes (
780+ attribute .Int ("chunk_size" , len (chunkData )),
781+ attribute .Int ("compressed_size" , len (compressedData )),
782+ )
783+ }
784+
785+ localWriteCtx , localWriteSpn := tracing .StartNamedSpan (chunkCtx , "localChunkWrite" )
786+ defer localWriteSpn .End ()
787+ rawStream := & rawWriteStream {
788+ ctx : localWriteCtx ,
751789 resourceName : chunkRN .NewUploadString (),
752790 data : compressedData ,
753791 }
754- if err := s .local .Write (stream ); err != nil {
792+ if err := s .local .Write (rawStream ); err != nil {
755793 return status .InternalErrorf ("writing chunk %s to local: %s" , chunkRN .DownloadString (), err )
756794 }
757795 return nil
@@ -774,18 +812,26 @@ func (s *ByteStreamServerProxy) writeChunked(ctx context.Context, stream bspb.By
774812 chunkInput = decompressor
775813 }
776814
815+ chunkingStart := time .Now ()
777816 req := firstReq
778817 bytesReceived := int64 (0 )
779818 for {
780- if _ , err := chunkInput .Write (req .GetData ()); err != nil {
819+ // Backpressure: chunkInputWrite / streamRecv (compression/local-write is bottleneck).
820+ // Forward pressure: streamRecv / chunkInputWrite (client send is bottleneck).
821+ _ , pipeWriteSpn := tracing .StartNamedSpan (ctx , "chunkInputWrite" )
822+ _ , err := chunkInput .Write (req .GetData ())
823+ pipeWriteSpn .End ()
824+ if err != nil {
781825 return writeChunkedResult {}, status .InternalErrorf ("writing data to chunker: %s" , err )
782826 }
783827 bytesReceived += int64 (len (req .GetData ()))
784828 if req .GetFinishWrite () {
785829 break
786830 }
787831
832+ _ , recvSpn := tracing .StartNamedSpan (ctx , "streamRecv" )
788833 req , err = stream .Recv ()
834+ recvSpn .End ()
789835 if err == io .EOF {
790836 return writeChunkedResult {}, status .InvalidArgumentErrorf ("received EOF before FinishWrite; stream cannot be recovered" )
791837 }
@@ -795,25 +841,40 @@ func (s *ByteStreamServerProxy) writeChunked(ctx context.Context, stream bspb.By
795841 }
796842
797843 if decompressor != nil {
798- if err := decompressor .Close (); err != nil {
844+ _ , closeSpn := tracing .StartNamedSpan (ctx , "closeDecompressor" )
845+ err := decompressor .Close ()
846+ closeSpn .End ()
847+ if err != nil {
799848 return writeChunkedResult {}, status .InternalErrorf ("closing decompressor: %s" , err )
800849 }
801850 }
802851
803852 // Close blocks until all chunk writes complete, ensuring chunkDigests is fully populated.
804- if err := chunker .Close (); err != nil {
853+ _ , closeChunkerSpn := tracing .StartNamedSpan (ctx , "closeChunker" )
854+ err = chunker .Close ()
855+ closeChunkerSpn .End ()
856+ if err != nil {
805857 return writeChunkedResult {}, status .InternalErrorf ("closing chunker: %s" , err )
806858 }
859+ chunkingDuration := time .Since (chunkingStart )
807860
808861 var chunkBytesTotal int64
809862 for _ , d := range chunkDigests {
810863 chunkBytesTotal += d .GetSizeBytes ()
811864 }
812865
813866 result := writeChunkedResult {
814- blobBytes : blobSize ,
815- chunksTotal : len (chunkDigests ),
816- chunkBytesTotal : chunkBytesTotal ,
867+ blobBytes : blobSize ,
868+ chunksTotal : len (chunkDigests ),
869+ chunkBytesTotal : chunkBytesTotal ,
870+ chunkingDuration : chunkingDuration ,
871+ }
872+
873+ if spn .IsRecording () {
874+ spn .SetAttributes (
875+ attribute .Int ("chunks_total" , result .chunksTotal ),
876+ attribute .Int64 ("chunk_bytes_total" , result .chunkBytesTotal ),
877+ )
817878 }
818879
819880 // If there's only 1 chunk, the chunking threshold is misconfigured.
@@ -829,7 +890,11 @@ func (s *ByteStreamServerProxy) writeChunked(ctx context.Context, stream bspb.By
829890 DigestFunction : digestFunction ,
830891 }
831892
832- missingBlobs , err := s .remoteCAS .FindMissingBlobs (ctx , manifest .ToFindMissingBlobsRequest ())
893+ remoteStart := time .Now ()
894+
895+ fmbCtx , fmbSpn := tracing .StartNamedSpan (ctx , "remote.FindMissingBlobs" )
896+ missingBlobs , err := s .remoteCAS .FindMissingBlobs (fmbCtx , manifest .ToFindMissingBlobsRequest ())
897+ fmbSpn .End ()
833898 if err != nil {
834899 return writeChunkedResult {}, status .InternalErrorf ("finding missing blobs on remote: %s" , err )
835900 }
@@ -852,14 +917,23 @@ func (s *ByteStreamServerProxy) writeChunked(ctx context.Context, stream bspb.By
852917 }
853918 }
854919
855- if _ , err := s .remoteCAS .SpliceBlob (ctx , manifest .ToSpliceBlobRequest ()); err != nil {
920+ spliceCtx , spliceSpn := tracing .StartNamedSpan (ctx , "remote.SpliceBlob" )
921+ _ , err = s .remoteCAS .SpliceBlob (spliceCtx , manifest .ToSpliceBlobRequest ())
922+ spliceSpn .End ()
923+ if err != nil {
856924 return writeChunkedResult {}, status .InternalErrorf ("splice blob on remote: %s" , err )
857925 }
858926
927+ result .remoteDuration = time .Since (remoteStart )
859928 return result , stream .SendAndClose (& bspb.WriteResponse {CommittedSize : bytesReceived })
860929}
861930
862931func (s * ByteStreamServerProxy ) uploadMissingChunks (ctx context.Context , missingDigests []* repb.Digest , instanceName string , digestFunction repb.DigestFunction_Value ) error {
932+ ctx , spn := tracing .StartSpan (ctx )
933+ defer spn .End ()
934+ if spn .IsRecording () {
935+ spn .SetAttributes (attribute .Int ("missing_chunks" , len (missingDigests )))
936+ }
863937 g , gCtx := errgroup .WithContext (ctx )
864938 g .SetLimit (* chunkUploadConcurrency )
865939 for _ , chunkDigest := range missingDigests {
0 commit comments