-
Notifications
You must be signed in to change notification settings - Fork 4.5k
GCS client library migration in Java SDK - part 3 #37900
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
9e70407
eb1be77
67c42da
2b1e93a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -23,6 +23,8 @@ | |||||||||||||
|
|
||||||||||||||
| import com.google.api.gax.paging.Page; | ||||||||||||||
| import com.google.auto.value.AutoValue; | ||||||||||||||
| import com.google.cloud.ReadChannel; | ||||||||||||||
| import com.google.cloud.WriteChannel; | ||||||||||||||
| import com.google.cloud.storage.Blob; | ||||||||||||||
| import com.google.cloud.storage.BlobId; | ||||||||||||||
| import com.google.cloud.storage.BlobInfo; | ||||||||||||||
|
|
@@ -33,6 +35,8 @@ | |||||||||||||
| import com.google.cloud.storage.Storage.BlobField; | ||||||||||||||
| import com.google.cloud.storage.Storage.BlobGetOption; | ||||||||||||||
| import com.google.cloud.storage.Storage.BlobListOption; | ||||||||||||||
| import com.google.cloud.storage.Storage.BlobSourceOption; | ||||||||||||||
| import com.google.cloud.storage.Storage.BlobWriteOption; | ||||||||||||||
| import com.google.cloud.storage.Storage.BucketField; | ||||||||||||||
| import com.google.cloud.storage.Storage.BucketGetOption; | ||||||||||||||
| import com.google.cloud.storage.Storage.CopyRequest; | ||||||||||||||
|
|
@@ -42,12 +46,17 @@ | |||||||||||||
| import com.google.cloud.storage.StorageOptions; | ||||||||||||||
| import java.io.FileNotFoundException; | ||||||||||||||
| import java.io.IOException; | ||||||||||||||
| import java.nio.ByteBuffer; | ||||||||||||||
| import java.nio.channels.SeekableByteChannel; | ||||||||||||||
| import java.nio.channels.WritableByteChannel; | ||||||||||||||
| import java.nio.file.AccessDeniedException; | ||||||||||||||
| import java.nio.file.FileAlreadyExistsException; | ||||||||||||||
| import java.util.ArrayList; | ||||||||||||||
| import java.util.Arrays; | ||||||||||||||
| import java.util.List; | ||||||||||||||
| import java.util.regex.Pattern; | ||||||||||||||
| import org.apache.beam.sdk.extensions.gcp.options.GcpOptions; | ||||||||||||||
| import org.apache.beam.sdk.extensions.gcp.options.GcsOptions; | ||||||||||||||
| import org.apache.beam.sdk.extensions.gcp.util.gcsfs.GcsPath; | ||||||||||||||
| import org.apache.beam.sdk.options.DefaultValueFactory; | ||||||||||||||
| import org.apache.beam.sdk.options.PipelineOptions; | ||||||||||||||
|
|
@@ -70,6 +79,8 @@ public GcsUtilV2 create(PipelineOptions options) { | |||||||||||||
|
|
||||||||||||||
| private Storage storage; | ||||||||||||||
|
|
||||||||||||||
| private final @Nullable Integer uploadBufferSizeBytes; | ||||||||||||||
|
|
||||||||||||||
| /** Maximum number of items to retrieve per Objects.List request. */ | ||||||||||||||
| private static final long MAX_LIST_BLOBS_PER_CALL = 1024; | ||||||||||||||
|
|
||||||||||||||
|
|
@@ -85,13 +96,14 @@ public GcsUtilV2 create(PipelineOptions options) { | |||||||||||||
| GcsUtilV2(PipelineOptions options) { | ||||||||||||||
| String projectId = options.as(GcpOptions.class).getProject(); | ||||||||||||||
| storage = StorageOptions.newBuilder().setProjectId(projectId).build().getService(); | ||||||||||||||
| uploadBufferSizeBytes = options.as(GcsOptions.class).getGcsUploadBufferSizeBytes(); | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| @SuppressWarnings({ | ||||||||||||||
| "nullness" // For Creating AccessDeniedException FileNotFoundException, and | ||||||||||||||
| // FileAlreadyExistsException with null. | ||||||||||||||
| }) | ||||||||||||||
| private IOException translateStorageException(GcsPath gcsPath, StorageException e) { | ||||||||||||||
| private static IOException translateStorageException(GcsPath gcsPath, StorageException e) { | ||||||||||||||
| switch (e.getCode()) { | ||||||||||||||
| case 403: | ||||||||||||||
| return new AccessDeniedException(gcsPath.toString(), null, e.getMessage()); | ||||||||||||||
|
|
@@ -481,4 +493,141 @@ public void removeBucket(BucketInfo bucketInfo) throws IOException { | |||||||||||||
| throw translateStorageException(bucketInfo.getName(), null, e); | ||||||||||||||
| } | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| /** A bridge that allows a GCS ReadChannel to behave as a SeekableByteChannel. */ | ||||||||||||||
| private static class GcsSeekableByteChannel implements SeekableByteChannel { | ||||||||||||||
| private final ReadChannel reader; | ||||||||||||||
| private final long size; | ||||||||||||||
| private long position = 0; | ||||||||||||||
|
|
||||||||||||||
| GcsSeekableByteChannel(ReadChannel reader, long size) { | ||||||||||||||
| this.reader = reader; | ||||||||||||||
| this.size = size; | ||||||||||||||
| this.position = 0; | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| @Override | ||||||||||||||
| public int read(ByteBuffer dst) throws IOException { | ||||||||||||||
| int count = reader.read(dst); | ||||||||||||||
| if (count > 0) { | ||||||||||||||
| this.position += count; | ||||||||||||||
| } | ||||||||||||||
| return count; | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| @Override | ||||||||||||||
| public SeekableByteChannel position(long newPosition) throws IOException { | ||||||||||||||
| checkArgument(newPosition >= 0, "Position must be non-negative: %s", newPosition); | ||||||||||||||
| reader.seek(newPosition); | ||||||||||||||
| this.position = newPosition; | ||||||||||||||
| return this; | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| @Override | ||||||||||||||
| public long position() throws IOException { | ||||||||||||||
| return this.position; | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| @Override | ||||||||||||||
| public long size() throws IOException { | ||||||||||||||
| return size; | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| @Override | ||||||||||||||
| public SeekableByteChannel truncate(long size) throws IOException { | ||||||||||||||
| throw new UnsupportedOperationException( | ||||||||||||||
| "GcsSeekableByteChannels are read-only and cannot be truncated."); | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| @Override | ||||||||||||||
| public int write(ByteBuffer src) throws IOException { | ||||||||||||||
| throw new UnsupportedOperationException( | ||||||||||||||
| "GcsSeekableByteChannel are read-only and does not support writing."); | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| @Override | ||||||||||||||
| public boolean isOpen() { | ||||||||||||||
| return reader.isOpen(); | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| @Override | ||||||||||||||
| public void close() throws IOException { | ||||||||||||||
| if (isOpen()) { | ||||||||||||||
| reader.close(); | ||||||||||||||
| } | ||||||||||||||
| } | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| public SeekableByteChannel open(GcsPath path, BlobSourceOption... sourceOptions) | ||||||||||||||
| throws IOException { | ||||||||||||||
| Blob blob = getBlob(path, BlobGetOption.fields(BlobField.SIZE)); | ||||||||||||||
| return new GcsSeekableByteChannel( | ||||||||||||||
| blob.getStorage().reader(blob.getBlobId(), sourceOptions), blob.getSize()); | ||||||||||||||
|
Comment on lines
+564
to
+565
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would change this to the following to avoid unnecessary ByteBuffer allocations.
Suggested change
|
||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| /** A bridge that allows a GCS WriteChannel to behave as a WritableByteChannel. */ | ||||||||||||||
| private static class GcsWritableByteChannel implements WritableByteChannel { | ||||||||||||||
| private final WriteChannel writer; | ||||||||||||||
| private final GcsPath gcsPath; | ||||||||||||||
|
|
||||||||||||||
| GcsWritableByteChannel(WriteChannel writer, GcsPath gcsPath) { | ||||||||||||||
| this.writer = writer; | ||||||||||||||
| this.gcsPath = gcsPath; | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| @Override | ||||||||||||||
| public int write(ByteBuffer src) throws IOException { | ||||||||||||||
| try { | ||||||||||||||
| return writer.write(src); | ||||||||||||||
| } catch (StorageException e) { | ||||||||||||||
| throw translateStorageException(gcsPath, e); | ||||||||||||||
| } | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| @Override | ||||||||||||||
| public boolean isOpen() { | ||||||||||||||
| return writer.isOpen(); | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| @Override | ||||||||||||||
| public void close() throws IOException { | ||||||||||||||
| writer.close(); | ||||||||||||||
| } | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| public WritableByteChannel create( | ||||||||||||||
| GcsPath path, GcsUtilV1.CreateOptions options, BlobWriteOption... writeOptions) | ||||||||||||||
| throws IOException { | ||||||||||||||
| try { | ||||||||||||||
| // Define the metadata for the new object | ||||||||||||||
| BlobInfo.Builder builder = BlobInfo.newBuilder(path.getBucket(), path.getObject()); | ||||||||||||||
| String type = options.getContentType(); | ||||||||||||||
| if (type != null) { | ||||||||||||||
| builder.setContentType(type); | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| BlobInfo blobInfo = builder.build(); | ||||||||||||||
|
|
||||||||||||||
| List<BlobWriteOption> writeOptionList = new ArrayList<>(Arrays.asList(writeOptions)); | ||||||||||||||
| if (options.getExpectFileToNotExist()) { | ||||||||||||||
| writeOptionList.add(BlobWriteOption.doesNotExist()); | ||||||||||||||
| } | ||||||||||||||
|
Comment on lines
+612
to
+614
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This should be flushed out more. If there isn't a precondition[1] present when the writer is created, some internal rpcs won't be able to be automatically retried. And example of the other branch can be seen in this code sample for create from array https://github.com/googleapis/java-storage/blob/ba5daed0c1d306f821cc26549142ff0bcfb80cbb/samples/snippets/src/main/java/com/example/storage/object/UploadObjectFromMemory.java#L53-L64 |
||||||||||||||
| // Open a WriteChannel from the storage service | ||||||||||||||
| WriteChannel writer = | ||||||||||||||
| storage.writer(blobInfo, writeOptionList.toArray(new BlobWriteOption[0])); | ||||||||||||||
| Integer uploadBufferSizeBytes = | ||||||||||||||
| options.getUploadBufferSizeBytes() != null | ||||||||||||||
| ? options.getUploadBufferSizeBytes() | ||||||||||||||
| : this.uploadBufferSizeBytes; | ||||||||||||||
| if (uploadBufferSizeBytes != null) { | ||||||||||||||
| writer.setChunkSize(uploadBufferSizeBytes); | ||||||||||||||
| } | ||||||||||||||
|
|
||||||||||||||
| // Return the bridge wrapper | ||||||||||||||
| return new GcsWritableByteChannel(writer, path); | ||||||||||||||
|
|
||||||||||||||
| } catch (StorageException e) { | ||||||||||||||
| throw translateStorageException(path, e); | ||||||||||||||
| } | ||||||||||||||
| } | ||||||||||||||
| } | ||||||||||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After making the channel non-blocking, you'll probably want to add the following change if you need to always fill the provided buffer as much as possible: