Skip to content

Commit aaeb1b9

Browse files
committed
support remote cache chunking using FastCDC 2020
1 parent 7b532fc commit aaeb1b9

32 files changed

+2553
-13
lines changed

src/main/java/com/google/devtools/build/lib/remote/BUILD

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ package(
88
filegroup(
99
name = "srcs",
1010
srcs = glob(["*"]) + [
11+
"//src/main/java/com/google/devtools/build/lib/remote/chunking:srcs",
1112
"//src/main/java/com/google/devtools/build/lib/remote/circuitbreaker:srcs",
1213
"//src/main/java/com/google/devtools/build/lib/remote/common:srcs",
1314
"//src/main/java/com/google/devtools/build/lib/remote/disk:srcs",
@@ -28,7 +29,9 @@ java_library(
2829
srcs = glob(
2930
["*.java"],
3031
exclude = [
32+
"ChunkingConfig.java",
3133
"ExecutionStatusException.java",
34+
"FastCDCChunker.java",
3235
"ReferenceCountedChannel.java",
3336
"ChannelConnectionWithServerCapabilitiesFactory.java",
3437
"RemoteRetrier.java",
@@ -53,6 +56,7 @@ java_library(
5356
":Retrier",
5457
":abstract_action_input_prefetcher",
5558
":lease_service",
59+
"//src/main/java/com/google/devtools/build/lib/concurrent:task_deduplicator",
5660
":remote_important_output_handler",
5761
":remote_output_checker",
5862
":scrubber",
@@ -97,6 +101,7 @@ java_library(
97101
"//src/main/java/com/google/devtools/build/lib/exec/local",
98102
"//src/main/java/com/google/devtools/build/lib/packages/semantics",
99103
"//src/main/java/com/google/devtools/build/lib/profiler",
104+
"//src/main/java/com/google/devtools/build/lib/remote/chunking",
100105
"//src/main/java/com/google/devtools/build/lib/remote/circuitbreaker",
101106
"//src/main/java/com/google/devtools/build/lib/remote/common",
102107
"//src/main/java/com/google/devtools/build/lib/remote/common:bulk_transfer_exception",
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
// Copyright 2026 The Bazel Authors. All rights reserved.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package com.google.devtools.build.lib.remote;
16+
17+
import static com.google.devtools.build.lib.remote.util.Utils.getFromFuture;
18+
19+
import build.bazel.remote.execution.v2.Digest;
20+
import build.bazel.remote.execution.v2.SplitBlobResponse;
21+
import com.google.common.flogger.GoogleLogger;
22+
import com.google.common.util.concurrent.ListenableFuture;
23+
import com.google.devtools.build.lib.remote.common.CacheNotFoundException;
24+
import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContext;
25+
import io.grpc.StatusRuntimeException;
26+
import java.io.IOException;
27+
import java.io.OutputStream;
28+
import java.util.List;
29+
30+
/**
31+
* Downloads blobs by sequentially fetching chunks via the SplitBlob API.
32+
*/
33+
public class ChunkedBlobDownloader {
34+
private static final GoogleLogger logger = GoogleLogger.forEnclosingClass();
35+
36+
private final GrpcCacheClient grpcCacheClient;
37+
private final CombinedCache combinedCache;
38+
39+
public ChunkedBlobDownloader(GrpcCacheClient grpcCacheClient, CombinedCache combinedCache) {
40+
this.grpcCacheClient = grpcCacheClient;
41+
this.combinedCache = combinedCache;
42+
}
43+
44+
/**
45+
* Downloads a blob using chunked download via the SplitBlob API. This should be
46+
* called with virtual threads.
47+
*/
48+
public void downloadChunked(
49+
RemoteActionExecutionContext context, Digest blobDigest, OutputStream out)
50+
throws CacheNotFoundException, IOException, InterruptedException {
51+
List<Digest> chunkDigests;
52+
try {
53+
chunkDigests = getChunkDigests(context, blobDigest);
54+
} catch (IOException | StatusRuntimeException e) {
55+
logger.atWarning().withCause(e).log(
56+
"SplitBlob failed for %s, falling back to regular download", blobDigest.getHash());
57+
throw new CacheNotFoundException(blobDigest);
58+
}
59+
downloadAndReassembleChunks(context, chunkDigests, out);
60+
}
61+
62+
private List<Digest> getChunkDigests(
63+
RemoteActionExecutionContext context, Digest blobDigest)
64+
throws IOException, InterruptedException {
65+
ListenableFuture<SplitBlobResponse> splitResponseFuture =
66+
grpcCacheClient.splitBlob(context, blobDigest);
67+
if (splitResponseFuture == null) {
68+
throw new CacheNotFoundException(blobDigest);
69+
}
70+
List<Digest> chunkDigests = getFromFuture(splitResponseFuture).getChunkDigestsList();
71+
if (chunkDigests.isEmpty() && blobDigest.getSizeBytes() > 0) {
72+
throw new CacheNotFoundException(blobDigest);
73+
}
74+
return chunkDigests;
75+
}
76+
77+
private void downloadAndReassembleChunks(
78+
RemoteActionExecutionContext context, List<Digest> chunkDigests, OutputStream out)
79+
throws IOException, InterruptedException {
80+
for (Digest chunkDigest : chunkDigests) {
81+
getFromFuture(combinedCache.downloadBlob(context, chunkDigest, out));
82+
}
83+
}
84+
}
Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
// Copyright 2026 The Bazel Authors. All rights reserved.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package com.google.devtools.build.lib.remote;
16+
17+
import static com.google.devtools.build.lib.remote.util.Utils.getFromFuture;
18+
19+
import build.bazel.remote.execution.v2.Digest;
20+
import com.google.common.collect.ImmutableSet;
21+
import com.google.common.io.ByteStreams;
22+
import com.google.devtools.build.lib.remote.chunking.ChunkingConfig;
23+
import com.google.devtools.build.lib.remote.chunking.FastCDCChunker;
24+
import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContext;
25+
import com.google.devtools.build.lib.remote.util.DigestUtil;
26+
import com.google.devtools.build.lib.vfs.Path;
27+
import com.google.protobuf.ByteString;
28+
import java.io.IOException;
29+
import java.io.InputStream;
30+
import java.util.HashSet;
31+
import java.util.List;
32+
import java.util.Set;
33+
34+
/**
35+
* Uploads blobs in chunks using Content-Defined Chunking with FastCDC 2020.
36+
*
37+
* <p>
38+
* Upload flow for blobs above threshold:
39+
*
40+
* <ol>
41+
* <li>Chunk file with FastCDC
42+
* <li>Call findMissingDigests on chunk digests
43+
* <li>Upload only missing chunks
44+
* <li>Call SpliceBlob to register the blob as the concatenation of chunks
45+
* </ol>
46+
*/
47+
public class ChunkedBlobUploader {
48+
49+
private final GrpcCacheClient grpcCacheClient;
50+
private final CombinedCache combinedCache;
51+
private final FastCDCChunker chunker;
52+
private final long chunkingThreshold;
53+
54+
public ChunkedBlobUploader(
55+
GrpcCacheClient grpcCacheClient,
56+
CombinedCache combinedCache,
57+
ChunkingConfig config,
58+
DigestUtil digestUtil) {
59+
this.grpcCacheClient = grpcCacheClient;
60+
this.combinedCache = combinedCache;
61+
this.chunker = new FastCDCChunker(config, digestUtil);
62+
this.chunkingThreshold = config.chunkingThreshold();
63+
}
64+
65+
public long getChunkingThreshold() {
66+
return chunkingThreshold;
67+
}
68+
69+
public void uploadChunked(RemoteActionExecutionContext context, Digest blobDigest, Path file)
70+
throws IOException, InterruptedException {
71+
List<Digest> chunkDigests;
72+
try (InputStream input = file.getInputStream()) {
73+
chunkDigests = chunker.chunkToDigests(input);
74+
}
75+
if (chunkDigests.isEmpty()) {
76+
return;
77+
}
78+
79+
ImmutableSet<Digest> missingDigests = getFromFuture(grpcCacheClient.findMissingDigests(context, chunkDigests));
80+
uploadMissingChunks(context, missingDigests, chunkDigests, file);
81+
getFromFuture(grpcCacheClient.spliceBlob(context, blobDigest, chunkDigests));
82+
}
83+
84+
private void uploadMissingChunks(
85+
RemoteActionExecutionContext context,
86+
ImmutableSet<Digest> missingDigests,
87+
List<Digest> chunkDigests,
88+
Path file)
89+
throws IOException, InterruptedException {
90+
if (missingDigests.isEmpty()) {
91+
return;
92+
}
93+
94+
Set<Digest> uploaded = new HashSet<>();
95+
try (InputStream input = file.getInputStream()) {
96+
for (Digest chunkDigest : chunkDigests) {
97+
if (missingDigests.contains(chunkDigest) && uploaded.add(chunkDigest)) {
98+
ByteString.Output out = ByteString.newOutput((int) chunkDigest.getSizeBytes());
99+
ByteStreams.limit(input, chunkDigest.getSizeBytes()).transferTo(out);
100+
getFromFuture(combinedCache.uploadBlob(context, chunkDigest, out.toByteString()));
101+
} else {
102+
input.skipNBytes(chunkDigest.getSizeBytes());
103+
}
104+
}
105+
}
106+
}
107+
}

src/main/java/com/google/devtools/build/lib/remote/CombinedCache.java

Lines changed: 66 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,12 @@
3232
import com.google.common.flogger.GoogleLogger;
3333
import com.google.common.util.concurrent.Futures;
3434
import com.google.common.util.concurrent.ListenableFuture;
35+
import com.google.common.util.concurrent.ListeningExecutorService;
36+
import com.google.common.util.concurrent.MoreExecutors;
3537
import com.google.devtools.build.lib.concurrent.ThreadSafety;
3638
import com.google.devtools.build.lib.exec.SpawnCheckingCacheEvent;
3739
import com.google.devtools.build.lib.exec.SpawnProgressEvent;
40+
import com.google.devtools.build.lib.remote.chunking.ChunkingConfig;
3841
import com.google.devtools.build.lib.remote.common.CacheNotFoundException;
3942
import com.google.devtools.build.lib.remote.common.LazyFileOutputStream;
4043
import com.google.devtools.build.lib.remote.common.OutputDigestMismatchException;
@@ -64,6 +67,7 @@
6467
import java.util.List;
6568
import java.util.Set;
6669
import java.util.concurrent.CountDownLatch;
70+
import java.util.concurrent.Executors;
6771
import java.util.concurrent.atomic.AtomicLong;
6872
import java.util.regex.Matcher;
6973
import java.util.regex.Pattern;
@@ -93,9 +97,17 @@ public class CombinedCache extends AbstractReferenceCounted {
9397
private final CountDownLatch closeCountDownLatch = new CountDownLatch(1);
9498
protected final AsyncTaskCache.NoResult<Digest> casUploadCache = AsyncTaskCache.NoResult.create();
9599

100+
@SuppressWarnings("AllowVirtualThreads")
101+
private final ListeningExecutorService virtualThreadExecutor =
102+
MoreExecutors.listeningDecorator(
103+
Executors.newThreadPerTaskExecutor(Thread.ofVirtual().name("combined-cache-", 0).factory()));
104+
96105
@Nullable protected final RemoteCacheClient remoteCacheClient;
97106
@Nullable protected final DiskCacheClient diskCacheClient;
98107
@Nullable protected final String symlinkTemplate;
108+
@Nullable private final ChunkingConfig chunkingConfig;
109+
@Nullable private final ChunkedBlobDownloader chunkedDownloader;
110+
@Nullable private final ChunkedBlobUploader chunkedUploader;
99111
protected final DigestUtil digestUtil;
100112

101113
public CombinedCache(
@@ -110,6 +122,18 @@ public CombinedCache(
110122
this.diskCacheClient = diskCacheClient;
111123
this.symlinkTemplate = symlinkTemplate;
112124
this.digestUtil = digestUtil;
125+
126+
if (remoteCacheClient instanceof GrpcCacheClient grpcClient
127+
&& grpcClient.getChunkingConfig() != null) {
128+
ChunkingConfig config = grpcClient.getChunkingConfig();
129+
this.chunkingConfig = config;
130+
this.chunkedDownloader = new ChunkedBlobDownloader(grpcClient, this);
131+
this.chunkedUploader = new ChunkedBlobUploader(grpcClient, this, config, digestUtil);
132+
} else {
133+
this.chunkingConfig = null;
134+
this.chunkedDownloader = null;
135+
this.chunkedUploader = null;
136+
}
113137
}
114138

115139
public CacheCapabilities getRemoteCacheCapabilities() throws IOException {
@@ -130,6 +154,11 @@ public ServerCapabilities getRemoteServerCapabilities() throws IOException {
130154
return remoteCacheClient.getServerCapabilities();
131155
}
132156

157+
@Nullable
158+
public ChunkingConfig getChunkingConfig() {
159+
return chunkingConfig;
160+
}
161+
133162
/**
134163
* Class to keep track of which cache (disk or remote) a given [cached] ActionResult comes from.
135164
*/
@@ -315,13 +344,21 @@ protected ListenableFuture<Void> uploadFile(
315344

316345
ListenableFuture<Void> remoteCacheFuture = Futures.immediateVoidFuture();
317346
if (remoteCacheClient != null && context.getWriteCachePolicy().allowRemoteCache()) {
318-
Completable upload =
319-
casUploadCache.execute(
320-
digest,
321-
RxFutures.toCompletable(
322-
() -> remoteCacheClient.uploadFile(context, digest, file), directExecutor()),
323-
force);
324-
remoteCacheFuture = RxFutures.toListenableFuture(upload);
347+
if (chunkedUploader != null
348+
&& digest.getSizeBytes() > chunkingConfig.chunkingThreshold()) {
349+
remoteCacheFuture = virtualThreadExecutor.submit(() -> {
350+
chunkedUploader.uploadChunked(context, digest, file);
351+
return null;
352+
});
353+
} else {
354+
Completable upload =
355+
casUploadCache.execute(
356+
digest,
357+
RxFutures.toCompletable(
358+
() -> remoteCacheClient.uploadFile(context, digest, file), directExecutor()),
359+
force);
360+
remoteCacheFuture = RxFutures.toListenableFuture(upload);
361+
}
325362
}
326363

327364
return Futures.whenAllSucceed(diskCacheFuture, remoteCacheFuture)
@@ -416,7 +453,7 @@ private ListenableFuture<Void> downloadBlob(
416453
directExecutor());
417454
}
418455

419-
private ListenableFuture<Void> downloadBlob(
456+
ListenableFuture<Void> downloadBlob(
420457
RemoteActionExecutionContext context, Digest digest, OutputStream out) {
421458
ListenableFuture<Void> future = immediateFailedFuture(new CacheNotFoundException(digest));
422459

@@ -440,6 +477,27 @@ private ListenableFuture<Void> downloadBlobFromRemote(
440477
RemoteActionExecutionContext context, Digest digest, OutputStream out) {
441478
checkState(remoteCacheClient != null && context.getReadCachePolicy().allowRemoteCache());
442479

480+
if (chunkedDownloader != null
481+
&& digest.getSizeBytes() > chunkingConfig.chunkingThreshold()) {
482+
ListenableFuture<Void> chunkedDownloadFuture =
483+
virtualThreadExecutor.submit(() -> {
484+
chunkedDownloader.downloadChunked(context, digest, out);
485+
return null;
486+
});
487+
return Futures.catchingAsync(
488+
chunkedDownloadFuture,
489+
CacheNotFoundException.class,
490+
(e) -> regularDownloadBlobFromRemote(context, digest, out),
491+
directExecutor());
492+
}
493+
494+
return regularDownloadBlobFromRemote(context, digest, out);
495+
}
496+
497+
private ListenableFuture<Void> regularDownloadBlobFromRemote(
498+
RemoteActionExecutionContext context, Digest digest, OutputStream out) {
499+
checkState(remoteCacheClient != null && context.getReadCachePolicy().allowRemoteCache());
500+
443501
if (diskCacheClient != null && context.getWriteCachePolicy().allowDiskCache()) {
444502
Path tempPath = diskCacheClient.getTempPath();
445503
LazyFileOutputStream tempOut = new LazyFileOutputStream(tempPath);

0 commit comments

Comments
 (0)