Skip to content

Commit cf1239e

Browse files
authored
test(Cache proxy): simulate network delay in chunk benchmarking (#11478)
This is helpful for comparing different proxy uploading situations
1 parent ee88212 commit cf1239e

File tree

1 file changed

+35
-2
lines changed

1 file changed

+35
-2
lines changed

enterprise/server/byte_stream_server_proxy/byte_stream_server_proxy_test.go

Lines changed: 35 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package byte_stream_server_proxy
33
import (
44
"bytes"
55
"context"
6+
"flag"
67
"fmt"
78
"io"
89
"strconv"
@@ -49,6 +50,11 @@ import (
4950
gstatus "google.golang.org/grpc/status"
5051
)
5152

53+
var (
54+
benchRTT = flag.Duration("bench_rtt", 8*time.Millisecond, "Simulated network RTT for benchmark remote RPCs (e.g. FindMissingBlobs, SpliceBlob)")
55+
benchUploadDelay = flag.Duration("bench_upload_delay", 16*time.Millisecond, "Simulated time to upload 1MB to the remote cache")
56+
)
57+
5258
type remoteReadExpectation int
5359

5460
const (
@@ -480,7 +486,6 @@ func TestWrite(t *testing.T) {
480486

481487
require.Equal(t, int32(1), requestCounter.Load())
482488
}
483-
484489
}
485490

486491
// Run all tests for both bazel 5.0.0 (which introduced compression) and
@@ -1198,6 +1203,31 @@ func TestWriteChunkedFallbackBelowThreshold(t *testing.T) {
11981203
require.Equal(t, originalData, downloadedData)
11991204
}
12001205

1206+
func networkLatencyUnaryInterceptor(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
1207+
time.Sleep(*benchRTT)
1208+
return invoker(ctx, method, req, reply, cc, opts...)
1209+
}
1210+
1211+
type delayedRecvClientStream struct {
1212+
grpc.ClientStream
1213+
}
1214+
1215+
// RecvMsg delays the server response. Each chunk is uploaded as a separate
1216+
// ByteStream/Write stream, so this fires once per chunk at CloseAndRecv(),
1217+
// adding a fixed delay per chunk regardless of internals.
1218+
func (s *delayedRecvClientStream) RecvMsg(m interface{}) error {
1219+
time.Sleep(*benchUploadDelay)
1220+
return s.ClientStream.RecvMsg(m)
1221+
}
1222+
1223+
func networkSimStreamInterceptor(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
1224+
cs, err := streamer(ctx, desc, cc, method, opts...)
1225+
if err != nil {
1226+
return nil, err
1227+
}
1228+
return &delayedRecvClientStream{ClientStream: cs}, nil
1229+
}
1230+
12011231
func setupChunkedBenchmarkEnv(b *testing.B) (bspb.ByteStreamClient, context.Context) {
12021232
*log.LogLevel = "error"
12031233
log.Configure()
@@ -1260,7 +1290,10 @@ func setupChunkedBenchmarkEnv(b *testing.B) (bspb.ByteStreamClient, context.Cont
12601290
bspb.RegisterByteStreamServer(remoteGRPC, remoteBSS)
12611291
repb.RegisterContentAddressableStorageServer(remoteGRPC, remoteCAS)
12621292
go remoteRun()
1263-
remoteConn, err := testenv.LocalGRPCConn(ctx, remoteLis)
1293+
remoteConn, err := testenv.LocalGRPCConn(ctx, remoteLis,
1294+
grpc.WithChainUnaryInterceptor(networkLatencyUnaryInterceptor),
1295+
grpc.WithChainStreamInterceptor(networkSimStreamInterceptor),
1296+
)
12641297
require.NoError(b, err)
12651298
b.Cleanup(func() { remoteConn.Close() })
12661299
bsClient := bspb.NewByteStreamClient(remoteConn)

0 commit comments

Comments
 (0)