Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 18 additions & 7 deletions crates/iceberg/src/spec/manifest/_serde.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use serde_derive::{Deserialize, Serialize};
use serde_with::serde_as;

use super::{Datum, ManifestEntry, Schema, Struct};
use crate::spec::{Literal, RawLiteral, StructType, Type};
use crate::spec::{FormatVersion, Literal, RawLiteral, StructType, Type};
use crate::{Error, ErrorKind};

#[derive(Serialize, Deserialize)]
Expand All @@ -40,7 +40,7 @@ impl ManifestEntryV2 {
snapshot_id: value.snapshot_id,
sequence_number: value.sequence_number,
file_sequence_number: value.file_sequence_number,
data_file: DataFileSerde::try_from(value.data_file, partition_type, false)?,
data_file: DataFileSerde::try_from(value.data_file, partition_type, FormatVersion::V2)?,
})
}

Expand Down Expand Up @@ -74,7 +74,7 @@ impl ManifestEntryV1 {
Ok(Self {
status: value.status as i32,
snapshot_id: value.snapshot_id.unwrap_or_default(),
data_file: DataFileSerde::try_from(value.data_file, partition_type, true)?,
data_file: DataFileSerde::try_from(value.data_file, partition_type, FormatVersion::V1)?,
})
}

Expand Down Expand Up @@ -129,9 +129,13 @@ impl DataFileSerde {
pub fn try_from(
value: super::DataFile,
partition_type: &StructType,
is_version_1: bool,
format_version: FormatVersion,
) -> Result<Self, Error> {
let block_size_in_bytes = if is_version_1 { Some(0) } else { None };
let block_size_in_bytes = if format_version == FormatVersion::V1 {
Some(0)
} else {
None
};
Ok(Self {
content: value.content as i32,
file_path: value.file_path,
Expand Down Expand Up @@ -292,16 +296,23 @@ fn parse_i64_entry(v: Vec<I64Entry>) -> Result<HashMap<i32, u64>, Error> {
Ok(m)
}

#[allow(unused_mut)]
fn to_i64_entry(entries: HashMap<i32, u64>) -> Result<Vec<I64Entry>, Error> {
entries
let mut i64_entries = entries
.iter()
.map(|e| {
Ok(I64Entry {
key: *e.0,
value: (*e.1).try_into()?,
})
})
.collect()
.collect::<Result<Vec<_>, Error>>()?;

// Ensure that the order is deterministic during testing
#[cfg(test)]
i64_entries.sort_by_key(|e| e.key);

Ok(i64_entries)
}

#[cfg(test)]
Expand Down
8 changes: 6 additions & 2 deletions crates/iceberg/src/spec/manifest/data_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -297,8 +297,12 @@ pub fn write_data_files_to_avro<W: Write>(
let mut writer = AvroWriter::new(&avro_schema, writer);

for data_file in data_files {
let value = to_value(DataFileSerde::try_from(data_file, partition_type, true)?)?
.resolve(&avro_schema)?;
let value = to_value(DataFileSerde::try_from(
data_file,
partition_type,
FormatVersion::V1,
)?)?
.resolve(&avro_schema)?;
writer.append(value)?;
}

Expand Down
191 changes: 191 additions & 0 deletions crates/iceberg/src/spec/manifest/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use super::{
UNASSIGNED_SEQUENCE_NUMBER,
};
use crate::error::Result;
use crate::{Error, ErrorKind};

/// A manifest contains metadata and a list of entries.
#[derive(Debug, PartialEq, Eq, Clone)]
Expand Down Expand Up @@ -119,12 +120,47 @@ impl Manifest {
}
}

/// Serialize a DataFile to a JSON string.
pub fn serialize_data_file_to_json(
data_file: DataFile,
partition_type: &super::StructType,
format_version: FormatVersion,
) -> Result<String> {
let serde = _serde::DataFileSerde::try_from(data_file, partition_type, format_version)?;
serde_json::to_string(&serde).map_err(|e| {
Error::new(
ErrorKind::DataInvalid,
"Failed to serialize DataFile to JSON!".to_string(),
)
.with_source(e)
})
}

/// Deserialize a DataFile from a JSON string.
pub fn deserialize_data_file_from_json(
json: &str,
partition_spec_id: i32,
partition_type: &super::StructType,
schema: &Schema,
) -> Result<DataFile> {
let serde = serde_json::from_str::<_serde::DataFileSerde>(json).map_err(|e| {
Error::new(
ErrorKind::DataInvalid,
"Failed to deserialize JSON to DataFile!".to_string(),
)
.with_source(e)
})?;

serde.try_into(partition_spec_id, partition_type, schema)
}

#[cfg(test)]
mod tests {
use std::collections::HashMap;
use std::fs;
use std::sync::Arc;

use serde_json::Value;
use tempfile::TempDir;

use super::*;
Expand Down Expand Up @@ -1056,4 +1092,159 @@ mod tests {
assert!(!partitions[2].clone().contains_null);
assert_eq!(partitions[2].clone().contains_nan, Some(false));
}

Comment thread
CTTY marked this conversation as resolved.
#[test]
fn test_data_file_serialization() {
// Create a simple schema
let schema = Schema::builder()
.with_schema_id(1)
.with_identifier_field_ids(vec![1])
.with_fields(vec![
NestedField::required(1, "id", Type::Primitive(PrimitiveType::Long)).into(),
NestedField::required(2, "name", Type::Primitive(PrimitiveType::String)).into(),
])
.build()
.unwrap();

// Create a partition spec
let partition_spec = PartitionSpec::builder(schema.clone())
.with_spec_id(1)
.add_partition_field("id", "id_partition", Transform::Identity)
.unwrap()
.build()
.unwrap();

// Get partition type from the partition spec
let partition_type = partition_spec.partition_type(&schema).unwrap();

// Create a vector of DataFile objects
let data_files = vec![
DataFileBuilder::default()
.content(DataContentType::Data)
.file_format(DataFileFormat::Parquet)
.file_path("path/to/file1.parquet".to_string())
.file_size_in_bytes(1024)
.record_count(100)
.partition_spec_id(1)
.partition(Struct::empty())
.column_sizes(HashMap::from([(1, 512), (2, 1024)]))
.value_counts(HashMap::from([(1, 100), (2, 500)]))
.null_value_counts(HashMap::from([(1, 0), (2, 1)]))
.build()
.unwrap(),
DataFileBuilder::default()
.content(DataContentType::Data)
.file_format(DataFileFormat::Parquet)
.file_path("path/to/file2.parquet".to_string())
.file_size_in_bytes(2048)
.record_count(200)
.partition_spec_id(1)
.partition(Struct::empty())
.column_sizes(HashMap::from([(1, 1024), (2, 2048)]))
.value_counts(HashMap::from([(1, 200), (2, 600)]))
.null_value_counts(HashMap::from([(1, 10), (2, 999)]))
.build()
.unwrap(),
];

// Serialize the DataFile objects
let serialized_files = data_files
.clone()
.into_iter()
.map(|f| serialize_data_file_to_json(f, &partition_type, FormatVersion::V2).unwrap())
.collect::<Vec<String>>();

// Verify we have the expected serialized files
assert_eq!(serialized_files.len(), 2);
let pretty_json1: Value = serde_json::from_str(serialized_files.first().unwrap()).unwrap();
let pretty_json2: Value = serde_json::from_str(serialized_files.get(1).unwrap()).unwrap();
let expected_serialized_file1 = serde_json::json!({
"content": 0,
"file_path": "path/to/file1.parquet",
"file_format": "PARQUET",
"partition": {},
"record_count": 100,
"file_size_in_bytes": 1024,
"column_sizes": [
{ "key": 1, "value": 512 },
{ "key": 2, "value": 1024 }
],
"value_counts": [
{ "key": 1, "value": 100 },
{ "key": 2, "value": 500 }
],
"null_value_counts": [
{ "key": 1, "value": 0 },
{ "key": 2, "value": 1 }
],
"nan_value_counts": [],
"lower_bounds": [],
"upper_bounds": [],
"key_metadata": null,
"split_offsets": [],
"equality_ids": [],
"sort_order_id": null,
"first_row_id": null,
"referenced_data_file": null,
"content_offset": null,
"content_size_in_bytes": null
});
let expected_serialized_file2 = serde_json::json!({
"content": 0,
"file_path": "path/to/file2.parquet",
"file_format": "PARQUET",
"partition": {},
"record_count": 200,
"file_size_in_bytes": 2048,
"column_sizes": [
{ "key": 1, "value": 1024 },
{ "key": 2, "value": 2048 }
],
"value_counts": [
{ "key": 1, "value": 200 },
{ "key": 2, "value": 600 }
],
"null_value_counts": [
{ "key": 1, "value": 10 },
{ "key": 2, "value": 999 }
],
"nan_value_counts": [],
"lower_bounds": [],
"upper_bounds": [],
"key_metadata": null,
"split_offsets": [],
"equality_ids": [],
"sort_order_id": null,
"first_row_id": null,
"referenced_data_file": null,
"content_offset": null,
"content_size_in_bytes": null
});

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After some tinkering, I think using serde_json::Value to check the json output gives the most readable test.

Also I found a tricky corner case: DataFile's serializer won't preserve the order of DataFile's array like column_sizes. Right now I just make the array sizes to be one to work around that, maybe there is a better solution?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also I found a tricky corner case: DataFile's serializer won't preserve the order of DataFile's array like column_sizes. Right now I just make the array sizes to be one to work around that, maybe there is a better solution?

I guess the randomness comes from HashMap, but since now you use serde_json::Value to do the check, I guess the order doesn't matter?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems it still matters in below outtput. One approach is to change this function to make it determined:

#[cfg(test)]
// sort the keys.

assert_eq!(pretty_json1, expected_serialized_file1);
assert_eq!(pretty_json2, expected_serialized_file2);

// Now deserialize the JSON strings back into DataFile objects
let deserialized_files: Vec<DataFile> = serialized_files
.into_iter()
.map(|json| {
deserialize_data_file_from_json(
&json,
partition_spec.spec_id(),
&partition_type,
&schema,
)
.unwrap()
})
.collect();

// Verify we have the expected number of deserialized files
assert_eq!(deserialized_files.len(), 2);
let deserialized_data_file1 = deserialized_files.first().unwrap();
let deserialized_data_file2 = deserialized_files.get(1).unwrap();
let original_data_file1 = data_files.first().unwrap();
let original_data_file2 = data_files.get(1).unwrap();

assert_eq!(deserialized_data_file1, original_data_file1);
assert_eq!(deserialized_data_file2, original_data_file2);
}
}
Loading