diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSink.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSink.java index 3121b6ffb065..752882a9d6c2 100644 --- a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSink.java +++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSink.java @@ -30,6 +30,7 @@ import java.util.UUID; import java.util.function.Function; import org.apache.flink.annotation.Experimental; +import org.apache.flink.api.common.SupportsConcurrentExecutionAttempts; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.typeinfo.TypeInformation; @@ -139,7 +140,8 @@ public class IcebergSink SupportsPreWriteTopology, SupportsCommitter, SupportsPreCommitTopology, - SupportsPostCommitTopology { + SupportsPostCommitTopology, + SupportsConcurrentExecutionAttempts { private static final Logger LOG = LoggerFactory.getLogger(IcebergSink.class); private final TableLoader tableLoader; private final Map snapshotProperties; diff --git a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSpeculativeExecutionSupport.java b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSpeculativeExecutionSupport.java index f0d083060c1d..61a587e7786a 100644 --- a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSpeculativeExecutionSupport.java +++ b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSpeculativeExecutionSupport.java @@ -44,12 +44,16 @@ import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.test.junit5.MiniClusterExtension; import org.apache.flink.types.Row; +import org.apache.iceberg.Parameter; +import org.apache.iceberg.ParameterizedTestExtension; +import org.apache.iceberg.Parameters; import org.apache.iceberg.flink.FlinkConfigOptions; import org.apache.iceberg.flink.TestBase; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestTemplate; import org.junit.jupiter.api.Timeout; +import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.api.extension.RegisterExtension; /** @@ -57,6 +61,7 @@ * anything goes wrong unexpectedly. */ @Timeout(value = 60) +@ExtendWith(ParameterizedTestExtension.class) public class TestIcebergSpeculativeExecutionSupport extends TestBase { private static final int NUM_TASK_MANAGERS = 1; private static final int NUM_TASK_SLOTS = 3; @@ -90,6 +95,14 @@ protected TableEnvironment getTableEnv() { return tEnv; } + @Parameter(index = 0) + private boolean useV2Sink; + + @Parameters(name = "useV2Sink = {0}") + public static Object[][] parameters() { + return new Object[][] {{true}, {false}}; + } + @BeforeEach public void before() throws IOException { String warehouse = @@ -114,8 +127,9 @@ public void after() { dropCatalog(CATALOG_NAME, true); } - @Test + @TestTemplate public void testSpeculativeExecution() throws Exception { + tEnv.getConfig().set("table.exec.iceberg.use-v2-sink", String.valueOf(useV2Sink)); Table table = tEnv.sqlQuery(String.format("SELECT * FROM %s.%s", DATABASE_NAME, INPUT_TABLE_NAME)); DataStream slowStream =