Skip to content
Open
1 change: 1 addition & 0 deletions crates/catalog/glue/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -681,6 +681,7 @@ impl Catalog for GlueCatalog {
table_info.file_io(),
table_info.metadata(),
table_info.metadata_location(),
table_info.encryption_manager(),
)
.await
}
Expand Down
1 change: 1 addition & 0 deletions crates/catalog/hms/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -628,6 +628,7 @@ impl Catalog for HmsCatalog {
table_info.file_io(),
table_info.metadata(),
table_info.metadata_location(),
table_info.encryption_manager(),
)
.await
}
Expand Down
1 change: 1 addition & 0 deletions crates/catalog/sql/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -778,6 +778,7 @@ impl Catalog for SqlCatalog {
table_info.file_io(),
table_info.metadata(),
table_info.metadata_location(),
table_info.encryption_manager(),
)
.await
}
Expand Down
1 change: 1 addition & 0 deletions crates/iceberg/src/catalog/memory/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,7 @@ impl Catalog for MemoryCatalog {
table_info.file_io(),
table_info.metadata(),
table_info.metadata_location(),
table_info.encryption_manager(),
)
.await
}
Expand Down
6 changes: 5 additions & 1 deletion crates/iceberg/src/catalog/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use std::collections::HashSet;
use futures::{TryStreamExt, stream};

use crate::Result;
use crate::encryption::EncryptionManager;
use crate::io::FileIO;
use crate::spec::TableMetadata;

Expand All @@ -40,14 +41,17 @@ pub async fn drop_table_data(
io: &FileIO,
metadata: &TableMetadata,
metadata_location: Option<&str>,
encryption_manager: Option<&EncryptionManager>,
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

public api change here

) -> Result<()> {
let mut manifest_lists_to_delete: HashSet<String> = HashSet::new();
let mut manifests_to_delete: HashSet<String> = HashSet::new();

// Load all manifest lists concurrently
let results: Vec<_> =
futures::future::try_join_all(metadata.snapshots().map(|snapshot| async {
let manifest_list = snapshot.load_manifest_list(io, metadata).await?;
let manifest_list = snapshot
.load_manifest_list(io, metadata, encryption_manager)
.await?;
Ok::<_, crate::Error>((snapshot.manifest_list().to_string(), manifest_list))
}))
.await?;
Expand Down
4 changes: 1 addition & 3 deletions crates/iceberg/src/inspect/manifests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,9 +161,7 @@ impl<'a> ManifestsTable<'a> {
let mut partition_summaries = self.partition_summary_builder()?;

if let Some(snapshot) = self.table.metadata().current_snapshot() {
let manifest_list = snapshot
.load_manifest_list(self.table.file_io(), &self.table.metadata_ref())
.await?;
let manifest_list = self.table.manifest_list_loader(snapshot).load().await?;
for manifest in manifest_list.entries() {
content.append_value(manifest.content as i32);
path.append_value(manifest.manifest_path.clone());
Expand Down
37 changes: 28 additions & 9 deletions crates/iceberg/src/io/object_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
use std::mem::size_of_val;
use std::sync::Arc;

use crate::encryption::EncryptionManager;
use crate::io::FileIO;
use crate::spec::{
FormatVersion, Manifest, ManifestFile, ManifestList, SchemaId, SnapshotRef, TableMetadataRef,
Expand All @@ -44,20 +45,25 @@ pub struct ObjectCache {
cache: moka::future::Cache<CachedObjectKey, CachedItem>,
file_io: FileIO,
cache_disabled: bool,
encryption_manager: Option<Arc<EncryptionManager>>,
}

impl ObjectCache {
/// Creates a new [`ObjectCache`]
/// with the default cache size
pub(crate) fn new(file_io: FileIO) -> Self {
Self::new_with_capacity(file_io, DEFAULT_CACHE_SIZE_BYTES)
pub(crate) fn new(file_io: FileIO, encryption_manager: Option<Arc<EncryptionManager>>) -> Self {
Self::new_with_capacity(file_io, DEFAULT_CACHE_SIZE_BYTES, encryption_manager)
}

/// Creates a new [`ObjectCache`]
/// with a specific cache size
pub(crate) fn new_with_capacity(file_io: FileIO, cache_size_bytes: u64) -> Self {
pub(crate) fn new_with_capacity(
file_io: FileIO,
cache_size_bytes: u64,
encryption_manager: Option<Arc<EncryptionManager>>,
) -> Self {
if cache_size_bytes == 0 {
Self::with_disabled_cache(file_io)
Self::with_disabled_cache(file_io, encryption_manager)
} else {
Self {
cache: moka::future::Cache::builder()
Expand All @@ -69,17 +75,22 @@ impl ObjectCache {
.build(),
file_io,
cache_disabled: false,
encryption_manager,
}
}
}

/// Creates a new [`ObjectCache`]
/// with caching disabled
pub(crate) fn with_disabled_cache(file_io: FileIO) -> Self {
pub(crate) fn with_disabled_cache(
file_io: FileIO,
encryption_manager: Option<Arc<EncryptionManager>>,
) -> Self {
Self {
cache: moka::future::Cache::new(0),
file_io,
cache_disabled: true,
encryption_manager,
}
}

Expand Down Expand Up @@ -127,7 +138,11 @@ impl ObjectCache {
) -> Result<Arc<ManifestList>> {
if self.cache_disabled {
return snapshot
.load_manifest_list(&self.file_io, table_metadata)
.load_manifest_list(
&self.file_io,
table_metadata,
self.encryption_manager.as_deref(),
)
.await
.map(Arc::new);
}
Expand Down Expand Up @@ -174,7 +189,11 @@ impl ObjectCache {
table_metadata: &TableMetadataRef,
) -> Result<CachedItem> {
let manifest_list = snapshot
.load_manifest_list(&self.file_io, table_metadata)
.load_manifest_list(
&self.file_io,
table_metadata,
self.encryption_manager.as_deref(),
)
.await?;

Ok(CachedItem::ManifestList(Arc::new(manifest_list)))
Expand Down Expand Up @@ -319,7 +338,7 @@ mod tests {
let mut fixture = TableTestFixture::new();
fixture.setup_manifest_files().await;

let object_cache = ObjectCache::with_disabled_cache(fixture.table.file_io().clone());
let object_cache = ObjectCache::with_disabled_cache(fixture.table.file_io().clone(), None);

let result_manifest_list = object_cache
.get_manifest_list(
Expand Down Expand Up @@ -352,7 +371,7 @@ mod tests {
let mut fixture = TableTestFixture::new();
fixture.setup_manifest_files().await;

let object_cache = ObjectCache::new(fixture.table.file_io().clone());
let object_cache = ObjectCache::new(fixture.table.file_io().clone(), None);

// not in cache
let result_manifest_list = object_cache
Expand Down
136 changes: 134 additions & 2 deletions crates/iceberg/src/spec/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use serde::{Deserialize, Serialize};
use typed_builder::TypedBuilder;

use super::table_metadata::SnapshotLog;
use crate::encryption::{EncryptedInputFile, EncryptionManager};
use crate::error::{Result, timestamp_ms_to_utc};
use crate::io::FileIO;
use crate::spec::{ManifestList, SchemaId, SchemaRef, TableMetadata};
Expand Down Expand Up @@ -195,12 +196,26 @@ impl Snapshot {
}

/// Load manifest list.
pub async fn load_manifest_list(
pub(crate) async fn load_manifest_list(
&self,
file_io: &FileIO,
table_metadata: &TableMetadata,
encryption_manager: Option<&EncryptionManager>,
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

public api change here.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We had some discussion before, and I think we should add sth like ManifestListLoader to avoid such api changes.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The api should be as following:

impl Table {
pub fn manifest_list_loader(&self, s: &Snapshot) -> Result<ManifestListLoader> {
   ...
}
}

With this approach, we could hide all table related constructs like Runtime, Cache, FileIO to avoid api changes in future.

) -> Result<ManifestList> {
let manifest_list_content = file_io.new_input(&self.manifest_list)?.read().await?;
let manifest_list_content = match (&self.encryption_key_id, encryption_manager) {
(Some(_), None) => {
return Err(Error::new(
ErrorKind::PreconditionFailed,
"Snapshot has encryption_key_id but no EncryptionManager configured on Table",
));
}
(Some(key_id), Some(em)) => {
let key_metadata = em.decrypt_manifest_list_key_metadata(key_id).await?;
let input = file_io.new_input(&self.manifest_list)?;
EncryptedInputFile::new(input, key_metadata).read().await?
}
(None, _) => file_io.new_input(&self.manifest_list)?.read().await?,
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's non-obvious that there could be an intentionally unused EncryptionManager here, since a table could have unencrypted snapshots in its history. Maybe worth a one-line comment so someone doesn't change this to (None, None).

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

};
ManifestList::parse_with_version(
&manifest_list_content,
// TODO: You don't really need the version since you could just project any Avro in
Expand Down Expand Up @@ -521,13 +536,82 @@ impl SnapshotRetention {
#[cfg(test)]
mod tests {
use std::collections::HashMap;
use std::sync::Arc;

use bytes::Bytes;
use chrono::{TimeZone, Utc};

use crate::encryption::kms::{KeyManagementClient, MemoryKeyManagementClient};
use crate::encryption::{EncryptionManager, StandardKeyMetadata};
use crate::io::FileIO;
use crate::spec::TableMetadata;
use crate::spec::manifest_list::{ManifestList, ManifestListWriter};
use crate::spec::snapshot::_serde::SnapshotV1;
use crate::spec::snapshot::{Operation, Snapshot, Summary};

const ENCRYPTION_TEST_V3_METADATA: &str = r#"{
"format-version": 3,
"table-uuid": "9c12d441-03fe-4693-9a96-a0705ddf69c1",
"location": "memory:///table",
"last-sequence-number": 0,
"last-updated-ms": 1602638573590,
"last-column-id": 1,
"current-schema-id": 0,
"schemas": [{"type": "struct", "schema-id": 0, "fields": [
{"id": 1, "name": "x", "required": true, "type": "long"}
]}],
"default-spec-id": 0,
"partition-specs": [{"spec-id": 0, "fields": []}],
"last-partition-id": 1000,
"default-sort-order-id": 0,
"sort-orders": [{"order-id": 0, "fields": []}],
"properties": {},
"snapshots": [],
"snapshot-log": [],
"metadata-log": [],
"refs": {},
"next-row-id": 0
}"#;

fn encryption_test_metadata() -> TableMetadata {
serde_json::from_str(ENCRYPTION_TEST_V3_METADATA).unwrap()
}

fn encryption_test_kms() -> Arc<dyn KeyManagementClient> {
let kms = MemoryKeyManagementClient::new();
kms.add_master_key("master-1").unwrap();
Arc::new(kms)
}

fn encryption_test_manager() -> EncryptionManager {
EncryptionManager::builder()
.kms_client(encryption_test_kms())
.table_key_id("master-1")
.build()
}

async fn write_v3_manifest_list_bytes(io: &FileIO, path: &str) -> Bytes {
let output = io.new_output(path).unwrap();
let mut writer = ManifestListWriter::v3(output, 1, None, 0, Some(0));
writer.add_manifests(std::iter::empty()).unwrap();
writer.close().await.unwrap();
io.new_input(path).unwrap().read().await.unwrap()
}

fn snapshot_pointing_at(manifest_list_path: &str, key_id: Option<String>) -> Snapshot {
Snapshot::builder()
.with_snapshot_id(1)
.with_sequence_number(0)
.with_timestamp_ms(0)
.with_manifest_list(manifest_list_path.to_string())
.with_summary(Summary {
operation: Operation::Append,
additional_properties: HashMap::new(),
})
.with_encryption_key_id(key_id)
.build()
}

#[test]
fn schema() {
let record = r#"
Expand Down Expand Up @@ -729,4 +813,52 @@ mod tests {
assert_eq!(v2_snapshot.parent_snapshot_id(), None);
assert_eq!(v2_snapshot.schema_id(), None);
}

#[tokio::test]
async fn load_manifest_list_errors_when_encrypted_but_no_manager_configured() {
let io = FileIO::new_with_memory();
let snapshot = snapshot_pointing_at(
"memory:///table/metadata/manifest-list-enc.avro",
Some("k1".to_string()),
);
let metadata = encryption_test_metadata();

let err = snapshot
.load_manifest_list(&io, &metadata, None)
.await
.unwrap_err();
assert_eq!(err.kind(), crate::ErrorKind::PreconditionFailed);
}

#[tokio::test]
async fn load_manifest_list_decrypts_roundtrip() {
let io = FileIO::new_with_memory();
let plain_path = "memory:///table/metadata/manifest-list-plain.avro";
let encrypted_path = "memory:///table/metadata/manifest-list-enc.avro";

// Build raw manifest list bytes via the standard writer.
let raw_bytes = write_v3_manifest_list_bytes(&io, plain_path).await;

// Encrypt those bytes to a second path and capture the file's key metadata.
let mgr = encryption_test_manager();
let encrypted_output = mgr.encrypt(io.new_output(encrypted_path).unwrap());
let std_key_metadata: StandardKeyMetadata = encrypted_output.key_metadata().clone();
encrypted_output.write(raw_bytes).await.unwrap();

// Wrap the file's key metadata with a KEK and record the resulting wrapped
// entry's id on the snapshot.
let key_id = mgr
.encrypt_manifest_list_key_metadata(&std_key_metadata)
.await
.unwrap();

let snapshot = snapshot_pointing_at(encrypted_path, Some(key_id));
let metadata = encryption_test_metadata();

let manifest_list: ManifestList = snapshot
.load_manifest_list(&io, &metadata, Some(&mgr))
.await
.unwrap();
assert_eq!(manifest_list.entries().len(), 0);
}
}
22 changes: 22 additions & 0 deletions crates/iceberg/src/spec/table_properties.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,11 @@ pub struct TableProperties {
pub cdc_max_chunk_size: usize,
/// Content-defined chunking normalization level (gearhash bit adjustment).
pub cdc_norm_level: i32,
/// The master key id used to encrypt this table's manifest list and data
/// files. `None` if `encryption.key-id` is not set.
pub encryption_key_id: Option<String>,
/// The encryption data encryption key length in bytes.
pub encryption_data_key_length: usize,
}

impl TableProperties {
Expand Down Expand Up @@ -253,6 +258,15 @@ impl TableProperties {
"write.parquet.content-defined-chunking.norm-level";
/// Default matches `parquet::file::properties::DEFAULT_CDC_NORM_LEVEL`.
pub const PROPERTY_PARQUET_CDC_NORM_LEVEL_DEFAULT: i32 = 0;

/// Property key for the master key id used to encrypt the table's manifest
/// list and data files as defined in https://iceberg.apache.org/docs/nightly/encryption/.
pub const PROPERTY_ENCRYPTION_KEY_ID: &str = "encryption.key-id";

/// Property key for the encryption data encryption key (DEK) length in bytes.
pub const PROPERTY_ENCRYPTION_DATA_KEY_LENGTH: &str = "encryption.data-key-length";
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for adding it. I think I can get rid of the additional argument for data key length In introduced in #2466

/// Default value for the encryption DEK length (16 bytes = AES-128).
pub const PROPERTY_ENCRYPTION_DATA_KEY_LENGTH_DEFAULT: usize = 16;
}

impl TryFrom<&HashMap<String, String>> for TableProperties {
Expand Down Expand Up @@ -322,6 +336,14 @@ impl TryFrom<&HashMap<String, String>> for TableProperties {
TableProperties::PROPERTY_PARQUET_CDC_NORM_LEVEL,
TableProperties::PROPERTY_PARQUET_CDC_NORM_LEVEL_DEFAULT,
)?,
encryption_key_id: props
.get(TableProperties::PROPERTY_ENCRYPTION_KEY_ID)
.cloned(),
encryption_data_key_length: parse_property(
props,
TableProperties::PROPERTY_ENCRYPTION_DATA_KEY_LENGTH,
TableProperties::PROPERTY_ENCRYPTION_DATA_KEY_LENGTH_DEFAULT,
)?,
})
}
}
Expand Down
Loading
Loading