Skip to content

Commit 67c42da

Browse files
committed
Store the gcs path into GcsWritableByteChannel.
1 parent eb1be77 commit 67c42da

File tree

2 files changed

+10
-5
lines changed

2 files changed

+10
-5
lines changed

sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilV2.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ public GcsUtilV2 create(PipelineOptions options) {
103103
"nullness" // For Creating AccessDeniedException FileNotFoundException, and
104104
// FileAlreadyExistsException with null.
105105
})
106-
private IOException translateStorageException(GcsPath gcsPath, StorageException e) {
106+
private static IOException translateStorageException(GcsPath gcsPath, StorageException e) {
107107
switch (e.getCode()) {
108108
case 403:
109109
return new AccessDeniedException(gcsPath.toString(), null, e.getMessage());
@@ -568,18 +568,19 @@ public SeekableByteChannel open(GcsPath path, BlobSourceOption... sourceOptions)
568568
/** A bridge that allows a GCS WriteChannel to behave as a WritableByteChannel. */
569569
private static class GcsWritableByteChannel implements WritableByteChannel {
570570
private final WriteChannel writer;
571+
private final GcsPath gcsPath;
571572

572-
GcsWritableByteChannel(WriteChannel writer) {
573+
GcsWritableByteChannel(WriteChannel writer, GcsPath gcsPath) {
573574
this.writer = writer;
575+
this.gcsPath = gcsPath;
574576
}
575577

576578
@Override
577579
public int write(ByteBuffer src) throws IOException {
578580
try {
579581
return writer.write(src);
580582
} catch (StorageException e) {
581-
// In a real implementation, you'd use your translateStorageException here
582-
throw new IOException(e);
583+
throw translateStorageException(gcsPath, e);
583584
}
584585
}
585586

@@ -623,7 +624,7 @@ public WritableByteChannel create(
623624
}
624625

625626
// Return the bridge wrapper
626-
return new GcsWritableByteChannel(writer);
627+
return new GcsWritableByteChannel(writer, path);
627628

628629
} catch (StorageException e) {
629630
throw translateStorageException(path, e);

sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilParameterizedIT.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -659,11 +659,13 @@ public void testWriteAndRead() throws IOException {
659659
try {
660660
createTestBucketHelper(bucketName, false);
661661

662+
// Write content to a GCS file
662663
CreateOptions options = CreateOptions.builder().setExpectFileToNotExist(true).build();
663664
try (WritableByteChannel writer = gcsUtil.create(targetPath, options)) {
664665
writer.write(ByteBuffer.wrap(content.getBytes(StandardCharsets.UTF_8)));
665666
}
666667

668+
// Read content into a buffer
667669
StringBuilder readContent = new StringBuilder();
668670
try (ReadableByteChannel reader = gcsUtil.open(targetPath)) {
669671
ByteBuffer buffer = ByteBuffer.allocate(1024);
@@ -673,6 +675,8 @@ public void testWriteAndRead() throws IOException {
673675
buffer.clear();
674676
}
675677
}
678+
679+
// Verify content
676680
assertEquals(content, readContent.toString());
677681
} finally {
678682
tearDownTestBucketHelper(bucketName);

0 commit comments

Comments
 (0)