From 4394e9019ba830dd6a46af9ad3c7b1ddbaeeb365 Mon Sep 17 00:00:00 2001 From: Abid Mohammed Date: Mon, 18 Mar 2024 17:46:54 -0700 Subject: [PATCH 01/12] throw io exception on close if hadoopstreams interrupted --- .../org/apache/iceberg/hadoop/HadoopStreams.java | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/hadoop/HadoopStreams.java b/core/src/main/java/org/apache/iceberg/hadoop/HadoopStreams.java index 44023326a0cf..ad1902fcdc1e 100644 --- a/core/src/main/java/org/apache/iceberg/hadoop/HadoopStreams.java +++ b/core/src/main/java/org/apache/iceberg/hadoop/HadoopStreams.java @@ -185,8 +185,20 @@ public void flush() throws IOException { @Override public void close() throws IOException { - stream.close(); - this.closed = true; + try { + stream.close(); + // {@link org.apache.hadoop.fs.s3a.S3ABlockOutputStream#close()} calls {@link + // org.apache.hadoop.fs.s3a.S3ABlockOutputStream#putObject()} + // which doesn't throw an exception when interrupted. Need to handle interrupts here to make + // sure file is uploaded on close. + if (Thread.interrupted() + && "S3ABlockOutputStream" + .equals(stream.getWrappedStream().getClass().getSimpleName())) { + throw new IOException("s3a putObject interrupted"); + } + } finally { + this.closed = true; + } } @SuppressWarnings("checkstyle:NoFinalizer") From 91163ae6daf26f90e00483a0ea0c5d90905c831c Mon Sep 17 00:00:00 2001 From: Abid Mohammed Date: Mon, 18 Mar 2024 18:17:37 -0700 Subject: [PATCH 02/12] edit exception message --- core/src/main/java/org/apache/iceberg/hadoop/HadoopStreams.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/java/org/apache/iceberg/hadoop/HadoopStreams.java b/core/src/main/java/org/apache/iceberg/hadoop/HadoopStreams.java index ad1902fcdc1e..2e0ac5ffe163 100644 --- a/core/src/main/java/org/apache/iceberg/hadoop/HadoopStreams.java +++ b/core/src/main/java/org/apache/iceberg/hadoop/HadoopStreams.java @@ -194,7 +194,7 @@ public void close() throws IOException { if (Thread.interrupted() && "S3ABlockOutputStream" .equals(stream.getWrappedStream().getClass().getSimpleName())) { - throw new IOException("s3a putObject interrupted"); + throw new IOException("object upload failed as it was interrupted during close"); } } finally { this.closed = true; From 32b620f9b99043cfb0f7c716a44c408c31077f50 Mon Sep 17 00:00:00 2001 From: Abid Mohammed Date: Tue, 19 Mar 2024 11:51:53 -0700 Subject: [PATCH 03/12] use fully qualified name instead --- .../java/org/apache/iceberg/hadoop/HadoopStreams.java | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/hadoop/HadoopStreams.java b/core/src/main/java/org/apache/iceberg/hadoop/HadoopStreams.java index 2e0ac5ffe163..21da759b4363 100644 --- a/core/src/main/java/org/apache/iceberg/hadoop/HadoopStreams.java +++ b/core/src/main/java/org/apache/iceberg/hadoop/HadoopStreams.java @@ -189,11 +189,12 @@ public void close() throws IOException { stream.close(); // {@link org.apache.hadoop.fs.s3a.S3ABlockOutputStream#close()} calls {@link // org.apache.hadoop.fs.s3a.S3ABlockOutputStream#putObject()} - // which doesn't throw an exception when interrupted. Need to handle interrupts here to make - // sure file is uploaded on close. + // which doesn't throw an exception when interrupted. Need to check the interrupted flag to + // detect failed object upload + // and propagate the error up. if (Thread.interrupted() - && "S3ABlockOutputStream" - .equals(stream.getWrappedStream().getClass().getSimpleName())) { + && "org.apache.hadoop.fs.s3a.S3ABlockOutputStream" + .equals(stream.getWrappedStream().getClass().getName())) { throw new IOException("object upload failed as it was interrupted during close"); } } finally { From c1c022a78e50f79a95a8a515ddff1dfc1acc915c Mon Sep 17 00:00:00 2001 From: Abid Mohammed Date: Wed, 20 Mar 2024 13:44:31 -0700 Subject: [PATCH 04/12] add unit test --- .../apache/iceberg/hadoop/HadoopStreams.java | 4 +- .../hadoop/fs/s3a/S3ABlockOutputStream.java | 36 +++++++++++++++ .../iceberg/hadoop/HadoopStreamsTest.java | 44 +++++++++++++++++++ 3 files changed, 82 insertions(+), 2 deletions(-) create mode 100644 core/src/test/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java create mode 100644 core/src/test/java/org/apache/iceberg/hadoop/HadoopStreamsTest.java diff --git a/core/src/main/java/org/apache/iceberg/hadoop/HadoopStreams.java b/core/src/main/java/org/apache/iceberg/hadoop/HadoopStreams.java index 21da759b4363..92a4ba05a573 100644 --- a/core/src/main/java/org/apache/iceberg/hadoop/HadoopStreams.java +++ b/core/src/main/java/org/apache/iceberg/hadoop/HadoopStreams.java @@ -189,8 +189,8 @@ public void close() throws IOException { stream.close(); // {@link org.apache.hadoop.fs.s3a.S3ABlockOutputStream#close()} calls {@link // org.apache.hadoop.fs.s3a.S3ABlockOutputStream#putObject()} - // which doesn't throw an exception when interrupted. Need to check the interrupted flag to - // detect failed object upload + // which doesn't throw an exception when interrupted. + // Need to check the interrupted flag to detect failed object upload // and propagate the error up. if (Thread.interrupted() && "org.apache.hadoop.fs.s3a.S3ABlockOutputStream" diff --git a/core/src/test/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java b/core/src/test/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java new file mode 100644 index 000000000000..1914219fddbc --- /dev/null +++ b/core/src/test/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.fs.s3a; + +import java.io.IOException; +import java.io.OutputStream; + +/** mock class for testing hadoop s3a writer */ +public class S3ABlockOutputStream extends OutputStream { + + @Override + public void write(int b) throws IOException { + throw new IOException("mocked clas, do not use"); + } + + @Override + public void close() throws IOException { + super.close(); + } +} diff --git a/core/src/test/java/org/apache/iceberg/hadoop/HadoopStreamsTest.java b/core/src/test/java/org/apache/iceberg/hadoop/HadoopStreamsTest.java new file mode 100644 index 000000000000..a1edeaf06b24 --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/hadoop/HadoopStreamsTest.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.hadoop; + +import static org.junit.jupiter.api.Assertions.*; + +import java.io.IOException; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.s3a.S3ABlockOutputStream; +import org.apache.iceberg.io.PositionOutputStream; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +class HadoopStreamsTest { + + @Test + void closeShouldThrowIOExceptionOnInterrupted() throws IOException { + + FSDataOutputStream fsDataOutputStream = + new FSDataOutputStream(new S3ABlockOutputStream(), null); + PositionOutputStream wrap = HadoopStreams.wrap(fsDataOutputStream); + + // mock interrupt in S3ABlockOutputStream#putObject + Thread.currentThread().interrupt(); + + Assertions.assertThrowsExactly(IOException.class, wrap::close); + } +} From 93fbc9a449e6062dcc29176f56209cb002135d08 Mon Sep 17 00:00:00 2001 From: Abid Mohammed Date: Wed, 20 Mar 2024 13:47:25 -0700 Subject: [PATCH 05/12] update exception message --- core/src/main/java/org/apache/iceberg/hadoop/HadoopStreams.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/java/org/apache/iceberg/hadoop/HadoopStreams.java b/core/src/main/java/org/apache/iceberg/hadoop/HadoopStreams.java index 92a4ba05a573..34c11e876bf1 100644 --- a/core/src/main/java/org/apache/iceberg/hadoop/HadoopStreams.java +++ b/core/src/main/java/org/apache/iceberg/hadoop/HadoopStreams.java @@ -195,7 +195,7 @@ public void close() throws IOException { if (Thread.interrupted() && "org.apache.hadoop.fs.s3a.S3ABlockOutputStream" .equals(stream.getWrappedStream().getClass().getName())) { - throw new IOException("object upload failed as it was interrupted during close"); + throw new IOException("Failed to close stream as object failed to upload."); } } finally { this.closed = true; From bb0252fc9e63082936c3eff913ee639b5eafbdcc Mon Sep 17 00:00:00 2001 From: Abid Mohammed Date: Wed, 20 Mar 2024 14:44:40 -0700 Subject: [PATCH 06/12] fix imports --- .../test/java/org/apache/iceberg/hadoop/HadoopStreamsTest.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/core/src/test/java/org/apache/iceberg/hadoop/HadoopStreamsTest.java b/core/src/test/java/org/apache/iceberg/hadoop/HadoopStreamsTest.java index a1edeaf06b24..feb4b0cf87d1 100644 --- a/core/src/test/java/org/apache/iceberg/hadoop/HadoopStreamsTest.java +++ b/core/src/test/java/org/apache/iceberg/hadoop/HadoopStreamsTest.java @@ -18,8 +18,6 @@ */ package org.apache.iceberg.hadoop; -import static org.junit.jupiter.api.Assertions.*; - import java.io.IOException; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.s3a.S3ABlockOutputStream; From 0eafe8209082adb011b9bcb0f3808a19966bce01 Mon Sep 17 00:00:00 2001 From: Abid Mohammed Date: Thu, 21 Mar 2024 12:21:46 -0700 Subject: [PATCH 07/12] review comments refactor --- .../apache/iceberg/hadoop/HadoopStreams.java | 25 ++++++++----------- ...treamsTest.java => TestHadoopStreams.java} | 7 +++--- 2 files changed, 15 insertions(+), 17 deletions(-) rename core/src/test/java/org/apache/iceberg/hadoop/{HadoopStreamsTest.java => TestHadoopStreams.java} (87%) diff --git a/core/src/main/java/org/apache/iceberg/hadoop/HadoopStreams.java b/core/src/main/java/org/apache/iceberg/hadoop/HadoopStreams.java index 34c11e876bf1..33ae4dc41b45 100644 --- a/core/src/main/java/org/apache/iceberg/hadoop/HadoopStreams.java +++ b/core/src/main/java/org/apache/iceberg/hadoop/HadoopStreams.java @@ -185,20 +185,17 @@ public void flush() throws IOException { @Override public void close() throws IOException { - try { - stream.close(); - // {@link org.apache.hadoop.fs.s3a.S3ABlockOutputStream#close()} calls {@link - // org.apache.hadoop.fs.s3a.S3ABlockOutputStream#putObject()} - // which doesn't throw an exception when interrupted. - // Need to check the interrupted flag to detect failed object upload - // and propagate the error up. - if (Thread.interrupted() - && "org.apache.hadoop.fs.s3a.S3ABlockOutputStream" - .equals(stream.getWrappedStream().getClass().getName())) { - throw new IOException("Failed to close stream as object failed to upload."); - } - } finally { - this.closed = true; + stream.close(); + this.closed = true; + // {@link org.apache.hadoop.fs.s3a.S3ABlockOutputStream#close()} calls {@link + // org.apache.hadoop.fs.s3a.S3ABlockOutputStream#putObject()} + // which doesn't throw an exception when interrupted. + // Need to check the interrupted flag to detect failed object upload + // and propagate the error up. + if (Thread.interrupted() + && "org.apache.hadoop.fs.s3a.S3ABlockOutputStream" + .equals(stream.getWrappedStream().getClass().getName())) { + throw new IOException("Failed to close stream as object failed to upload."); } } diff --git a/core/src/test/java/org/apache/iceberg/hadoop/HadoopStreamsTest.java b/core/src/test/java/org/apache/iceberg/hadoop/TestHadoopStreams.java similarity index 87% rename from core/src/test/java/org/apache/iceberg/hadoop/HadoopStreamsTest.java rename to core/src/test/java/org/apache/iceberg/hadoop/TestHadoopStreams.java index feb4b0cf87d1..bfd3275153ec 100644 --- a/core/src/test/java/org/apache/iceberg/hadoop/HadoopStreamsTest.java +++ b/core/src/test/java/org/apache/iceberg/hadoop/TestHadoopStreams.java @@ -22,10 +22,9 @@ import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.s3a.S3ABlockOutputStream; import org.apache.iceberg.io.PositionOutputStream; -import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; -class HadoopStreamsTest { +class TestHadoopStreams { @Test void closeShouldThrowIOExceptionOnInterrupted() throws IOException { @@ -37,6 +36,8 @@ void closeShouldThrowIOExceptionOnInterrupted() throws IOException { // mock interrupt in S3ABlockOutputStream#putObject Thread.currentThread().interrupt(); - Assertions.assertThrowsExactly(IOException.class, wrap::close); + org.assertj.core.api.Assertions.assertThatThrownBy(wrap::close) + .isInstanceOf(IOException.class) + .hasMessage("Failed to close stream as object failed to upload."); } } From 478ac3d095a98f427d64c359f273027da6483801 Mon Sep 17 00:00:00 2001 From: Abid Mohammed Date: Thu, 21 Mar 2024 12:22:39 -0700 Subject: [PATCH 08/12] fix import --- .../test/java/org/apache/iceberg/hadoop/TestHadoopStreams.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/src/test/java/org/apache/iceberg/hadoop/TestHadoopStreams.java b/core/src/test/java/org/apache/iceberg/hadoop/TestHadoopStreams.java index bfd3275153ec..8270243e6a20 100644 --- a/core/src/test/java/org/apache/iceberg/hadoop/TestHadoopStreams.java +++ b/core/src/test/java/org/apache/iceberg/hadoop/TestHadoopStreams.java @@ -22,6 +22,7 @@ import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.s3a.S3ABlockOutputStream; import org.apache.iceberg.io.PositionOutputStream; +import org.assertj.core.api.Assertions; import org.junit.jupiter.api.Test; class TestHadoopStreams { @@ -36,7 +37,7 @@ void closeShouldThrowIOExceptionOnInterrupted() throws IOException { // mock interrupt in S3ABlockOutputStream#putObject Thread.currentThread().interrupt(); - org.assertj.core.api.Assertions.assertThatThrownBy(wrap::close) + Assertions.assertThatThrownBy(wrap::close) .isInstanceOf(IOException.class) .hasMessage("Failed to close stream as object failed to upload."); } From 87dd6e7e0bab38acc3a563e7fe1d61cc827b8b8f Mon Sep 17 00:00:00 2001 From: Abid Mohammed Date: Fri, 22 Mar 2024 06:10:08 -0700 Subject: [PATCH 09/12] address review comments --- .../main/java/org/apache/iceberg/hadoop/HadoopStreams.java | 3 ++- .../java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java | 2 +- .../java/org/apache/iceberg/hadoop/TestHadoopStreams.java | 4 ++-- 3 files changed, 5 insertions(+), 4 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/hadoop/HadoopStreams.java b/core/src/main/java/org/apache/iceberg/hadoop/HadoopStreams.java index 33ae4dc41b45..f9b43b684666 100644 --- a/core/src/main/java/org/apache/iceberg/hadoop/HadoopStreams.java +++ b/core/src/main/java/org/apache/iceberg/hadoop/HadoopStreams.java @@ -195,7 +195,8 @@ public void close() throws IOException { if (Thread.interrupted() && "org.apache.hadoop.fs.s3a.S3ABlockOutputStream" .equals(stream.getWrappedStream().getClass().getName())) { - throw new IOException("Failed to close stream as object failed to upload."); + throw new IOException( + "S3ABlockOutputStream failed to upload object after stream was closed"); } } diff --git a/core/src/test/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java b/core/src/test/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java index 1914219fddbc..6f35c8ed5b6a 100644 --- a/core/src/test/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java +++ b/core/src/test/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java @@ -26,7 +26,7 @@ public class S3ABlockOutputStream extends OutputStream { @Override public void write(int b) throws IOException { - throw new IOException("mocked clas, do not use"); + throw new IOException("mocked class, do not use"); } @Override diff --git a/core/src/test/java/org/apache/iceberg/hadoop/TestHadoopStreams.java b/core/src/test/java/org/apache/iceberg/hadoop/TestHadoopStreams.java index 8270243e6a20..24c678be13a2 100644 --- a/core/src/test/java/org/apache/iceberg/hadoop/TestHadoopStreams.java +++ b/core/src/test/java/org/apache/iceberg/hadoop/TestHadoopStreams.java @@ -28,7 +28,7 @@ class TestHadoopStreams { @Test - void closeShouldThrowIOExceptionOnInterrupted() throws IOException { + void closeShouldThrowIOExceptionWhenInterrupted() throws IOException { FSDataOutputStream fsDataOutputStream = new FSDataOutputStream(new S3ABlockOutputStream(), null); @@ -39,6 +39,6 @@ void closeShouldThrowIOExceptionOnInterrupted() throws IOException { Assertions.assertThatThrownBy(wrap::close) .isInstanceOf(IOException.class) - .hasMessage("Failed to close stream as object failed to upload."); + .hasMessage("S3ABlockOutputStream failed to upload object after stream was closed"); } } From d57baa8fdc11b982908bf20430cb699930031d68 Mon Sep 17 00:00:00 2001 From: Abid Mohammed Date: Mon, 25 Mar 2024 10:40:04 -0700 Subject: [PATCH 10/12] update test to mock upload interrupt from another thread --- .../hadoop/fs/s3a/S3ABlockOutputStream.java | 31 +++++++++++++++++++ .../iceberg/hadoop/TestHadoopStreams.java | 26 +++++++++++++--- 2 files changed, 52 insertions(+), 5 deletions(-) diff --git a/core/src/test/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java b/core/src/test/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java index 6f35c8ed5b6a..63bddf7b79c2 100644 --- a/core/src/test/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java +++ b/core/src/test/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java @@ -20,9 +20,19 @@ import java.io.IOException; import java.io.OutputStream; +import java.util.concurrent.CancellationException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; /** mock class for testing hadoop s3a writer */ public class S3ABlockOutputStream extends OutputStream { + public ExecutorService mockCloseService; + public Future mockUploadOnClose; + + public S3ABlockOutputStream() { + mockCloseService = Executors.newSingleThreadExecutor(); + } @Override public void write(int b) throws IOException { @@ -31,6 +41,27 @@ public void write(int b) throws IOException { @Override public void close() throws IOException { + try { + mockUploadOnClose = + mockCloseService.submit( + () -> { + try { + Thread.sleep(30 * 1000); + } catch (InterruptedException e) { + // ignore + } + }); + mockUploadOnClose.get(); + } catch (CancellationException | InterruptedException e) { + // mock interrupt in S3ABlockOutputStream#putObject + Thread.currentThread().interrupt(); + } catch (Exception e) { + throw new RuntimeException(e); + } super.close(); } + + public void interruptClose() { + mockUploadOnClose.cancel(true); + } } diff --git a/core/src/test/java/org/apache/iceberg/hadoop/TestHadoopStreams.java b/core/src/test/java/org/apache/iceberg/hadoop/TestHadoopStreams.java index 24c678be13a2..9655fbcb4c08 100644 --- a/core/src/test/java/org/apache/iceberg/hadoop/TestHadoopStreams.java +++ b/core/src/test/java/org/apache/iceberg/hadoop/TestHadoopStreams.java @@ -19,26 +19,42 @@ package org.apache.iceberg.hadoop; import java.io.IOException; +import java.util.concurrent.Executors; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.s3a.S3ABlockOutputStream; import org.apache.iceberg.io.PositionOutputStream; import org.assertj.core.api.Assertions; +import org.junit.Assert; import org.junit.jupiter.api.Test; class TestHadoopStreams { @Test - void closeShouldThrowIOExceptionWhenInterrupted() throws IOException { + void closeShouldThrowIOExceptionWhenInterrupted() throws Exception { - FSDataOutputStream fsDataOutputStream = - new FSDataOutputStream(new S3ABlockOutputStream(), null); + long startTime = System.currentTimeMillis(); + S3ABlockOutputStream s3ABlockOutputStream = new S3ABlockOutputStream(); + FSDataOutputStream fsDataOutputStream = new FSDataOutputStream(s3ABlockOutputStream, null); PositionOutputStream wrap = HadoopStreams.wrap(fsDataOutputStream); - // mock interrupt in S3ABlockOutputStream#putObject - Thread.currentThread().interrupt(); + // interrupt mock upload on close after a delay + Executors.newSingleThreadExecutor() + .execute( + () -> { + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + s3ABlockOutputStream.interruptClose(); + }); Assertions.assertThatThrownBy(wrap::close) .isInstanceOf(IOException.class) .hasMessage("S3ABlockOutputStream failed to upload object after stream was closed"); + + long endTime = System.currentTimeMillis(); + long closeDuration = endTime - startTime; + Assert.assertTrue(closeDuration < 30 * 1000 && closeDuration > 1000); } } From 7e24aa9b03ca3366f12b472977a43c42501606d8 Mon Sep 17 00:00:00 2001 From: Abid Mohammed Date: Mon, 25 Mar 2024 13:16:42 -0700 Subject: [PATCH 11/12] remove runtime assert for test --- .../java/org/apache/iceberg/hadoop/TestHadoopStreams.java | 6 ------ 1 file changed, 6 deletions(-) diff --git a/core/src/test/java/org/apache/iceberg/hadoop/TestHadoopStreams.java b/core/src/test/java/org/apache/iceberg/hadoop/TestHadoopStreams.java index 9655fbcb4c08..68d570f7c705 100644 --- a/core/src/test/java/org/apache/iceberg/hadoop/TestHadoopStreams.java +++ b/core/src/test/java/org/apache/iceberg/hadoop/TestHadoopStreams.java @@ -24,7 +24,6 @@ import org.apache.hadoop.fs.s3a.S3ABlockOutputStream; import org.apache.iceberg.io.PositionOutputStream; import org.assertj.core.api.Assertions; -import org.junit.Assert; import org.junit.jupiter.api.Test; class TestHadoopStreams { @@ -32,7 +31,6 @@ class TestHadoopStreams { @Test void closeShouldThrowIOExceptionWhenInterrupted() throws Exception { - long startTime = System.currentTimeMillis(); S3ABlockOutputStream s3ABlockOutputStream = new S3ABlockOutputStream(); FSDataOutputStream fsDataOutputStream = new FSDataOutputStream(s3ABlockOutputStream, null); PositionOutputStream wrap = HadoopStreams.wrap(fsDataOutputStream); @@ -52,9 +50,5 @@ void closeShouldThrowIOExceptionWhenInterrupted() throws Exception { Assertions.assertThatThrownBy(wrap::close) .isInstanceOf(IOException.class) .hasMessage("S3ABlockOutputStream failed to upload object after stream was closed"); - - long endTime = System.currentTimeMillis(); - long closeDuration = endTime - startTime; - Assert.assertTrue(closeDuration < 30 * 1000 && closeDuration > 1000); } } From e601204a564b97b60e75515502699ea959efdca4 Mon Sep 17 00:00:00 2001 From: Abid Mohammed Date: Fri, 29 Mar 2024 05:54:01 -0700 Subject: [PATCH 12/12] retrigger checks --- .../test/java/org/apache/iceberg/hadoop/TestHadoopStreams.java | 1 - 1 file changed, 1 deletion(-) diff --git a/core/src/test/java/org/apache/iceberg/hadoop/TestHadoopStreams.java b/core/src/test/java/org/apache/iceberg/hadoop/TestHadoopStreams.java index 68d570f7c705..09b478e4a6c1 100644 --- a/core/src/test/java/org/apache/iceberg/hadoop/TestHadoopStreams.java +++ b/core/src/test/java/org/apache/iceberg/hadoop/TestHadoopStreams.java @@ -34,7 +34,6 @@ void closeShouldThrowIOExceptionWhenInterrupted() throws Exception { S3ABlockOutputStream s3ABlockOutputStream = new S3ABlockOutputStream(); FSDataOutputStream fsDataOutputStream = new FSDataOutputStream(s3ABlockOutputStream, null); PositionOutputStream wrap = HadoopStreams.wrap(fsDataOutputStream); - // interrupt mock upload on close after a delay Executors.newSingleThreadExecutor() .execute(