diff --git a/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java b/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java index 85b04f3868cd..fe0d13217b62 100644 --- a/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java +++ b/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java @@ -160,6 +160,7 @@ public class RESTSessionCatalog extends BaseViewSessionCatalog private Integer pageSize = null; private CloseableGroup closeables = null; private Set endpoints; + private Supplier> mutationHeaders = Map::of; public RESTSessionCatalog() { this( @@ -203,6 +204,11 @@ public void initialize(String name, Map unresolved) { // build the final configuration and set up the catalog's auth Map mergedProps = config.merge(props); + // Enable Idempotency-Key header for mutation endpoints if the server advertises support + if (config.idempotencyKeyLifetime() != null) { + this.mutationHeaders = RESTUtil::idempotencyHeaders; + } + if (config.endpoints().isEmpty()) { this.endpoints = PropertyUtil.propertyAsBoolean( @@ -307,7 +313,8 @@ public boolean dropTable(SessionContext context, TableIdentifier identifier) { AuthSession contextualSession = authManager.contextualSession(context, catalogAuth); client .withAuthSession(contextualSession) - .delete(paths.table(identifier), null, Map.of(), ErrorHandlers.tableErrorHandler()); + .delete( + paths.table(identifier), null, mutationHeaders, ErrorHandlers.tableErrorHandler()); return true; } catch (NoSuchTableException e) { return false; @@ -327,7 +334,7 @@ public boolean purgeTable(SessionContext context, TableIdentifier identifier) { paths.table(identifier), ImmutableMap.of("purgeRequested", "true"), null, - Map.of(), + mutationHeaders, ErrorHandlers.tableErrorHandler()); return true; } catch (NoSuchTableException e) { @@ -348,7 +355,7 @@ public void renameTable(SessionContext context, TableIdentifier from, TableIdent AuthSession contextualSession = authManager.contextualSession(context, catalogAuth); client .withAuthSession(contextualSession) - .post(paths.rename(), request, null, Map.of(), ErrorHandlers.tableErrorHandler()); + .post(paths.rename(), request, null, mutationHeaders, ErrorHandlers.tableErrorHandler()); } @Override @@ -455,6 +462,7 @@ public Table loadTable(SessionContext context, TableIdentifier identifier) { tableClient, paths.table(finalIdentifier), Map::of, + mutationHeaders, tableFileIO(context, tableConf, response.credentials()), tableMetadata, endpoints); @@ -523,7 +531,7 @@ public Table registerTable( paths.register(ident.namespace()), request, LoadTableResponse.class, - Map.of(), + mutationHeaders, ErrorHandlers.tableErrorHandler()); Map tableConf = response.config(); @@ -534,6 +542,7 @@ public Table registerTable( tableClient, paths.table(ident), Map::of, + mutationHeaders, tableFileIO(context, tableConf, response.credentials()), response.tableMetadata(), endpoints); @@ -559,7 +568,7 @@ public void createNamespace( paths.namespaces(), request, CreateNamespaceResponse.class, - Map.of(), + mutationHeaders, ErrorHandlers.namespaceErrorHandler()); } @@ -645,7 +654,11 @@ public boolean dropNamespace(SessionContext context, Namespace ns) { AuthSession contextualSession = authManager.contextualSession(context, catalogAuth); client .withAuthSession(contextualSession) - .delete(paths.namespace(ns), null, Map.of(), ErrorHandlers.dropNamespaceErrorHandler()); + .delete( + paths.namespace(ns), + null, + mutationHeaders, + ErrorHandlers.dropNamespaceErrorHandler()); return true; } catch (NoSuchNamespaceException e) { return false; @@ -669,7 +682,7 @@ public boolean updateNamespaceMetadata( paths.namespaceProperties(ns), request, UpdateNamespacePropertiesResponse.class, - Map.of(), + mutationHeaders, ErrorHandlers.namespaceErrorHandler()); return !response.updated().isEmpty(); @@ -782,7 +795,7 @@ public Table create() { paths.tables(ident.namespace()), request, LoadTableResponse.class, - Map.of(), + mutationHeaders, ErrorHandlers.tableErrorHandler()); Map tableConf = response.config(); @@ -793,6 +806,7 @@ public Table create() { tableClient, paths.table(ident), Map::of, + mutationHeaders, tableFileIO(context, tableConf, response.credentials()), response.tableMetadata(), endpoints); @@ -820,6 +834,7 @@ public Transaction createTransaction() { tableClient, paths.table(ident), Map::of, + mutationHeaders, tableFileIO(context, tableConf, response.credentials()), RESTTableOperations.UpdateType.CREATE, createChanges(meta), @@ -883,6 +898,7 @@ public Transaction replaceTransaction() { tableClient, paths.table(ident), Map::of, + mutationHeaders, tableFileIO(context, tableConf, response.credentials()), RESTTableOperations.UpdateType.REPLACE, changes.build(), @@ -932,7 +948,7 @@ private LoadTableResponse stageCreate() { paths.tables(ident.namespace()), request, LoadTableResponse.class, - Map.of(), + mutationHeaders, ErrorHandlers.tableErrorHandler()); } } @@ -1019,7 +1035,10 @@ private FileIO tableFileIO( * * @param restClient the REST client to use for communicating with the catalog server * @param path the REST path for the table - * @param headers a supplier for additional HTTP headers to include in requests + * @param readHeaders a supplier for additional HTTP headers to include in read requests + * (GET/HEAD) + * @param mutationHeaderSupplier a supplier for additional HTTP headers to include in mutation + * requests (POST/DELETE) * @param fileIO the FileIO implementation for reading and writing table metadata and data files * @param current the current table metadata * @param supportedEndpoints the set of supported REST endpoints @@ -1028,11 +1047,13 @@ private FileIO tableFileIO( protected RESTTableOperations newTableOps( RESTClient restClient, String path, - Supplier> headers, + Supplier> readHeaders, + Supplier> mutationHeaderSupplier, FileIO fileIO, TableMetadata current, Set supportedEndpoints) { - return new RESTTableOperations(restClient, path, headers, fileIO, current, supportedEndpoints); + return new RESTTableOperations( + restClient, path, readHeaders, mutationHeaderSupplier, fileIO, current, supportedEndpoints); } /** @@ -1044,7 +1065,10 @@ protected RESTTableOperations newTableOps( * * @param restClient the REST client to use for communicating with the catalog server * @param path the REST path for the table - * @param headers a supplier for additional HTTP headers to include in requests + * @param readHeaders a supplier for additional HTTP headers to include in read requests + * (GET/HEAD) + * @param mutationHeaderSupplier a supplier for additional HTTP headers to include in mutation + * requests (POST/DELETE) * @param fileIO the FileIO implementation for reading and writing table metadata and data files * @param updateType the {@link RESTTableOperations.UpdateType} being performed * @param createChanges the list of metadata updates to apply during table creation or replacement @@ -1055,14 +1079,23 @@ protected RESTTableOperations newTableOps( protected RESTTableOperations newTableOps( RESTClient restClient, String path, - Supplier> headers, + Supplier> readHeaders, + Supplier> mutationHeaderSupplier, FileIO fileIO, RESTTableOperations.UpdateType updateType, List createChanges, TableMetadata current, Set supportedEndpoints) { return new RESTTableOperations( - restClient, path, headers, fileIO, updateType, createChanges, current, supportedEndpoints); + restClient, + path, + readHeaders, + mutationHeaderSupplier, + fileIO, + updateType, + createChanges, + current, + supportedEndpoints); } /** @@ -1073,7 +1106,10 @@ protected RESTTableOperations newTableOps( * * @param restClient the REST client to use for communicating with the catalog server * @param path the REST path for the view - * @param headers a supplier for additional HTTP headers to include in requests + * @param readHeaders a supplier for additional HTTP headers to include in read requests + * (GET/HEAD) + * @param mutationHeaderSupplier a supplier for additional HTTP headers to include in mutation + * requests (POST/DELETE) * @param current the current view metadata * @param supportedEndpoints the set of supported REST endpoints * @return a new RESTViewOperations instance @@ -1081,10 +1117,12 @@ protected RESTTableOperations newTableOps( protected RESTViewOperations newViewOps( RESTClient restClient, String path, - Supplier> headers, + Supplier> readHeaders, + Supplier> mutationHeaderSupplier, ViewMetadata current, Set supportedEndpoints) { - return new RESTViewOperations(restClient, path, headers, current, supportedEndpoints); + return new RESTViewOperations( + restClient, path, readHeaders, mutationHeaderSupplier, current, supportedEndpoints); } private static ConfigResponse fetchConfig( @@ -1148,7 +1186,7 @@ public void commitTransaction(SessionContext context, List commits) paths.commitTransaction(), new CommitTransactionRequest(tableChanges), null, - Map.of(), + mutationHeaders, ErrorHandlers.tableCommitHandler()); } @@ -1235,6 +1273,7 @@ public View loadView(SessionContext context, TableIdentifier identifier) { client.withAuthSession(tableSession), paths.view(identifier), Map::of, + mutationHeaders, metadata, endpoints); @@ -1255,7 +1294,7 @@ public boolean dropView(SessionContext context, TableIdentifier identifier) { AuthSession contextualSession = authManager.contextualSession(context, catalogAuth); client .withAuthSession(contextualSession) - .delete(paths.view(identifier), null, Map.of(), ErrorHandlers.viewErrorHandler()); + .delete(paths.view(identifier), null, mutationHeaders, ErrorHandlers.viewErrorHandler()); return true; } catch (NoSuchViewException e) { return false; @@ -1274,7 +1313,7 @@ public void renameView(SessionContext context, TableIdentifier from, TableIdenti AuthSession contextualSession = authManager.contextualSession(context, catalogAuth); client .withAuthSession(contextualSession) - .post(paths.renameView(), request, null, Map.of(), ErrorHandlers.viewErrorHandler()); + .post(paths.renameView(), request, null, mutationHeaders, ErrorHandlers.viewErrorHandler()); } private class RESTViewBuilder implements ViewBuilder { @@ -1404,7 +1443,7 @@ public View create() { paths.views(identifier.namespace()), request, LoadViewResponse.class, - Map.of(), + mutationHeaders, ErrorHandlers.viewErrorHandler()); Map tableConf = response.config(); @@ -1414,6 +1453,7 @@ public View create() { client.withAuthSession(tableSession), paths.view(identifier), Map::of, + mutationHeaders, response.metadata(), endpoints); @@ -1505,6 +1545,7 @@ private View replace(LoadViewResponse response) { client.withAuthSession(tableSession), paths.view(identifier), Map::of, + mutationHeaders, metadata, endpoints); diff --git a/core/src/main/java/org/apache/iceberg/rest/RESTTableOperations.java b/core/src/main/java/org/apache/iceberg/rest/RESTTableOperations.java index 38dabc8ae568..d2a6ab618ca8 100644 --- a/core/src/main/java/org/apache/iceberg/rest/RESTTableOperations.java +++ b/core/src/main/java/org/apache/iceberg/rest/RESTTableOperations.java @@ -55,7 +55,8 @@ enum UpdateType { private final RESTClient client; private final String path; - private final Supplier> headers; + private final Supplier> readHeaders; + private final Supplier> mutationHeaders; private final FileIO io; private final List createChanges; private final TableMetadata replaceBase; @@ -70,7 +71,16 @@ enum UpdateType { FileIO io, TableMetadata current, Set endpoints) { - this(client, path, headers, io, UpdateType.SIMPLE, Lists.newArrayList(), current, endpoints); + this( + client, + path, + headers, + headers, + io, + UpdateType.SIMPLE, + Lists.newArrayList(), + current, + endpoints); } RESTTableOperations( @@ -82,9 +92,43 @@ enum UpdateType { List createChanges, TableMetadata current, Set endpoints) { + this(client, path, headers, headers, io, updateType, createChanges, current, endpoints); + } + + RESTTableOperations( + RESTClient client, + String path, + Supplier> readHeaders, + Supplier> mutationHeaders, + FileIO io, + TableMetadata current, + Set endpoints) { + this( + client, + path, + readHeaders, + mutationHeaders, + io, + UpdateType.SIMPLE, + Lists.newArrayList(), + current, + endpoints); + } + + RESTTableOperations( + RESTClient client, + String path, + Supplier> readHeaders, + Supplier> mutationHeaders, + FileIO io, + UpdateType updateType, + List createChanges, + TableMetadata current, + Set endpoints) { this.client = client; this.path = path; - this.headers = headers; + this.readHeaders = readHeaders; + this.mutationHeaders = mutationHeaders; this.io = io; this.updateType = updateType; this.createChanges = createChanges; @@ -106,7 +150,7 @@ public TableMetadata current() { public TableMetadata refresh() { Endpoint.check(endpoints, Endpoint.V1_LOAD_TABLE); return updateCurrentMetadata( - client.get(path, LoadTableResponse.class, headers, ErrorHandlers.tableErrorHandler())); + client.get(path, LoadTableResponse.class, readHeaders, ErrorHandlers.tableErrorHandler())); } @Override @@ -159,7 +203,7 @@ public void commit(TableMetadata base, TableMetadata metadata) { // TODO: ensure that the HTTP client lib passes HTTP client errors to the error handler LoadTableResponse response; try { - response = client.post(path, request, LoadTableResponse.class, headers, errorHandler); + response = client.post(path, request, LoadTableResponse.class, mutationHeaders, errorHandler); } catch (CommitStateUnknownException e) { // Lightweight reconciliation for snapshot-add-only updates on transient unknown commit state if (updateType == UpdateType.SIMPLE && reconcileOnSimpleUpdate(updates, e)) { diff --git a/core/src/main/java/org/apache/iceberg/rest/RESTUtil.java b/core/src/main/java/org/apache/iceberg/rest/RESTUtil.java index 6ca04e74d9f5..ec02a9dc8459 100644 --- a/core/src/main/java/org/apache/iceberg/rest/RESTUtil.java +++ b/core/src/main/java/org/apache/iceberg/rest/RESTUtil.java @@ -29,6 +29,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.util.PropertyUtil; +import org.apache.iceberg.util.UUIDUtil; public class RESTUtil { private static final char NAMESPACE_SEPARATOR = '\u001f'; @@ -49,6 +50,8 @@ public class RESTUtil { */ @Deprecated public static final Splitter NAMESPACE_SPLITTER = Splitter.on(NAMESPACE_SEPARATOR); + public static final String IDEMPOTENCY_KEY_HEADER = "Idempotency-Key"; + private RESTUtil() {} public static String stripTrailingSlash(String path) { @@ -271,4 +274,12 @@ public static String resolveEndpoint(String catalogUri, String endpointPath) { public static Map configHeaders(Map properties) { return RESTUtil.extractPrefixMap(properties, "header."); } + + /** + * Returns a single-use headers map containing a freshly generated idempotency key. The key is a + * UUIDv7 string suitable for use in the Idempotency-Key header. + */ + public static Map idempotencyHeaders() { + return ImmutableMap.of(IDEMPOTENCY_KEY_HEADER, UUIDUtil.generateUuidV7().toString()); + } } diff --git a/core/src/main/java/org/apache/iceberg/rest/RESTViewOperations.java b/core/src/main/java/org/apache/iceberg/rest/RESTViewOperations.java index 466a8e66899b..0018e2f91903 100644 --- a/core/src/main/java/org/apache/iceberg/rest/RESTViewOperations.java +++ b/core/src/main/java/org/apache/iceberg/rest/RESTViewOperations.java @@ -32,7 +32,8 @@ class RESTViewOperations implements ViewOperations { private final RESTClient client; private final String path; - private final Supplier> headers; + private final Supplier> readHeaders; + private final Supplier> mutationHeaders; private final Set endpoints; private ViewMetadata current; @@ -42,10 +43,21 @@ class RESTViewOperations implements ViewOperations { Supplier> headers, ViewMetadata current, Set endpoints) { + this(client, path, headers, headers, current, endpoints); + } + + RESTViewOperations( + RESTClient client, + String path, + Supplier> readHeaders, + Supplier> mutationHeaders, + ViewMetadata current, + Set endpoints) { Preconditions.checkArgument(null != current, "Invalid view metadata: null"); this.client = client; this.path = path; - this.headers = headers; + this.readHeaders = readHeaders; + this.mutationHeaders = mutationHeaders; this.current = current; this.endpoints = endpoints; } @@ -59,7 +71,7 @@ public ViewMetadata current() { public ViewMetadata refresh() { Endpoint.check(endpoints, Endpoint.V1_LOAD_VIEW); return updateCurrentMetadata( - client.get(path, LoadViewResponse.class, headers, ErrorHandlers.viewErrorHandler())); + client.get(path, LoadViewResponse.class, readHeaders, ErrorHandlers.viewErrorHandler())); } @Override @@ -74,7 +86,11 @@ public void commit(ViewMetadata base, ViewMetadata metadata) { LoadViewResponse response = client.post( - path, request, LoadViewResponse.class, headers, ErrorHandlers.viewCommitHandler()); + path, + request, + LoadViewResponse.class, + mutationHeaders, + ErrorHandlers.viewCommitHandler()); updateCurrentMetadata(response); } diff --git a/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java b/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java index efe76e2bf060..385893ea7130 100644 --- a/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java +++ b/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java @@ -37,6 +37,7 @@ import java.net.InetAddress; import java.net.InetSocketAddress; import java.nio.file.Path; +import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.Objects; @@ -50,6 +51,7 @@ import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Supplier; +import java.util.stream.Collectors; import org.apache.hadoop.conf.Configuration; import org.apache.http.HttpHeaders; import org.apache.iceberg.BaseTable; @@ -167,6 +169,7 @@ public T execute( assertThat(request.headers().entries()).containsAll(contextHeaders.entries()); } } + Object body = roundTripSerialize(request.body(), "request"); HTTPRequest req = ImmutableHTTPRequest.builder().from(request).body(body).build(); T response = super.execute(req, responseType, errorHandler, responseHeaders); @@ -3170,13 +3173,14 @@ class CustomRESTSessionCatalog extends RESTSessionCatalog { protected RESTTableOperations newTableOps( RESTClient restClient, String path, - Supplier> headers, + Supplier> readHeaders, + Supplier> mutationHeaders, FileIO fileIO, TableMetadata current, Set supportedEndpoints) { RESTTableOperations ops = new CustomRESTTableOperations( - restClient, path, headers, fileIO, current, supportedEndpoints); + restClient, path, mutationHeaders, fileIO, current, supportedEndpoints); RESTTableOperations spy = Mockito.spy(ops); capturedOps.set(spy); return spy; @@ -3186,7 +3190,8 @@ protected RESTTableOperations newTableOps( protected RESTTableOperations newTableOps( RESTClient restClient, String path, - Supplier> headers, + Supplier> readHeaders, + Supplier> mutationHeaders, FileIO fileIO, RESTTableOperations.UpdateType updateType, List createChanges, @@ -3196,7 +3201,7 @@ protected RESTTableOperations newTableOps( new CustomRESTTableOperations( restClient, path, - headers, + mutationHeaders, fileIO, updateType, createChanges, @@ -3261,6 +3266,85 @@ protected RESTTableOperations newTableOps( } } + @Test + public void testClientAutoSendsIdempotencyWhenServerAdvertises() { + ConfigResponse cfgWithIdem = + ConfigResponse.builder() + .withIdempotencyKeyLifetime("PT30M") + .withEndpoints( + Arrays.stream(Route.values()) + .map(r -> Endpoint.create(r.method().name(), r.resourcePath())) + .collect(Collectors.toList())) + .build(); + + RESTCatalog local = createCatalogWithIdempAdapter(cfgWithIdem, true); + + Namespace ns = Namespace.of("ns_cfg_idem"); + TableIdentifier ident = TableIdentifier.of(ns, "t_cfg_idem"); + Schema schema = new Schema(Types.NestedField.required(1, "id", Types.IntegerType.get())); + local.createNamespace(ns, ImmutableMap.of()); + local.createTable(ident, schema); + assertThat(local.tableExists(ident)).isTrue(); + local.dropTable(ident); + } + + @Test + public void testClientDoesNotSendIdempotencyWhenServerNotAdvertising() { + ConfigResponse cfgNoIdem = + ConfigResponse.builder() + .withEndpoints( + Arrays.stream(Route.values()) + .map(r -> Endpoint.create(r.method().name(), r.resourcePath())) + .collect(Collectors.toList())) + .build(); + + RESTCatalog local = createCatalogWithIdempAdapter(cfgNoIdem, false); + + Namespace ns = Namespace.of("ns_cfg_no_idem"); + TableIdentifier ident = TableIdentifier.of(ns, "t_cfg_no_idem"); + Schema schema = new Schema(Types.NestedField.required(1, "id", Types.IntegerType.get())); + local.createNamespace(ns, ImmutableMap.of()); + local.createTable(ident, schema); + assertThat(local.tableExists(ident)).isTrue(); + local.dropTable(ident); + } + + private RESTCatalog createCatalogWithIdempAdapter(ConfigResponse cfg, boolean expectOnMutations) { + RESTCatalogAdapter adapter = + Mockito.spy( + new RESTCatalogAdapter(backendCatalog) { + @Override + public T execute( + HTTPRequest request, + Class responseType, + Consumer errorHandler, + Consumer> responseHeaders) { + if (ResourcePaths.config().equals(request.path())) { + return castResponse(responseType, cfg); + } + + boolean isMutation = + request.method() == HTTPMethod.POST || request.method() == HTTPMethod.DELETE; + boolean hasIdemp = request.headers().contains(RESTUtil.IDEMPOTENCY_KEY_HEADER); + + if (isMutation) { + assertThat(hasIdemp) + .as("Idempotency-Key presence on mutations did not match expectation") + .isEqualTo(expectOnMutations); + } else { + assertThat(hasIdemp).as("Idempotency-Key must NOT be sent on reads").isFalse(); + } + + return super.execute(request, responseType, errorHandler, responseHeaders); + } + }); + + RESTCatalog local = + new RESTCatalog(SessionCatalog.SessionContext.createEmpty(), (config) -> adapter); + local.initialize("test", ImmutableMap.of()); + return local; + } + private RESTCatalog catalog(RESTCatalogAdapter adapter) { RESTCatalog catalog = new RESTCatalog(SessionCatalog.SessionContext.createEmpty(), (config) -> adapter); diff --git a/core/src/test/java/org/apache/iceberg/rest/TestRESTViewCatalog.java b/core/src/test/java/org/apache/iceberg/rest/TestRESTViewCatalog.java index 6b39907098f9..8bfe26b18cda 100644 --- a/core/src/test/java/org/apache/iceberg/rest/TestRESTViewCatalog.java +++ b/core/src/test/java/org/apache/iceberg/rest/TestRESTViewCatalog.java @@ -350,11 +350,13 @@ class CustomRESTSessionCatalog extends RESTSessionCatalog { protected RESTViewOperations newViewOps( RESTClient restClient, String path, - Supplier> headers, + Supplier> readHeaders, + Supplier> mutationHeaders, ViewMetadata current, Set supportedEndpoints) { RESTViewOperations ops = - new CustomRESTViewOperations(restClient, path, headers, current, supportedEndpoints); + new CustomRESTViewOperations( + restClient, path, mutationHeaders, current, supportedEndpoints); RESTViewOperations spy = Mockito.spy(ops); capturedOps.set(spy); return spy;