Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 30 additions & 3 deletions core/src/main/java/org/apache/iceberg/ContentFileParser.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.types.Types;
Expand Down Expand Up @@ -50,6 +51,9 @@ public class ContentFileParser {
private static final String REFERENCED_DATA_FILE = "referenced-data-file";
private static final String CONTENT_OFFSET = "content-offset";
private static final String CONTENT_SIZE = "content-size-in-bytes";
private static final String CONTENT_DATA = "data";
private static final String CONTENT_POSITION_DELETES = "position-deletes";
private static final String CONTENT_EQUALITY_DELETES = "equality-deletes";

private ContentFileParser() {}

Expand Down Expand Up @@ -84,9 +88,13 @@ public static void toJson(ContentFile<?> contentFile, PartitionSpec spec, JsonGe
// as it isn't used and BaseFile constructor doesn't support it.

generator.writeNumberField(SPEC_ID, contentFile.specId());
generator.writeStringField(CONTENT, contentFile.content().name());
// Since 1.11, we serialize content as lowercase kebab-case values like "equality-deletes"
generator.writeStringField(
CONTENT, contentFile.content().name().toLowerCase(Locale.ENGLISH).replace('_', '-'));
generator.writeStringField(FILE_PATH, contentFile.location());
generator.writeStringField(FILE_FORMAT, contentFile.format().name());
// Since 1.11, we serialize format as lower-case strings like "parquet"
generator.writeStringField(
FILE_FORMAT, contentFile.format().name().toLowerCase(Locale.ENGLISH));

if (contentFile.partition() != null) {
generator.writeFieldName(PARTITION);
Expand Down Expand Up @@ -147,7 +155,7 @@ public static ContentFile<?> fromJson(JsonNode jsonNode, Map<Integer, PartitionS
int specId = JsonUtil.getInt(SPEC_ID, jsonNode);
PartitionSpec spec = specsById.get(specId);
Preconditions.checkArgument(spec != null, "Invalid partition specId: %s", specId);
FileContent fileContent = FileContent.valueOf(JsonUtil.getString(CONTENT, jsonNode));
FileContent fileContent = fileContentFromJson(JsonUtil.getString(CONTENT, jsonNode));

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor: fileContentFromString

I also wouldn't be opposed to introducing a FileContent.fromString API that does what fileContentFromString does below.

String filePath = JsonUtil.getString(FILE_PATH, jsonNode);
FileFormat fileFormat = FileFormat.fromString(JsonUtil.getString(FILE_FORMAT, jsonNode));

Expand Down Expand Up @@ -345,4 +353,23 @@ private static PartitionData partitionFromJson(

return partitionData;
}

private static FileContent fileContentFromJson(String content) {
switch (content) {
Comment thread
geruh marked this conversation as resolved.
case CONTENT_DATA:
return FileContent.DATA;
case CONTENT_POSITION_DELETES:
return FileContent.POSITION_DELETES;
case CONTENT_EQUALITY_DELETES:
return FileContent.EQUALITY_DELETES;
default:
// In 1.10 and before, file content is serialized as the FileContent enum value
try {
return FileContent.valueOf(content);
} catch (IllegalArgumentException e) {
throw new IllegalArgumentException(
String.format("Invalid file content value: '%s'", content), e);
}
}
}
}
124 changes: 96 additions & 28 deletions core/src/test/java/org/apache/iceberg/TestContentFileParser.java
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,8 @@ public void testDeleteFile(PartitionSpec spec, DeleteFile deleteFile, String exp
public void testPartitionJsonArrayWrongSize() throws Exception {
PartitionSpec spec = PartitionSpec.builderFor(TestBase.SCHEMA).identity("data").build();
String jsonStr =
"{\"spec-id\":0,\"content\":\"DATA\",\"file-path\":\"/path/to/data.parquet\","
+ "\"file-format\":\"PARQUET\",\"partition\":[],\"file-size-in-bytes\":10,"
"{\"spec-id\":0,\"content\":\"data\",\"file-path\":\"/path/to/data.parquet\","
+ "\"file-format\":\"parquet\",\"partition\":[],\"file-size-in-bytes\":10,"
+ "\"record-count\":1}";

JsonNode jsonNode = JsonUtil.mapper().readTree(jsonStr);
Expand All @@ -124,8 +124,8 @@ public void testPartitionJsonArrayWrongSize() throws Exception {
public void testPartitionJsonInvalidType() throws Exception {
PartitionSpec spec = PartitionSpec.builderFor(TestBase.SCHEMA).identity("data").build();
String jsonStr =
"{\"spec-id\":0,\"content\":\"DATA\",\"file-path\":\"/path/to/data.parquet\","
+ "\"file-format\":\"PARQUET\",\"partition\":\"invalid\",\"file-size-in-bytes\":10,"
"{\"spec-id\":0,\"content\":\"data\",\"file-path\":\"/path/to/data.parquet\","
+ "\"file-format\":\"parquet\",\"partition\":\"invalid\",\"file-size-in-bytes\":10,"
+ "\"record-count\":1}";

JsonNode jsonNode = JsonUtil.mapper().readTree(jsonStr);
Expand All @@ -139,8 +139,8 @@ public void testPartitionJsonInvalidType() throws Exception {
public void testParsesFieldIdPartitionMap() throws Exception {
PartitionSpec spec = PartitionSpec.builderFor(TestBase.SCHEMA).identity("data").build();
String legacyJson =
"{\"spec-id\":0,\"content\":\"DATA\",\"file-path\":\"/path/to/data.parquet\","
+ "\"file-format\":\"PARQUET\",\"partition\":{\"1000\":\"foo\"},\"file-size-in-bytes\":10,"
"{\"spec-id\":0,\"content\":\"data\",\"file-path\":\"/path/to/data.parquet\","
+ "\"file-format\":\"parquet\",\"partition\":{\"1000\":\"foo\"},\"file-size-in-bytes\":10,"
+ "\"record-count\":1}";

JsonNode jsonNode = JsonUtil.mapper().readTree(legacyJson);
Expand All @@ -155,8 +155,8 @@ public void testParsesFieldIdPartitionMap() throws Exception {
public void testPartitionStructObjectContainsExtraField() throws Exception {
PartitionSpec spec = PartitionSpec.builderFor(TestBase.SCHEMA).identity("data").build();
String jsonStr =
"{\"spec-id\":0,\"content\":\"DATA\",\"file-path\":\"/path/to/data.parquet\","
+ "\"file-format\":\"PARQUET\",\"partition\":{\"1000\":\"foo\",\"9999\":\"bar\"},"
"{\"spec-id\":0,\"content\":\"data\",\"file-path\":\"/path/to/data.parquet\","
+ "\"file-format\":\"parquet\",\"partition\":{\"1000\":\"foo\",\"9999\":\"bar\"},"
+ "\"file-size-in-bytes\":10,\"record-count\":1}";

JsonNode jsonNode = JsonUtil.mapper().readTree(jsonStr);
Expand All @@ -169,8 +169,8 @@ public void testPartitionStructObjectContainsExtraField() throws Exception {
public void testPartitionStructObjectEmptyIsNull() throws Exception {
PartitionSpec spec = PartitionSpec.builderFor(TestBase.SCHEMA).identity("data").build();
String jsonStr =
"{\"spec-id\":0,\"content\":\"DATA\",\"file-path\":\"/path/to/data.parquet\","
+ "\"file-format\":\"PARQUET\",\"partition\":{},\"file-size-in-bytes\":10,"
"{\"spec-id\":0,\"content\":\"data\",\"file-path\":\"/path/to/data.parquet\","
+ "\"file-format\":\"parquet\",\"partition\":{},\"file-size-in-bytes\":10,"
+ "\"record-count\":1}";

JsonNode jsonNode = JsonUtil.mapper().readTree(jsonStr);
Expand Down Expand Up @@ -210,6 +210,74 @@ public void testPartitionArrayRespectsSpecOrder() throws Exception {
assertThat(deserializedContentFile.partition().get(1, String.class)).isEqualTo("foo");
}

@Test
public void testInvalidContentType() throws Exception {
String jsonStr =
"{\"spec-id\":0,"
+ "\"content\":\"invalid-content\","
+ "\"file-path\":\"/path/to/file.parquet\","
+ "\"file-format\":\"parquet\","
+ "\"partition\":{},"
+ "\"file-size-in-bytes\":1,"
+ "\"record-count\":1}";

JsonNode node = JsonUtil.mapper().readTree(jsonStr);

assertThatThrownBy(
() -> ContentFileParser.fromJson(node, Map.of(0, PartitionSpec.unpartitioned())))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("Invalid file content value: 'invalid-content'");
}

@Test
public void testUppercaseFileFormat() throws Exception {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor : can we add its a deserialization test case ? and make it a parameterized test
{serializedContent, serializedFormat, expectedDeserializedContent, expectedDeserialziedFormat}
.....
{ "DATA", "PARQUET", "DATA", "PARQUET"}
{"data", "parquet", "DATA", "PARQUET"}
{"POSITION_DELETE", "PUFFIN", "POSITION_DELETE", "PUFFIN" }
{"position-delete", "puffin", "POSITION_DELETE", "PUFFIN" }

String jsonStr =
"{\"spec-id\":0,"
+ "\"content\":\"data\","
+ "\"file-path\":\"/path/to/file.parquet\","
+ "\"file-format\":\"PARQUET\","
+ "\"partition\":{},"
+ "\"file-size-in-bytes\":1,"
+ "\"record-count\":1}";

JsonNode jsonNode = JsonUtil.mapper().readTree(jsonStr);
ContentFile<?> deserializedContentFile =
ContentFileParser.fromJson(jsonNode, Map.of(0, PartitionSpec.unpartitioned()));
assertThat(deserializedContentFile.format()).isEqualTo(FileFormat.PARQUET);
}

@ParameterizedTest
@MethodSource("enumContentTypeCases")
public void testEnumContentTypeSerialization(FileContent content, String expectedJsonContent)
throws Exception {
String jsonStr =
"{\"spec-id\":0,"
+ "\"content\":\""
+ content.name()
+ "\","
+ "\"file-path\":\"/path/to/data.parquet\","
+ "\"file-format\":\"parquet\","
Comment thread
geruh marked this conversation as resolved.
+ "\"partition\":{},"
+ "\"file-size-in-bytes\":1,"
+ "\"record-count\":1}";

JsonNode jsonNode = JsonUtil.mapper().readTree(jsonStr);
ContentFile<?> deserializedContentFile =
ContentFileParser.fromJson(jsonNode, Map.of(0, PartitionSpec.unpartitioned()));
assertThat(deserializedContentFile.content()).isEqualTo(content);

String serializedStr =
ContentFileParser.toJson(deserializedContentFile, PartitionSpec.unpartitioned());
assertThat(serializedStr).contains("\"content\":\"" + expectedJsonContent + "\"");
}

private static Stream<Arguments> enumContentTypeCases() {
return Stream.of(
Arguments.of(FileContent.DATA, "data"),
Arguments.of(FileContent.POSITION_DELETES, "position-deletes"),
Arguments.of(FileContent.EQUALITY_DELETES, "equality-deletes"));
}

private static Stream<Arguments> provideSpecAndDataFile() {
return Stream.of(
Arguments.of(
Expand Down Expand Up @@ -271,18 +339,18 @@ private static DataFile dataFileWithOnlyNanCounts(PartitionSpec spec) {

private static String dataFileJsonWithRequiredOnly(PartitionSpec spec) {
if (spec.isUnpartitioned()) {
return "{\"spec-id\":0,\"content\":\"DATA\",\"file-path\":\"/path/to/data-a.parquet\",\"file-format\":\"PARQUET\","
return "{\"spec-id\":0,\"content\":\"data\",\"file-path\":\"/path/to/data-a.parquet\",\"file-format\":\"parquet\","
+ "\"partition\":[],\"file-size-in-bytes\":10,\"record-count\":1,\"sort-order-id\":0}";
} else {
return "{\"spec-id\":0,\"content\":\"DATA\",\"file-path\":\"/path/to/data-a.parquet\",\"file-format\":\"PARQUET\","
return "{\"spec-id\":0,\"content\":\"data\",\"file-path\":\"/path/to/data-a.parquet\",\"file-format\":\"parquet\","
+ "\"partition\":[1],\"file-size-in-bytes\":10,\"record-count\":1,\"sort-order-id\":0}";
}
}

private static String dataFileJsonWithAllOptional(PartitionSpec spec) {
if (spec.isUnpartitioned()) {
return "{\"spec-id\":0,\"content\":\"DATA\",\"file-path\":\"/path/to/data-with-stats.parquet\","
+ "\"file-format\":\"PARQUET\",\"partition\":[],\"file-size-in-bytes\":350,\"record-count\":10,"
return "{\"spec-id\":0,\"content\":\"data\",\"file-path\":\"/path/to/data-with-stats.parquet\","
+ "\"file-format\":\"parquet\",\"partition\":[],\"file-size-in-bytes\":350,\"record-count\":10,"
+ "\"column-sizes\":{\"keys\":[3,4],\"values\":[100,200]},"
+ "\"value-counts\":{\"keys\":[3,4],\"values\":[90,180]},"
+ "\"null-value-counts\":{\"keys\":[3,4],\"values\":[10,20]},"
Expand All @@ -292,8 +360,8 @@ private static String dataFileJsonWithAllOptional(PartitionSpec spec) {
+ "\"key-metadata\":\"00000000000000000000000000000000\","
+ "\"split-offsets\":[128,256],\"sort-order-id\":1}";
} else {
return "{\"spec-id\":0,\"content\":\"DATA\",\"file-path\":\"/path/to/data-with-stats.parquet\","
+ "\"file-format\":\"PARQUET\",\"partition\":[1],\"file-size-in-bytes\":350,\"record-count\":10,"
return "{\"spec-id\":0,\"content\":\"data\",\"file-path\":\"/path/to/data-with-stats.parquet\","
+ "\"file-format\":\"parquet\",\"partition\":[1],\"file-size-in-bytes\":350,\"record-count\":10,"
+ "\"column-sizes\":{\"keys\":[3,4],\"values\":[100,200]},"
+ "\"value-counts\":{\"keys\":[3,4],\"values\":[90,180]},"
+ "\"null-value-counts\":{\"keys\":[3,4],\"values\":[10,20]},"
Expand Down Expand Up @@ -388,8 +456,8 @@ private static DeleteFile deleteFileWithDataRef(PartitionSpec spec) {
}

private static String deleteFileWithDataRefJson() {
return "{\"spec-id\":0,\"content\":\"POSITION_DELETES\",\"file-path\":\"/path/to/delete.parquet\","
+ "\"file-format\":\"PARQUET\",\"partition\":[4],\"file-size-in-bytes\":1234,"
return "{\"spec-id\":0,\"content\":\"position-deletes\",\"file-path\":\"/path/to/delete.parquet\","
+ "\"file-format\":\"parquet\",\"partition\":[4],\"file-size-in-bytes\":1234,"
+ "\"record-count\":10,\"referenced-data-file\":\"/path/to/data/file.parquet\"}";
}

Expand All @@ -414,8 +482,8 @@ private static DeleteFile dv(PartitionSpec spec) {
}

private static String dvJson() {
return "{\"spec-id\":0,\"content\":\"POSITION_DELETES\",\"file-path\":\"/path/to/delete.puffin\","
+ "\"file-format\":\"PUFFIN\",\"partition\":[4],\"file-size-in-bytes\":1234,\"record-count\":10,"
return "{\"spec-id\":0,\"content\":\"position-deletes\",\"file-path\":\"/path/to/delete.puffin\","
+ "\"file-format\":\"puffin\",\"partition\":[4],\"file-size-in-bytes\":1234,\"record-count\":10,"
+ "\"referenced-data-file\":\"/path/to/data/file.parquet\",\"content-offset\":4,\"content-size-in-bytes\":40}";
}

Expand Down Expand Up @@ -487,18 +555,18 @@ private static DeleteFile deleteFileWithAllOptional(PartitionSpec spec) {

private static String deleteFileJsonWithRequiredOnly(PartitionSpec spec) {
if (spec.isUnpartitioned()) {
return "{\"spec-id\":0,\"content\":\"POSITION_DELETES\",\"file-path\":\"/path/to/delete-a.parquet\","
+ "\"file-format\":\"PARQUET\",\"partition\":[],\"file-size-in-bytes\":1234,\"record-count\":9}";
return "{\"spec-id\":0,\"content\":\"position-deletes\",\"file-path\":\"/path/to/delete-a.parquet\","
+ "\"file-format\":\"parquet\",\"partition\":[],\"file-size-in-bytes\":1234,\"record-count\":9}";
} else {
return "{\"spec-id\":0,\"content\":\"POSITION_DELETES\",\"file-path\":\"/path/to/delete-a.parquet\","
+ "\"file-format\":\"PARQUET\",\"partition\":[9],\"file-size-in-bytes\":1234,\"record-count\":9}";
return "{\"spec-id\":0,\"content\":\"position-deletes\",\"file-path\":\"/path/to/delete-a.parquet\","
+ "\"file-format\":\"parquet\",\"partition\":[9],\"file-size-in-bytes\":1234,\"record-count\":9}";
}
}

private static String deleteFileJsonWithAllOptional(PartitionSpec spec) {
if (spec.isUnpartitioned()) {
return "{\"spec-id\":0,\"content\":\"EQUALITY_DELETES\",\"file-path\":\"/path/to/delete-with-stats.parquet\","
+ "\"file-format\":\"PARQUET\",\"partition\":[],\"file-size-in-bytes\":1234,\"record-count\":10,"
return "{\"spec-id\":0,\"content\":\"equality-deletes\",\"file-path\":\"/path/to/delete-with-stats.parquet\","
+ "\"file-format\":\"parquet\",\"partition\":[],\"file-size-in-bytes\":1234,\"record-count\":10,"
+ "\"column-sizes\":{\"keys\":[3,4],\"values\":[100,200]},"
+ "\"value-counts\":{\"keys\":[3,4],\"values\":[90,180]},"
+ "\"null-value-counts\":{\"keys\":[3,4],\"values\":[10,20]},"
Expand All @@ -508,8 +576,8 @@ private static String deleteFileJsonWithAllOptional(PartitionSpec spec) {
+ "\"key-metadata\":\"00000000000000000000000000000000\","
+ "\"split-offsets\":[128],\"equality-ids\":[3],\"sort-order-id\":1}";
} else {
return "{\"spec-id\":0,\"content\":\"EQUALITY_DELETES\",\"file-path\":\"/path/to/delete-with-stats.parquet\","
+ "\"file-format\":\"PARQUET\",\"partition\":[9],\"file-size-in-bytes\":1234,\"record-count\":10,"
return "{\"spec-id\":0,\"content\":\"equality-deletes\",\"file-path\":\"/path/to/delete-with-stats.parquet\","
+ "\"file-format\":\"parquet\",\"partition\":[9],\"file-size-in-bytes\":1234,\"record-count\":10,"
+ "\"column-sizes\":{\"keys\":[3,4],\"values\":[100,200]},"
+ "\"value-counts\":{\"keys\":[3,4],\"values\":[90,180]},"
+ "\"null-value-counts\":{\"keys\":[3,4],\"values\":[10,20]},"
Expand Down
12 changes: 6 additions & 6 deletions core/src/test/java/org/apache/iceberg/TestDataTaskParser.java
Original file line number Diff line number Diff line change
Expand Up @@ -144,9 +144,9 @@ public void missingFields() throws Exception {
+ "{\"id\":6,\"name\":\"summary\",\"required\":false,\"type\":{\"type\":\"map\","
+ "\"key-id\":7,\"key\":\"string\",\"value-id\":8,"
+ "\"value\":\"string\",\"value-required\":true}}]},"
+ "\"metadata-file\":{\"spec-id\":0,\"content\":\"DATA\","
+ "\"metadata-file\":{\"spec-id\":0,\"content\":\"data\","
+ "\"file-path\":\"/tmp/metadata2.json\","
+ "\"file-format\":\"METADATA\",\"partition\":[],"
+ "\"file-format\":\"metadata\",\"partition\":[],"
+ "\"file-size-in-bytes\":0,\"record-count\":2,\"sort-order-id\":0}"
+ "}";
JsonNode missingTableRowsNode = mapper.reader().readTree(missingTableRowsStr);
Expand All @@ -172,8 +172,8 @@ public void testDataTaskParsesFieldIdPartitionMap() {
+ "\"fields\":[{\"id\":1,\"name\":\"committed_at\",\"required\":true,\"type\":\"timestamptz\"}]},"
+ "\"projection\":{\"type\":\"struct\",\"schema-id\":0,"
+ "\"fields\":[{\"id\":1,\"name\":\"committed_at\",\"required\":true,\"type\":\"timestamptz\"}]},"
+ "\"metadata-file\":{\"spec-id\":0,\"content\":\"DATA\",\"file-path\":\"/tmp/metadata.json\","
+ "\"file-format\":\"METADATA\",\"partition\":{},\"file-size-in-bytes\":0,\"record-count\":1,\"sort-order-id\":0},"
+ "\"metadata-file\":{\"spec-id\":0,\"content\":\"data\",\"file-path\":\"/tmp/metadata.json\","
+ "\"file-format\":\"metadata\",\"partition\":{},\"file-size-in-bytes\":0,\"record-count\":1,\"sort-order-id\":0},"
+ "\"rows\":[{\"1\":\"2009-02-13T23:31:30+00:00\"}]}";

StaticDataTask deserializedTask = (StaticDataTask) ScanTaskParser.fromJson(jsonStr, true);
Expand Down Expand Up @@ -263,9 +263,9 @@ private String snapshotsDataTaskJson() {
+ "{\"id\":6,\"name\":\"summary\",\"required\":false,\"type\":{\"type\":\"map\","
+ "\"key-id\":7,\"key\":\"string\",\"value-id\":8,"
+ "\"value\":\"string\",\"value-required\":true}}]},"
+ "\"metadata-file\":{\"spec-id\":0,\"content\":\"DATA\","
+ "\"metadata-file\":{\"spec-id\":0,\"content\":\"data\","
+ "\"file-path\":\"/tmp/metadata2.json\","
+ "\"file-format\":\"METADATA\",\"partition\":[],"
+ "\"file-format\":\"metadata\",\"partition\":[],"
+ "\"file-size-in-bytes\":0,\"record-count\":2,\"sort-order-id\":0},"
+ "\"rows\":[{\"1\":\"2009-02-13T23:31:30+00:00\",\"2\":1,\"4\":\"append\","
+ "\"5\":\"file:/tmp/manifest1.avro\","
Expand Down
Loading