From d1000cd70e67504efcca82c27797d88e460e1c88 Mon Sep 17 00:00:00 2001 From: Huaxin Gao Date: Tue, 2 Dec 2025 15:03:23 -0800 Subject: [PATCH 1/4] =?UTF-8?q?Core:=20REST=20client=20=E2=80=93=20send=20?= =?UTF-8?q?Idempotency-Key=20on=20mutation=20requests=20when=20advertised?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../iceberg/rest/RESTSessionCatalog.java | 51 +++++++++++-------- .../org/apache/iceberg/rest/RESTUtil.java | 11 ++++ 2 files changed, 42 insertions(+), 20 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java b/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java index 85b04f3868cd..5b5d6e939f6d 100644 --- a/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java +++ b/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java @@ -160,6 +160,7 @@ public class RESTSessionCatalog extends BaseViewSessionCatalog private Integer pageSize = null; private CloseableGroup closeables = null; private Set endpoints; + private Supplier> mutationHeaders = Map::of; public RESTSessionCatalog() { this( @@ -203,6 +204,11 @@ public void initialize(String name, Map unresolved) { // build the final configuration and set up the catalog's auth Map mergedProps = config.merge(props); + // Enable Idempotency-Key header for mutation endpoints if the server advertises support + if (config.idempotencyKeyLifetime() != null) { + this.mutationHeaders = RESTUtil::idempotencyHeaders; + } + if (config.endpoints().isEmpty()) { this.endpoints = PropertyUtil.propertyAsBoolean( @@ -307,7 +313,8 @@ public boolean dropTable(SessionContext context, TableIdentifier identifier) { AuthSession contextualSession = authManager.contextualSession(context, catalogAuth); client .withAuthSession(contextualSession) - .delete(paths.table(identifier), null, Map.of(), ErrorHandlers.tableErrorHandler()); + .delete( + paths.table(identifier), null, mutationHeaders, ErrorHandlers.tableErrorHandler()); return true; } catch (NoSuchTableException e) { return false; @@ -327,7 +334,7 @@ public boolean purgeTable(SessionContext context, TableIdentifier identifier) { paths.table(identifier), ImmutableMap.of("purgeRequested", "true"), null, - Map.of(), + mutationHeaders, ErrorHandlers.tableErrorHandler()); return true; } catch (NoSuchTableException e) { @@ -348,7 +355,7 @@ public void renameTable(SessionContext context, TableIdentifier from, TableIdent AuthSession contextualSession = authManager.contextualSession(context, catalogAuth); client .withAuthSession(contextualSession) - .post(paths.rename(), request, null, Map.of(), ErrorHandlers.tableErrorHandler()); + .post(paths.rename(), request, null, mutationHeaders, ErrorHandlers.tableErrorHandler()); } @Override @@ -454,7 +461,7 @@ public Table loadTable(SessionContext context, TableIdentifier identifier) { newTableOps( tableClient, paths.table(finalIdentifier), - Map::of, + mutationHeaders, tableFileIO(context, tableConf, response.credentials()), tableMetadata, endpoints); @@ -523,7 +530,7 @@ public Table registerTable( paths.register(ident.namespace()), request, LoadTableResponse.class, - Map.of(), + mutationHeaders, ErrorHandlers.tableErrorHandler()); Map tableConf = response.config(); @@ -533,7 +540,7 @@ public Table registerTable( newTableOps( tableClient, paths.table(ident), - Map::of, + mutationHeaders, tableFileIO(context, tableConf, response.credentials()), response.tableMetadata(), endpoints); @@ -559,7 +566,7 @@ public void createNamespace( paths.namespaces(), request, CreateNamespaceResponse.class, - Map.of(), + mutationHeaders, ErrorHandlers.namespaceErrorHandler()); } @@ -645,7 +652,11 @@ public boolean dropNamespace(SessionContext context, Namespace ns) { AuthSession contextualSession = authManager.contextualSession(context, catalogAuth); client .withAuthSession(contextualSession) - .delete(paths.namespace(ns), null, Map.of(), ErrorHandlers.dropNamespaceErrorHandler()); + .delete( + paths.namespace(ns), + null, + mutationHeaders, + ErrorHandlers.dropNamespaceErrorHandler()); return true; } catch (NoSuchNamespaceException e) { return false; @@ -669,7 +680,7 @@ public boolean updateNamespaceMetadata( paths.namespaceProperties(ns), request, UpdateNamespacePropertiesResponse.class, - Map.of(), + mutationHeaders, ErrorHandlers.namespaceErrorHandler()); return !response.updated().isEmpty(); @@ -782,7 +793,7 @@ public Table create() { paths.tables(ident.namespace()), request, LoadTableResponse.class, - Map.of(), + mutationHeaders, ErrorHandlers.tableErrorHandler()); Map tableConf = response.config(); @@ -819,7 +830,7 @@ public Transaction createTransaction() { newTableOps( tableClient, paths.table(ident), - Map::of, + mutationHeaders, tableFileIO(context, tableConf, response.credentials()), RESTTableOperations.UpdateType.CREATE, createChanges(meta), @@ -882,7 +893,7 @@ public Transaction replaceTransaction() { newTableOps( tableClient, paths.table(ident), - Map::of, + mutationHeaders, tableFileIO(context, tableConf, response.credentials()), RESTTableOperations.UpdateType.REPLACE, changes.build(), @@ -932,7 +943,7 @@ private LoadTableResponse stageCreate() { paths.tables(ident.namespace()), request, LoadTableResponse.class, - Map.of(), + mutationHeaders, ErrorHandlers.tableErrorHandler()); } } @@ -1148,7 +1159,7 @@ public void commitTransaction(SessionContext context, List commits) paths.commitTransaction(), new CommitTransactionRequest(tableChanges), null, - Map.of(), + mutationHeaders, ErrorHandlers.tableCommitHandler()); } @@ -1234,7 +1245,7 @@ public View loadView(SessionContext context, TableIdentifier identifier) { newViewOps( client.withAuthSession(tableSession), paths.view(identifier), - Map::of, + mutationHeaders, metadata, endpoints); @@ -1255,7 +1266,7 @@ public boolean dropView(SessionContext context, TableIdentifier identifier) { AuthSession contextualSession = authManager.contextualSession(context, catalogAuth); client .withAuthSession(contextualSession) - .delete(paths.view(identifier), null, Map.of(), ErrorHandlers.viewErrorHandler()); + .delete(paths.view(identifier), null, mutationHeaders, ErrorHandlers.viewErrorHandler()); return true; } catch (NoSuchViewException e) { return false; @@ -1274,7 +1285,7 @@ public void renameView(SessionContext context, TableIdentifier from, TableIdenti AuthSession contextualSession = authManager.contextualSession(context, catalogAuth); client .withAuthSession(contextualSession) - .post(paths.renameView(), request, null, Map.of(), ErrorHandlers.viewErrorHandler()); + .post(paths.renameView(), request, null, mutationHeaders, ErrorHandlers.viewErrorHandler()); } private class RESTViewBuilder implements ViewBuilder { @@ -1404,7 +1415,7 @@ public View create() { paths.views(identifier.namespace()), request, LoadViewResponse.class, - Map.of(), + mutationHeaders, ErrorHandlers.viewErrorHandler()); Map tableConf = response.config(); @@ -1413,7 +1424,7 @@ public View create() { newViewOps( client.withAuthSession(tableSession), paths.view(identifier), - Map::of, + mutationHeaders, response.metadata(), endpoints); @@ -1504,7 +1515,7 @@ private View replace(LoadViewResponse response) { newViewOps( client.withAuthSession(tableSession), paths.view(identifier), - Map::of, + mutationHeaders, metadata, endpoints); diff --git a/core/src/main/java/org/apache/iceberg/rest/RESTUtil.java b/core/src/main/java/org/apache/iceberg/rest/RESTUtil.java index 6ca04e74d9f5..ec02a9dc8459 100644 --- a/core/src/main/java/org/apache/iceberg/rest/RESTUtil.java +++ b/core/src/main/java/org/apache/iceberg/rest/RESTUtil.java @@ -29,6 +29,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.util.PropertyUtil; +import org.apache.iceberg.util.UUIDUtil; public class RESTUtil { private static final char NAMESPACE_SEPARATOR = '\u001f'; @@ -49,6 +50,8 @@ public class RESTUtil { */ @Deprecated public static final Splitter NAMESPACE_SPLITTER = Splitter.on(NAMESPACE_SEPARATOR); + public static final String IDEMPOTENCY_KEY_HEADER = "Idempotency-Key"; + private RESTUtil() {} public static String stripTrailingSlash(String path) { @@ -271,4 +274,12 @@ public static String resolveEndpoint(String catalogUri, String endpointPath) { public static Map configHeaders(Map properties) { return RESTUtil.extractPrefixMap(properties, "header."); } + + /** + * Returns a single-use headers map containing a freshly generated idempotency key. The key is a + * UUIDv7 string suitable for use in the Idempotency-Key header. + */ + public static Map idempotencyHeaders() { + return ImmutableMap.of(IDEMPOTENCY_KEY_HEADER, UUIDUtil.generateUuidV7().toString()); + } } From 131ab74e36b38066bc700ef2080ce958776d1de3 Mon Sep 17 00:00:00 2001 From: Huaxin Gao Date: Tue, 2 Dec 2025 16:35:33 -0800 Subject: [PATCH 2/4] rebase --- .../apache/iceberg/rest/TestRESTCatalog.java | 102 ++++++++++++++++++ 1 file changed, 102 insertions(+) diff --git a/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java b/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java index efe76e2bf060..8e3ec216755f 100644 --- a/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java +++ b/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java @@ -37,6 +37,7 @@ import java.net.InetAddress; import java.net.InetSocketAddress; import java.nio.file.Path; +import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.Objects; @@ -50,6 +51,7 @@ import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Supplier; +import java.util.stream.Collectors; import org.apache.hadoop.conf.Configuration; import org.apache.http.HttpHeaders; import org.apache.iceberg.BaseTable; @@ -125,6 +127,16 @@ public class TestRESTCatalog extends CatalogTests { private Server httpServer; private RESTCatalogAdapter adapterForRESTServer; + private enum IdempHeaderExpectation { + NONE, + REQUIRE, + FORBID + } + + private static volatile IdempHeaderExpectation idempHeaderExpectation = + IdempHeaderExpectation.NONE; + private static volatile boolean advertiseIdempInConfig = false; + @BeforeEach public void createCatalog() throws Exception { File warehouse = temp.toFile(); @@ -152,6 +164,30 @@ public void createCatalog() throws Exception { adapterForRESTServer = Mockito.spy( new RESTCatalogAdapter(backendCatalog) { + @Override + public T handleRequest( + Route route, + Map vars, + HTTPRequest httpRequest, + Class responseType, + Consumer> responseHeaders) { + if (route == Route.CONFIG) { + ConfigResponse.Builder builder = + ConfigResponse.builder() + .withEndpoints( + Arrays.stream(Route.values()) + .map(r -> Endpoint.create(r.method().name(), r.resourcePath())) + .collect(Collectors.toList())); + if (advertiseIdempInConfig) { + builder.withIdempotencyKeyLifetime("PT30M"); + } + + return (T) builder.build(); + } + + return super.handleRequest(route, vars, httpRequest, responseType, responseHeaders); + } + @Override public T execute( HTTPRequest request, @@ -161,12 +197,24 @@ public T execute( // this doesn't use a Mockito spy because this is used for catalog tests, which have // different method calls if (!ResourcePaths.tokens().equals(request.path())) { + boolean isMutation = + request.method() == HTTPMethod.POST || request.method() == HTTPMethod.DELETE; + if (isMutation) { + boolean hasIdemp = request.headers().contains(RESTUtil.IDEMPOTENCY_KEY_HEADER); + if (idempHeaderExpectation == IdempHeaderExpectation.REQUIRE) { + assertThat(hasIdemp).isTrue(); + } else if (idempHeaderExpectation == IdempHeaderExpectation.FORBID) { + assertThat(hasIdemp).isFalse(); + } + } + if (ResourcePaths.config().equals(request.path())) { assertThat(request.headers().entries()).containsAll(catalogHeaders.entries()); } else { assertThat(request.headers().entries()).containsAll(contextHeaders.entries()); } } + Object body = roundTripSerialize(request.body(), "request"); HTTPRequest req = ImmutableHTTPRequest.builder().from(request).body(body).build(); T response = super.execute(req, responseType, errorHandler, responseHeaders); @@ -3261,6 +3309,60 @@ protected RESTTableOperations newTableOps( } } + @Test + public void testClientAutoSendsIdempotencyWhenServerAdvertises() { + idempHeaderExpectation = IdempHeaderExpectation.REQUIRE; + advertiseIdempInConfig = true; + RESTCatalog local = null; + try { + local = initCatalog("prod", ImmutableMap.of()); + Namespace ns = Namespace.of("ns_cfg_yes"); + TableIdentifier ident = TableIdentifier.of(ns, "t_cfg_yes"); + Schema schema = new Schema(Types.NestedField.required(1, "id", Types.IntegerType.get())); + local.createNamespace(ns, ImmutableMap.of()); + local.createTable(ident, schema); + assertThat(local.tableExists(ident)).isTrue(); + local.dropTable(ident); + } finally { + idempHeaderExpectation = IdempHeaderExpectation.NONE; + advertiseIdempInConfig = false; + if (local != null) { + try { + local.close(); + } catch (Exception e) { + throw new RuntimeException("Failed to close RESTCatalog", e); + } + } + } + } + + @Test + public void testClientDoesNotSendIdempotencyWhenServerNotAdvertising() { + idempHeaderExpectation = IdempHeaderExpectation.FORBID; + advertiseIdempInConfig = false; + RESTCatalog local = null; + try { + local = initCatalog("prod", ImmutableMap.of()); + Namespace ns = Namespace.of("ns_cfg_no"); + TableIdentifier ident = TableIdentifier.of(ns, "t_cfg_no"); + Schema schema = new Schema(Types.NestedField.required(1, "id", Types.IntegerType.get())); + local.createNamespace(ns, ImmutableMap.of()); + local.createTable(ident, schema); + assertThat(local.tableExists(ident)).isTrue(); + local.dropTable(ident); + } finally { + idempHeaderExpectation = IdempHeaderExpectation.NONE; + advertiseIdempInConfig = false; + if (local != null) { + try { + local.close(); + } catch (Exception e) { + throw new RuntimeException("Failed to close RESTCatalog", e); + } + } + } + } + private RESTCatalog catalog(RESTCatalogAdapter adapter) { RESTCatalog catalog = new RESTCatalog(SessionCatalog.SessionContext.createEmpty(), (config) -> adapter); From ac9680a6dd3bb1bbf5d7ff30167c33dc9df1ee63 Mon Sep 17 00:00:00 2001 From: Huaxin Gao Date: Wed, 3 Dec 2025 11:03:58 -0800 Subject: [PATCH 3/4] address comments --- .../iceberg/rest/RESTSessionCatalog.java | 17 ++++-- .../iceberg/rest/RESTTableOperations.java | 54 +++++++++++++++++-- .../iceberg/rest/RESTViewOperations.java | 24 +++++++-- 3 files changed, 82 insertions(+), 13 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java b/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java index 5b5d6e939f6d..ba45857f581b 100644 --- a/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java +++ b/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java @@ -803,7 +803,7 @@ public Table create() { newTableOps( tableClient, paths.table(ident), - Map::of, + mutationHeaders, tableFileIO(context, tableConf, response.credentials()), response.tableMetadata(), endpoints); @@ -1043,7 +1043,8 @@ protected RESTTableOperations newTableOps( FileIO fileIO, TableMetadata current, Set supportedEndpoints) { - return new RESTTableOperations(restClient, path, headers, fileIO, current, supportedEndpoints); + return new RESTTableOperations( + restClient, path, Map::of, headers, fileIO, current, supportedEndpoints); } /** @@ -1073,7 +1074,15 @@ protected RESTTableOperations newTableOps( TableMetadata current, Set supportedEndpoints) { return new RESTTableOperations( - restClient, path, headers, fileIO, updateType, createChanges, current, supportedEndpoints); + restClient, + path, + Map::of, + headers, + fileIO, + updateType, + createChanges, + current, + supportedEndpoints); } /** @@ -1095,7 +1104,7 @@ protected RESTViewOperations newViewOps( Supplier> headers, ViewMetadata current, Set supportedEndpoints) { - return new RESTViewOperations(restClient, path, headers, current, supportedEndpoints); + return new RESTViewOperations(restClient, path, Map::of, headers, current, supportedEndpoints); } private static ConfigResponse fetchConfig( diff --git a/core/src/main/java/org/apache/iceberg/rest/RESTTableOperations.java b/core/src/main/java/org/apache/iceberg/rest/RESTTableOperations.java index 38dabc8ae568..d2a6ab618ca8 100644 --- a/core/src/main/java/org/apache/iceberg/rest/RESTTableOperations.java +++ b/core/src/main/java/org/apache/iceberg/rest/RESTTableOperations.java @@ -55,7 +55,8 @@ enum UpdateType { private final RESTClient client; private final String path; - private final Supplier> headers; + private final Supplier> readHeaders; + private final Supplier> mutationHeaders; private final FileIO io; private final List createChanges; private final TableMetadata replaceBase; @@ -70,7 +71,16 @@ enum UpdateType { FileIO io, TableMetadata current, Set endpoints) { - this(client, path, headers, io, UpdateType.SIMPLE, Lists.newArrayList(), current, endpoints); + this( + client, + path, + headers, + headers, + io, + UpdateType.SIMPLE, + Lists.newArrayList(), + current, + endpoints); } RESTTableOperations( @@ -82,9 +92,43 @@ enum UpdateType { List createChanges, TableMetadata current, Set endpoints) { + this(client, path, headers, headers, io, updateType, createChanges, current, endpoints); + } + + RESTTableOperations( + RESTClient client, + String path, + Supplier> readHeaders, + Supplier> mutationHeaders, + FileIO io, + TableMetadata current, + Set endpoints) { + this( + client, + path, + readHeaders, + mutationHeaders, + io, + UpdateType.SIMPLE, + Lists.newArrayList(), + current, + endpoints); + } + + RESTTableOperations( + RESTClient client, + String path, + Supplier> readHeaders, + Supplier> mutationHeaders, + FileIO io, + UpdateType updateType, + List createChanges, + TableMetadata current, + Set endpoints) { this.client = client; this.path = path; - this.headers = headers; + this.readHeaders = readHeaders; + this.mutationHeaders = mutationHeaders; this.io = io; this.updateType = updateType; this.createChanges = createChanges; @@ -106,7 +150,7 @@ public TableMetadata current() { public TableMetadata refresh() { Endpoint.check(endpoints, Endpoint.V1_LOAD_TABLE); return updateCurrentMetadata( - client.get(path, LoadTableResponse.class, headers, ErrorHandlers.tableErrorHandler())); + client.get(path, LoadTableResponse.class, readHeaders, ErrorHandlers.tableErrorHandler())); } @Override @@ -159,7 +203,7 @@ public void commit(TableMetadata base, TableMetadata metadata) { // TODO: ensure that the HTTP client lib passes HTTP client errors to the error handler LoadTableResponse response; try { - response = client.post(path, request, LoadTableResponse.class, headers, errorHandler); + response = client.post(path, request, LoadTableResponse.class, mutationHeaders, errorHandler); } catch (CommitStateUnknownException e) { // Lightweight reconciliation for snapshot-add-only updates on transient unknown commit state if (updateType == UpdateType.SIMPLE && reconcileOnSimpleUpdate(updates, e)) { diff --git a/core/src/main/java/org/apache/iceberg/rest/RESTViewOperations.java b/core/src/main/java/org/apache/iceberg/rest/RESTViewOperations.java index 466a8e66899b..0018e2f91903 100644 --- a/core/src/main/java/org/apache/iceberg/rest/RESTViewOperations.java +++ b/core/src/main/java/org/apache/iceberg/rest/RESTViewOperations.java @@ -32,7 +32,8 @@ class RESTViewOperations implements ViewOperations { private final RESTClient client; private final String path; - private final Supplier> headers; + private final Supplier> readHeaders; + private final Supplier> mutationHeaders; private final Set endpoints; private ViewMetadata current; @@ -42,10 +43,21 @@ class RESTViewOperations implements ViewOperations { Supplier> headers, ViewMetadata current, Set endpoints) { + this(client, path, headers, headers, current, endpoints); + } + + RESTViewOperations( + RESTClient client, + String path, + Supplier> readHeaders, + Supplier> mutationHeaders, + ViewMetadata current, + Set endpoints) { Preconditions.checkArgument(null != current, "Invalid view metadata: null"); this.client = client; this.path = path; - this.headers = headers; + this.readHeaders = readHeaders; + this.mutationHeaders = mutationHeaders; this.current = current; this.endpoints = endpoints; } @@ -59,7 +71,7 @@ public ViewMetadata current() { public ViewMetadata refresh() { Endpoint.check(endpoints, Endpoint.V1_LOAD_VIEW); return updateCurrentMetadata( - client.get(path, LoadViewResponse.class, headers, ErrorHandlers.viewErrorHandler())); + client.get(path, LoadViewResponse.class, readHeaders, ErrorHandlers.viewErrorHandler())); } @Override @@ -74,7 +86,11 @@ public void commit(ViewMetadata base, ViewMetadata metadata) { LoadViewResponse response = client.post( - path, request, LoadViewResponse.class, headers, ErrorHandlers.viewCommitHandler()); + path, + request, + LoadViewResponse.class, + mutationHeaders, + ErrorHandlers.viewCommitHandler()); updateCurrentMetadata(response); } From 3ce7a256e320164d36f1fec27cd2d7d32f29490c Mon Sep 17 00:00:00 2001 From: Huaxin Gao Date: Thu, 4 Dec 2025 11:24:40 -0800 Subject: [PATCH 4/4] address comments --- .../iceberg/rest/RESTSessionCatalog.java | 41 ++++- .../apache/iceberg/rest/TestRESTCatalog.java | 172 ++++++++---------- .../iceberg/rest/TestRESTViewCatalog.java | 6 +- 3 files changed, 112 insertions(+), 107 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java b/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java index ba45857f581b..fe0d13217b62 100644 --- a/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java +++ b/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java @@ -461,6 +461,7 @@ public Table loadTable(SessionContext context, TableIdentifier identifier) { newTableOps( tableClient, paths.table(finalIdentifier), + Map::of, mutationHeaders, tableFileIO(context, tableConf, response.credentials()), tableMetadata, @@ -540,6 +541,7 @@ public Table registerTable( newTableOps( tableClient, paths.table(ident), + Map::of, mutationHeaders, tableFileIO(context, tableConf, response.credentials()), response.tableMetadata(), @@ -803,6 +805,7 @@ public Table create() { newTableOps( tableClient, paths.table(ident), + Map::of, mutationHeaders, tableFileIO(context, tableConf, response.credentials()), response.tableMetadata(), @@ -830,6 +833,7 @@ public Transaction createTransaction() { newTableOps( tableClient, paths.table(ident), + Map::of, mutationHeaders, tableFileIO(context, tableConf, response.credentials()), RESTTableOperations.UpdateType.CREATE, @@ -893,6 +897,7 @@ public Transaction replaceTransaction() { newTableOps( tableClient, paths.table(ident), + Map::of, mutationHeaders, tableFileIO(context, tableConf, response.credentials()), RESTTableOperations.UpdateType.REPLACE, @@ -1030,7 +1035,10 @@ private FileIO tableFileIO( * * @param restClient the REST client to use for communicating with the catalog server * @param path the REST path for the table - * @param headers a supplier for additional HTTP headers to include in requests + * @param readHeaders a supplier for additional HTTP headers to include in read requests + * (GET/HEAD) + * @param mutationHeaderSupplier a supplier for additional HTTP headers to include in mutation + * requests (POST/DELETE) * @param fileIO the FileIO implementation for reading and writing table metadata and data files * @param current the current table metadata * @param supportedEndpoints the set of supported REST endpoints @@ -1039,12 +1047,13 @@ private FileIO tableFileIO( protected RESTTableOperations newTableOps( RESTClient restClient, String path, - Supplier> headers, + Supplier> readHeaders, + Supplier> mutationHeaderSupplier, FileIO fileIO, TableMetadata current, Set supportedEndpoints) { return new RESTTableOperations( - restClient, path, Map::of, headers, fileIO, current, supportedEndpoints); + restClient, path, readHeaders, mutationHeaderSupplier, fileIO, current, supportedEndpoints); } /** @@ -1056,7 +1065,10 @@ protected RESTTableOperations newTableOps( * * @param restClient the REST client to use for communicating with the catalog server * @param path the REST path for the table - * @param headers a supplier for additional HTTP headers to include in requests + * @param readHeaders a supplier for additional HTTP headers to include in read requests + * (GET/HEAD) + * @param mutationHeaderSupplier a supplier for additional HTTP headers to include in mutation + * requests (POST/DELETE) * @param fileIO the FileIO implementation for reading and writing table metadata and data files * @param updateType the {@link RESTTableOperations.UpdateType} being performed * @param createChanges the list of metadata updates to apply during table creation or replacement @@ -1067,7 +1079,8 @@ protected RESTTableOperations newTableOps( protected RESTTableOperations newTableOps( RESTClient restClient, String path, - Supplier> headers, + Supplier> readHeaders, + Supplier> mutationHeaderSupplier, FileIO fileIO, RESTTableOperations.UpdateType updateType, List createChanges, @@ -1076,8 +1089,8 @@ protected RESTTableOperations newTableOps( return new RESTTableOperations( restClient, path, - Map::of, - headers, + readHeaders, + mutationHeaderSupplier, fileIO, updateType, createChanges, @@ -1093,7 +1106,10 @@ protected RESTTableOperations newTableOps( * * @param restClient the REST client to use for communicating with the catalog server * @param path the REST path for the view - * @param headers a supplier for additional HTTP headers to include in requests + * @param readHeaders a supplier for additional HTTP headers to include in read requests + * (GET/HEAD) + * @param mutationHeaderSupplier a supplier for additional HTTP headers to include in mutation + * requests (POST/DELETE) * @param current the current view metadata * @param supportedEndpoints the set of supported REST endpoints * @return a new RESTViewOperations instance @@ -1101,10 +1117,12 @@ protected RESTTableOperations newTableOps( protected RESTViewOperations newViewOps( RESTClient restClient, String path, - Supplier> headers, + Supplier> readHeaders, + Supplier> mutationHeaderSupplier, ViewMetadata current, Set supportedEndpoints) { - return new RESTViewOperations(restClient, path, Map::of, headers, current, supportedEndpoints); + return new RESTViewOperations( + restClient, path, readHeaders, mutationHeaderSupplier, current, supportedEndpoints); } private static ConfigResponse fetchConfig( @@ -1254,6 +1272,7 @@ public View loadView(SessionContext context, TableIdentifier identifier) { newViewOps( client.withAuthSession(tableSession), paths.view(identifier), + Map::of, mutationHeaders, metadata, endpoints); @@ -1433,6 +1452,7 @@ public View create() { newViewOps( client.withAuthSession(tableSession), paths.view(identifier), + Map::of, mutationHeaders, response.metadata(), endpoints); @@ -1524,6 +1544,7 @@ private View replace(LoadViewResponse response) { newViewOps( client.withAuthSession(tableSession), paths.view(identifier), + Map::of, mutationHeaders, metadata, endpoints); diff --git a/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java b/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java index 8e3ec216755f..385893ea7130 100644 --- a/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java +++ b/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java @@ -127,16 +127,6 @@ public class TestRESTCatalog extends CatalogTests { private Server httpServer; private RESTCatalogAdapter adapterForRESTServer; - private enum IdempHeaderExpectation { - NONE, - REQUIRE, - FORBID - } - - private static volatile IdempHeaderExpectation idempHeaderExpectation = - IdempHeaderExpectation.NONE; - private static volatile boolean advertiseIdempInConfig = false; - @BeforeEach public void createCatalog() throws Exception { File warehouse = temp.toFile(); @@ -164,30 +154,6 @@ public void createCatalog() throws Exception { adapterForRESTServer = Mockito.spy( new RESTCatalogAdapter(backendCatalog) { - @Override - public T handleRequest( - Route route, - Map vars, - HTTPRequest httpRequest, - Class responseType, - Consumer> responseHeaders) { - if (route == Route.CONFIG) { - ConfigResponse.Builder builder = - ConfigResponse.builder() - .withEndpoints( - Arrays.stream(Route.values()) - .map(r -> Endpoint.create(r.method().name(), r.resourcePath())) - .collect(Collectors.toList())); - if (advertiseIdempInConfig) { - builder.withIdempotencyKeyLifetime("PT30M"); - } - - return (T) builder.build(); - } - - return super.handleRequest(route, vars, httpRequest, responseType, responseHeaders); - } - @Override public T execute( HTTPRequest request, @@ -197,17 +163,6 @@ public T execute( // this doesn't use a Mockito spy because this is used for catalog tests, which have // different method calls if (!ResourcePaths.tokens().equals(request.path())) { - boolean isMutation = - request.method() == HTTPMethod.POST || request.method() == HTTPMethod.DELETE; - if (isMutation) { - boolean hasIdemp = request.headers().contains(RESTUtil.IDEMPOTENCY_KEY_HEADER); - if (idempHeaderExpectation == IdempHeaderExpectation.REQUIRE) { - assertThat(hasIdemp).isTrue(); - } else if (idempHeaderExpectation == IdempHeaderExpectation.FORBID) { - assertThat(hasIdemp).isFalse(); - } - } - if (ResourcePaths.config().equals(request.path())) { assertThat(request.headers().entries()).containsAll(catalogHeaders.entries()); } else { @@ -3218,13 +3173,14 @@ class CustomRESTSessionCatalog extends RESTSessionCatalog { protected RESTTableOperations newTableOps( RESTClient restClient, String path, - Supplier> headers, + Supplier> readHeaders, + Supplier> mutationHeaders, FileIO fileIO, TableMetadata current, Set supportedEndpoints) { RESTTableOperations ops = new CustomRESTTableOperations( - restClient, path, headers, fileIO, current, supportedEndpoints); + restClient, path, mutationHeaders, fileIO, current, supportedEndpoints); RESTTableOperations spy = Mockito.spy(ops); capturedOps.set(spy); return spy; @@ -3234,7 +3190,8 @@ protected RESTTableOperations newTableOps( protected RESTTableOperations newTableOps( RESTClient restClient, String path, - Supplier> headers, + Supplier> readHeaders, + Supplier> mutationHeaders, FileIO fileIO, RESTTableOperations.UpdateType updateType, List createChanges, @@ -3244,7 +3201,7 @@ protected RESTTableOperations newTableOps( new CustomRESTTableOperations( restClient, path, - headers, + mutationHeaders, fileIO, updateType, createChanges, @@ -3311,56 +3268,81 @@ protected RESTTableOperations newTableOps( @Test public void testClientAutoSendsIdempotencyWhenServerAdvertises() { - idempHeaderExpectation = IdempHeaderExpectation.REQUIRE; - advertiseIdempInConfig = true; - RESTCatalog local = null; - try { - local = initCatalog("prod", ImmutableMap.of()); - Namespace ns = Namespace.of("ns_cfg_yes"); - TableIdentifier ident = TableIdentifier.of(ns, "t_cfg_yes"); - Schema schema = new Schema(Types.NestedField.required(1, "id", Types.IntegerType.get())); - local.createNamespace(ns, ImmutableMap.of()); - local.createTable(ident, schema); - assertThat(local.tableExists(ident)).isTrue(); - local.dropTable(ident); - } finally { - idempHeaderExpectation = IdempHeaderExpectation.NONE; - advertiseIdempInConfig = false; - if (local != null) { - try { - local.close(); - } catch (Exception e) { - throw new RuntimeException("Failed to close RESTCatalog", e); - } - } - } + ConfigResponse cfgWithIdem = + ConfigResponse.builder() + .withIdempotencyKeyLifetime("PT30M") + .withEndpoints( + Arrays.stream(Route.values()) + .map(r -> Endpoint.create(r.method().name(), r.resourcePath())) + .collect(Collectors.toList())) + .build(); + + RESTCatalog local = createCatalogWithIdempAdapter(cfgWithIdem, true); + + Namespace ns = Namespace.of("ns_cfg_idem"); + TableIdentifier ident = TableIdentifier.of(ns, "t_cfg_idem"); + Schema schema = new Schema(Types.NestedField.required(1, "id", Types.IntegerType.get())); + local.createNamespace(ns, ImmutableMap.of()); + local.createTable(ident, schema); + assertThat(local.tableExists(ident)).isTrue(); + local.dropTable(ident); } @Test public void testClientDoesNotSendIdempotencyWhenServerNotAdvertising() { - idempHeaderExpectation = IdempHeaderExpectation.FORBID; - advertiseIdempInConfig = false; - RESTCatalog local = null; - try { - local = initCatalog("prod", ImmutableMap.of()); - Namespace ns = Namespace.of("ns_cfg_no"); - TableIdentifier ident = TableIdentifier.of(ns, "t_cfg_no"); - Schema schema = new Schema(Types.NestedField.required(1, "id", Types.IntegerType.get())); - local.createNamespace(ns, ImmutableMap.of()); - local.createTable(ident, schema); - assertThat(local.tableExists(ident)).isTrue(); - local.dropTable(ident); - } finally { - idempHeaderExpectation = IdempHeaderExpectation.NONE; - advertiseIdempInConfig = false; - if (local != null) { - try { - local.close(); - } catch (Exception e) { - throw new RuntimeException("Failed to close RESTCatalog", e); - } - } - } + ConfigResponse cfgNoIdem = + ConfigResponse.builder() + .withEndpoints( + Arrays.stream(Route.values()) + .map(r -> Endpoint.create(r.method().name(), r.resourcePath())) + .collect(Collectors.toList())) + .build(); + + RESTCatalog local = createCatalogWithIdempAdapter(cfgNoIdem, false); + + Namespace ns = Namespace.of("ns_cfg_no_idem"); + TableIdentifier ident = TableIdentifier.of(ns, "t_cfg_no_idem"); + Schema schema = new Schema(Types.NestedField.required(1, "id", Types.IntegerType.get())); + local.createNamespace(ns, ImmutableMap.of()); + local.createTable(ident, schema); + assertThat(local.tableExists(ident)).isTrue(); + local.dropTable(ident); + } + + private RESTCatalog createCatalogWithIdempAdapter(ConfigResponse cfg, boolean expectOnMutations) { + RESTCatalogAdapter adapter = + Mockito.spy( + new RESTCatalogAdapter(backendCatalog) { + @Override + public T execute( + HTTPRequest request, + Class responseType, + Consumer errorHandler, + Consumer> responseHeaders) { + if (ResourcePaths.config().equals(request.path())) { + return castResponse(responseType, cfg); + } + + boolean isMutation = + request.method() == HTTPMethod.POST || request.method() == HTTPMethod.DELETE; + boolean hasIdemp = request.headers().contains(RESTUtil.IDEMPOTENCY_KEY_HEADER); + + if (isMutation) { + assertThat(hasIdemp) + .as("Idempotency-Key presence on mutations did not match expectation") + .isEqualTo(expectOnMutations); + } else { + assertThat(hasIdemp).as("Idempotency-Key must NOT be sent on reads").isFalse(); + } + + return super.execute(request, responseType, errorHandler, responseHeaders); + } + }); + + RESTCatalog local = + new RESTCatalog(SessionCatalog.SessionContext.createEmpty(), (config) -> adapter); + local.initialize("test", ImmutableMap.of()); + return local; } private RESTCatalog catalog(RESTCatalogAdapter adapter) { diff --git a/core/src/test/java/org/apache/iceberg/rest/TestRESTViewCatalog.java b/core/src/test/java/org/apache/iceberg/rest/TestRESTViewCatalog.java index 6b39907098f9..8bfe26b18cda 100644 --- a/core/src/test/java/org/apache/iceberg/rest/TestRESTViewCatalog.java +++ b/core/src/test/java/org/apache/iceberg/rest/TestRESTViewCatalog.java @@ -350,11 +350,13 @@ class CustomRESTSessionCatalog extends RESTSessionCatalog { protected RESTViewOperations newViewOps( RESTClient restClient, String path, - Supplier> headers, + Supplier> readHeaders, + Supplier> mutationHeaders, ViewMetadata current, Set supportedEndpoints) { RESTViewOperations ops = - new CustomRESTViewOperations(restClient, path, headers, current, supportedEndpoints); + new CustomRESTViewOperations( + restClient, path, mutationHeaders, current, supportedEndpoints); RESTViewOperations spy = Mockito.spy(ops); capturedOps.set(spy); return spy;