Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
71edb38
fix: make FlushPolicy${Min,Max}FlushSizeFlushPolicy constructors private
BenWhitehead Jul 31, 2025
d890480
chore: centralize all protobuf formatting methods in StorageV2ProtoUtils
BenWhitehead Jul 31, 2025
2c8c81f
chore: rename ResumableSessionFailureScenario to UploadFailureScenario
BenWhitehead Jul 31, 2025
72134ff
test: update TestBench to be graceful to an instance running outside …
BenWhitehead Jul 31, 2025
fc552a4
test: consolidate declarations of grpc-status-details-bin to a single…
BenWhitehead Jul 31, 2025
75d4cfd
chore: add BidiUploadState
BenWhitehead Jul 31, 2025
365bce5
chore: add BidiUploadStreamingStream
BenWhitehead Jul 31, 2025
6d24a64
chore: add BidiAppendableUnbufferedWritableByteChannel
BenWhitehead Aug 4, 2025
f262a2c
chore: make MinFlushBufferedWritableByteChannel capable of being non-…
BenWhitehead Aug 4, 2025
fb34cb2
chore: new bidi appendable channel bootstrappable
BenWhitehead Aug 4, 2025
acff0cd
chore: remove old appendble upload implementation
BenWhitehead Aug 4, 2025
9a21428
test: refactor existing appendable upload tests to work with new impl…
BenWhitehead Aug 4, 2025
dd946d1
chore: remove obsolete internal hasher option
BenWhitehead Aug 5, 2025
0257fe0
docs: note FlushPolicy.MaxFlushSizeFlushPolicy has a better option
BenWhitehead Aug 5, 2025
62846d4
docs: update BlobAppendableUpload.AppendableUploadWriteableByteChanne…
BenWhitehead Aug 6, 2025
fc83080
feat: add StorageNonBlockingChannelUtils
BenWhitehead Aug 6, 2025
352fde7
fix: fix otel span lifetime for BlobAppendableUpload
BenWhitehead Aug 8, 2025
b20cee5
chore: update some retry/takeover tests and initialization after revi…
BenWhitehead Aug 7, 2025
9ba1625
chore: make BlobAppendableUpload.AppendableUploadWriteableByteChannel…
BenWhitehead Aug 12, 2025
2db2592
chore: update bidi grpc calls to request 1 message initially instead …
BenWhitehead Aug 18, 2025
934e1a9
chore: add comment to BidiAppendableUnbufferedWritableByteChannel#wri…
BenWhitehead Aug 18, 2025
7c919dc
chore: add comment to BidiUploadStreamingStream class
BenWhitehead Aug 18, 2025
70b6b78
chore: make maxRedirectsAllowed a configurable parameter of BlobAppen…
BenWhitehead Aug 18, 2025
1f2708b
chore: make DefaultBufferedWritableByteChannel capable of being non-b…
BenWhitehead Aug 18, 2025
1a1f879
fix: update BlobAppendableUploadConfig and FlushPolicy.MinFlushSizeFl…
BenWhitehead Aug 19, 2025
ca7aa88
chore: update BidiUploadStreamingStream to avoid enqueuing multiple e…
BenWhitehead Aug 20, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions google-cloud-storage/clirr-ignored-differences.xml
Original file line number Diff line number Diff line change
Expand Up @@ -161,4 +161,27 @@
<method>com.google.cloud.storage.BucketInfo$Builder setGoogleManagedEncryptionEnforcementConfig(com.google.cloud.storage.BucketInfo$GoogleManagedEncryptionEnforcementConfig)</method>
</difference>

<!-- make beta api constructors private, they still retain their factory methods. -->
<difference>
<differenceType>7004</differenceType>
<className>com/google/cloud/storage/FlushPolicy$MinFlushSizeFlushPolicy</className>
<method>FlushPolicy$MinFlushSizeFlushPolicy(int)</method>
</difference>
<difference>
<differenceType>7009</differenceType>
<className>com/google/cloud/storage/FlushPolicy$MinFlushSizeFlushPolicy</className>
<method>FlushPolicy$MinFlushSizeFlushPolicy(int)</method>
</difference>
<difference>
<differenceType>7009</differenceType>
<className>com/google/cloud/storage/FlushPolicy$MaxFlushSizeFlushPolicy</className>
<method>FlushPolicy$MaxFlushSizeFlushPolicy(int)</method>
</difference>
<!-- this method is already on the parent interface, and is on an @InternalExtensionOnly interface -->
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/storage/BlobAppendableUpload$AppendableUploadWriteableByteChannel</className>
<method>int write(java.nio.ByteBuffer)</method>
</difference>

</differences>
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
/*
* Copyright 2025 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.storage;

import com.google.cloud.BaseServiceException;
import com.google.cloud.storage.ChunkSegmenter.ChunkSegment;
import com.google.cloud.storage.UnbufferedWritableByteChannelSession.UnbufferedWritableByteChannel;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

final class BidiAppendableUnbufferedWritableByteChannel implements UnbufferedWritableByteChannel {

private final BidiUploadStreamingStream stream;
private final ChunkSegmenter chunkSegmenter;

private boolean open;
private long writeOffset;
private volatile boolean nextWriteShouldFinalize;
private boolean writeCalledAtLeastOnce;

/** If write throws an error, don't attempt to finalize things when {@link #close()} is called. */
private boolean writeThrewError;

BidiAppendableUnbufferedWritableByteChannel(
BidiUploadStreamingStream stream, ChunkSegmenter chunkSegmenter, long writeOffset) {
this.stream = stream;
this.chunkSegmenter = chunkSegmenter;
this.open = true;
this.writeOffset = writeOffset;
this.nextWriteShouldFinalize = false;
this.writeThrewError = false;
}

@Override
public long write(ByteBuffer[] srcs, int srcsOffset, int srcsLength) throws IOException {
return internalWrite(srcs, srcsOffset, srcsLength);
}

@Override
public long writeAndClose(ByteBuffer[] srcs, int offset, int length) throws IOException {
long totalRemaining = Buffers.totalRemaining(srcs, offset, length);
// internalWrite is non-blocking, but close is blocking.
// loop here to ensure all the bytes we need flush are enqueued before we transition to trying
// to close.
long written = 0;
do {
written += internalWrite(srcs, offset, length);
} while (written < totalRemaining);
close();
return written;
}

@Override
public boolean isOpen() {
return open;
}

@Override
public void close() throws IOException {
if (!open) {
return;
}
try {
if (writeThrewError) {
return;
}

if (!writeCalledAtLeastOnce) {
stream.flush();
}
if (nextWriteShouldFinalize) {
//noinspection StatementWithEmptyBody
while (!stream.finishWrite(writeOffset)) {}
} else {
//noinspection StatementWithEmptyBody
while (!stream.closeStream(writeOffset)) {}
}

awaitResultFuture();
} finally {
stream.sendClose();
open = false;
}
}

public void nextWriteShouldFinalize() {
this.nextWriteShouldFinalize = true;
}

private long internalWrite(ByteBuffer[] srcs, int srcsOffset, int srcsLength) throws IOException {
if (!open) {
throw new ClosedChannelException();
}
// error early. if the result future is already failed, await it to throw the error
if (stream.getResultFuture().isDone()) {
awaitResultFuture();
return 0;
}
writeCalledAtLeastOnce = true;

long availableCapacity = stream.availableCapacity();
if (availableCapacity <= 0) {
return 0;
}
RewindableContent rewindableContent = RewindableContent.of(srcs, srcsOffset, srcsLength);
long totalBufferRemaining = rewindableContent.getLength();

ChunkSegment[] data = chunkSegmenter.segmentBuffers(srcs, srcsOffset, srcsLength, true);
if (data.length == 0) {
return 0;
}
// we consumed some bytes from srcs, flag our content as dirty since we aren't writing
// those bytes to implicitly flag as dirty.
rewindableContent.flagDirty();

long bytesConsumed = 0;
for (int i = 0, len = data.length, lastIdx = len - 1; i < len; i++) {
ChunkSegment datum = data[i];
int size = datum.getB().size();
boolean appended;
if (i < lastIdx) {
appended = stream.append(datum);
} else if (i == lastIdx && nextWriteShouldFinalize) {
appended = stream.appendAndFinalize(datum);
} else {
appended = stream.appendAndFlush(datum);
}
if (appended) {
bytesConsumed += size;
writeOffset += size;
} else {
// if we weren't able to trigger a flush by reaching the end of the array and calling
// appendAndFlush, explicitly call flush here so that some progress can be made.
// we prefer appendAndFlush so a separate message is not needed, but an extra message
// in order to make progress and free buffer space is better than ending up in a live-lock.
stream.flush();
break;
}
}

if (bytesConsumed != totalBufferRemaining) {
rewindableContent.rewindTo(bytesConsumed);
}

return bytesConsumed;
}

private void awaitResultFuture() throws IOException {
try {
stream.getResultFuture().get(10_717, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
InterruptedIOException ioe = new InterruptedIOException();
ioe.initCause(e);
writeThrewError = true;
throw ioe;
} catch (ExecutionException e) {
BaseServiceException coalesce = StorageException.coalesce(e.getCause());
String message = coalesce.getMessage();
String ioExceptionMessage = message;
// if the failure is an upload scenario we detect client side, it's message will be
// verbose. To avoid duplication, select the first line only for the io exception
int firstNewLineIndex = message != null ? message.indexOf('\n') : -1;
if (firstNewLineIndex > -1) {
ioExceptionMessage = message.substring(0, firstNewLineIndex);
}
IOException ioException = new IOException(ioExceptionMessage, coalesce);
// ioException.addSuppressed(new AsyncStorageTaskException());
writeThrewError = true;
throw ioException;
} catch (TimeoutException e) {
writeThrewError = true;
throw new IOException(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,8 @@ public WritableByteChannelSession<?, BlobInfo> writeSession(
GrpcStorageImpl grpc = (GrpcStorageImpl) s;
GrpcCallContext grpcCallContext =
opts.grpcMetadataMapper().apply(GrpcCallContext.createDefault());
BidiWriteObjectRequest req = grpc.getBidiWriteObjectRequest(info, opts);
BidiWriteObjectRequest req =
grpc.getBidiWriteObjectRequest(info, opts, false);

ApiFuture<BidiResumableWrite> startResumableWrite =
grpc.startResumableWrite(grpcCallContext, req, opts);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,73 +94,3 @@ static BidiResumableWrite identity(BidiResumableWrite w) {
return w;
}
}

final class BidiAppendableWrite implements BidiWriteObjectRequestBuilderFactory {

private final BidiWriteObjectRequest req;

public BidiAppendableWrite(BidiWriteObjectRequest req) {
this(req, false);
}

public BidiAppendableWrite(BidiWriteObjectRequest req, boolean takeOver) {
if (takeOver) {
this.req = req;
} else {
req =
req.toBuilder()
.setWriteObjectSpec(req.getWriteObjectSpec().toBuilder().setAppendable(true).build())
.build();
this.req = req;
}
}

public BidiWriteObjectRequest getReq() {
return req;
}

@Override
public BidiWriteObjectRequest.Builder newBuilder() {
return req.toBuilder();
}

@Override
public @Nullable String bucketName() {
if (req.hasWriteObjectSpec() && req.getWriteObjectSpec().hasResource()) {
return req.getWriteObjectSpec().getResource().getBucket();
} else if (req.hasAppendObjectSpec()) {
return req.getAppendObjectSpec().getBucket();
}
return null;
}

@Override
public String toString() {
return "BidiAppendableWrite{" + "req=" + fmtProto(req) + '}';
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof BidiAppendableWrite)) {
return false;
}
BidiAppendableWrite BidiAppendableWrite = (BidiAppendableWrite) o;
return Objects.equals(req, BidiAppendableWrite.getReq());
}

@Override
public int hashCode() {
return Objects.hash(req);
}

/**
* Helper function which is more specific than {@link Function#identity()}. Constraining the input
* and output to be exactly {@link BidiAppendableWrite}.
*/
static BidiAppendableWrite identity(BidiAppendableWrite w) {
return w;
}
}
Loading
Loading