properties) throws IOException {
+ AliyunOSSMock mock =
+ new AliyunOSSMock(
+ properties.getOrDefault(PROP_ROOT_DIR, ROOT_DIR_DEFAULT).toString(),
+ Integer.parseInt(
+ properties.getOrDefault(PROP_HTTP_PORT, PORT_HTTP_PORT_DEFAULT).toString()));
+ mock.start();
+ return mock;
+ }
+
+ private AliyunOSSMock(String rootDir, int serverPort) throws IOException {
+ localStore = new AliyunOSSMockLocalStore(rootDir);
+ httpServer = HttpServer.create(new InetSocketAddress("localhost", serverPort), 0);
+ }
+
+ private void start() {
+ httpServer.createContext("/", new AliyunHttpHandler());
+ httpServer.start();
+ }
+
+ public void stop() {
+ httpServer.stop(0);
+ }
+
+ private class AliyunHttpHandler implements HttpHandler {
+
+ @Override
+ public void handle(HttpExchange httpExchange) throws IOException {
+ String request = httpExchange.getRequestURI().getPath().substring(1);
+ String[] requests = request.split("/");
+ String bucketName = requests[0];
+ if (requests.length == 1) {
+ // bucket operations
+ if (httpExchange.getRequestMethod().equals("PUT")) {
+ putBucket(bucketName, httpExchange);
+ }
+ if (httpExchange.getRequestMethod().equals("DELETE")) {
+ deleteBucket(bucketName, httpExchange);
+ }
+ } else {
+ // object operations
+ String objectName = requests[1];
+ if (objectName.contains("?")) {
+ objectName = objectName.substring(0, objectName.indexOf("?"));
+ }
+ if (httpExchange.getRequestMethod().equals("PUT")) {
+ putObject(bucketName, objectName, httpExchange);
+ }
+ if (httpExchange.getRequestMethod().equals("DELETE")) {
+ deleteObject(bucketName, objectName, httpExchange);
+ }
+ if (httpExchange.getRequestMethod().equals("HEAD")) {
+ getObjectMeta(bucketName, objectName, httpExchange);
+ }
+ if (httpExchange.getRequestMethod().equals("GET")) {
+ getObject(bucketName, objectName, httpExchange);
+ }
+ }
+ }
+
+ private void putBucket(String bucketName, HttpExchange httpExchange) throws IOException {
+ if (localStore.getBucket(bucketName) != null) {
+ String errorMessage =
+ createErrorResponse(
+ OSSErrorCode.BUCKET_ALREADY_EXISTS, bucketName + " already exists.");
+ handleResponse(httpExchange, 409, errorMessage, "application/xml");
+ return;
+ }
+ localStore.createBucket(bucketName);
+ handleResponse(httpExchange, 200, "OK", "application/xml");
+ }
+
+ private void deleteBucket(String bucketName, HttpExchange httpExchange) throws IOException {
+ verifyBucketExistence(bucketName, httpExchange);
+ try {
+ localStore.deleteBucket(bucketName);
+ } catch (Exception e) {
+ String errorMessage =
+ createErrorResponse(
+ OSSErrorCode.BUCKET_NOT_EMPTY, "The bucket you tried to delete is not empty.");
+ handleResponse(httpExchange, 409, errorMessage, "application/xml");
+ }
+ handleResponse(httpExchange, 200, "OK", "application/xml");
+ }
+
+ private void putObject(String bucketName, String objectName, HttpExchange httpExchange)
+ throws IOException {
+ verifyBucketExistence(bucketName, httpExchange);
+
+ try (InputStream inputStream = httpExchange.getRequestBody()) {
+ ObjectMetadata metadata =
+ localStore.putObject(
+ bucketName,
+ objectName,
+ inputStream,
+ httpExchange.getRequestHeaders().getFirst("Content-Type"),
+ httpExchange.getRequestHeaders().getFirst("Content-Headers"),
+ ImmutableMap.of());
+
+ httpExchange.getResponseHeaders().add("ETag", metadata.getContentMD5());
+ httpExchange
+ .getResponseHeaders()
+ .add("Last-Modified", createDate(metadata.getLastModificationDate()));
+ handleResponse(httpExchange, 200, "OK", "text/plain");
+ } catch (Exception e) {
+ handleResponse(httpExchange, 500, "Internal Server Error", "text/plain");
+ }
+ }
+
+ private void deleteObject(String bucketName, String objectName, HttpExchange httpExchange)
+ throws IOException {
+ verifyBucketExistence(bucketName, httpExchange);
+ localStore.deleteObject(bucketName, objectName);
+
+ handleResponse(httpExchange, 200, "OK", "text/plain");
+ }
+
+ private void getObjectMeta(String bucketName, String objectName, HttpExchange httpExchange)
+ throws IOException {
+ verifyBucketExistence(bucketName, httpExchange);
+ ObjectMetadata metadata = verifyObjectExistence(bucketName, objectName);
+
+ if (metadata == null) {
+ String errorMessage =
+ createErrorResponse(OSSErrorCode.NO_SUCH_KEY, "The specify oss key does not exists.");
+ handleResponse(httpExchange, 404, errorMessage, "application/xml");
+ } else {
+ httpExchange.getResponseHeaders().add("ETag", metadata.getContentMD5());
+ httpExchange
+ .getResponseHeaders()
+ .add("Last-Modified", createDate(metadata.getLastModificationDate()));
+ httpExchange
+ .getResponseHeaders()
+ .add("Content-Length", Long.toString(metadata.getContentLength()));
+
+ handleResponse(httpExchange, 200, "OK", "text/plain");
+ }
+ }
+
+ private void getObject(String bucketName, String objectName, HttpExchange httpExchange)
+ throws IOException {
+ verifyBucketExistence(bucketName, httpExchange);
+
+ String filename = objectName;
+ ObjectMetadata metadata = verifyObjectExistence(bucketName, filename);
+
+ if (metadata == null) {
+ String errorMessage =
+ createErrorResponse(OSSErrorCode.NO_SUCH_KEY, "The specify oss key does not exists.");
+ handleResponse(httpExchange, 404, errorMessage, "application/xml");
+ return;
+ }
+
+ Object range = httpExchange.getRequestHeaders().get("Range");
+ if (range != null) {
+ range = range.toString().replace("[bytes=", "").replace("]", "");
+ String[] ranges = range.toString().split("-");
+ long rangeStart = -1;
+ if (!ranges[0].isEmpty()) {
+ rangeStart = Long.parseLong(ranges[0]);
+ }
+ long rangeEnd = -1;
+ if (ranges.length == 2 && !ranges[1].isEmpty()) {
+ rangeEnd = Long.parseLong(ranges[1]);
+ }
+ if (rangeEnd == -1) {
+ rangeEnd = Long.MAX_VALUE;
+ if (rangeStart == -1) {
+ rangeStart = 0;
+ }
+ }
+
+ long fileSize = metadata.getContentLength();
+ long bytesToRead = Math.min(fileSize - 1, rangeEnd) - rangeStart + 1;
+ long skipSize = rangeStart;
+ if (rangeStart == -1) {
+ bytesToRead = Math.min(fileSize - 1, rangeEnd);
+ skipSize = fileSize - rangeEnd;
+ }
+ if (rangeEnd == -1) {
+ bytesToRead = fileSize - rangeStart;
+ }
+ if (bytesToRead < 0 || fileSize < rangeStart) {
+ httpExchange.sendResponseHeaders(416, 1);
+ return;
+ }
+
+ httpExchange.getResponseHeaders().add("Accept-Ranges", "bytes");
+ httpExchange
+ .getResponseHeaders()
+ .add(
+ "Content-Range",
+ "bytes "
+ + rangeStart
+ + "-"
+ + (bytesToRead + rangeStart + 1)
+ + "/"
+ + metadata.getContentLength());
+ httpExchange.getResponseHeaders().add("ETag", metadata.getContentMD5());
+ httpExchange
+ .getResponseHeaders()
+ .add("Last-Modified", createDate(metadata.getLastModificationDate()));
+ httpExchange.getResponseHeaders().add("Content-Type", metadata.getContentType());
+ httpExchange.getResponseHeaders().add("Content-Length", Long.toString(bytesToRead));
+ httpExchange.sendResponseHeaders(206, bytesToRead);
+ try (OutputStream outputStream = httpExchange.getResponseBody()) {
+ try (FileInputStream fis = new FileInputStream(metadata.getDataFile())) {
+ fis.skip(skipSize);
+ ByteStreams.copy(new BoundedInputStream(fis, bytesToRead), outputStream);
+ }
+ }
+ } else {
+ httpExchange.getResponseHeaders().add("Accept-Ranges", "bytes");
+ httpExchange.getResponseHeaders().add("ETag", metadata.getContentMD5());
+ httpExchange
+ .getResponseHeaders()
+ .add("Last-Modified", createDate(metadata.getLastModificationDate()));
+ httpExchange.getResponseHeaders().add("Content-Type", metadata.getContentType());
+ httpExchange.sendResponseHeaders(200, metadata.getContentLength());
+
+ try (OutputStream outputStream = httpExchange.getResponseBody()) {
+ try (FileInputStream fis = new FileInputStream(metadata.getDataFile())) {
+ ByteStreams.copy(fis, outputStream);
+ }
+ }
+ }
+ }
+
+ private void verifyBucketExistence(String bucketName, HttpExchange httpExchange)
+ throws IOException {
+ Bucket bucket = localStore.getBucket(bucketName);
+ if (bucket == null) {
+ String errorMessage =
+ createErrorResponse(
+ OSSErrorCode.NO_SUCH_BUCKET, "The specified bucket does not exist.");
+ handleResponse(httpExchange, 404, errorMessage, "application/xml");
+ }
+ }
+
+ private ObjectMetadata verifyObjectExistence(String bucketName, String fileName) {
+ ObjectMetadata objectMetadata = null;
+ try {
+ objectMetadata = localStore.getObjectMetadata(bucketName, fileName);
+ } catch (IOException e) {
+ // no-op
+ }
+
+ return objectMetadata;
+ }
+
+ private void handleResponse(
+ HttpExchange httpExchange, int responseCode, String responsePayload, String contentType)
+ throws IOException {
+ OutputStream outputStream = httpExchange.getResponseBody();
+ httpExchange.getResponseHeaders().put("Content-Type", Collections.singletonList(contentType));
+ httpExchange.sendResponseHeaders(responseCode, responsePayload.length());
+ outputStream.write(responsePayload.getBytes());
+ outputStream.flush();
+ outputStream.close();
+ }
+
+ private String createErrorResponse(String errorCode, String message) {
+ StringBuilder builder = new StringBuilder();
+ builder.append("");
+ builder.append("").append(errorCode).append("");
+ builder.append("").append(message).append("");
+ builder.append("");
+ return builder.toString();
+ }
+
+ private String createDate(long timestamp) {
+ java.util.Date date = new java.util.Date(timestamp);
+ ZonedDateTime dateTime = date.toInstant().atZone(ZoneId.of("GMT"));
+ return dateTime.format(DateTimeFormatter.RFC_1123_DATE_TIME);
+ }
+ }
+
+ /**
+ * Reads bytes up to a maximum length, if its count goes above that, it stops.
+ *
+ * This is useful to wrap ServletInputStreams. The ServletInputStream will block if you try to
+ * read content from it that isn't there, because it doesn't know whether the content hasn't
+ * arrived yet or whether the content has finished. So, one of these, initialized with the
+ * Content-length sent in the ServletInputStream's header, will stop it blocking, providing it's
+ * been sent with a correct content length.
+ *
+ *
This code is borrowed from `org.apache.commons:commons-io`
+ */
+ public class BoundedInputStream extends FilterInputStream {
+
+ /** The max count of bytes to read. */
+ private final long maxCount;
+
+ /** The count of bytes read. */
+ private long count;
+
+ /** The marked position. */
+ private long mark = -1;
+
+ /** Flag if close should be propagated. */
+ private boolean propagateClose = true;
+
+ /**
+ * Constructs a new {@link BoundedInputStream} that wraps the given input stream and is
+ * unlimited.
+ *
+ * @param in The wrapped input stream.
+ */
+ public BoundedInputStream(final InputStream in) {
+ this(in, -1);
+ }
+
+ /**
+ * Constructs a new {@link BoundedInputStream} that wraps the given input stream and limits it
+ * to a certain size.
+ *
+ * @param inputStream The wrapped input stream.
+ * @param maxLength The maximum number of bytes to return.
+ */
+ public BoundedInputStream(final InputStream inputStream, final long maxLength) {
+ // Some badly designed methods - e.g. the servlet API - overload length
+ // such that "-1" means stream finished
+ super(inputStream);
+ this.maxCount = maxLength;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public int available() throws IOException {
+ if (isMaxLength()) {
+ onMaxLength(maxCount, count);
+ return 0;
+ }
+ return in.available();
+ }
+
+ /**
+ * Invokes the delegate's {@code close()} method if {@link #isPropagateClose()} is {@code true}.
+ *
+ * @throws IOException if an I/O error occurs.
+ */
+ @Override
+ public void close() throws IOException {
+ if (propagateClose) {
+ in.close();
+ }
+ }
+
+ /**
+ * Gets the count of bytes read.
+ *
+ * @return The count of bytes read.
+ * @since 2.12.0
+ */
+ public long getCount() {
+ return count;
+ }
+
+ /**
+ * Gets the max count of bytes to read.
+ *
+ * @return The max count of bytes to read.
+ * @since 2.12.0
+ */
+ public long getMaxLength() {
+ return maxCount;
+ }
+
+ private boolean isMaxLength() {
+ return maxCount >= 0 && count >= maxCount;
+ }
+
+ /**
+ * Tests whether the {@link #close()} method should propagate to the underling {@link
+ * InputStream}.
+ *
+ * @return {@code true} if calling {@link #close()} propagates to the {@code close()} method of
+ * the underlying stream or {@code false} if it does not.
+ */
+ public boolean isPropagateClose() {
+ return propagateClose;
+ }
+
+ /**
+ * Sets whether the {@link #close()} method should propagate to the underling {@link
+ * InputStream}.
+ *
+ * @param propagateClose {@code true} if calling {@link #close()} propagates to the {@code
+ * close()} method of the underlying stream or {@code false} if it does not.
+ */
+ public void setPropagateClose(final boolean propagateClose) {
+ this.propagateClose = propagateClose;
+ }
+
+ /**
+ * Invokes the delegate's {@code mark(int)} method.
+ *
+ * @param readlimit read ahead limit
+ */
+ @Override
+ public synchronized void mark(final int readlimit) {
+ in.mark(readlimit);
+ mark = count;
+ }
+
+ /**
+ * Invokes the delegate's {@code markSupported()} method.
+ *
+ * @return true if mark is supported, otherwise false
+ */
+ @Override
+ public boolean markSupported() {
+ return in.markSupported();
+ }
+
+ /**
+ * A caller has caused a request that would cross the {@code maxLength} boundary.
+ *
+ * @param maxLength The max count of bytes to read.
+ * @param bytesRead The count of bytes read.
+ * @throws IOException Subclasses may throw.
+ * @since 2.12.0
+ */
+ protected void onMaxLength(final long maxLength, final long bytesRead) throws IOException {
+ // for subclasses
+ }
+
+ /**
+ * Invokes the delegate's {@code read()} method if the current position is less than the limit.
+ *
+ * @return the byte read or -1 if the end of stream or the limit has been reached.
+ * @throws IOException if an I/O error occurs.
+ */
+ @Override
+ public int read() throws IOException {
+ if (isMaxLength()) {
+ onMaxLength(maxCount, count);
+ return -1;
+ }
+ final int result = in.read();
+ count++;
+ return result;
+ }
+
+ /**
+ * Invokes the delegate's {@code read(byte[])} method.
+ *
+ * @param b the buffer to read the bytes into
+ * @return the number of bytes read or -1 if the end of stream or the limit has been reached.
+ * @throws IOException if an I/O error occurs.
+ */
+ @Override
+ public int read(final byte[] b) throws IOException {
+ return this.read(b, 0, b.length);
+ }
+
+ /**
+ * Invokes the delegate's {@code read(byte[], int, int)} method.
+ *
+ * @param b the buffer to read the bytes into
+ * @param off The start offset
+ * @param len The number of bytes to read
+ * @return the number of bytes read or -1 if the end of stream or the limit has been reached.
+ * @throws IOException if an I/O error occurs.
+ */
+ @Override
+ public int read(final byte[] b, final int off, final int len) throws IOException {
+ if (isMaxLength()) {
+ onMaxLength(maxCount, count);
+ return -1;
+ }
+ final long maxRead = maxCount >= 0 ? Math.min(len, maxCount - count) : len;
+ final int bytesRead = in.read(b, off, (int) maxRead);
+
+ if (bytesRead == -1) {
+ return -1;
+ }
+
+ count += bytesRead;
+ return bytesRead;
+ }
+
+ /**
+ * Invokes the delegate's {@code reset()} method.
+ *
+ * @throws IOException if an I/O error occurs.
+ */
+ @Override
+ public synchronized void reset() throws IOException {
+ in.reset();
+ count = mark;
+ }
+
+ /**
+ * Invokes the delegate's {@code skip(long)} method.
+ *
+ * @param n the number of bytes to skip
+ * @return the actual number of bytes skipped
+ * @throws IOException if an I/O error occurs.
+ */
+ @Override
+ public long skip(final long n) throws IOException {
+ final long toSkip = maxCount >= 0 ? Math.min(n, maxCount - count) : n;
+ final long skippedBytes = in.skip(toSkip);
+ count += skippedBytes;
+ return skippedBytes;
+ }
+
+ /**
+ * Invokes the delegate's {@code toString()} method.
+ *
+ * @return the delegate's {@code toString()}
+ */
+ @Override
+ public String toString() {
+ return in.toString();
+ }
+ }
+}
diff --git a/aliyun/src/test/java/org/apache/iceberg/aliyun/oss/mock/AliyunOSSMockApp.java b/aliyun/src/test/java/org/apache/iceberg/aliyun/oss/mock/AliyunOSSMockApp.java
deleted file mode 100644
index ea0ef0fe4de3..000000000000
--- a/aliyun/src/test/java/org/apache/iceberg/aliyun/oss/mock/AliyunOSSMockApp.java
+++ /dev/null
@@ -1,158 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iceberg.aliyun.oss.mock;
-
-import java.util.List;
-import java.util.Map;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
-import org.apache.iceberg.relocated.com.google.common.collect.Lists;
-import org.apache.iceberg.relocated.com.google.common.collect.Maps;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.boot.Banner;
-import org.springframework.boot.SpringApplication;
-import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
-import org.springframework.boot.autoconfigure.security.servlet.SecurityAutoConfiguration;
-import org.springframework.boot.builder.SpringApplicationBuilder;
-import org.springframework.context.ConfigurableApplicationContext;
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.ComponentScan;
-import org.springframework.context.annotation.Configuration;
-import org.springframework.core.convert.converter.Converter;
-import org.springframework.http.MediaType;
-import org.springframework.http.converter.xml.MappingJackson2XmlHttpMessageConverter;
-import org.springframework.util.StringUtils;
-import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;
-
-@SuppressWarnings("checkstyle:AnnotationUseStyle")
-@Configuration
-@EnableAutoConfiguration(
- exclude = {SecurityAutoConfiguration.class},
- excludeName = {
- "org.springframework.boot.actuate.autoconfigure.security.servlet.ManagementWebSecurityAutoConfiguration"
- })
-@ComponentScan
-public class AliyunOSSMockApp {
-
- static final String PROP_ROOT_DIR = "root-dir";
-
- static final String PROP_HTTP_PORT = "server.port";
- static final int PORT_HTTP_PORT_DEFAULT = 9393;
-
- static final String PROP_SILENT = "silent";
-
- @Autowired private ConfigurableApplicationContext context;
-
- public static AliyunOSSMockApp start(Map properties, String... args) {
- Map defaults = Maps.newHashMap();
- defaults.put(PROP_HTTP_PORT, PORT_HTTP_PORT_DEFAULT);
-
- Banner.Mode bannerMode = Banner.Mode.CONSOLE;
-
- if (Boolean.parseBoolean(String.valueOf(properties.remove(PROP_SILENT)))) {
- defaults.put("logging.level.root", "WARN");
- bannerMode = Banner.Mode.OFF;
- }
-
- final ConfigurableApplicationContext ctx =
- new SpringApplicationBuilder(AliyunOSSMockApp.class)
- .properties(defaults)
- .properties(properties)
- .bannerMode(bannerMode)
- .run(args);
-
- return ctx.getBean(AliyunOSSMockApp.class);
- }
-
- public void stop() {
- SpringApplication.exit(context, () -> 0);
- }
-
- @Configuration
- static class Config implements WebMvcConfigurer {
-
- @Bean
- Converter rangeConverter() {
- return new RangeConverter();
- }
-
- /**
- * Creates an HttpMessageConverter for XML.
- *
- * @return The configured {@link MappingJackson2XmlHttpMessageConverter}.
- */
- @Bean
- public MappingJackson2XmlHttpMessageConverter getMessageConverter() {
- List mediaTypes = Lists.newArrayList();
- mediaTypes.add(MediaType.APPLICATION_XML);
- mediaTypes.add(MediaType.APPLICATION_FORM_URLENCODED);
- mediaTypes.add(MediaType.APPLICATION_OCTET_STREAM);
-
- final MappingJackson2XmlHttpMessageConverter xmlConverter =
- new MappingJackson2XmlHttpMessageConverter();
- xmlConverter.setSupportedMediaTypes(mediaTypes);
-
- return xmlConverter;
- }
- }
-
- private static class RangeConverter implements Converter {
-
- private static final Pattern REQUESTED_RANGE_PATTERN =
- Pattern.compile("^bytes=((\\d*)-(\\d*))((,\\d*-\\d*)*)");
-
- @Override
- public Range convert(String rangeString) {
- Preconditions.checkNotNull(rangeString, "Range value should not be null.");
-
- final Range range;
-
- // parsing a range specification of format: "bytes=start-end" - multiple ranges not supported
- final Matcher matcher = REQUESTED_RANGE_PATTERN.matcher(rangeString.trim());
- if (matcher.matches()) {
- final String rangeStart = matcher.group(2);
- final String rangeEnd = matcher.group(3);
-
- long start = StringUtils.isEmpty(rangeStart) ? -1L : Long.parseLong(rangeStart);
- long end = StringUtils.isEmpty(rangeEnd) ? Long.MAX_VALUE : Long.parseLong(rangeEnd);
- range = new Range(start, end);
-
- if (matcher.groupCount() == 5 && !"".equals(matcher.group(4))) {
- throw new IllegalArgumentException(
- "Unsupported range specification. Only single range specifications allowed");
- }
- if (range.start() != -1 && range.start() < 0) {
- throw new IllegalArgumentException(
- "Unsupported range specification. A start byte must be supplied");
- }
-
- if (range.end() != -1 && range.end() < range.start()) {
- throw new IllegalArgumentException(
- "Range header is malformed. End byte is smaller than start byte.");
- }
- } else {
- // Per Aliyun OSS behavior, return whole object content for illegal header
- range = new Range(0, Long.MAX_VALUE);
- }
-
- return range;
- }
- }
-}
diff --git a/aliyun/src/test/java/org/apache/iceberg/aliyun/oss/mock/AliyunOSSMockExtension.java b/aliyun/src/test/java/org/apache/iceberg/aliyun/oss/mock/AliyunOSSMockExtension.java
index 9aae5b777a8d..d4cb10615094 100644
--- a/aliyun/src/test/java/org/apache/iceberg/aliyun/oss/mock/AliyunOSSMockExtension.java
+++ b/aliyun/src/test/java/org/apache/iceberg/aliyun/oss/mock/AliyunOSSMockExtension.java
@@ -34,7 +34,7 @@ public class AliyunOSSMockExtension implements AliyunOSSExtension {
private final Map properties;
- private AliyunOSSMockApp ossMockApp;
+ private AliyunOSSMock ossMock;
private AliyunOSSMockExtension(Map properties) {
this.properties = properties;
@@ -51,12 +51,16 @@ public String keyPrefix() {
@Override
public void start() {
- ossMockApp = AliyunOSSMockApp.start(properties);
+ try {
+ ossMock = AliyunOSSMock.start(properties);
+ } catch (Exception e) {
+ throw new RuntimeException("Can't start OSS Mock");
+ }
}
@Override
public void stop() {
- ossMockApp.stop();
+ ossMock.stop();
}
@Override
@@ -65,12 +69,12 @@ public OSS createOSSClient() {
String.format(
"http://localhost:%s",
properties.getOrDefault(
- AliyunOSSMockApp.PROP_HTTP_PORT, AliyunOSSMockApp.PORT_HTTP_PORT_DEFAULT));
+ AliyunOSSMock.PROP_HTTP_PORT, AliyunOSSMock.PORT_HTTP_PORT_DEFAULT));
return new OSSClientBuilder().build(endpoint, "foo", "bar");
}
private File rootDir() {
- Object rootDir = properties.get(AliyunOSSMockApp.PROP_ROOT_DIR);
+ Object rootDir = properties.get(AliyunOSSMock.PROP_ROOT_DIR);
Preconditions.checkNotNull(rootDir, "Root directory cannot be null");
return new File(rootDir.toString());
}
@@ -103,20 +107,15 @@ public void tearDownBucket(String bucket) {
public static class Builder {
private final Map props = Maps.newHashMap();
- public Builder silent() {
- props.put(AliyunOSSMockApp.PROP_SILENT, true);
- return this;
- }
-
public AliyunOSSExtension build() {
- String rootDir = (String) props.get(AliyunOSSMockApp.PROP_ROOT_DIR);
+ String rootDir = (String) props.get(AliyunOSSMock.PROP_ROOT_DIR);
if (Strings.isNullOrEmpty(rootDir)) {
File dir =
new File(
System.getProperty("java.io.tmpdir"),
"oss-mock-file-store-" + System.currentTimeMillis());
rootDir = dir.getAbsolutePath();
- props.put(AliyunOSSMockApp.PROP_ROOT_DIR, rootDir);
+ props.put(AliyunOSSMock.PROP_ROOT_DIR, rootDir);
}
File root = new File(rootDir);
root.deleteOnExit();
diff --git a/aliyun/src/test/java/org/apache/iceberg/aliyun/oss/mock/AliyunOSSMockLocalController.java b/aliyun/src/test/java/org/apache/iceberg/aliyun/oss/mock/AliyunOSSMockLocalController.java
deleted file mode 100644
index 7f7546ec233b..000000000000
--- a/aliyun/src/test/java/org/apache/iceberg/aliyun/oss/mock/AliyunOSSMockLocalController.java
+++ /dev/null
@@ -1,522 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iceberg.aliyun.oss.mock;
-
-import static org.springframework.http.HttpStatus.INTERNAL_SERVER_ERROR;
-import static org.springframework.http.HttpStatus.OK;
-import static org.springframework.http.HttpStatus.PARTIAL_CONTENT;
-import static org.springframework.http.HttpStatus.REQUESTED_RANGE_NOT_SATISFIABLE;
-
-import com.aliyun.oss.OSSErrorCode;
-import com.aliyun.oss.model.Bucket;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.annotation.JsonRootName;
-import java.io.FileInputStream;
-import java.io.FilterInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import javax.servlet.ServletInputStream;
-import javax.servlet.http.HttpServletRequest;
-import javax.servlet.http.HttpServletResponse;
-import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
-import org.apache.iceberg.relocated.com.google.common.io.ByteStreams;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.http.HttpHeaders;
-import org.springframework.http.MediaType;
-import org.springframework.http.ResponseEntity;
-import org.springframework.web.bind.annotation.ControllerAdvice;
-import org.springframework.web.bind.annotation.ExceptionHandler;
-import org.springframework.web.bind.annotation.PathVariable;
-import org.springframework.web.bind.annotation.RequestHeader;
-import org.springframework.web.bind.annotation.RequestMapping;
-import org.springframework.web.bind.annotation.RequestMethod;
-import org.springframework.web.bind.annotation.RestController;
-import org.springframework.web.servlet.mvc.method.annotation.ResponseEntityExceptionHandler;
-
-@RestController
-public class AliyunOSSMockLocalController {
- private static final Logger LOG = LoggerFactory.getLogger(AliyunOSSMockLocalController.class);
-
- @Autowired private AliyunOSSMockLocalStore localStore;
-
- private static String filenameFrom(@PathVariable String bucketName, HttpServletRequest request) {
- String requestUri = request.getRequestURI();
- return requestUri.substring(requestUri.indexOf(bucketName) + bucketName.length() + 1);
- }
-
- @RequestMapping(value = "/{bucketName}", method = RequestMethod.PUT, produces = "application/xml")
- public void putBucket(@PathVariable String bucketName) throws IOException {
- if (localStore.getBucket(bucketName) != null) {
- throw new OssException(
- 409, OSSErrorCode.BUCKET_ALREADY_EXISTS, bucketName + " already exists.");
- }
-
- localStore.createBucket(bucketName);
- }
-
- @RequestMapping(
- value = "/{bucketName}",
- method = RequestMethod.DELETE,
- produces = "application/xml")
- public void deleteBucket(@PathVariable String bucketName) throws IOException {
- verifyBucketExistence(bucketName);
-
- localStore.deleteBucket(bucketName);
- }
-
- @RequestMapping(value = "/{bucketName:.+}/**", method = RequestMethod.PUT)
- public ResponseEntity putObject(
- @PathVariable String bucketName, HttpServletRequest request) {
- verifyBucketExistence(bucketName);
- String filename = filenameFrom(bucketName, request);
- try (ServletInputStream inputStream = request.getInputStream()) {
- ObjectMetadata metadata =
- localStore.putObject(
- bucketName,
- filename,
- inputStream,
- request.getContentType(),
- request.getHeader(HttpHeaders.CONTENT_ENCODING),
- ImmutableMap.of());
-
- HttpHeaders responseHeaders = new HttpHeaders();
- responseHeaders.setETag("\"" + metadata.getContentMD5() + "\"");
- responseHeaders.setLastModified(metadata.getLastModificationDate());
-
- return new ResponseEntity<>(responseHeaders, OK);
- } catch (Exception e) {
- LOG.error("Failed to put object - bucket: {} - object: {}", bucketName, filename, e);
- return new ResponseEntity<>(e.getMessage(), INTERNAL_SERVER_ERROR);
- }
- }
-
- @RequestMapping(value = "/{bucketName:.+}/**", method = RequestMethod.DELETE)
- public void deleteObject(@PathVariable String bucketName, HttpServletRequest request) {
- verifyBucketExistence(bucketName);
-
- localStore.deleteObject(bucketName, filenameFrom(bucketName, request));
- }
-
- @RequestMapping(value = "/{bucketName:.+}/**", method = RequestMethod.HEAD)
- public ResponseEntity getObjectMeta(
- @PathVariable String bucketName, HttpServletRequest request) {
- verifyBucketExistence(bucketName);
- ObjectMetadata metadata = verifyObjectExistence(bucketName, filenameFrom(bucketName, request));
-
- HttpHeaders headers = new HttpHeaders();
- headers.setETag("\"" + metadata.getContentMD5() + "\"");
- headers.setLastModified(metadata.getLastModificationDate());
- headers.setContentLength(metadata.getContentLength());
-
- return new ResponseEntity<>(headers, OK);
- }
-
- @SuppressWarnings("checkstyle:AnnotationUseStyle")
- @RequestMapping(
- value = "/{bucketName:.+}/**",
- method = RequestMethod.GET,
- produces = "application/xml")
- public void getObject(
- @PathVariable String bucketName,
- @RequestHeader(value = "Range", required = false) Range range,
- HttpServletRequest request,
- HttpServletResponse response)
- throws IOException {
- verifyBucketExistence(bucketName);
-
- String filename = filenameFrom(bucketName, request);
- ObjectMetadata metadata = verifyObjectExistence(bucketName, filename);
-
- if (range != null) {
- long fileSize = metadata.getContentLength();
- long bytesToRead = Math.min(fileSize - 1, range.end()) - range.start() + 1;
- long skipSize = range.start();
- if (range.start() == -1) {
- bytesToRead = Math.min(fileSize - 1, range.end());
- skipSize = fileSize - range.end();
- }
- if (range.end() == -1) {
- bytesToRead = fileSize - range.start();
- }
- if (bytesToRead < 0 || fileSize < range.start()) {
- response.setStatus(REQUESTED_RANGE_NOT_SATISFIABLE.value());
- response.flushBuffer();
- return;
- }
-
- response.setStatus(PARTIAL_CONTENT.value());
- response.setHeader(HttpHeaders.ACCEPT_RANGES, "bytes");
- response.setHeader(
- HttpHeaders.CONTENT_RANGE,
- String.format(
- "bytes %s-%s/%s",
- range.start(), bytesToRead + range.start() + 1, metadata.getContentLength()));
- response.setHeader(HttpHeaders.ETAG, "\"" + metadata.getContentMD5() + "\"");
- response.setDateHeader(HttpHeaders.LAST_MODIFIED, metadata.getLastModificationDate());
- response.setContentType(metadata.getContentType());
- response.setContentLengthLong(bytesToRead);
-
- try (OutputStream outputStream = response.getOutputStream()) {
- try (FileInputStream fis = new FileInputStream(metadata.getDataFile())) {
- fis.skip(skipSize);
- ByteStreams.copy(new BoundedInputStream(fis, bytesToRead), outputStream);
- }
- }
- } else {
- response.setHeader(HttpHeaders.ACCEPT_RANGES, "bytes");
- response.setHeader(HttpHeaders.ETAG, "\"" + metadata.getContentMD5() + "\"");
- response.setDateHeader(HttpHeaders.LAST_MODIFIED, metadata.getLastModificationDate());
- response.setContentType(metadata.getContentType());
- response.setContentLengthLong(metadata.getContentLength());
-
- try (OutputStream outputStream = response.getOutputStream()) {
- try (FileInputStream fis = new FileInputStream(metadata.getDataFile())) {
- ByteStreams.copy(fis, outputStream);
- }
- }
- }
- }
-
- private void verifyBucketExistence(String bucketName) {
- Bucket bucket = localStore.getBucket(bucketName);
- if (bucket == null) {
- throw new OssException(
- 404, OSSErrorCode.NO_SUCH_BUCKET, "The specified bucket does not exist. ");
- }
- }
-
- private ObjectMetadata verifyObjectExistence(String bucketName, String filename) {
- ObjectMetadata objectMetadata = null;
- try {
- objectMetadata = localStore.getObjectMetadata(bucketName, filename);
- } catch (IOException e) {
- LOG.error(
- "Failed to get the object metadata, bucket: {}, object: {}.", bucketName, filename, e);
- }
-
- if (objectMetadata == null) {
- throw new OssException(404, OSSErrorCode.NO_SUCH_KEY, "The specify oss key does not exists.");
- }
-
- return objectMetadata;
- }
-
- @ControllerAdvice
- public static class OSSMockExceptionHandler extends ResponseEntityExceptionHandler {
-
- @ExceptionHandler
- public ResponseEntity handleOSSException(OssException ex) {
- LOG.info("Responding with status {} - {}, {}", ex.status, ex.code, ex.message);
-
- ErrorResponse errorResponse = new ErrorResponse();
- errorResponse.setCode(ex.getCode());
- errorResponse.setMessage(ex.getMessage());
-
- HttpHeaders headers = new HttpHeaders();
- headers.setContentType(MediaType.APPLICATION_XML);
-
- return ResponseEntity.status(ex.status).headers(headers).body(errorResponse);
- }
- }
-
- public static class OssException extends RuntimeException {
-
- private final int status;
- private final String code;
- private final String message;
-
- public OssException(final int status, final String code, final String message) {
- super(message);
- this.status = status;
- this.code = code;
- this.message = message;
- }
-
- public String getCode() {
- return code;
- }
-
- @Override
- public String getMessage() {
- return message;
- }
- }
-
- @JsonRootName("Error")
- public static class ErrorResponse {
- @JsonProperty("Code")
- private String code;
-
- @JsonProperty("Message")
- private String message;
-
- public void setCode(String code) {
- this.code = code;
- }
-
- public void setMessage(String message) {
- this.message = message;
- }
- }
-
- /**
- * Reads bytes up to a maximum length, if its count goes above that, it stops.
- *
- * This is useful to wrap ServletInputStreams. The ServletInputStream will block if you try to
- * read content from it that isn't there, because it doesn't know whether the content hasn't
- * arrived yet or whether the content has finished. So, one of these, initialized with the
- * Content-length sent in the ServletInputStream's header, will stop it blocking, providing it's
- * been sent with a correct content length.
- *
- *
This code is borrowed from `org.apache.commons:commons-io`
- */
- public class BoundedInputStream extends FilterInputStream {
-
- /** The max count of bytes to read. */
- private final long maxCount;
-
- /** The count of bytes read. */
- private long count;
-
- /** The marked position. */
- private long mark = -1;
-
- /** Flag if close should be propagated. */
- private boolean propagateClose = true;
-
- /**
- * Constructs a new {@link BoundedInputStream} that wraps the given input stream and is
- * unlimited.
- *
- * @param in The wrapped input stream.
- */
- public BoundedInputStream(final InputStream in) {
- this(in, -1);
- }
-
- /**
- * Constructs a new {@link BoundedInputStream} that wraps the given input stream and limits it
- * to a certain size.
- *
- * @param inputStream The wrapped input stream.
- * @param maxLength The maximum number of bytes to return.
- */
- public BoundedInputStream(final InputStream inputStream, final long maxLength) {
- // Some badly designed methods - e.g. the servlet API - overload length
- // such that "-1" means stream finished
- super(inputStream);
- this.maxCount = maxLength;
- }
-
- /** {@inheritDoc} */
- @Override
- public int available() throws IOException {
- if (isMaxLength()) {
- onMaxLength(maxCount, count);
- return 0;
- }
- return in.available();
- }
-
- /**
- * Invokes the delegate's {@code close()} method if {@link #isPropagateClose()} is {@code true}.
- *
- * @throws IOException if an I/O error occurs.
- */
- @Override
- public void close() throws IOException {
- if (propagateClose) {
- in.close();
- }
- }
-
- /**
- * Gets the count of bytes read.
- *
- * @return The count of bytes read.
- * @since 2.12.0
- */
- public long getCount() {
- return count;
- }
-
- /**
- * Gets the max count of bytes to read.
- *
- * @return The max count of bytes to read.
- * @since 2.12.0
- */
- public long getMaxLength() {
- return maxCount;
- }
-
- private boolean isMaxLength() {
- return maxCount >= 0 && count >= maxCount;
- }
-
- /**
- * Tests whether the {@link #close()} method should propagate to the underling {@link
- * InputStream}.
- *
- * @return {@code true} if calling {@link #close()} propagates to the {@code close()} method of
- * the underlying stream or {@code false} if it does not.
- */
- public boolean isPropagateClose() {
- return propagateClose;
- }
-
- /**
- * Sets whether the {@link #close()} method should propagate to the underling {@link
- * InputStream}.
- *
- * @param propagateClose {@code true} if calling {@link #close()} propagates to the {@code
- * close()} method of the underlying stream or {@code false} if it does not.
- */
- public void setPropagateClose(final boolean propagateClose) {
- this.propagateClose = propagateClose;
- }
-
- /**
- * Invokes the delegate's {@code mark(int)} method.
- *
- * @param readlimit read ahead limit
- */
- @Override
- public synchronized void mark(final int readlimit) {
- in.mark(readlimit);
- mark = count;
- }
-
- /**
- * Invokes the delegate's {@code markSupported()} method.
- *
- * @return true if mark is supported, otherwise false
- */
- @Override
- public boolean markSupported() {
- return in.markSupported();
- }
-
- /**
- * A caller has caused a request that would cross the {@code maxLength} boundary.
- *
- * @param maxLength The max count of bytes to read.
- * @param bytesRead The count of bytes read.
- * @throws IOException Subclasses may throw.
- * @since 2.12.0
- */
- protected void onMaxLength(final long maxLength, final long bytesRead) throws IOException {
- // for subclasses
- }
-
- /**
- * Invokes the delegate's {@code read()} method if the current position is less than the limit.
- *
- * @return the byte read or -1 if the end of stream or the limit has been reached.
- * @throws IOException if an I/O error occurs.
- */
- @Override
- public int read() throws IOException {
- if (isMaxLength()) {
- onMaxLength(maxCount, count);
- return -1;
- }
- final int result = in.read();
- count++;
- return result;
- }
-
- /**
- * Invokes the delegate's {@code read(byte[])} method.
- *
- * @param b the buffer to read the bytes into
- * @return the number of bytes read or -1 if the end of stream or the limit has been reached.
- * @throws IOException if an I/O error occurs.
- */
- @Override
- public int read(final byte[] b) throws IOException {
- return this.read(b, 0, b.length);
- }
-
- /**
- * Invokes the delegate's {@code read(byte[], int, int)} method.
- *
- * @param b the buffer to read the bytes into
- * @param off The start offset
- * @param len The number of bytes to read
- * @return the number of bytes read or -1 if the end of stream or the limit has been reached.
- * @throws IOException if an I/O error occurs.
- */
- @Override
- public int read(final byte[] b, final int off, final int len) throws IOException {
- if (isMaxLength()) {
- onMaxLength(maxCount, count);
- return -1;
- }
- final long maxRead = maxCount >= 0 ? Math.min(len, maxCount - count) : len;
- final int bytesRead = in.read(b, off, (int) maxRead);
-
- if (bytesRead == -1) {
- return -1;
- }
-
- count += bytesRead;
- return bytesRead;
- }
-
- /**
- * Invokes the delegate's {@code reset()} method.
- *
- * @throws IOException if an I/O error occurs.
- */
- @Override
- public synchronized void reset() throws IOException {
- in.reset();
- count = mark;
- }
-
- /**
- * Invokes the delegate's {@code skip(long)} method.
- *
- * @param n the number of bytes to skip
- * @return the actual number of bytes skipped
- * @throws IOException if an I/O error occurs.
- */
- @Override
- public long skip(final long n) throws IOException {
- final long toSkip = maxCount >= 0 ? Math.min(n, maxCount - count) : n;
- final long skippedBytes = in.skip(toSkip);
- count += skippedBytes;
- return skippedBytes;
- }
-
- /**
- * Invokes the delegate's {@code toString()} method.
- *
- * @return the delegate's {@code toString()}
- */
- @Override
- public String toString() {
- return in.toString();
- }
- }
-}
diff --git a/aliyun/src/test/java/org/apache/iceberg/aliyun/oss/mock/AliyunOSSMockLocalStore.java b/aliyun/src/test/java/org/apache/iceberg/aliyun/oss/mock/AliyunOSSMockLocalStore.java
index f7a4b72e4b97..521b87e31e80 100644
--- a/aliyun/src/test/java/org/apache/iceberg/aliyun/oss/mock/AliyunOSSMockLocalStore.java
+++ b/aliyun/src/test/java/org/apache/iceberg/aliyun/oss/mock/AliyunOSSMockLocalStore.java
@@ -46,11 +46,7 @@
import org.apache.iceberg.relocated.com.google.common.io.ByteStreams;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Value;
-import org.springframework.http.MediaType;
-import org.springframework.stereotype.Component;
-@Component
public class AliyunOSSMockLocalStore {
private static final Logger LOG = LoggerFactory.getLogger(AliyunOSSMockLocalStore.class);
@@ -61,8 +57,7 @@ public class AliyunOSSMockLocalStore {
private final ObjectMapper objectMapper = new ObjectMapper();
- public AliyunOSSMockLocalStore(
- @Value("${" + AliyunOSSMockApp.PROP_ROOT_DIR + ":}") String rootDir) {
+ public AliyunOSSMockLocalStore(String rootDir) {
Preconditions.checkNotNull(rootDir, "Root directory cannot be null");
this.root = new File(rootDir);
@@ -121,8 +116,7 @@ void deleteBucket(String bucketName) throws IOException {
File dir = new File(root, bucket.getName());
if (Files.walk(dir.toPath()).anyMatch(p -> p.toFile().isFile())) {
- throw new AliyunOSSMockLocalController.OssException(
- 409, OSSErrorCode.BUCKET_NOT_EMPTY, "The bucket you tried to delete is not empty. ");
+ throw new RuntimeException(OSSErrorCode.BUCKET_NOT_EMPTY);
}
try (Stream walk = Files.walk(dir.toPath())) {
@@ -156,7 +150,9 @@ ObjectMetadata putObject(
metadata.setContentLength(dataFile.length());
metadata.setContentMD5(md5sum(dataFile.getAbsolutePath()));
metadata.setContentType(
- contentType != null ? contentType : MediaType.APPLICATION_OCTET_STREAM_VALUE);
+ contentType != null
+ ? contentType
+ : "application/octet"); // MediaType.APPLICATION_OCTET_STREAM_VALUE
metadata.setContentEncoding(contentEncoding);
metadata.setDataFile(dataFile.getAbsolutePath());
metadata.setMetaFile(metaFile.getAbsolutePath());
diff --git a/api/src/main/java/org/apache/iceberg/ContentFile.java b/api/src/main/java/org/apache/iceberg/ContentFile.java
index 4bb8a78289de..a4d97fa93fa1 100644
--- a/api/src/main/java/org/apache/iceberg/ContentFile.java
+++ b/api/src/main/java/org/apache/iceberg/ContentFile.java
@@ -29,6 +29,14 @@
* @param the concrete Java class of a ContentFile instance.
*/
public interface ContentFile {
+ /**
+ * Returns the path of the manifest which this file is referenced in or null if it was not read
+ * from a manifest.
+ */
+ default String manifestLocation() {
+ return null;
+ }
+
/**
* Returns the ordinal position of the file in a manifest, or null if it was not read from a
* manifest.
@@ -43,9 +51,19 @@ public interface ContentFile {
*/
FileContent content();
- /** Returns fully qualified path to the file, suitable for constructing a Hadoop Path. */
+ /**
+ * Returns fully qualified path to the file, suitable for constructing a Hadoop Path.
+ *
+ * @deprecated since 1.7.0, will be removed in 2.0.0; use {@link #location()} instead.
+ */
+ @Deprecated
CharSequence path();
+ /** Return the fully qualified path to the file. */
+ default String location() {
+ return path().toString();
+ }
+
/** Returns format of the file. */
FileFormat format();
diff --git a/api/src/main/java/org/apache/iceberg/DataFile.java b/api/src/main/java/org/apache/iceberg/DataFile.java
index 59b329c500c7..02ad0aff3128 100644
--- a/api/src/main/java/org/apache/iceberg/DataFile.java
+++ b/api/src/main/java/org/apache/iceberg/DataFile.java
@@ -102,6 +102,7 @@ public interface DataFile extends ContentFile {
int PARTITION_ID = 102;
String PARTITION_NAME = "partition";
String PARTITION_DOC = "Partition data tuple, schema based on the partition spec";
+
// NEXT ID TO ASSIGN: 142
static StructType getType(StructType partitionType) {
@@ -126,7 +127,9 @@ static StructType getType(StructType partitionType) {
SORT_ORDER_ID);
}
- /** @return the content stored in the file; one of DATA, POSITION_DELETES, or EQUALITY_DELETES */
+ /**
+ * @return the content stored in the file; one of DATA, POSITION_DELETES, or EQUALITY_DELETES
+ */
@Override
default FileContent content() {
return FileContent.DATA;
diff --git a/api/src/main/java/org/apache/iceberg/PartitionSpec.java b/api/src/main/java/org/apache/iceberg/PartitionSpec.java
index 08a1c4f9ecfd..9b74893f1831 100644
--- a/api/src/main/java/org/apache/iceberg/PartitionSpec.java
+++ b/api/src/main/java/org/apache/iceberg/PartitionSpec.java
@@ -371,6 +371,7 @@ public static class Builder {
new AtomicInteger(unpartitionedLastAssignedId());
// check if there are conflicts between partition and schema field name
private boolean checkConflicts = true;
+ private boolean caseSensitive = true;
private Builder(Schema schema) {
this.schema = schema;
@@ -390,7 +391,8 @@ Builder checkConflicts(boolean check) {
}
private void checkAndAddPartitionName(String name, Integer sourceColumnId) {
- Types.NestedField schemaField = schema.findField(name);
+ Types.NestedField schemaField =
+ this.caseSensitive ? schema.findField(name) : schema.caseInsensitiveFindField(name);
if (checkConflicts) {
if (sourceColumnId != null) {
// for identity transform case we allow conflicts between partition and schema field name
@@ -427,20 +429,31 @@ private void checkForRedundantPartitions(PartitionField field) {
dedupFields.put(dedupKey, field);
}
+ public Builder caseSensitive(boolean sensitive) {
+ this.caseSensitive = sensitive;
+ return this;
+ }
+
public Builder withSpecId(int newSpecId) {
this.specId = newSpecId;
return this;
}
private Types.NestedField findSourceColumn(String sourceName) {
- Types.NestedField sourceColumn = schema.findField(sourceName);
+ Types.NestedField sourceColumn =
+ this.caseSensitive
+ ? schema.findField(sourceName)
+ : schema.caseInsensitiveFindField(sourceName);
Preconditions.checkArgument(
sourceColumn != null, "Cannot find source column: %s", sourceName);
return sourceColumn;
}
Builder identity(String sourceName, String targetName) {
- Types.NestedField sourceColumn = findSourceColumn(sourceName);
+ return identity(findSourceColumn(sourceName), targetName);
+ }
+
+ private Builder identity(Types.NestedField sourceColumn, String targetName) {
checkAndAddPartitionName(targetName, sourceColumn.fieldId());
PartitionField field =
new PartitionField(
@@ -451,12 +464,16 @@ Builder identity(String sourceName, String targetName) {
}
public Builder identity(String sourceName) {
- return identity(sourceName, sourceName);
+ Types.NestedField sourceColumn = findSourceColumn(sourceName);
+ return identity(sourceColumn, schema.findColumnName(sourceColumn.fieldId()));
}
public Builder year(String sourceName, String targetName) {
+ return year(findSourceColumn(sourceName), targetName);
+ }
+
+ private Builder year(Types.NestedField sourceColumn, String targetName) {
checkAndAddPartitionName(targetName);
- Types.NestedField sourceColumn = findSourceColumn(sourceName);
PartitionField field =
new PartitionField(sourceColumn.fieldId(), nextFieldId(), targetName, Transforms.year());
checkForRedundantPartitions(field);
@@ -465,12 +482,17 @@ public Builder year(String sourceName, String targetName) {
}
public Builder year(String sourceName) {
- return year(sourceName, sourceName + "_year");
+ Types.NestedField sourceColumn = findSourceColumn(sourceName);
+ String columnName = schema.findColumnName(sourceColumn.fieldId());
+ return year(sourceColumn, columnName + "_year");
}
public Builder month(String sourceName, String targetName) {
+ return month(findSourceColumn(sourceName), targetName);
+ }
+
+ private Builder month(Types.NestedField sourceColumn, String targetName) {
checkAndAddPartitionName(targetName);
- Types.NestedField sourceColumn = findSourceColumn(sourceName);
PartitionField field =
new PartitionField(sourceColumn.fieldId(), nextFieldId(), targetName, Transforms.month());
checkForRedundantPartitions(field);
@@ -479,12 +501,17 @@ public Builder month(String sourceName, String targetName) {
}
public Builder month(String sourceName) {
- return month(sourceName, sourceName + "_month");
+ Types.NestedField sourceColumn = findSourceColumn(sourceName);
+ String columnName = schema.findColumnName(sourceColumn.fieldId());
+ return month(sourceColumn, columnName + "_month");
}
public Builder day(String sourceName, String targetName) {
+ return day(findSourceColumn(sourceName), targetName);
+ }
+
+ private Builder day(Types.NestedField sourceColumn, String targetName) {
checkAndAddPartitionName(targetName);
- Types.NestedField sourceColumn = findSourceColumn(sourceName);
PartitionField field =
new PartitionField(sourceColumn.fieldId(), nextFieldId(), targetName, Transforms.day());
checkForRedundantPartitions(field);
@@ -493,12 +520,17 @@ public Builder day(String sourceName, String targetName) {
}
public Builder day(String sourceName) {
- return day(sourceName, sourceName + "_day");
+ Types.NestedField sourceColumn = findSourceColumn(sourceName);
+ String columnName = schema.findColumnName(sourceColumn.fieldId());
+ return day(sourceColumn, columnName + "_day");
}
public Builder hour(String sourceName, String targetName) {
+ return hour(findSourceColumn(sourceName), targetName);
+ }
+
+ private Builder hour(Types.NestedField sourceColumn, String targetName) {
checkAndAddPartitionName(targetName);
- Types.NestedField sourceColumn = findSourceColumn(sourceName);
PartitionField field =
new PartitionField(sourceColumn.fieldId(), nextFieldId(), targetName, Transforms.hour());
checkForRedundantPartitions(field);
@@ -507,12 +539,17 @@ public Builder hour(String sourceName, String targetName) {
}
public Builder hour(String sourceName) {
- return hour(sourceName, sourceName + "_hour");
+ Types.NestedField sourceColumn = findSourceColumn(sourceName);
+ String columnName = schema.findColumnName(sourceColumn.fieldId());
+ return hour(sourceColumn, columnName + "_hour");
}
public Builder bucket(String sourceName, int numBuckets, String targetName) {
+ return bucket(findSourceColumn(sourceName), numBuckets, targetName);
+ }
+
+ private Builder bucket(Types.NestedField sourceColumn, int numBuckets, String targetName) {
checkAndAddPartitionName(targetName);
- Types.NestedField sourceColumn = findSourceColumn(sourceName);
fields.add(
new PartitionField(
sourceColumn.fieldId(), nextFieldId(), targetName, Transforms.bucket(numBuckets)));
@@ -520,12 +557,17 @@ public Builder bucket(String sourceName, int numBuckets, String targetName) {
}
public Builder bucket(String sourceName, int numBuckets) {
- return bucket(sourceName, numBuckets, sourceName + "_bucket");
+ Types.NestedField sourceColumn = findSourceColumn(sourceName);
+ String columnName = schema.findColumnName(sourceColumn.fieldId());
+ return bucket(sourceColumn, numBuckets, columnName + "_bucket");
}
public Builder truncate(String sourceName, int width, String targetName) {
+ return truncate(findSourceColumn(sourceName), width, targetName);
+ }
+
+ private Builder truncate(Types.NestedField sourceColumn, int width, String targetName) {
checkAndAddPartitionName(targetName);
- Types.NestedField sourceColumn = findSourceColumn(sourceName);
fields.add(
new PartitionField(
sourceColumn.fieldId(), nextFieldId(), targetName, Transforms.truncate(width)));
@@ -533,11 +575,16 @@ public Builder truncate(String sourceName, int width, String targetName) {
}
public Builder truncate(String sourceName, int width) {
- return truncate(sourceName, width, sourceName + "_trunc");
+ Types.NestedField sourceColumn = findSourceColumn(sourceName);
+ String columnName = schema.findColumnName(sourceColumn.fieldId());
+ return truncate(sourceColumn, width, columnName + "_trunc");
}
public Builder alwaysNull(String sourceName, String targetName) {
- Types.NestedField sourceColumn = findSourceColumn(sourceName);
+ return alwaysNull(findSourceColumn(sourceName), targetName);
+ }
+
+ private Builder alwaysNull(Types.NestedField sourceColumn, String targetName) {
checkAndAddPartitionName(
targetName, sourceColumn.fieldId()); // can duplicate a source column name
fields.add(
@@ -547,7 +594,9 @@ public Builder alwaysNull(String sourceName, String targetName) {
}
public Builder alwaysNull(String sourceName) {
- return alwaysNull(sourceName, sourceName + "_null");
+ Types.NestedField sourceColumn = findSourceColumn(sourceName);
+ String columnName = schema.findColumnName(sourceColumn.fieldId());
+ return alwaysNull(sourceColumn, columnName + "_null");
}
// add a partition field with an auto-increment partition field id starting from
diff --git a/api/src/main/java/org/apache/iceberg/RowDelta.java b/api/src/main/java/org/apache/iceberg/RowDelta.java
index 624f6c15d20b..a5e3fa477ba9 100644
--- a/api/src/main/java/org/apache/iceberg/RowDelta.java
+++ b/api/src/main/java/org/apache/iceberg/RowDelta.java
@@ -46,6 +46,17 @@ public interface RowDelta extends SnapshotUpdate {
*/
RowDelta addDeletes(DeleteFile deletes);
+ /**
+ * Removes a rewritten {@link DeleteFile} from the table.
+ *
+ * @param deletes a delete file that can be removed from the table
+ * @return this for method chaining
+ */
+ default RowDelta removeDeletes(DeleteFile deletes) {
+ throw new UnsupportedOperationException(
+ getClass().getName() + " does not implement removeDeletes");
+ }
+
/**
* Set the snapshot ID used in any reads for this operation.
*
diff --git a/api/src/main/java/org/apache/iceberg/Schema.java b/api/src/main/java/org/apache/iceberg/Schema.java
index 7ff712b62790..9bcf691f5a03 100644
--- a/api/src/main/java/org/apache/iceberg/Schema.java
+++ b/api/src/main/java/org/apache/iceberg/Schema.java
@@ -54,6 +54,8 @@ public class Schema implements Serializable {
private static final Joiner NEWLINE = Joiner.on('\n');
private static final String ALL_COLUMNS = "*";
private static final int DEFAULT_SCHEMA_ID = 0;
+ private static final Map MIN_FORMAT_VERSIONS =
+ ImmutableMap.of(Type.TypeID.TIMESTAMP_NANO, 3);
private final StructType struct;
private final int schemaId;
@@ -573,4 +575,27 @@ private List reassignIds(List columns, TypeUtil.GetID
});
return res.asStructType().fields();
}
+
+ /**
+ * Check the compatibility of the schema with a format version.
+ *
+ * This validates that the schema does not contain types that were released in later format
+ * versions.
+ *
+ * @param schema a Schema
+ * @param formatVersion table format version
+ */
+ public static void checkCompatibility(Schema schema, int formatVersion) {
+ // check the type in each field
+ for (NestedField field : schema.lazyIdToField().values()) {
+ Integer minFormatVersion = MIN_FORMAT_VERSIONS.get(field.type().typeId());
+ Preconditions.checkState(
+ minFormatVersion == null || formatVersion >= minFormatVersion,
+ "Invalid type in v%s schema: %s %s is not supported until v%s",
+ formatVersion,
+ schema.findColumnName(field.fieldId()),
+ field.type(),
+ minFormatVersion);
+ }
+ }
}
diff --git a/api/src/main/java/org/apache/iceberg/UpdatePartitionSpec.java b/api/src/main/java/org/apache/iceberg/UpdatePartitionSpec.java
index eeb596d42d5c..a4994d22001d 100644
--- a/api/src/main/java/org/apache/iceberg/UpdatePartitionSpec.java
+++ b/api/src/main/java/org/apache/iceberg/UpdatePartitionSpec.java
@@ -132,5 +132,5 @@ public interface UpdatePartitionSpec extends PendingUpdate {
default UpdatePartitionSpec addNonDefaultSpec() {
throw new UnsupportedOperationException(
this.getClass().getName() + " doesn't implement addNonDefaultSpec()");
- };
+ }
}
diff --git a/api/src/main/java/org/apache/iceberg/actions/ActionsProvider.java b/api/src/main/java/org/apache/iceberg/actions/ActionsProvider.java
index 2d6ff2679a17..61750d83fc79 100644
--- a/api/src/main/java/org/apache/iceberg/actions/ActionsProvider.java
+++ b/api/src/main/java/org/apache/iceberg/actions/ActionsProvider.java
@@ -70,4 +70,22 @@ default RewritePositionDeleteFiles rewritePositionDeletes(Table table) {
throw new UnsupportedOperationException(
this.getClass().getName() + " does not implement rewritePositionDeletes");
}
+
+ /** Instantiates an action to compute table stats. */
+ default ComputeTableStats computeTableStats(Table table) {
+ throw new UnsupportedOperationException(
+ this.getClass().getName() + " does not implement computeTableStats");
+ }
+
+ /** Instantiates an action to rewrite all absolute paths in table metadata. */
+ default RewriteTablePath rewriteTablePath(Table table) {
+ throw new UnsupportedOperationException(
+ this.getClass().getName() + " does not implement rewriteTablePath");
+ }
+
+ /** Instantiates an action to remove dangling delete files from current snapshot. */
+ default RemoveDanglingDeleteFiles removeDanglingDeleteFiles(Table table) {
+ throw new UnsupportedOperationException(
+ this.getClass().getName() + " does not implement removeDanglingDeleteFiles");
+ }
}
diff --git a/api/src/main/java/org/apache/iceberg/actions/ComputeTableStats.java b/api/src/main/java/org/apache/iceberg/actions/ComputeTableStats.java
new file mode 100644
index 000000000000..04449d591657
--- /dev/null
+++ b/api/src/main/java/org/apache/iceberg/actions/ComputeTableStats.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.actions;
+
+import org.apache.iceberg.StatisticsFile;
+
+/** An action that collects statistics of an Iceberg table and writes to Puffin files. */
+public interface ComputeTableStats extends Action {
+ /**
+ * Choose the set of columns to collect stats, by default all columns are chosen.
+ *
+ * @param columns a set of column names to be analyzed
+ * @return this for method chaining
+ */
+ ComputeTableStats columns(String... columns);
+
+ /**
+ * Choose the table snapshot to compute stats, by default the current snapshot is used.
+ *
+ * @param snapshotId long ID of the snapshot for which stats need to be computed
+ * @return this for method chaining
+ */
+ ComputeTableStats snapshot(long snapshotId);
+
+ /** The result of table statistics collection. */
+ interface Result {
+
+ /** Returns statistics file or none if no statistics were collected. */
+ StatisticsFile statisticsFile();
+ }
+}
diff --git a/api/src/main/java/org/apache/iceberg/actions/RemoveDanglingDeleteFiles.java b/api/src/main/java/org/apache/iceberg/actions/RemoveDanglingDeleteFiles.java
new file mode 100644
index 000000000000..b0ef0d5e35f8
--- /dev/null
+++ b/api/src/main/java/org/apache/iceberg/actions/RemoveDanglingDeleteFiles.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.actions;
+
+import org.apache.iceberg.DeleteFile;
+
+/**
+ * An action that removes dangling delete files from the current snapshot. A delete file is dangling
+ * if its deletes no longer applies to any live data files.
+ */
+public interface RemoveDanglingDeleteFiles
+ extends Action {
+
+ /** An action that remove dangling deletes. */
+ interface Result {
+ /** Return removed deletes. */
+ Iterable removedDeleteFiles();
+ }
+}
diff --git a/api/src/main/java/org/apache/iceberg/actions/RewriteDataFiles.java b/api/src/main/java/org/apache/iceberg/actions/RewriteDataFiles.java
index f6ef40270852..589b9017741e 100644
--- a/api/src/main/java/org/apache/iceberg/actions/RewriteDataFiles.java
+++ b/api/src/main/java/org/apache/iceberg/actions/RewriteDataFiles.java
@@ -106,6 +106,18 @@ public interface RewriteDataFiles
boolean USE_STARTING_SEQUENCE_NUMBER_DEFAULT = true;
+ /**
+ * Remove dangling delete files from the current snapshot after compaction. A delete file is
+ * considered dangling if it does not apply to any live data files.
+ *
+ * Both equality and position dangling delete files will be removed.
+ *
+ *
Defaults to false.
+ */
+ String REMOVE_DANGLING_DELETES = "remove-dangling-deletes";
+
+ boolean REMOVE_DANGLING_DELETES_DEFAULT = false;
+
/**
* Forces the rewrite job order based on the value.
*
@@ -216,6 +228,10 @@ default long rewrittenBytesCount() {
default int failedDataFilesCount() {
return rewriteFailures().stream().mapToInt(FileGroupFailureResult::dataFilesCount).sum();
}
+
+ default int removedDeleteFilesCount() {
+ return 0;
+ }
}
/**
diff --git a/api/src/main/java/org/apache/iceberg/actions/RewriteTablePath.java b/api/src/main/java/org/apache/iceberg/actions/RewriteTablePath.java
new file mode 100644
index 000000000000..b7aed67396a5
--- /dev/null
+++ b/api/src/main/java/org/apache/iceberg/actions/RewriteTablePath.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.actions;
+
+/**
+ * An action that rewrites the table's metadata files to a staging directory, replacing all source
+ * prefixes in absolute paths with a specified target prefix. There are two modes:
+ *
+ *
+ * - Complete copy: Rewrites all metadata files to the staging directory.
+ *
- Incremental copy: Rewrites a subset of metadata files to the staging directory,
+ * consisting of metadata files added since a specified start version and/or until end
+ * version. The start/end version is identified by the name of a metadata.json file, and all
+ * metadata files added before/after these file are marked for rewrite.
+ *
+ *
+ * This action can be used as the starting point to fully or incrementally copy an Iceberg table
+ * located under the source prefix to the target prefix.
+ *
+ * The action returns the following:
+ *
+ *
+ * - The name of the latest metadata.json rewritten to staging location. After the files are
+ * copied, this will be the root of the copied table.
+ *
- A list of all files added to the table between startVersion and endVersion, including their
+ * original and target paths under the target prefix. This list covers both original and
+ * rewritten files, allowing for copying to the target paths to form the copied table.
+ *
+ */
+public interface RewriteTablePath extends Action {
+
+ /**
+ * Configure a source prefix that will be replaced by the specified target prefix in all paths
+ *
+ * @param sourcePrefix the source prefix to be replaced
+ * @param targetPrefix the target prefix
+ * @return this for method chaining
+ */
+ RewriteTablePath rewriteLocationPrefix(String sourcePrefix, String targetPrefix);
+
+ /**
+ * First metadata version to rewrite, identified by name of a metadata.json file in the table's
+ * metadata log. It is optional, if provided then this action will only rewrite metadata files
+ * added after this version.
+ *
+ * @param startVersion name of a metadata.json file. For example,
+ * "00001-8893aa9e-f92e-4443-80e7-cfa42238a654.metadata.json".
+ * @return this for method chaining
+ */
+ RewriteTablePath startVersion(String startVersion);
+
+ /**
+ * Last metadata version to rewrite, identified by name of a metadata.json file in the table's
+ * metadata log. It is optional, if provided then this action will only rewrite metadata files
+ * added before this file, including the file itself.
+ *
+ * @param endVersion name of a metadata.json file. For example,
+ * "00001-8893aa9e-f92e-4443-80e7-cfa42238a654.metadata.json".
+ * @return this for method chaining
+ */
+ RewriteTablePath endVersion(String endVersion);
+
+ /**
+ * Custom staging location. It is optional. By default, staging location is a subdirectory under
+ * table's metadata directory.
+ *
+ * @param stagingLocation the staging location
+ * @return this for method chaining
+ */
+ RewriteTablePath stagingLocation(String stagingLocation);
+
+ /** The action result that contains a summary of the execution. */
+ interface Result {
+ /** Staging location of rewritten files */
+ String stagingLocation();
+
+ /**
+ * Path to a comma-separated list of source and target paths for all files added to the table
+ * between startVersion and endVersion, including original data files and metadata files
+ * rewritten to staging.
+ */
+ String fileListLocation();
+
+ /** Name of latest metadata file version */
+ String latestVersion();
+ }
+}
diff --git a/api/src/main/java/org/apache/iceberg/encryption/KmsClient.java b/api/src/main/java/org/apache/iceberg/encryption/KmsClient.java
index 3ebda7be27f2..87dd2b286b12 100644
--- a/api/src/main/java/org/apache/iceberg/encryption/KmsClient.java
+++ b/api/src/main/java/org/apache/iceberg/encryption/KmsClient.java
@@ -23,7 +23,9 @@
import java.util.Map;
/** A minimum client interface to connect to a key management service (KMS). */
-/** @deprecated the API will be removed in v2.0.0 (replaced with KeyManagementClient interface). */
+/**
+ * @deprecated the API will be removed in v2.0.0 (replaced with KeyManagementClient interface).
+ */
@Deprecated
public interface KmsClient extends Serializable {
diff --git a/api/src/main/java/org/apache/iceberg/exceptions/NoSuchIcebergViewException.java b/api/src/main/java/org/apache/iceberg/exceptions/NoSuchIcebergViewException.java
new file mode 100644
index 000000000000..bc5da2aee280
--- /dev/null
+++ b/api/src/main/java/org/apache/iceberg/exceptions/NoSuchIcebergViewException.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.exceptions;
+
+import com.google.errorprone.annotations.FormatMethod;
+
+/** NoSuchIcebergViewException thrown when a view is found, but it is not an Iceberg view. */
+public class NoSuchIcebergViewException extends NoSuchViewException {
+ @FormatMethod
+ public NoSuchIcebergViewException(String message, Object... args) {
+ super(message, args);
+ }
+
+ @FormatMethod
+ public static void check(boolean test, String message, Object... args) {
+ if (!test) {
+ throw new NoSuchIcebergViewException(message, args);
+ }
+ }
+}
diff --git a/api/src/main/java/org/apache/iceberg/expressions/BoundLiteralPredicate.java b/api/src/main/java/org/apache/iceberg/expressions/BoundLiteralPredicate.java
index 02dc31c6a6c5..127d46e6a48f 100644
--- a/api/src/main/java/org/apache/iceberg/expressions/BoundLiteralPredicate.java
+++ b/api/src/main/java/org/apache/iceberg/expressions/BoundLiteralPredicate.java
@@ -31,6 +31,7 @@ public class BoundLiteralPredicate extends BoundPredicate {
Type.TypeID.LONG,
Type.TypeID.DATE,
Type.TypeID.TIME,
+ Type.TypeID.TIMESTAMP_NANO,
Type.TypeID.TIMESTAMP);
private static long toLong(Literal> lit) {
diff --git a/api/src/main/java/org/apache/iceberg/expressions/BoundReference.java b/api/src/main/java/org/apache/iceberg/expressions/BoundReference.java
index ca51b1944c66..0ff73632b1d6 100644
--- a/api/src/main/java/org/apache/iceberg/expressions/BoundReference.java
+++ b/api/src/main/java/org/apache/iceberg/expressions/BoundReference.java
@@ -18,6 +18,7 @@
*/
package org.apache.iceberg.expressions;
+import java.util.Locale;
import org.apache.iceberg.Accessor;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.types.Type;
@@ -82,6 +83,7 @@ public Accessor accessor() {
@Override
public String toString() {
- return String.format("ref(id=%d, accessor-type=%s)", field.fieldId(), accessor.type());
+ return String.format(
+ Locale.ROOT, "ref(id=%d, accessor-type=%s)", field.fieldId(), accessor.type());
}
}
diff --git a/api/src/main/java/org/apache/iceberg/expressions/ExpressionUtil.java b/api/src/main/java/org/apache/iceberg/expressions/ExpressionUtil.java
index bf72e03bc406..82d513ced7dd 100644
--- a/api/src/main/java/org/apache/iceberg/expressions/ExpressionUtil.java
+++ b/api/src/main/java/org/apache/iceberg/expressions/ExpressionUtil.java
@@ -23,6 +23,7 @@
import java.time.ZoneOffset;
import java.time.temporal.ChronoUnit;
import java.util.List;
+import java.util.Locale;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
@@ -36,6 +37,7 @@
import org.apache.iceberg.transforms.Transforms;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.DateTimeUtil;
/** Expression utility methods. */
public class ExpressionUtil {
@@ -52,6 +54,12 @@ public class ExpressionUtil {
private static final Pattern TIMESTAMPTZ =
Pattern.compile(
"\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}(:\\d{2}(.\\d{1,9})?)?([-+]\\d{2}:\\d{2}|Z)");
+ private static final Pattern TIMESTAMPNS =
+ Pattern.compile("\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}(:\\d{2}(.\\d{7,9})?)?");
+ private static final Pattern TIMESTAMPTZNS =
+ Pattern.compile(
+ "\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}(:\\d{2}(.\\d{7,9})?)?([-+]\\d{2}:\\d{2}|Z)");
+
static final int LONG_IN_PREDICATE_ABBREVIATION_THRESHOLD = 10;
private static final int LONG_IN_PREDICATE_ABBREVIATION_MIN_GAIN = 5;
@@ -493,8 +501,10 @@ private static List abbreviateValues(List sanitizedValues) {
abbreviatedList.addAll(distinctValues);
abbreviatedList.add(
String.format(
+ Locale.ROOT,
"... (%d values hidden, %d in total)",
- sanitizedValues.size() - distinctValues.size(), sanitizedValues.size()));
+ sanitizedValues.size() - distinctValues.size(),
+ sanitizedValues.size()));
return abbreviatedList;
}
}
@@ -515,6 +525,8 @@ private static String sanitize(Type type, Object value, long now, int today) {
return "(time)";
case TIMESTAMP:
return sanitizeTimestamp((long) value, now);
+ case TIMESTAMP_NANO:
+ return sanitizeTimestamp(DateTimeUtil.nanosToMicros((long) value / 1000), now);
case STRING:
return sanitizeString((CharSequence) value, now, today);
case BOOLEAN:
@@ -536,6 +548,9 @@ private static String sanitize(Literal> literal, long now, int today) {
return sanitizeDate(((Literals.DateLiteral) literal).value(), today);
} else if (literal instanceof Literals.TimestampLiteral) {
return sanitizeTimestamp(((Literals.TimestampLiteral) literal).value(), now);
+ } else if (literal instanceof Literals.TimestampNanoLiteral) {
+ return sanitizeTimestamp(
+ DateTimeUtil.nanosToMicros(((Literals.TimestampNanoLiteral) literal).value()), now);
} else if (literal instanceof Literals.TimeLiteral) {
return "(time)";
} else if (literal instanceof Literals.IntegerLiteral) {
@@ -594,6 +609,12 @@ private static String sanitizeString(CharSequence value, long now, int today) {
if (DATE.matcher(value).matches()) {
Literal date = Literal.of(value).to(Types.DateType.get());
return sanitizeDate(date.value(), today);
+ } else if (TIMESTAMPNS.matcher(value).matches()) {
+ Literal ts = Literal.of(value).to(Types.TimestampNanoType.withoutZone());
+ return sanitizeTimestamp(DateTimeUtil.nanosToMicros(ts.value()), now);
+ } else if (TIMESTAMPTZNS.matcher(value).matches()) {
+ Literal ts = Literal.of(value).to(Types.TimestampNanoType.withZone());
+ return sanitizeTimestamp(DateTimeUtil.nanosToMicros(ts.value()), now);
} else if (TIMESTAMP.matcher(value).matches()) {
Literal ts = Literal.of(value).to(Types.TimestampType.withoutZone());
return sanitizeTimestamp(ts.value(), now);
@@ -615,7 +636,7 @@ private static String sanitizeString(CharSequence value, long now, int today) {
private static String sanitizeSimpleString(CharSequence value) {
// hash the value and return the hash as hex
- return String.format("(hash-%08x)", HASH_FUNC.apply(value));
+ return String.format(Locale.ROOT, "(hash-%08x)", HASH_FUNC.apply(value));
}
private static PartitionSpec identitySpec(Schema schema, int... ids) {
diff --git a/api/src/main/java/org/apache/iceberg/expressions/Expressions.java b/api/src/main/java/org/apache/iceberg/expressions/Expressions.java
index f21a7705968b..deeba664ec07 100644
--- a/api/src/main/java/org/apache/iceberg/expressions/Expressions.java
+++ b/api/src/main/java/org/apache/iceberg/expressions/Expressions.java
@@ -309,6 +309,18 @@ public static UnboundTerm transform(String name, Transform, T> transfor
return new UnboundTransform<>(ref(name), transform);
}
+ /**
+ * Create a {@link Literal} from an Object.
+ *
+ * @param value a value
+ * @param Java type of value
+ * @return a Literal for the given value
+ * @throws IllegalArgumentException if the value has no literal implementation
+ */
+ public static Literal lit(T value) {
+ return Literals.from(value);
+ }
+
public static UnboundAggregate count(String name) {
return new UnboundAggregate<>(Operation.COUNT, ref(name));
}
diff --git a/api/src/main/java/org/apache/iceberg/expressions/Literals.java b/api/src/main/java/org/apache/iceberg/expressions/Literals.java
index 79d7190c49df..ee47035b1e72 100644
--- a/api/src/main/java/org/apache/iceberg/expressions/Literals.java
+++ b/api/src/main/java/org/apache/iceberg/expressions/Literals.java
@@ -24,7 +24,6 @@
import java.nio.ByteBuffer;
import java.time.Instant;
import java.time.LocalDate;
-import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
@@ -40,6 +39,7 @@
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.ByteBuffers;
+import org.apache.iceberg.util.DateTimeUtil;
import org.apache.iceberg.util.NaNUtil;
class Literals {
@@ -299,6 +299,9 @@ public Literal to(Type type) {
return (Literal) new TimeLiteral(value());
case TIMESTAMP:
return (Literal) new TimestampLiteral(value());
+ case TIMESTAMP_NANO:
+ // assume micros and convert to nanos to match the behavior in the timestamp case above
+ return new TimestampLiteral(value()).to(type);
case DATE:
if ((long) Integer.MAX_VALUE < value()) {
return aboveMax();
@@ -437,11 +440,9 @@ public Literal to(Type type) {
case TIMESTAMP:
return (Literal) this;
case DATE:
- return (Literal)
- new DateLiteral(
- (int)
- ChronoUnit.DAYS.between(
- EPOCH_DAY, EPOCH.plus(value(), ChronoUnit.MICROS).toLocalDate()));
+ return (Literal) new DateLiteral(DateTimeUtil.microsToDays(value()));
+ case TIMESTAMP_NANO:
+ return (Literal) new TimestampNanoLiteral(DateTimeUtil.microsToNanos(value()));
default:
}
return null;
@@ -453,6 +454,32 @@ protected Type.TypeID typeId() {
}
}
+ static class TimestampNanoLiteral extends ComparableLiteral {
+ TimestampNanoLiteral(Long value) {
+ super(value);
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public Literal to(Type type) {
+ switch (type.typeId()) {
+ case DATE:
+ return (Literal) new DateLiteral(DateTimeUtil.nanosToDays(value()));
+ case TIMESTAMP:
+ return (Literal) new TimestampLiteral(DateTimeUtil.nanosToMicros(value()));
+ case TIMESTAMP_NANO:
+ return (Literal) this;
+ default:
+ }
+ return null;
+ }
+
+ @Override
+ protected Type.TypeID typeId() {
+ return Type.TypeID.TIMESTAMP_NANO;
+ }
+ }
+
static class DecimalLiteral extends ComparableLiteral {
DecimalLiteral(BigDecimal value) {
super(value);
@@ -502,19 +529,21 @@ public Literal to(Type type) {
case TIMESTAMP:
if (((Types.TimestampType) type).shouldAdjustToUTC()) {
- long timestampMicros =
- ChronoUnit.MICROS.between(
- EPOCH, OffsetDateTime.parse(value(), DateTimeFormatter.ISO_DATE_TIME));
+ long timestampMicros = DateTimeUtil.isoTimestamptzToMicros(value().toString());
return (Literal) new TimestampLiteral(timestampMicros);
} else {
- long timestampMicros =
- ChronoUnit.MICROS.between(
- EPOCH,
- LocalDateTime.parse(value(), DateTimeFormatter.ISO_LOCAL_DATE_TIME)
- .atOffset(ZoneOffset.UTC));
+ long timestampMicros = DateTimeUtil.isoTimestampToMicros(value().toString());
return (Literal) new TimestampLiteral(timestampMicros);
}
+ case TIMESTAMP_NANO:
+ if (((Types.TimestampNanoType) type).shouldAdjustToUTC()) {
+ return (Literal)
+ new TimestampNanoLiteral(DateTimeUtil.isoTimestamptzToNanos(value()));
+ } else {
+ return (Literal) new TimestampNanoLiteral(DateTimeUtil.isoTimestampToNanos(value()));
+ }
+
case STRING:
return (Literal) this;
diff --git a/api/src/main/java/org/apache/iceberg/expressions/StrictMetricsEvaluator.java b/api/src/main/java/org/apache/iceberg/expressions/StrictMetricsEvaluator.java
index 4aee75c447d3..1a5a884f651a 100644
--- a/api/src/main/java/org/apache/iceberg/expressions/StrictMetricsEvaluator.java
+++ b/api/src/main/java/org/apache/iceberg/expressions/StrictMetricsEvaluator.java
@@ -29,9 +29,7 @@
import org.apache.iceberg.DataFile;
import org.apache.iceberg.Schema;
import org.apache.iceberg.expressions.ExpressionVisitors.BoundExpressionVisitor;
-import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.types.Conversions;
-import org.apache.iceberg.types.Types;
import org.apache.iceberg.types.Types.StructType;
import org.apache.iceberg.util.NaNUtil;
@@ -51,7 +49,6 @@
* checks for NaN is necessary in order to not include files that may contain rows that don't match.
*/
public class StrictMetricsEvaluator {
- private final Schema schema;
private final StructType struct;
private final Expression expr;
@@ -60,7 +57,6 @@ public StrictMetricsEvaluator(Schema schema, Expression unbound) {
}
public StrictMetricsEvaluator(Schema schema, Expression unbound, boolean caseSensitive) {
- this.schema = schema;
this.struct = schema.asStruct();
this.expr = Binder.bind(struct, rewriteNot(unbound), caseSensitive);
}
@@ -144,8 +140,9 @@ public Boolean isNull(BoundReference ref) {
// no need to check whether the field is required because binding evaluates that case
// if the column has any non-null values, the expression does not match
int id = ref.fieldId();
- Preconditions.checkNotNull(
- struct.field(id), "Cannot filter by nested column: %s", schema.findField(id));
+ if (isNestedColumn(id)) {
+ return ROWS_MIGHT_NOT_MATCH;
+ }
if (containsNullsOnly(id)) {
return ROWS_MUST_MATCH;
@@ -159,8 +156,9 @@ public Boolean notNull(BoundReference ref) {
// no need to check whether the field is required because binding evaluates that case
// if the column has any null values, the expression does not match
int id = ref.fieldId();
- Preconditions.checkNotNull(
- struct.field(id), "Cannot filter by nested column: %s", schema.findField(id));
+ if (isNestedColumn(id)) {
+ return ROWS_MIGHT_NOT_MATCH;
+ }
if (nullCounts != null && nullCounts.containsKey(id) && nullCounts.get(id) == 0) {
return ROWS_MUST_MATCH;
@@ -199,15 +197,16 @@ public Boolean notNaN(BoundReference ref) {
public Boolean lt(BoundReference ref, Literal lit) {
// Rows must match when: <----------Min----Max---X------->
Integer id = ref.fieldId();
- Types.NestedField field = struct.field(id);
- Preconditions.checkNotNull(field, "Cannot filter by nested column: %s", schema.findField(id));
+ if (isNestedColumn(id)) {
+ return ROWS_MIGHT_NOT_MATCH;
+ }
if (canContainNulls(id) || canContainNaNs(id)) {
return ROWS_MIGHT_NOT_MATCH;
}
if (upperBounds != null && upperBounds.containsKey(id)) {
- T upper = Conversions.fromByteBuffer(field.type(), upperBounds.get(id));
+ T upper = Conversions.fromByteBuffer(ref.type(), upperBounds.get(id));
int cmp = lit.comparator().compare(upper, lit.value());
if (cmp < 0) {
@@ -222,15 +221,16 @@ public Boolean lt(BoundReference ref, Literal lit) {
public Boolean ltEq(BoundReference ref, Literal lit) {
// Rows must match when: <----------Min----Max---X------->
Integer id = ref.fieldId();
- Types.NestedField field = struct.field(id);
- Preconditions.checkNotNull(field, "Cannot filter by nested column: %s", schema.findField(id));
+ if (isNestedColumn(id)) {
+ return ROWS_MIGHT_NOT_MATCH;
+ }
if (canContainNulls(id) || canContainNaNs(id)) {
return ROWS_MIGHT_NOT_MATCH;
}
if (upperBounds != null && upperBounds.containsKey(id)) {
- T upper = Conversions.fromByteBuffer(field.type(), upperBounds.get(id));
+ T upper = Conversions.fromByteBuffer(ref.type(), upperBounds.get(id));
int cmp = lit.comparator().compare(upper, lit.value());
if (cmp <= 0) {
@@ -245,15 +245,16 @@ public Boolean ltEq(BoundReference ref, Literal lit) {
public Boolean gt(BoundReference ref, Literal lit) {
// Rows must match when: <-------X---Min----Max---------->
Integer id = ref.fieldId();
- Types.NestedField field = struct.field(id);
- Preconditions.checkNotNull(field, "Cannot filter by nested column: %s", schema.findField(id));
+ if (isNestedColumn(id)) {
+ return ROWS_MIGHT_NOT_MATCH;
+ }
if (canContainNulls(id) || canContainNaNs(id)) {
return ROWS_MIGHT_NOT_MATCH;
}
if (lowerBounds != null && lowerBounds.containsKey(id)) {
- T lower = Conversions.fromByteBuffer(field.type(), lowerBounds.get(id));
+ T lower = Conversions.fromByteBuffer(ref.type(), lowerBounds.get(id));
if (NaNUtil.isNaN(lower)) {
// NaN indicates unreliable bounds. See the StrictMetricsEvaluator docs for more.
@@ -273,15 +274,16 @@ public Boolean gt(BoundReference ref, Literal lit) {
public Boolean gtEq(BoundReference ref, Literal lit) {
// Rows must match when: <-------X---Min----Max---------->
Integer id = ref.fieldId();
- Types.NestedField field = struct.field(id);
- Preconditions.checkNotNull(field, "Cannot filter by nested column: %s", schema.findField(id));
+ if (isNestedColumn(id)) {
+ return ROWS_MIGHT_NOT_MATCH;
+ }
if (canContainNulls(id) || canContainNaNs(id)) {
return ROWS_MIGHT_NOT_MATCH;
}
if (lowerBounds != null && lowerBounds.containsKey(id)) {
- T lower = Conversions.fromByteBuffer(field.type(), lowerBounds.get(id));
+ T lower = Conversions.fromByteBuffer(ref.type(), lowerBounds.get(id));
if (NaNUtil.isNaN(lower)) {
// NaN indicates unreliable bounds. See the StrictMetricsEvaluator docs for more.
@@ -301,8 +303,9 @@ public Boolean gtEq(BoundReference ref, Literal lit) {
public Boolean eq(BoundReference ref, Literal lit) {
// Rows must match when Min == X == Max
Integer id = ref.fieldId();
- Types.NestedField field = struct.field(id);
- Preconditions.checkNotNull(field, "Cannot filter by nested column: %s", schema.findField(id));
+ if (isNestedColumn(id)) {
+ return ROWS_MIGHT_NOT_MATCH;
+ }
if (canContainNulls(id) || canContainNaNs(id)) {
return ROWS_MIGHT_NOT_MATCH;
@@ -319,7 +322,7 @@ public Boolean eq(BoundReference ref, Literal lit) {
return ROWS_MIGHT_NOT_MATCH;
}
- T upper = Conversions.fromByteBuffer(field.type(), upperBounds.get(id));
+ T upper = Conversions.fromByteBuffer(ref.type(), upperBounds.get(id));
cmp = lit.comparator().compare(upper, lit.value());
if (cmp != 0) {
@@ -336,8 +339,9 @@ public Boolean eq(BoundReference ref, Literal lit) {
public Boolean notEq(BoundReference ref, Literal lit) {
// Rows must match when X < Min or Max < X because it is not in the range
Integer id = ref.fieldId();
- Types.NestedField field = struct.field(id);
- Preconditions.checkNotNull(field, "Cannot filter by nested column: %s", schema.findField(id));
+ if (isNestedColumn(id)) {
+ return ROWS_MIGHT_NOT_MATCH;
+ }
if (containsNullsOnly(id) || containsNaNsOnly(id)) {
return ROWS_MUST_MATCH;
@@ -358,7 +362,7 @@ public Boolean notEq(BoundReference ref, Literal lit) {
}
if (upperBounds != null && upperBounds.containsKey(id)) {
- T upper = Conversions.fromByteBuffer(field.type(), upperBounds.get(id));
+ T upper = Conversions.fromByteBuffer(ref.type(), upperBounds.get(id));
int cmp = lit.comparator().compare(upper, lit.value());
if (cmp < 0) {
@@ -372,8 +376,9 @@ public Boolean notEq(BoundReference ref, Literal lit) {
@Override
public Boolean in(BoundReference ref, Set literalSet) {
Integer id = ref.fieldId();
- Types.NestedField field = struct.field(id);
- Preconditions.checkNotNull(field, "Cannot filter by nested column: %s", schema.findField(id));
+ if (isNestedColumn(id)) {
+ return ROWS_MIGHT_NOT_MATCH;
+ }
if (canContainNulls(id) || canContainNaNs(id)) {
return ROWS_MIGHT_NOT_MATCH;
@@ -390,7 +395,7 @@ public Boolean in(BoundReference ref, Set literalSet) {
}
// check if the upper bound is in the set
- T upper = Conversions.fromByteBuffer(field.type(), upperBounds.get(id));
+ T upper = Conversions.fromByteBuffer(ref.type(), upperBounds.get(id));
if (!literalSet.contains(upper)) {
return ROWS_MIGHT_NOT_MATCH;
}
@@ -411,8 +416,9 @@ public Boolean in(BoundReference ref, Set literalSet) {
@Override
public Boolean notIn(BoundReference ref, Set literalSet) {
Integer id = ref.fieldId();
- Types.NestedField field = struct.field(id);
- Preconditions.checkNotNull(field, "Cannot filter by nested column: %s", schema.findField(id));
+ if (isNestedColumn(id)) {
+ return ROWS_MIGHT_NOT_MATCH;
+ }
if (containsNullsOnly(id) || containsNaNsOnly(id)) {
return ROWS_MUST_MATCH;
@@ -439,7 +445,7 @@ public Boolean notIn(BoundReference ref, Set literalSet) {
}
if (upperBounds != null && upperBounds.containsKey(id)) {
- T upper = Conversions.fromByteBuffer(field.type(), upperBounds.get(id));
+ T upper = Conversions.fromByteBuffer(ref.type(), upperBounds.get(id));
literals =
literals.stream()
.filter(v -> ref.comparator().compare(upper, v) >= 0)
@@ -466,6 +472,10 @@ public Boolean notStartsWith(BoundReference ref, Literal lit) {
return ROWS_MIGHT_NOT_MATCH;
}
+ private boolean isNestedColumn(int id) {
+ return struct.field(id) == null;
+ }
+
private boolean canContainNulls(Integer id) {
return nullCounts == null || (nullCounts.containsKey(id) && nullCounts.get(id) > 0);
}
diff --git a/api/src/main/java/org/apache/iceberg/io/BulkDeletionFailureException.java b/api/src/main/java/org/apache/iceberg/io/BulkDeletionFailureException.java
index 535be5f64ec8..4f89d462fe94 100644
--- a/api/src/main/java/org/apache/iceberg/io/BulkDeletionFailureException.java
+++ b/api/src/main/java/org/apache/iceberg/io/BulkDeletionFailureException.java
@@ -18,11 +18,13 @@
*/
package org.apache.iceberg.io;
+import java.util.Locale;
+
public class BulkDeletionFailureException extends RuntimeException {
private final int numberFailedObjects;
public BulkDeletionFailureException(int numberFailedObjects) {
- super(String.format("Failed to delete %d files", numberFailedObjects));
+ super(String.format(Locale.ROOT, "Failed to delete %d files", numberFailedObjects));
this.numberFailedObjects = numberFailedObjects;
}
diff --git a/api/src/main/java/org/apache/iceberg/transforms/Bucket.java b/api/src/main/java/org/apache/iceberg/transforms/Bucket.java
index 912bcd271725..0e4e782cc110 100644
--- a/api/src/main/java/org/apache/iceberg/transforms/Bucket.java
+++ b/api/src/main/java/org/apache/iceberg/transforms/Bucket.java
@@ -33,6 +33,7 @@
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.BucketUtil;
+import org.apache.iceberg.util.DateTimeUtil;
import org.apache.iceberg.util.SerializableFunction;
class Bucket implements Transform, Serializable {
@@ -63,6 +64,8 @@ static & SerializableFunction> B get(
case FIXED:
case BINARY:
return (B) new BucketByteBuffer(numBuckets);
+ case TIMESTAMP_NANO:
+ return (B) new BucketTimestampNano(numBuckets);
case UUID:
return (B) new BucketUUID(numBuckets);
default:
@@ -107,6 +110,7 @@ public boolean canTransform(Type type) {
case DATE:
case TIME:
case TIMESTAMP:
+ case TIMESTAMP_NANO:
case STRING:
case BINARY:
case FIXED:
@@ -214,6 +218,20 @@ protected int hash(Long value) {
}
}
+ // In order to bucket TimestampNano the same as Timestamp, convert to micros before hashing.
+ private static class BucketTimestampNano extends Bucket
+ implements SerializableFunction {
+
+ private BucketTimestampNano(int numBuckets) {
+ super(numBuckets);
+ }
+
+ @Override
+ protected int hash(Long nanos) {
+ return BucketUtil.hash(DateTimeUtil.nanosToMicros(nanos));
+ }
+ }
+
private static class BucketString extends Bucket
implements SerializableFunction {
diff --git a/api/src/main/java/org/apache/iceberg/transforms/Dates.java b/api/src/main/java/org/apache/iceberg/transforms/Dates.java
index 3d26b542be7b..88db16797867 100644
--- a/api/src/main/java/org/apache/iceberg/transforms/Dates.java
+++ b/api/src/main/java/org/apache/iceberg/transforms/Dates.java
@@ -97,6 +97,10 @@ public Type getResultType(Type sourceType) {
return Types.IntegerType.get();
}
+ ChronoUnit granularity() {
+ return granularity;
+ }
+
@Override
public boolean preservesOrder() {
return true;
@@ -109,11 +113,11 @@ public boolean satisfiesOrderOf(Transform, ?> other) {
}
if (other instanceof Dates) {
- // test the granularity, in days. day(ts) => 1 day, months(ts) => 30 days, and day satisfies
- // the order of months
- Dates otherTransform = (Dates) other;
- return granularity.getDuration().toDays()
- <= otherTransform.granularity.getDuration().toDays();
+ return TransformUtil.satisfiesOrderOf(granularity, ((Dates) other).granularity());
+ } else if (other instanceof Timestamps) {
+ return TransformUtil.satisfiesOrderOf(granularity, ((Timestamps) other).granularity());
+ } else if (other instanceof TimeTransform) {
+ return TransformUtil.satisfiesOrderOf(granularity, ((TimeTransform>) other).granularity());
}
return false;
diff --git a/api/src/main/java/org/apache/iceberg/transforms/Days.java b/api/src/main/java/org/apache/iceberg/transforms/Days.java
index f69d5d6110ed..e2b829b86662 100644
--- a/api/src/main/java/org/apache/iceberg/transforms/Days.java
+++ b/api/src/main/java/org/apache/iceberg/transforms/Days.java
@@ -19,6 +19,7 @@
package org.apache.iceberg.transforms;
import java.io.ObjectStreamException;
+import java.time.temporal.ChronoUnit;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
@@ -31,38 +32,19 @@ static Days get() {
}
@Override
- @SuppressWarnings("unchecked")
- protected Transform toEnum(Type type) {
- switch (type.typeId()) {
- case DATE:
- return (Transform) Dates.DAY;
- case TIMESTAMP:
- return (Transform) Timestamps.DAY;
- default:
- throw new IllegalArgumentException("Unsupported type: " + type);
- }
+ protected ChronoUnit granularity() {
+ return ChronoUnit.DAYS;
}
@Override
- public Type getResultType(Type sourceType) {
- return Types.DateType.get();
+ protected Transform toEnum(Type type) {
+ return (Transform)
+ fromSourceType(type, Dates.DAY, Timestamps.MICROS_TO_DAY, Timestamps.NANOS_TO_DAY);
}
@Override
- public boolean satisfiesOrderOf(Transform, ?> other) {
- if (this == other) {
- return true;
- }
-
- if (other instanceof Timestamps) {
- return Timestamps.DAY.satisfiesOrderOf(other);
- } else if (other instanceof Dates) {
- return Dates.DAY.satisfiesOrderOf(other);
- } else if (other instanceof Days || other instanceof Months || other instanceof Years) {
- return true;
- }
-
- return false;
+ public Type getResultType(Type sourceType) {
+ return Types.DateType.get();
}
@Override
diff --git a/api/src/main/java/org/apache/iceberg/transforms/Hours.java b/api/src/main/java/org/apache/iceberg/transforms/Hours.java
index afc14516f3cd..2ff79f6a66a7 100644
--- a/api/src/main/java/org/apache/iceberg/transforms/Hours.java
+++ b/api/src/main/java/org/apache/iceberg/transforms/Hours.java
@@ -19,6 +19,7 @@
package org.apache.iceberg.transforms;
import java.io.ObjectStreamException;
+import java.time.temporal.ChronoUnit;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
@@ -30,19 +31,21 @@ static Hours get() {
return (Hours) INSTANCE;
}
+ @Override
+ protected ChronoUnit granularity() {
+ return ChronoUnit.HOURS;
+ }
+
@Override
@SuppressWarnings("unchecked")
protected Transform toEnum(Type type) {
- if (type.typeId() == Type.TypeID.TIMESTAMP) {
- return (Transform) Timestamps.HOUR;
- }
-
- throw new IllegalArgumentException("Unsupported type: " + type);
+ return (Transform)
+ fromSourceType(type, null, Timestamps.MICROS_TO_HOUR, Timestamps.NANOS_TO_HOUR);
}
@Override
public boolean canTransform(Type type) {
- return type.typeId() == Type.TypeID.TIMESTAMP;
+ return type.typeId() == Type.TypeID.TIMESTAMP || type.typeId() == Type.TypeID.TIMESTAMP_NANO;
}
@Override
@@ -50,24 +53,6 @@ public Type getResultType(Type sourceType) {
return Types.IntegerType.get();
}
- @Override
- public boolean satisfiesOrderOf(Transform, ?> other) {
- if (this == other) {
- return true;
- }
-
- if (other instanceof Timestamps) {
- return other == Timestamps.HOUR;
- } else if (other instanceof Hours
- || other instanceof Days
- || other instanceof Months
- || other instanceof Years) {
- return true;
- }
-
- return false;
- }
-
@Override
public String toHumanString(Type alwaysInt, Integer value) {
return value != null ? TransformUtil.humanHour(value) : "null";
diff --git a/api/src/main/java/org/apache/iceberg/transforms/Months.java b/api/src/main/java/org/apache/iceberg/transforms/Months.java
index 8fa4d42385f7..73ec50e5dd9a 100644
--- a/api/src/main/java/org/apache/iceberg/transforms/Months.java
+++ b/api/src/main/java/org/apache/iceberg/transforms/Months.java
@@ -19,6 +19,7 @@
package org.apache.iceberg.transforms;
import java.io.ObjectStreamException;
+import java.time.temporal.ChronoUnit;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
@@ -31,38 +32,19 @@ static Months get() {
}
@Override
- @SuppressWarnings("unchecked")
- protected Transform toEnum(Type type) {
- switch (type.typeId()) {
- case DATE:
- return (Transform) Dates.MONTH;
- case TIMESTAMP:
- return (Transform) Timestamps.MONTH;
- default:
- throw new IllegalArgumentException("Unsupported type: " + type);
- }
+ protected ChronoUnit granularity() {
+ return ChronoUnit.MONTHS;
}
@Override
- public Type getResultType(Type sourceType) {
- return Types.IntegerType.get();
+ protected Transform toEnum(Type type) {
+ return (Transform)
+ fromSourceType(type, Dates.MONTH, Timestamps.MICROS_TO_MONTH, Timestamps.NANOS_TO_MONTH);
}
@Override
- public boolean satisfiesOrderOf(Transform, ?> other) {
- if (this == other) {
- return true;
- }
-
- if (other instanceof Timestamps) {
- return Timestamps.MONTH.satisfiesOrderOf(other);
- } else if (other instanceof Dates) {
- return Dates.MONTH.satisfiesOrderOf(other);
- } else if (other instanceof Months || other instanceof Years) {
- return true;
- }
-
- return false;
+ public Type getResultType(Type sourceType) {
+ return Types.IntegerType.get();
}
@Override
diff --git a/api/src/main/java/org/apache/iceberg/transforms/PartitionSpecVisitor.java b/api/src/main/java/org/apache/iceberg/transforms/PartitionSpecVisitor.java
index e4796478bf28..0d80ef88a296 100644
--- a/api/src/main/java/org/apache/iceberg/transforms/PartitionSpecVisitor.java
+++ b/api/src/main/java/org/apache/iceberg/transforms/PartitionSpecVisitor.java
@@ -122,16 +122,23 @@ static R visit(Schema schema, PartitionField field, PartitionSpecVisitor
int width = ((Truncate>) transform).width();
return visitor.truncate(field.fieldId(), sourceName, field.sourceId(), width);
} else if (transform == Dates.YEAR
- || transform == Timestamps.YEAR
+ || transform == Timestamps.MICROS_TO_YEAR
+ || transform == Timestamps.NANOS_TO_YEAR
|| transform instanceof Years) {
return visitor.year(field.fieldId(), sourceName, field.sourceId());
} else if (transform == Dates.MONTH
- || transform == Timestamps.MONTH
+ || transform == Timestamps.MICROS_TO_MONTH
+ || transform == Timestamps.NANOS_TO_MONTH
|| transform instanceof Months) {
return visitor.month(field.fieldId(), sourceName, field.sourceId());
- } else if (transform == Dates.DAY || transform == Timestamps.DAY || transform instanceof Days) {
+ } else if (transform == Dates.DAY
+ || transform == Timestamps.MICROS_TO_DAY
+ || transform == Timestamps.NANOS_TO_DAY
+ || transform instanceof Days) {
return visitor.day(field.fieldId(), sourceName, field.sourceId());
- } else if (transform == Timestamps.HOUR || transform instanceof Hours) {
+ } else if (transform == Timestamps.MICROS_TO_HOUR
+ || transform == Timestamps.NANOS_TO_HOUR
+ || transform instanceof Hours) {
return visitor.hour(field.fieldId(), sourceName, field.sourceId());
} else if (transform instanceof VoidTransform) {
return visitor.alwaysNull(field.fieldId(), sourceName, field.sourceId());
diff --git a/api/src/main/java/org/apache/iceberg/transforms/SortOrderVisitor.java b/api/src/main/java/org/apache/iceberg/transforms/SortOrderVisitor.java
index 680e095270fb..62cc9d3cdb33 100644
--- a/api/src/main/java/org/apache/iceberg/transforms/SortOrderVisitor.java
+++ b/api/src/main/java/org/apache/iceberg/transforms/SortOrderVisitor.java
@@ -85,21 +85,26 @@ static List visit(SortOrder sortOrder, SortOrderVisitor visitor) {
visitor.truncate(
sourceName, field.sourceId(), width, field.direction(), field.nullOrder()));
} else if (transform == Dates.YEAR
- || transform == Timestamps.YEAR
+ || transform == Timestamps.MICROS_TO_YEAR
+ || transform == Timestamps.NANOS_TO_YEAR
|| transform instanceof Years) {
results.add(
visitor.year(sourceName, field.sourceId(), field.direction(), field.nullOrder()));
} else if (transform == Dates.MONTH
- || transform == Timestamps.MONTH
+ || transform == Timestamps.MICROS_TO_MONTH
+ || transform == Timestamps.NANOS_TO_MONTH
|| transform instanceof Months) {
results.add(
visitor.month(sourceName, field.sourceId(), field.direction(), field.nullOrder()));
} else if (transform == Dates.DAY
- || transform == Timestamps.DAY
+ || transform == Timestamps.MICROS_TO_DAY
+ || transform == Timestamps.NANOS_TO_DAY
|| transform instanceof Days) {
results.add(
visitor.day(sourceName, field.sourceId(), field.direction(), field.nullOrder()));
- } else if (transform == Timestamps.HOUR || transform instanceof Hours) {
+ } else if (transform == Timestamps.MICROS_TO_HOUR
+ || transform == Timestamps.NANOS_TO_HOUR
+ || transform instanceof Hours) {
results.add(
visitor.hour(sourceName, field.sourceId(), field.direction(), field.nullOrder()));
} else if (transform instanceof UnknownTransform) {
diff --git a/api/src/main/java/org/apache/iceberg/transforms/TimeTransform.java b/api/src/main/java/org/apache/iceberg/transforms/TimeTransform.java
index 01ea8130aa60..c348fda52b02 100644
--- a/api/src/main/java/org/apache/iceberg/transforms/TimeTransform.java
+++ b/api/src/main/java/org/apache/iceberg/transforms/TimeTransform.java
@@ -18,6 +18,7 @@
*/
package org.apache.iceberg.transforms;
+import java.time.temporal.ChronoUnit;
import org.apache.iceberg.expressions.BoundPredicate;
import org.apache.iceberg.expressions.BoundTransform;
import org.apache.iceberg.expressions.UnboundPredicate;
@@ -25,6 +26,24 @@
import org.apache.iceberg.util.SerializableFunction;
abstract class TimeTransform implements Transform {
+ protected static R fromSourceType(Type type, R dateResult, R microsResult, R nanosResult) {
+ switch (type.typeId()) {
+ case DATE:
+ if (dateResult != null) {
+ return dateResult;
+ }
+ break;
+ case TIMESTAMP:
+ return microsResult;
+ case TIMESTAMP_NANO:
+ return nanosResult;
+ }
+
+ throw new IllegalArgumentException("Unsupported type: " + type);
+ }
+
+ protected abstract ChronoUnit granularity();
+
protected abstract Transform toEnum(Type type);
@Override
@@ -37,9 +56,29 @@ public boolean preservesOrder() {
return true;
}
+ @Override
+ public boolean satisfiesOrderOf(Transform, ?> other) {
+ if (this == other) {
+ return true;
+ }
+
+ if (other instanceof Dates) {
+ return TransformUtil.satisfiesOrderOf(granularity(), ((Dates) other).granularity());
+ } else if (other instanceof Timestamps) {
+ return TransformUtil.satisfiesOrderOf(granularity(), ((Timestamps) other).granularity());
+ } else if (other instanceof TimeTransform) {
+ return TransformUtil.satisfiesOrderOf(
+ granularity(), ((TimeTransform>) other).granularity());
+ }
+
+ return false;
+ }
+
@Override
public boolean canTransform(Type type) {
- return type.typeId() == Type.TypeID.DATE || type.typeId() == Type.TypeID.TIMESTAMP;
+ return type.typeId() == Type.TypeID.DATE
+ || type.typeId() == Type.TypeID.TIMESTAMP
+ || type.typeId() == Type.TypeID.TIMESTAMP_NANO;
}
@Override
diff --git a/api/src/main/java/org/apache/iceberg/transforms/Timestamps.java b/api/src/main/java/org/apache/iceberg/transforms/Timestamps.java
index b5b50e9d42b2..8b8c2ca0a96b 100644
--- a/api/src/main/java/org/apache/iceberg/transforms/Timestamps.java
+++ b/api/src/main/java/org/apache/iceberg/transforms/Timestamps.java
@@ -32,53 +32,29 @@
import org.apache.iceberg.util.SerializableFunction;
enum Timestamps implements Transform {
- YEAR(ChronoUnit.YEARS, "year"),
- MONTH(ChronoUnit.MONTHS, "month"),
- DAY(ChronoUnit.DAYS, "day"),
- HOUR(ChronoUnit.HOURS, "hour");
+ MICROS_TO_YEAR(ChronoUnit.YEARS, "year", MicrosToYears.INSTANCE),
+ MICROS_TO_MONTH(ChronoUnit.MONTHS, "month", MicrosToMonths.INSTANCE),
+ MICROS_TO_DAY(ChronoUnit.DAYS, "day", MicrosToDays.INSTANCE),
+ MICROS_TO_HOUR(ChronoUnit.HOURS, "hour", MicrosToHours.INSTANCE),
- @Immutable
- static class Apply implements SerializableFunction {
- private final ChronoUnit granularity;
-
- Apply(ChronoUnit granularity) {
- this.granularity = granularity;
- }
-
- @Override
- public Integer apply(Long timestampMicros) {
- if (timestampMicros == null) {
- return null;
- }
-
- switch (granularity) {
- case YEARS:
- return DateTimeUtil.microsToYears(timestampMicros);
- case MONTHS:
- return DateTimeUtil.microsToMonths(timestampMicros);
- case DAYS:
- return DateTimeUtil.microsToDays(timestampMicros);
- case HOURS:
- return DateTimeUtil.microsToHours(timestampMicros);
- default:
- throw new UnsupportedOperationException("Unsupported time unit: " + granularity);
- }
- }
- }
+ NANOS_TO_YEAR(ChronoUnit.YEARS, "year", NanosToYears.INSTANCE),
+ NANOS_TO_MONTH(ChronoUnit.MONTHS, "month", NanosToMonths.INSTANCE),
+ NANOS_TO_DAY(ChronoUnit.DAYS, "day", NanosToDays.INSTANCE),
+ NANOS_TO_HOUR(ChronoUnit.HOURS, "hour", NanosToHours.INSTANCE);
private final ChronoUnit granularity;
private final String name;
- private final Apply apply;
+ private final SerializableFunction apply;
- Timestamps(ChronoUnit granularity, String name) {
- this.granularity = granularity;
+ Timestamps(ChronoUnit granularity, String name, SerializableFunction apply) {
this.name = name;
- this.apply = new Apply(granularity);
+ this.granularity = granularity;
+ this.apply = apply;
}
@Override
- public Integer apply(Long timestampMicros) {
- return apply.apply(timestampMicros);
+ public Integer apply(Long timestamp) {
+ return apply.apply(timestamp);
}
@Override
@@ -89,7 +65,7 @@ public SerializableFunction bind(Type type) {
@Override
public boolean canTransform(Type type) {
- return type.typeId() == Type.TypeID.TIMESTAMP;
+ return type.typeId() == Type.TypeID.TIMESTAMP || type.typeId() == Type.TypeID.TIMESTAMP_NANO;
}
@Override
@@ -100,6 +76,10 @@ public Type getResultType(Type sourceType) {
return Types.IntegerType.get();
}
+ ChronoUnit granularity() {
+ return granularity;
+ }
+
@Override
public boolean preservesOrder() {
return true;
@@ -111,12 +91,12 @@ public boolean satisfiesOrderOf(Transform, ?> other) {
return true;
}
- if (other instanceof Timestamps) {
- // test the granularity, in hours. hour(ts) => 1 hour, day(ts) => 24 hours, and hour satisfies
- // the order of day
- Timestamps otherTransform = (Timestamps) other;
- return granularity.getDuration().toHours()
- <= otherTransform.granularity.getDuration().toHours();
+ if (other instanceof Dates) {
+ return TransformUtil.satisfiesOrderOf(granularity, ((Dates) other).granularity());
+ } else if (other instanceof Timestamps) {
+ return TransformUtil.satisfiesOrderOf(granularity, ((Timestamps) other).granularity());
+ } else if (other instanceof TimeTransform) {
+ return TransformUtil.satisfiesOrderOf(granularity, ((TimeTransform>) other).granularity());
}
return false;
@@ -197,4 +177,116 @@ public String toString() {
public String dedupName() {
return "time";
}
+
+ @Immutable
+ static class MicrosToYears implements SerializableFunction {
+ static final MicrosToYears INSTANCE = new MicrosToYears();
+
+ @Override
+ public Integer apply(Long micros) {
+ if (micros == null) {
+ return null;
+ }
+
+ return DateTimeUtil.microsToYears(micros);
+ }
+ }
+
+ @Immutable
+ static class MicrosToMonths implements SerializableFunction {
+ static final MicrosToMonths INSTANCE = new MicrosToMonths();
+
+ @Override
+ public Integer apply(Long micros) {
+ if (micros == null) {
+ return null;
+ }
+
+ return DateTimeUtil.microsToMonths(micros);
+ }
+ }
+
+ @Immutable
+ static class MicrosToDays implements SerializableFunction {
+ static final MicrosToDays INSTANCE = new MicrosToDays();
+
+ @Override
+ public Integer apply(Long micros) {
+ if (micros == null) {
+ return null;
+ }
+
+ return DateTimeUtil.microsToDays(micros);
+ }
+ }
+
+ @Immutable
+ static class MicrosToHours implements SerializableFunction {
+ static final MicrosToHours INSTANCE = new MicrosToHours();
+
+ @Override
+ public Integer apply(Long micros) {
+ if (micros == null) {
+ return null;
+ }
+
+ return DateTimeUtil.microsToHours(micros);
+ }
+ }
+
+ @Immutable
+ static class NanosToYears implements SerializableFunction {
+ static final NanosToYears INSTANCE = new NanosToYears();
+
+ @Override
+ public Integer apply(Long nanos) {
+ if (nanos == null) {
+ return null;
+ }
+
+ return DateTimeUtil.nanosToYears(nanos);
+ }
+ }
+
+ @Immutable
+ static class NanosToMonths implements SerializableFunction {
+ static final NanosToMonths INSTANCE = new NanosToMonths();
+
+ @Override
+ public Integer apply(Long nanos) {
+ if (nanos == null) {
+ return null;
+ }
+
+ return DateTimeUtil.nanosToMonths(nanos);
+ }
+ }
+
+ @Immutable
+ static class NanosToDays implements SerializableFunction {
+ static final NanosToDays INSTANCE = new NanosToDays();
+
+ @Override
+ public Integer apply(Long nanos) {
+ if (nanos == null) {
+ return null;
+ }
+
+ return DateTimeUtil.nanosToDays(nanos);
+ }
+ }
+
+ @Immutable
+ static class NanosToHours implements SerializableFunction {
+ static final NanosToHours INSTANCE = new NanosToHours();
+
+ @Override
+ public Integer apply(Long nanos) {
+ if (nanos == null) {
+ return null;
+ }
+
+ return DateTimeUtil.nanosToHours(nanos);
+ }
+ }
}
diff --git a/api/src/main/java/org/apache/iceberg/transforms/Transform.java b/api/src/main/java/org/apache/iceberg/transforms/Transform.java
index 5a56b672b1b1..78312b58b12f 100644
--- a/api/src/main/java/org/apache/iceberg/transforms/Transform.java
+++ b/api/src/main/java/org/apache/iceberg/transforms/Transform.java
@@ -181,6 +181,12 @@ default String toHumanString(Type type, T value) {
} else {
return TransformUtil.humanTimestampWithoutZone((Long) value);
}
+ case TIMESTAMP_NANO:
+ if (((Types.TimestampNanoType) type).shouldAdjustToUTC()) {
+ return TransformUtil.humanTimestampNanoWithZone((Long) value);
+ } else {
+ return TransformUtil.humanTimestampNanoWithoutZone((Long) value);
+ }
case FIXED:
case BINARY:
if (value instanceof ByteBuffer) {
diff --git a/api/src/main/java/org/apache/iceberg/transforms/TransformUtil.java b/api/src/main/java/org/apache/iceberg/transforms/TransformUtil.java
index 53bc23a49888..710019225e09 100644
--- a/api/src/main/java/org/apache/iceberg/transforms/TransformUtil.java
+++ b/api/src/main/java/org/apache/iceberg/transforms/TransformUtil.java
@@ -26,6 +26,8 @@
import java.time.ZoneOffset;
import java.time.temporal.ChronoUnit;
import java.util.Base64;
+import java.util.Locale;
+import org.apache.iceberg.util.DateTimeUtil;
class TransformUtil {
@@ -35,19 +37,25 @@ private TransformUtil() {}
private static final int EPOCH_YEAR = EPOCH.getYear();
static String humanYear(int yearOrdinal) {
- return String.format("%04d", EPOCH_YEAR + yearOrdinal);
+ return String.format(Locale.ROOT, "%04d", EPOCH_YEAR + yearOrdinal);
}
static String humanMonth(int monthOrdinal) {
return String.format(
+ Locale.ROOT,
"%04d-%02d",
- EPOCH_YEAR + Math.floorDiv(monthOrdinal, 12), 1 + Math.floorMod(monthOrdinal, 12));
+ EPOCH_YEAR + Math.floorDiv(monthOrdinal, 12),
+ 1 + Math.floorMod(monthOrdinal, 12));
}
static String humanDay(int dayOrdinal) {
OffsetDateTime day = EPOCH.plusDays(dayOrdinal);
return String.format(
- "%04d-%02d-%02d", day.getYear(), day.getMonth().getValue(), day.getDayOfMonth());
+ Locale.ROOT,
+ "%04d-%02d-%02d",
+ day.getYear(),
+ day.getMonth().getValue(),
+ day.getDayOfMonth());
}
static String humanTime(Long microsFromMidnight) {
@@ -55,22 +63,40 @@ static String humanTime(Long microsFromMidnight) {
}
static String humanTimestampWithZone(Long timestampMicros) {
- return ChronoUnit.MICROS.addTo(EPOCH, timestampMicros).toString();
+ return DateTimeUtil.microsToIsoTimestamptz(timestampMicros);
}
static String humanTimestampWithoutZone(Long timestampMicros) {
- return ChronoUnit.MICROS.addTo(EPOCH, timestampMicros).toLocalDateTime().toString();
+ return DateTimeUtil.microsToIsoTimestamp(timestampMicros);
+ }
+
+ static String humanTimestampNanoWithZone(Long timestampNanos) {
+ return DateTimeUtil.nanosToIsoTimestamptz(timestampNanos);
+ }
+
+ static String humanTimestampNanoWithoutZone(Long timestampNanos) {
+ return DateTimeUtil.nanosToIsoTimestamp(timestampNanos);
}
static String humanHour(int hourOrdinal) {
OffsetDateTime time = EPOCH.plusHours(hourOrdinal);
return String.format(
+ Locale.ROOT,
"%04d-%02d-%02d-%02d",
- time.getYear(), time.getMonth().getValue(), time.getDayOfMonth(), time.getHour());
+ time.getYear(),
+ time.getMonth().getValue(),
+ time.getDayOfMonth(),
+ time.getHour());
}
static String base64encode(ByteBuffer buffer) {
// use direct encoding because all of the encoded bytes are in ASCII
return StandardCharsets.ISO_8859_1.decode(Base64.getEncoder().encode(buffer)).toString();
}
+
+ static boolean satisfiesOrderOf(ChronoUnit leftGranularity, ChronoUnit rightGranularity) {
+ // test the granularity, in hours. hour(ts) => 1 hour, day(ts) => 24 hours, and hour satisfies
+ // the order of day
+ return leftGranularity.getDuration().toHours() <= rightGranularity.getDuration().toHours();
+ }
}
diff --git a/api/src/main/java/org/apache/iceberg/transforms/Transforms.java b/api/src/main/java/org/apache/iceberg/transforms/Transforms.java
index a1ce33ddd6da..aacd4d430069 100644
--- a/api/src/main/java/org/apache/iceberg/transforms/Transforms.java
+++ b/api/src/main/java/org/apache/iceberg/transforms/Transforms.java
@@ -23,7 +23,6 @@
import java.util.regex.Pattern;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
-import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.types.Type;
/**
@@ -68,6 +67,10 @@ private Transforms() {}
return new UnknownTransform<>(transform);
}
+ /**
+ * @deprecated use {@link #identity()} instead; will be removed in 2.0.0
+ */
+ @Deprecated
public static Transform, ?> fromString(Type type, String transform) {
Matcher widthMatcher = HAS_WIDTH.matcher(transform);
if (widthMatcher.matches()) {
@@ -80,22 +83,20 @@ private Transforms() {}
}
}
- if (transform.equalsIgnoreCase("identity")) {
- return Identity.get(type);
- }
-
- try {
- if (type.typeId() == Type.TypeID.TIMESTAMP) {
- return Timestamps.valueOf(transform.toUpperCase(Locale.ENGLISH));
- } else if (type.typeId() == Type.TypeID.DATE) {
- return Dates.valueOf(transform.toUpperCase(Locale.ENGLISH));
- }
- } catch (IllegalArgumentException ignored) {
- // fall through to return unknown transform
- }
-
- if (transform.equalsIgnoreCase("void")) {
- return VoidTransform.get();
+ String lowerTransform = transform.toLowerCase(Locale.ENGLISH);
+ switch (lowerTransform) {
+ case "identity":
+ return Identity.get(type);
+ case "year":
+ return Years.get().toEnum(type);
+ case "month":
+ return Months.get().toEnum(type);
+ case "day":
+ return Days.get().toEnum(type);
+ case "hour":
+ return Hours.get().toEnum(type);
+ case "void":
+ return VoidTransform.get();
}
return new UnknownTransform<>(transform);
@@ -125,14 +126,7 @@ public static Transform identity(Type type) {
@Deprecated
@SuppressWarnings("unchecked")
public static Transform year(Type type) {
- switch (type.typeId()) {
- case DATE:
- return (Transform) Dates.YEAR;
- case TIMESTAMP:
- return (Transform) Timestamps.YEAR;
- default:
- throw new IllegalArgumentException("Cannot partition type " + type + " by year");
- }
+ return (Transform) Years.get().toEnum(type);
}
/**
@@ -146,14 +140,7 @@ public static Transform year(Type type) {
@Deprecated
@SuppressWarnings("unchecked")
public static Transform month(Type type) {
- switch (type.typeId()) {
- case DATE:
- return (Transform) Dates.MONTH;
- case TIMESTAMP:
- return (Transform) Timestamps.MONTH;
- default:
- throw new IllegalArgumentException("Cannot partition type " + type + " by month");
- }
+ return (Transform) Months.get().toEnum(type);
}
/**
@@ -167,14 +154,7 @@ public static Transform month(Type type) {
@Deprecated
@SuppressWarnings("unchecked")
public static Transform day(Type type) {
- switch (type.typeId()) {
- case DATE:
- return (Transform) Dates.DAY;
- case TIMESTAMP:
- return (Transform) Timestamps.DAY;
- default:
- throw new IllegalArgumentException("Cannot partition type " + type + " by day");
- }
+ return (Transform) Days.get().toEnum(type);
}
/**
@@ -188,9 +168,7 @@ public static Transform day(Type type) {
@Deprecated
@SuppressWarnings("unchecked")
public static Transform hour(Type type) {
- Preconditions.checkArgument(
- type.typeId() == Type.TypeID.TIMESTAMP, "Cannot partition type %s by hour", type);
- return (Transform) Timestamps.HOUR;
+ return (Transform) Hours.get().toEnum(type);
}
/**
diff --git a/api/src/main/java/org/apache/iceberg/transforms/Truncate.java b/api/src/main/java/org/apache/iceberg/transforms/Truncate.java
index 670c6002a97a..a111e4ca394b 100644
--- a/api/src/main/java/org/apache/iceberg/transforms/Truncate.java
+++ b/api/src/main/java/org/apache/iceberg/transforms/Truncate.java
@@ -43,7 +43,9 @@ static Truncate get(int width) {
return new Truncate<>(width);
}
- /** @deprecated will be removed in 2.0.0 */
+ /**
+ * @deprecated will be removed in 2.0.0
+ */
@Deprecated
@SuppressWarnings("unchecked")
static & SerializableFunction> R get(Type type, int width) {
diff --git a/api/src/main/java/org/apache/iceberg/transforms/Years.java b/api/src/main/java/org/apache/iceberg/transforms/Years.java
index 6c1eee578506..2920a37dc692 100644
--- a/api/src/main/java/org/apache/iceberg/transforms/Years.java
+++ b/api/src/main/java/org/apache/iceberg/transforms/Years.java
@@ -19,6 +19,7 @@
package org.apache.iceberg.transforms;
import java.io.ObjectStreamException;
+import java.time.temporal.ChronoUnit;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
@@ -31,38 +32,19 @@ static Years get() {
}
@Override
- @SuppressWarnings("unchecked")
- protected Transform toEnum(Type type) {
- switch (type.typeId()) {
- case DATE:
- return (Transform) Dates.YEAR;
- case TIMESTAMP:
- return (Transform) Timestamps.YEAR;
- default:
- throw new IllegalArgumentException("Unsupported type: " + type);
- }
+ protected ChronoUnit granularity() {
+ return ChronoUnit.YEARS;
}
@Override
- public Type getResultType(Type sourceType) {
- return Types.IntegerType.get();
+ protected Transform toEnum(Type type) {
+ return (Transform)
+ fromSourceType(type, Dates.YEAR, Timestamps.MICROS_TO_YEAR, Timestamps.NANOS_TO_YEAR);
}
@Override
- public boolean satisfiesOrderOf(Transform, ?> other) {
- if (this == other) {
- return true;
- }
-
- if (other instanceof Timestamps) {
- return Timestamps.YEAR.satisfiesOrderOf(other);
- } else if (other instanceof Dates) {
- return Dates.YEAR.satisfiesOrderOf(other);
- } else if (other instanceof Years) {
- return true;
- }
-
- return false;
+ public Type getResultType(Type sourceType) {
+ return Types.IntegerType.get();
}
@Override
diff --git a/api/src/main/java/org/apache/iceberg/types/Comparators.java b/api/src/main/java/org/apache/iceberg/types/Comparators.java
index a803afac104f..98416c6943db 100644
--- a/api/src/main/java/org/apache/iceberg/types/Comparators.java
+++ b/api/src/main/java/org/apache/iceberg/types/Comparators.java
@@ -41,6 +41,8 @@ private Comparators() {}
.put(Types.TimeType.get(), Comparator.naturalOrder())
.put(Types.TimestampType.withZone(), Comparator.naturalOrder())
.put(Types.TimestampType.withoutZone(), Comparator.naturalOrder())
+ .put(Types.TimestampNanoType.withZone(), Comparator.naturalOrder())
+ .put(Types.TimestampNanoType.withoutZone(), Comparator.naturalOrder())
.put(Types.StringType.get(), Comparators.charSequences())
.put(Types.UUIDType.get(), Comparator.naturalOrder())
.put(Types.BinaryType.get(), Comparators.unsignedBytes())
@@ -321,9 +323,9 @@ private CharSeqComparator() {}
* represented using two Java characters (using UTF-16 surrogate pairs). Character by character
* comparison may yield incorrect results while comparing a 4 byte UTF-8 character to a java
* char. Character by character comparison works as expected if both characters are <= 3 byte
- * UTF-8 character or both characters are 4 byte UTF-8 characters.
- * isCharInUTF16HighSurrogateRange method detects a 4-byte character and considers that
- * character to be lexicographically greater than any 3 byte or lower UTF-8 character.
+ * UTF-8 character or both characters are 4 byte UTF-8 characters. isCharHighSurrogate method
+ * detects a high surrogate (4-byte character) and considers that character to be
+ * lexicographically greater than any 3 byte or lower UTF-8 character.
*/
@Override
public int compare(CharSequence s1, CharSequence s2) {
diff --git a/api/src/main/java/org/apache/iceberg/types/Conversions.java b/api/src/main/java/org/apache/iceberg/types/Conversions.java
index 1d2539514954..e18c7b4362e6 100644
--- a/api/src/main/java/org/apache/iceberg/types/Conversions.java
+++ b/api/src/main/java/org/apache/iceberg/types/Conversions.java
@@ -97,6 +97,7 @@ public static ByteBuffer toByteBuffer(Type.TypeID typeId, Object value) {
case LONG:
case TIME:
case TIMESTAMP:
+ case TIMESTAMP_NANO:
return ByteBuffer.allocate(8).order(ByteOrder.LITTLE_ENDIAN).putLong(0, (long) value);
case FLOAT:
return ByteBuffer.allocate(4).order(ByteOrder.LITTLE_ENDIAN).putFloat(0, (float) value);
@@ -146,6 +147,7 @@ private static Object internalFromByteBuffer(Type type, ByteBuffer buffer) {
case LONG:
case TIME:
case TIMESTAMP:
+ case TIMESTAMP_NANO:
if (tmp.remaining() < 8) {
// type was later promoted to long
return (long) tmp.getInt();
diff --git a/api/src/main/java/org/apache/iceberg/types/Type.java b/api/src/main/java/org/apache/iceberg/types/Type.java
index 5062b54d10e1..571bf9a14e43 100644
--- a/api/src/main/java/org/apache/iceberg/types/Type.java
+++ b/api/src/main/java/org/apache/iceberg/types/Type.java
@@ -37,6 +37,7 @@ enum TypeID {
DATE(Integer.class),
TIME(Long.class),
TIMESTAMP(Long.class),
+ TIMESTAMP_NANO(Long.class),
STRING(CharSequence.class),
UUID(java.util.UUID.class),
FIXED(ByteBuffer.class),
diff --git a/api/src/main/java/org/apache/iceberg/types/TypeUtil.java b/api/src/main/java/org/apache/iceberg/types/TypeUtil.java
index 07d06dcc5a89..8a9184569aec 100644
--- a/api/src/main/java/org/apache/iceberg/types/TypeUtil.java
+++ b/api/src/main/java/org/apache/iceberg/types/TypeUtil.java
@@ -181,11 +181,36 @@ public static Map indexQuotedNameById(
return indexer.byId();
}
+ /**
+ * Creates a mapping from lower-case field names to their corresponding field IDs.
+ *
+ * This method iterates over the fields of the provided struct and maps each field's name
+ * (converted to lower-case) to its ID. If two fields have the same lower-case name, an
+ * `IllegalArgumentException` is thrown.
+ *
+ * @param struct the struct type whose fields are to be indexed
+ * @return a map where the keys are lower-case field names and the values are field IDs
+ * @throws IllegalArgumentException if two fields have the same lower-case name
+ */
public static Map indexByLowerCaseName(Types.StructType struct) {
Map indexByLowerCaseName = Maps.newHashMap();
+
+ IndexByName indexer = new IndexByName();
+ visit(struct, indexer);
+ Map byId = indexer.byId();
+
indexByName(struct)
.forEach(
- (name, integer) -> indexByLowerCaseName.put(name.toLowerCase(Locale.ROOT), integer));
+ (name, fieldId) -> {
+ String key = name.toLowerCase(Locale.ROOT);
+ Integer existingId = indexByLowerCaseName.put(key, fieldId);
+ Preconditions.checkArgument(
+ existingId == null || existingId.equals(fieldId),
+ "Cannot build lower case index: %s and %s collide",
+ byId.get(existingId),
+ byId.get(fieldId));
+ indexByLowerCaseName.put(key, fieldId);
+ });
return indexByLowerCaseName;
}
@@ -496,6 +521,7 @@ private static int estimateSize(Type type) {
case DOUBLE:
case TIME:
case TIMESTAMP:
+ case TIMESTAMP_NANO:
// longs and doubles occupy 8 bytes
// times and timestamps are internally represented as longs
return 8;
diff --git a/api/src/main/java/org/apache/iceberg/types/Types.java b/api/src/main/java/org/apache/iceberg/types/Types.java
index ce6caa4721df..4bb1674f3be5 100644
--- a/api/src/main/java/org/apache/iceberg/types/Types.java
+++ b/api/src/main/java/org/apache/iceberg/types/Types.java
@@ -27,6 +27,7 @@
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.iceberg.Schema;
+import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.relocated.com.google.common.base.Joiner;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
@@ -49,6 +50,8 @@ private Types() {}
.put(TimeType.get().toString(), TimeType.get())
.put(TimestampType.withZone().toString(), TimestampType.withZone())
.put(TimestampType.withoutZone().toString(), TimestampType.withoutZone())
+ .put(TimestampNanoType.withZone().toString(), TimestampNanoType.withZone())
+ .put(TimestampNanoType.withoutZone().toString(), TimestampNanoType.withoutZone())
.put(StringType.get().toString(), StringType.get())
.put(UUIDType.get().toString(), UUIDType.get())
.put(BinaryType.get().toString(), BinaryType.get())
@@ -259,6 +262,59 @@ public int hashCode() {
}
}
+ public static class TimestampNanoType extends PrimitiveType {
+ private static final TimestampNanoType INSTANCE_WITH_ZONE = new TimestampNanoType(true);
+ private static final TimestampNanoType INSTANCE_WITHOUT_ZONE = new TimestampNanoType(false);
+
+ public static TimestampNanoType withZone() {
+ return INSTANCE_WITH_ZONE;
+ }
+
+ public static TimestampNanoType withoutZone() {
+ return INSTANCE_WITHOUT_ZONE;
+ }
+
+ private final boolean adjustToUTC;
+
+ private TimestampNanoType(boolean adjustToUTC) {
+ this.adjustToUTC = adjustToUTC;
+ }
+
+ public boolean shouldAdjustToUTC() {
+ return adjustToUTC;
+ }
+
+ @Override
+ public TypeID typeId() {
+ return TypeID.TIMESTAMP_NANO;
+ }
+
+ @Override
+ public String toString() {
+ if (shouldAdjustToUTC()) {
+ return "timestamptz_ns";
+ } else {
+ return "timestamp_ns";
+ }
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (this == other) {
+ return true;
+ } else if (!(other instanceof TimestampNanoType)) {
+ return false;
+ }
+
+ return adjustToUTC == ((TimestampNanoType) other).adjustToUTC;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(TimestampNanoType.class, adjustToUTC);
+ }
+ }
+
public static class StringType extends PrimitiveType {
private static final StringType INSTANCE = new StringType();
@@ -317,7 +373,7 @@ public TypeID typeId() {
@Override
public String toString() {
- return String.format("fixed[%d]", length);
+ return String.format(Locale.ROOT, "fixed[%d]", length);
}
@Override
@@ -388,7 +444,7 @@ public TypeID typeId() {
@Override
public String toString() {
- return String.format("decimal(%d, %d)", precision, scale);
+ return String.format(Locale.ROOT, "decimal(%d, %d)", precision, scale);
}
@Override
@@ -414,27 +470,94 @@ public int hashCode() {
public static class NestedField implements Serializable {
public static NestedField optional(int id, String name, Type type) {
- return new NestedField(true, id, name, type, null);
+ return new NestedField(true, id, name, type, null, null, null);
}
public static NestedField optional(int id, String name, Type type, String doc) {
- return new NestedField(true, id, name, type, doc);
+ return new NestedField(true, id, name, type, doc, null, null);
}
public static NestedField required(int id, String name, Type type) {
- return new NestedField(false, id, name, type, null);
+ return new NestedField(false, id, name, type, null, null, null);
}
public static NestedField required(int id, String name, Type type, String doc) {
- return new NestedField(false, id, name, type, doc);
+ return new NestedField(false, id, name, type, doc, null, null);
}
public static NestedField of(int id, boolean isOptional, String name, Type type) {
- return new NestedField(isOptional, id, name, type, null);
+ return new NestedField(isOptional, id, name, type, null, null, null);
}
public static NestedField of(int id, boolean isOptional, String name, Type type, String doc) {
- return new NestedField(isOptional, id, name, type, doc);
+ return new NestedField(isOptional, id, name, type, doc, null, null);
+ }
+
+ public static Builder from(NestedField field) {
+ return new Builder(field);
+ }
+
+ public static Builder required(String name) {
+ return new Builder(false, name);
+ }
+
+ public static Builder optional(String name) {
+ return new Builder(true, name);
+ }
+
+ public static class Builder {
+ private final boolean isOptional;
+ private final String name;
+ private Integer id = null;
+ private Type type = null;
+ private String doc = null;
+ private Object initialDefault = null;
+ private Object writeDefault = null;
+
+ private Builder(boolean isFieldOptional, String fieldName) {
+ isOptional = isFieldOptional;
+ name = fieldName;
+ }
+
+ private Builder(NestedField toCopy) {
+ this.isOptional = toCopy.isOptional;
+ this.name = toCopy.name;
+ this.id = toCopy.id;
+ this.type = toCopy.type;
+ this.doc = toCopy.doc;
+ this.initialDefault = toCopy.initialDefault;
+ this.writeDefault = toCopy.writeDefault;
+ }
+
+ public Builder withId(int fieldId) {
+ id = fieldId;
+ return this;
+ }
+
+ public Builder ofType(Type fieldType) {
+ type = fieldType;
+ return this;
+ }
+
+ public Builder withDoc(String fieldDoc) {
+ doc = fieldDoc;
+ return this;
+ }
+
+ public Builder withInitialDefault(Object fieldInitialDefault) {
+ initialDefault = fieldInitialDefault;
+ return this;
+ }
+
+ public Builder withWriteDefault(Object fieldWriteDefault) {
+ writeDefault = fieldWriteDefault;
+ return this;
+ }
+
+ public NestedField build() {
+ // the constructor validates the fields
+ return new NestedField(isOptional, id, name, type, doc, initialDefault, writeDefault);
+ }
}
private final boolean isOptional;
@@ -442,8 +565,17 @@ public static NestedField of(int id, boolean isOptional, String name, Type type,
private final String name;
private final Type type;
private final String doc;
-
- private NestedField(boolean isOptional, int id, String name, Type type, String doc) {
+ private final Object initialDefault;
+ private final Object writeDefault;
+
+ private NestedField(
+ boolean isOptional,
+ int id,
+ String name,
+ Type type,
+ String doc,
+ Object initialDefault,
+ Object writeDefault) {
Preconditions.checkNotNull(name, "Name cannot be null");
Preconditions.checkNotNull(type, "Type cannot be null");
this.isOptional = isOptional;
@@ -451,6 +583,19 @@ private NestedField(boolean isOptional, int id, String name, Type type, String d
this.name = name;
this.type = type;
this.doc = doc;
+ this.initialDefault = castDefault(initialDefault, type);
+ this.writeDefault = castDefault(writeDefault, type);
+ }
+
+ private static Object castDefault(Object defaultValue, Type type) {
+ if (type.isNestedType() && defaultValue != null) {
+ throw new IllegalArgumentException(
+ String.format("Invalid default value for %s: %s (must be null)", type, defaultValue));
+ } else if (defaultValue != null) {
+ return Expressions.lit(defaultValue).to(type).value();
+ }
+
+ return null;
}
public boolean isOptional() {
@@ -461,7 +606,7 @@ public NestedField asOptional() {
if (isOptional) {
return this;
}
- return new NestedField(true, id, name, type, doc);
+ return new NestedField(true, id, name, type, doc, initialDefault, writeDefault);
}
public boolean isRequired() {
@@ -472,11 +617,15 @@ public NestedField asRequired() {
if (!isOptional) {
return this;
}
- return new NestedField(false, id, name, type, doc);
+ return new NestedField(false, id, name, type, doc, initialDefault, writeDefault);
}
+ /**
+ * @deprecated will be removed in 2.0.0; use {@link Builder#withId(int)} instead
+ */
+ @Deprecated
public NestedField withFieldId(int newId) {
- return new NestedField(isOptional, newId, name, type, doc);
+ return new NestedField(isOptional, newId, name, type, doc, initialDefault, writeDefault);
}
public int fieldId() {
@@ -495,9 +644,18 @@ public String doc() {
return doc;
}
+ public Object initialDefault() {
+ return initialDefault;
+ }
+
+ public Object writeDefault() {
+ return writeDefault;
+ }
+
@Override
public String toString() {
- return String.format("%d: %s: %s %s", id, name, isOptional ? "optional" : "required", type)
+ return String.format(
+ Locale.ROOT, "%d: %s: %s %s", id, name, isOptional ? "optional" : "required", type)
+ (doc != null ? " (" + doc + ")" : "");
}
diff --git a/api/src/main/java/org/apache/iceberg/util/DataFileSet.java b/api/src/main/java/org/apache/iceberg/util/DataFileSet.java
new file mode 100644
index 000000000000..27cbee088ad4
--- /dev/null
+++ b/api/src/main/java/org/apache/iceberg/util/DataFileSet.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.util;
+
+import java.util.Objects;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+
+public class DataFileSet extends WrapperSet {
+ private static final ThreadLocal WRAPPERS =
+ ThreadLocal.withInitial(() -> DataFileWrapper.wrap(null));
+
+ private DataFileSet() {
+ // needed for serialization/deserialization
+ }
+
+ private DataFileSet(Iterable> wrappers) {
+ super(wrappers);
+ }
+
+ public static DataFileSet create() {
+ return new DataFileSet();
+ }
+
+ public static DataFileSet of(Iterable extends DataFile> iterable) {
+ return new DataFileSet(
+ Iterables.transform(
+ iterable,
+ obj -> {
+ Preconditions.checkNotNull(obj, "Invalid object: null");
+ return DataFileWrapper.wrap(obj);
+ }));
+ }
+
+ @Override
+ protected Wrapper wrapper() {
+ return WRAPPERS.get();
+ }
+
+ @Override
+ protected Wrapper wrap(DataFile dataFile) {
+ return DataFileWrapper.wrap(dataFile);
+ }
+
+ @Override
+ protected Class elementClass() {
+ return DataFile.class;
+ }
+
+ private static class DataFileWrapper implements Wrapper {
+ private DataFile file;
+
+ private DataFileWrapper(DataFile file) {
+ this.file = file;
+ }
+
+ private static DataFileWrapper wrap(DataFile dataFile) {
+ return new DataFileWrapper(dataFile);
+ }
+
+ @Override
+ public DataFile get() {
+ return file;
+ }
+
+ @Override
+ public Wrapper set(DataFile dataFile) {
+ this.file = dataFile;
+ return this;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+
+ if (!(o instanceof DataFileWrapper)) {
+ return false;
+ }
+
+ DataFileWrapper that = (DataFileWrapper) o;
+ return Objects.equals(file.location(), that.file.location());
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(file.location());
+ }
+
+ @Override
+ public String toString() {
+ return file.location();
+ }
+ }
+}
diff --git a/api/src/main/java/org/apache/iceberg/util/DateTimeUtil.java b/api/src/main/java/org/apache/iceberg/util/DateTimeUtil.java
index a2f5301f44a9..e26e7098cb22 100644
--- a/api/src/main/java/org/apache/iceberg/util/DateTimeUtil.java
+++ b/api/src/main/java/org/apache/iceberg/util/DateTimeUtil.java
@@ -27,6 +27,7 @@
import java.time.format.DateTimeFormatter;
import java.time.format.DateTimeFormatterBuilder;
import java.time.temporal.ChronoUnit;
+import java.util.Locale;
public class DateTimeUtil {
private DateTimeUtil() {}
@@ -35,6 +36,15 @@ private DateTimeUtil() {}
public static final LocalDate EPOCH_DAY = EPOCH.toLocalDate();
public static final long MICROS_PER_MILLIS = 1000L;
public static final long MICROS_PER_SECOND = 1_000_000L;
+ private static final long NANOS_PER_SECOND = 1_000_000_000L;
+ private static final long NANOS_PER_MICRO = 1_000L;
+
+ private static final DateTimeFormatter FORMATTER =
+ new DateTimeFormatterBuilder()
+ .parseCaseInsensitive()
+ .append(DateTimeFormatter.ISO_LOCAL_DATE_TIME)
+ .appendOffset("+HH:MM:ss", "+00:00")
+ .toFormatter(Locale.ROOT);
public static LocalDate dateFromDays(int daysFromEpoch) {
return ChronoUnit.DAYS.addTo(EPOCH_DAY, daysFromEpoch);
@@ -60,6 +70,10 @@ public static LocalDateTime timestampFromMicros(long microsFromEpoch) {
return ChronoUnit.MICROS.addTo(EPOCH, microsFromEpoch).toLocalDateTime();
}
+ public static LocalDateTime timestampFromNanos(long nanosFromEpoch) {
+ return ChronoUnit.NANOS.addTo(EPOCH, nanosFromEpoch).toLocalDateTime();
+ }
+
public static long microsFromInstant(Instant instant) {
return ChronoUnit.MICROS.between(EPOCH, instant.atOffset(ZoneOffset.UTC));
}
@@ -68,6 +82,10 @@ public static long microsFromTimestamp(LocalDateTime dateTime) {
return ChronoUnit.MICROS.between(EPOCH, dateTime.atOffset(ZoneOffset.UTC));
}
+ public static long nanosFromTimestamp(LocalDateTime dateTime) {
+ return ChronoUnit.NANOS.between(EPOCH, dateTime.atOffset(ZoneOffset.UTC));
+ }
+
public static long microsToMillis(long micros) {
// When the timestamp is negative, i.e before 1970, we need to adjust the milliseconds portion.
// Example - 1965-01-01 10:11:12.123456 is represented as (-157700927876544) in micro precision.
@@ -75,6 +93,14 @@ public static long microsToMillis(long micros) {
return Math.floorDiv(micros, MICROS_PER_MILLIS);
}
+ public static long nanosToMicros(long nanos) {
+ return Math.floorDiv(nanos, NANOS_PER_MICRO);
+ }
+
+ public static long microsToNanos(long micros) {
+ return Math.multiplyExact(micros, NANOS_PER_MICRO);
+ }
+
public static OffsetDateTime timestamptzFromMicros(long microsFromEpoch) {
return ChronoUnit.MICROS.addTo(EPOCH, microsFromEpoch);
}
@@ -83,6 +109,10 @@ public static long microsFromTimestamptz(OffsetDateTime dateTime) {
return ChronoUnit.MICROS.between(EPOCH, dateTime);
}
+ public static long nanosFromTimestamptz(OffsetDateTime dateTime) {
+ return ChronoUnit.NANOS.between(EPOCH, dateTime);
+ }
+
public static String formatTimestampMillis(long millis) {
return Instant.ofEpochMilli(millis).toString().replace("Z", "+00:00");
}
@@ -97,13 +127,12 @@ public static String microsToIsoTime(long micros) {
public static String microsToIsoTimestamptz(long micros) {
LocalDateTime localDateTime = timestampFromMicros(micros);
- DateTimeFormatter zeroOffsetFormatter =
- new DateTimeFormatterBuilder()
- .parseCaseInsensitive()
- .append(DateTimeFormatter.ISO_LOCAL_DATE_TIME)
- .appendOffset("+HH:MM:ss", "+00:00")
- .toFormatter();
- return localDateTime.atOffset(ZoneOffset.UTC).format(zeroOffsetFormatter);
+ return localDateTime.atOffset(ZoneOffset.UTC).format(FORMATTER);
+ }
+
+ public static String nanosToIsoTimestamptz(long nanos) {
+ LocalDateTime localDateTime = timestampFromNanos(nanos);
+ return localDateTime.atOffset(ZoneOffset.UTC).format(FORMATTER);
}
public static String microsToIsoTimestamp(long micros) {
@@ -111,6 +140,11 @@ public static String microsToIsoTimestamp(long micros) {
return localDateTime.format(DateTimeFormatter.ISO_LOCAL_DATE_TIME);
}
+ public static String nanosToIsoTimestamp(long nanos) {
+ LocalDateTime localDateTime = timestampFromNanos(nanos);
+ return localDateTime.format(DateTimeFormatter.ISO_LOCAL_DATE_TIME);
+ }
+
public static int isoDateToDays(String dateString) {
return daysFromDate(LocalDate.parse(dateString, DateTimeFormatter.ISO_LOCAL_DATE));
}
@@ -124,6 +158,11 @@ public static long isoTimestamptzToMicros(String timestampString) {
OffsetDateTime.parse(timestampString, DateTimeFormatter.ISO_DATE_TIME));
}
+ public static long isoTimestamptzToNanos(CharSequence timestampString) {
+ return nanosFromTimestamptz(
+ OffsetDateTime.parse(timestampString, DateTimeFormatter.ISO_DATE_TIME));
+ }
+
public static boolean isUTCTimestamptz(String timestampString) {
OffsetDateTime offsetDateTime =
OffsetDateTime.parse(timestampString, DateTimeFormatter.ISO_DATE_TIME);
@@ -135,6 +174,11 @@ public static long isoTimestampToMicros(String timestampString) {
LocalDateTime.parse(timestampString, DateTimeFormatter.ISO_LOCAL_DATE_TIME));
}
+ public static long isoTimestampToNanos(CharSequence timestampString) {
+ return nanosFromTimestamp(
+ LocalDateTime.parse(timestampString, DateTimeFormatter.ISO_LOCAL_DATE_TIME));
+ }
+
public static int daysToYears(int days) {
return convertDays(days, ChronoUnit.YEARS);
}
@@ -185,6 +229,36 @@ private static int convertMicros(long micros, ChronoUnit granularity) {
}
}
+ public static int nanosToYears(long nanos) {
+ return Math.toIntExact(convertNanos(nanos, ChronoUnit.YEARS));
+ }
+
+ public static int nanosToMonths(long nanos) {
+ return Math.toIntExact(convertNanos(nanos, ChronoUnit.MONTHS));
+ }
+
+ public static int nanosToDays(long nanos) {
+ return Math.toIntExact(convertNanos(nanos, ChronoUnit.DAYS));
+ }
+
+ public static int nanosToHours(long nanos) {
+ return Math.toIntExact(convertNanos(nanos, ChronoUnit.HOURS));
+ }
+
+ private static long convertNanos(long nanos, ChronoUnit granularity) {
+ if (nanos >= 0) {
+ long epochSecond = Math.floorDiv(nanos, NANOS_PER_SECOND);
+ long nanoAdjustment = Math.floorMod(nanos, NANOS_PER_SECOND);
+ return granularity.between(EPOCH, toOffsetDateTime(epochSecond, nanoAdjustment));
+ } else {
+ // add 1 nano to the value to account for the case where there is exactly 1 unit between
+ // the timestamp and epoch because the result will always be decremented.
+ long epochSecond = Math.floorDiv(nanos, NANOS_PER_SECOND);
+ long nanoAdjustment = Math.floorMod(nanos + 1, NANOS_PER_SECOND);
+ return granularity.between(EPOCH, toOffsetDateTime(epochSecond, nanoAdjustment)) - 1;
+ }
+ }
+
private static OffsetDateTime toOffsetDateTime(long epochSecond, long nanoAdjustment) {
return Instant.ofEpochSecond(epochSecond, nanoAdjustment).atOffset(ZoneOffset.UTC);
}
diff --git a/api/src/main/java/org/apache/iceberg/util/DeleteFileSet.java b/api/src/main/java/org/apache/iceberg/util/DeleteFileSet.java
new file mode 100644
index 000000000000..bbe9824963fc
--- /dev/null
+++ b/api/src/main/java/org/apache/iceberg/util/DeleteFileSet.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.util;
+
+import java.util.Objects;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+
+public class DeleteFileSet extends WrapperSet {
+ private static final ThreadLocal WRAPPERS =
+ ThreadLocal.withInitial(() -> DeleteFileWrapper.wrap(null));
+
+ private DeleteFileSet() {
+ // needed for serialization/deserialization
+ }
+
+ private DeleteFileSet(Iterable> wrappers) {
+ super(wrappers);
+ }
+
+ public static DeleteFileSet create() {
+ return new DeleteFileSet();
+ }
+
+ public static DeleteFileSet of(Iterable extends DeleteFile> iterable) {
+ return new DeleteFileSet(
+ Iterables.transform(
+ iterable,
+ obj -> {
+ Preconditions.checkNotNull(obj, "Invalid object: null");
+ return DeleteFileWrapper.wrap(obj);
+ }));
+ }
+
+ @Override
+ protected Wrapper wrapper() {
+ return WRAPPERS.get();
+ }
+
+ @Override
+ protected Wrapper wrap(DeleteFile deleteFile) {
+ return DeleteFileWrapper.wrap(deleteFile);
+ }
+
+ @Override
+ protected Class elementClass() {
+ return DeleteFile.class;
+ }
+
+ private static class DeleteFileWrapper implements Wrapper {
+ private DeleteFile file;
+
+ private DeleteFileWrapper(DeleteFile file) {
+ this.file = file;
+ }
+
+ private static DeleteFileWrapper wrap(DeleteFile deleteFile) {
+ return new DeleteFileWrapper(deleteFile);
+ }
+
+ @Override
+ public DeleteFile get() {
+ return file;
+ }
+
+ @Override
+ public Wrapper set(DeleteFile deleteFile) {
+ this.file = deleteFile;
+ return this;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+
+ if (!(o instanceof DeleteFileWrapper)) {
+ return false;
+ }
+
+ DeleteFileWrapper that = (DeleteFileWrapper) o;
+ // this needs to be updated once deletion vector support is added
+ return Objects.equals(file.location(), that.file.location());
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(file.location());
+ }
+
+ @Override
+ public String toString() {
+ return file.location();
+ }
+ }
+}
diff --git a/api/src/main/java/org/apache/iceberg/util/UnicodeUtil.java b/api/src/main/java/org/apache/iceberg/util/UnicodeUtil.java
index 4dd2afa123ac..a1bb3f497196 100644
--- a/api/src/main/java/org/apache/iceberg/util/UnicodeUtil.java
+++ b/api/src/main/java/org/apache/iceberg/util/UnicodeUtil.java
@@ -82,9 +82,9 @@ public static Literal truncateStringMax(Literal inpu
for (int i = length - 1; i >= 0; i--) {
// Get the offset in the truncated string buffer where the number of unicode characters = i
int offsetByCodePoint = truncatedStringBuilder.offsetByCodePoints(0, i);
- int nextCodePoint = truncatedStringBuilder.codePointAt(offsetByCodePoint) + 1;
+ int nextCodePoint = incrementCodePoint(truncatedStringBuilder.codePointAt(offsetByCodePoint));
// No overflow
- if (nextCodePoint != 0 && Character.isValidCodePoint(nextCodePoint)) {
+ if (nextCodePoint != 0) {
truncatedStringBuilder.setLength(offsetByCodePoint);
// Append next code point to the truncated substring
truncatedStringBuilder.appendCodePoint(nextCodePoint);
@@ -93,4 +93,24 @@ public static Literal truncateStringMax(Literal inpu
}
return null; // Cannot find a valid upper bound
}
+
+ private static int incrementCodePoint(int codePoint) {
+ // surrogate code points are not Unicode scalar values,
+ // any UTF-8 byte sequence that would otherwise map to code points U+D800..U+DFFF is ill-formed.
+ // see https://www.unicode.org/versions/Unicode16.0.0/core-spec/chapter-3/#G27288
+ Preconditions.checkArgument(
+ codePoint < Character.MIN_SURROGATE || codePoint > Character.MAX_SURROGATE,
+ "invalid code point: %s",
+ codePoint);
+
+ if (codePoint == Character.MIN_SURROGATE - 1) {
+ // increment to the next Unicode scalar value
+ return Character.MAX_SURROGATE + 1;
+ } else if (codePoint == Character.MAX_CODE_POINT) {
+ // overflow
+ return 0;
+ } else {
+ return codePoint + 1;
+ }
+ }
}
diff --git a/api/src/main/java/org/apache/iceberg/util/WrapperSet.java b/api/src/main/java/org/apache/iceberg/util/WrapperSet.java
new file mode 100644
index 000000000000..e589f435e158
--- /dev/null
+++ b/api/src/main/java/org/apache/iceberg/util/WrapperSet.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.util;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterators;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.relocated.com.google.common.collect.Streams;
+
+/**
+ * A custom set for a {@link Wrapper} of the given type that maintains insertion order and does not
+ * allow null elements.
+ *
+ * @param The type to wrap in a {@link Wrapper} instance.
+ */
+abstract class WrapperSet implements Set, Serializable {
+ private final Set> set = Sets.newLinkedHashSet();
+
+ protected WrapperSet(Iterable