3232import com .google .common .flogger .GoogleLogger ;
3333import com .google .common .util .concurrent .Futures ;
3434import com .google .common .util .concurrent .ListenableFuture ;
35+ import com .google .common .util .concurrent .ListeningExecutorService ;
36+ import com .google .common .util .concurrent .MoreExecutors ;
3537import com .google .devtools .build .lib .concurrent .ThreadSafety ;
3638import com .google .devtools .build .lib .exec .SpawnCheckingCacheEvent ;
3739import com .google .devtools .build .lib .exec .SpawnProgressEvent ;
40+ import com .google .devtools .build .lib .remote .chunking .ChunkingConfig ;
3841import com .google .devtools .build .lib .remote .common .CacheNotFoundException ;
3942import com .google .devtools .build .lib .remote .common .LazyFileOutputStream ;
4043import com .google .devtools .build .lib .remote .common .OutputDigestMismatchException ;
6467import java .util .List ;
6568import java .util .Set ;
6669import java .util .concurrent .CountDownLatch ;
70+ import java .util .concurrent .Executors ;
6771import java .util .concurrent .atomic .AtomicLong ;
6872import java .util .regex .Matcher ;
6973import java .util .regex .Pattern ;
@@ -93,9 +97,17 @@ public class CombinedCache extends AbstractReferenceCounted {
9397 private final CountDownLatch closeCountDownLatch = new CountDownLatch (1 );
9498 protected final AsyncTaskCache .NoResult <Digest > casUploadCache = AsyncTaskCache .NoResult .create ();
9599
100+ @ SuppressWarnings ("AllowVirtualThreads" )
101+ private final ListeningExecutorService virtualThreadExecutor =
102+ MoreExecutors .listeningDecorator (
103+ Executors .newThreadPerTaskExecutor (Thread .ofVirtual ().name ("combined-cache-" , 0 ).factory ()));
104+
96105 @ Nullable protected final RemoteCacheClient remoteCacheClient ;
97106 @ Nullable protected final DiskCacheClient diskCacheClient ;
98107 @ Nullable protected final String symlinkTemplate ;
108+ @ Nullable private final ChunkingConfig chunkingConfig ;
109+ @ Nullable private final ChunkedBlobDownloader chunkedDownloader ;
110+ @ Nullable private final ChunkedBlobUploader chunkedUploader ;
99111 protected final DigestUtil digestUtil ;
100112
101113 public CombinedCache (
@@ -110,6 +122,18 @@ public CombinedCache(
110122 this .diskCacheClient = diskCacheClient ;
111123 this .symlinkTemplate = symlinkTemplate ;
112124 this .digestUtil = digestUtil ;
125+
126+ if (remoteCacheClient instanceof GrpcCacheClient grpcClient
127+ && grpcClient .getChunkingConfig () != null ) {
128+ ChunkingConfig config = grpcClient .getChunkingConfig ();
129+ this .chunkingConfig = config ;
130+ this .chunkedDownloader = new ChunkedBlobDownloader (grpcClient , this );
131+ this .chunkedUploader = new ChunkedBlobUploader (grpcClient , this , config , digestUtil );
132+ } else {
133+ this .chunkingConfig = null ;
134+ this .chunkedDownloader = null ;
135+ this .chunkedUploader = null ;
136+ }
113137 }
114138
115139 public CacheCapabilities getRemoteCacheCapabilities () throws IOException {
@@ -130,6 +154,11 @@ public ServerCapabilities getRemoteServerCapabilities() throws IOException {
130154 return remoteCacheClient .getServerCapabilities ();
131155 }
132156
157+ @ Nullable
158+ public ChunkingConfig getChunkingConfig () {
159+ return chunkingConfig ;
160+ }
161+
133162 /**
134163 * Class to keep track of which cache (disk or remote) a given [cached] ActionResult comes from.
135164 */
@@ -315,13 +344,21 @@ protected ListenableFuture<Void> uploadFile(
315344
316345 ListenableFuture <Void > remoteCacheFuture = Futures .immediateVoidFuture ();
317346 if (remoteCacheClient != null && context .getWriteCachePolicy ().allowRemoteCache ()) {
318- Completable upload =
319- casUploadCache .execute (
320- digest ,
321- RxFutures .toCompletable (
322- () -> remoteCacheClient .uploadFile (context , digest , file ), directExecutor ()),
323- force );
324- remoteCacheFuture = RxFutures .toListenableFuture (upload );
347+ if (chunkedUploader != null
348+ && digest .getSizeBytes () > chunkingConfig .chunkingThreshold ()) {
349+ remoteCacheFuture = virtualThreadExecutor .submit (() -> {
350+ chunkedUploader .uploadChunked (context , digest , file );
351+ return null ;
352+ });
353+ } else {
354+ Completable upload =
355+ casUploadCache .execute (
356+ digest ,
357+ RxFutures .toCompletable (
358+ () -> remoteCacheClient .uploadFile (context , digest , file ), directExecutor ()),
359+ force );
360+ remoteCacheFuture = RxFutures .toListenableFuture (upload );
361+ }
325362 }
326363
327364 return Futures .whenAllSucceed (diskCacheFuture , remoteCacheFuture )
@@ -416,7 +453,7 @@ private ListenableFuture<Void> downloadBlob(
416453 directExecutor ());
417454 }
418455
419- private ListenableFuture <Void > downloadBlob (
456+ ListenableFuture <Void > downloadBlob (
420457 RemoteActionExecutionContext context , Digest digest , OutputStream out ) {
421458 ListenableFuture <Void > future = immediateFailedFuture (new CacheNotFoundException (digest ));
422459
@@ -440,6 +477,27 @@ private ListenableFuture<Void> downloadBlobFromRemote(
440477 RemoteActionExecutionContext context , Digest digest , OutputStream out ) {
441478 checkState (remoteCacheClient != null && context .getReadCachePolicy ().allowRemoteCache ());
442479
480+ if (chunkedDownloader != null
481+ && digest .getSizeBytes () > chunkingConfig .chunkingThreshold ()) {
482+ ListenableFuture <Void > chunkedDownloadFuture =
483+ virtualThreadExecutor .submit (() -> {
484+ chunkedDownloader .downloadChunked (context , digest , out );
485+ return null ;
486+ });
487+ return Futures .catchingAsync (
488+ chunkedDownloadFuture ,
489+ CacheNotFoundException .class ,
490+ (e ) -> regularDownloadBlobFromRemote (context , digest , out ),
491+ directExecutor ());
492+ }
493+
494+ return regularDownloadBlobFromRemote (context , digest , out );
495+ }
496+
497+ private ListenableFuture <Void > regularDownloadBlobFromRemote (
498+ RemoteActionExecutionContext context , Digest digest , OutputStream out ) {
499+ checkState (remoteCacheClient != null && context .getReadCachePolicy ().allowRemoteCache ());
500+
443501 if (diskCacheClient != null && context .getWriteCachePolicy ().allowDiskCache ()) {
444502 Path tempPath = diskCacheClient .getTempPath ();
445503 LazyFileOutputStream tempOut = new LazyFileOutputStream (tempPath );
0 commit comments