diff --git a/crates/catalog/glue/src/catalog.rs b/crates/catalog/glue/src/catalog.rs index c51f6a6a89..11a9ffd0bc 100644 --- a/crates/catalog/glue/src/catalog.rs +++ b/crates/catalog/glue/src/catalog.rs @@ -681,6 +681,7 @@ impl Catalog for GlueCatalog { table_info.file_io(), table_info.metadata(), table_info.metadata_location(), + table_info.encryption_manager(), ) .await } diff --git a/crates/catalog/hms/src/catalog.rs b/crates/catalog/hms/src/catalog.rs index d778a3d5fc..c76dab8ed3 100644 --- a/crates/catalog/hms/src/catalog.rs +++ b/crates/catalog/hms/src/catalog.rs @@ -628,6 +628,7 @@ impl Catalog for HmsCatalog { table_info.file_io(), table_info.metadata(), table_info.metadata_location(), + table_info.encryption_manager(), ) .await } diff --git a/crates/catalog/sql/src/catalog.rs b/crates/catalog/sql/src/catalog.rs index c7bf9d0cfd..4e6413fb46 100644 --- a/crates/catalog/sql/src/catalog.rs +++ b/crates/catalog/sql/src/catalog.rs @@ -778,6 +778,7 @@ impl Catalog for SqlCatalog { table_info.file_io(), table_info.metadata(), table_info.metadata_location(), + table_info.encryption_manager(), ) .await } diff --git a/crates/iceberg/src/catalog/memory/catalog.rs b/crates/iceberg/src/catalog/memory/catalog.rs index 3ae01a23df..2f2e2a215e 100644 --- a/crates/iceberg/src/catalog/memory/catalog.rs +++ b/crates/iceberg/src/catalog/memory/catalog.rs @@ -347,6 +347,7 @@ impl Catalog for MemoryCatalog { table_info.file_io(), table_info.metadata(), table_info.metadata_location(), + table_info.encryption_manager(), ) .await } diff --git a/crates/iceberg/src/catalog/utils.rs b/crates/iceberg/src/catalog/utils.rs index d450f9df80..3e4e1f6e45 100644 --- a/crates/iceberg/src/catalog/utils.rs +++ b/crates/iceberg/src/catalog/utils.rs @@ -22,6 +22,7 @@ use std::collections::HashSet; use futures::{TryStreamExt, stream}; use crate::Result; +use crate::encryption::EncryptionManager; use crate::io::FileIO; use crate::spec::TableMetadata; @@ -40,6 +41,7 @@ pub async fn drop_table_data( io: &FileIO, metadata: &TableMetadata, metadata_location: Option<&str>, + encryption_manager: Option<&EncryptionManager>, ) -> Result<()> { let mut manifest_lists_to_delete: HashSet = HashSet::new(); let mut manifests_to_delete: HashSet = HashSet::new(); @@ -47,7 +49,9 @@ pub async fn drop_table_data( // Load all manifest lists concurrently let results: Vec<_> = futures::future::try_join_all(metadata.snapshots().map(|snapshot| async { - let manifest_list = snapshot.load_manifest_list(io, metadata).await?; + let manifest_list = snapshot + .load_manifest_list(io, metadata, encryption_manager) + .await?; Ok::<_, crate::Error>((snapshot.manifest_list().to_string(), manifest_list)) })) .await?; diff --git a/crates/iceberg/src/inspect/manifests.rs b/crates/iceberg/src/inspect/manifests.rs index 4c30ca2ec5..e8d84ea5ce 100644 --- a/crates/iceberg/src/inspect/manifests.rs +++ b/crates/iceberg/src/inspect/manifests.rs @@ -161,9 +161,7 @@ impl<'a> ManifestsTable<'a> { let mut partition_summaries = self.partition_summary_builder()?; if let Some(snapshot) = self.table.metadata().current_snapshot() { - let manifest_list = snapshot - .load_manifest_list(self.table.file_io(), &self.table.metadata_ref()) - .await?; + let manifest_list = self.table.manifest_list_loader(snapshot).load().await?; for manifest in manifest_list.entries() { content.append_value(manifest.content as i32); path.append_value(manifest.manifest_path.clone()); diff --git a/crates/iceberg/src/io/object_cache.rs b/crates/iceberg/src/io/object_cache.rs index 5de45e2acc..87b324a471 100644 --- a/crates/iceberg/src/io/object_cache.rs +++ b/crates/iceberg/src/io/object_cache.rs @@ -18,6 +18,7 @@ use std::mem::size_of_val; use std::sync::Arc; +use crate::encryption::EncryptionManager; use crate::io::FileIO; use crate::spec::{ FormatVersion, Manifest, ManifestFile, ManifestList, SchemaId, SnapshotRef, TableMetadataRef, @@ -44,20 +45,25 @@ pub struct ObjectCache { cache: moka::future::Cache, file_io: FileIO, cache_disabled: bool, + encryption_manager: Option>, } impl ObjectCache { /// Creates a new [`ObjectCache`] /// with the default cache size - pub(crate) fn new(file_io: FileIO) -> Self { - Self::new_with_capacity(file_io, DEFAULT_CACHE_SIZE_BYTES) + pub(crate) fn new(file_io: FileIO, encryption_manager: Option>) -> Self { + Self::new_with_capacity(file_io, DEFAULT_CACHE_SIZE_BYTES, encryption_manager) } /// Creates a new [`ObjectCache`] /// with a specific cache size - pub(crate) fn new_with_capacity(file_io: FileIO, cache_size_bytes: u64) -> Self { + pub(crate) fn new_with_capacity( + file_io: FileIO, + cache_size_bytes: u64, + encryption_manager: Option>, + ) -> Self { if cache_size_bytes == 0 { - Self::with_disabled_cache(file_io) + Self::with_disabled_cache(file_io, encryption_manager) } else { Self { cache: moka::future::Cache::builder() @@ -69,17 +75,22 @@ impl ObjectCache { .build(), file_io, cache_disabled: false, + encryption_manager, } } } /// Creates a new [`ObjectCache`] /// with caching disabled - pub(crate) fn with_disabled_cache(file_io: FileIO) -> Self { + pub(crate) fn with_disabled_cache( + file_io: FileIO, + encryption_manager: Option>, + ) -> Self { Self { cache: moka::future::Cache::new(0), file_io, cache_disabled: true, + encryption_manager, } } @@ -127,7 +138,11 @@ impl ObjectCache { ) -> Result> { if self.cache_disabled { return snapshot - .load_manifest_list(&self.file_io, table_metadata) + .load_manifest_list( + &self.file_io, + table_metadata, + self.encryption_manager.as_deref(), + ) .await .map(Arc::new); } @@ -174,7 +189,11 @@ impl ObjectCache { table_metadata: &TableMetadataRef, ) -> Result { let manifest_list = snapshot - .load_manifest_list(&self.file_io, table_metadata) + .load_manifest_list( + &self.file_io, + table_metadata, + self.encryption_manager.as_deref(), + ) .await?; Ok(CachedItem::ManifestList(Arc::new(manifest_list))) @@ -319,7 +338,7 @@ mod tests { let mut fixture = TableTestFixture::new(); fixture.setup_manifest_files().await; - let object_cache = ObjectCache::with_disabled_cache(fixture.table.file_io().clone()); + let object_cache = ObjectCache::with_disabled_cache(fixture.table.file_io().clone(), None); let result_manifest_list = object_cache .get_manifest_list( @@ -352,7 +371,7 @@ mod tests { let mut fixture = TableTestFixture::new(); fixture.setup_manifest_files().await; - let object_cache = ObjectCache::new(fixture.table.file_io().clone()); + let object_cache = ObjectCache::new(fixture.table.file_io().clone(), None); // not in cache let result_manifest_list = object_cache diff --git a/crates/iceberg/src/spec/snapshot.rs b/crates/iceberg/src/spec/snapshot.rs index 3b8a3c934e..d7f82097da 100644 --- a/crates/iceberg/src/spec/snapshot.rs +++ b/crates/iceberg/src/spec/snapshot.rs @@ -26,6 +26,7 @@ use serde::{Deserialize, Serialize}; use typed_builder::TypedBuilder; use super::table_metadata::SnapshotLog; +use crate::encryption::{EncryptedInputFile, EncryptionManager}; use crate::error::{Result, timestamp_ms_to_utc}; use crate::io::FileIO; use crate::spec::{ManifestList, SchemaId, SchemaRef, TableMetadata}; @@ -195,12 +196,26 @@ impl Snapshot { } /// Load manifest list. - pub async fn load_manifest_list( + pub(crate) async fn load_manifest_list( &self, file_io: &FileIO, table_metadata: &TableMetadata, + encryption_manager: Option<&EncryptionManager>, ) -> Result { - let manifest_list_content = file_io.new_input(&self.manifest_list)?.read().await?; + let manifest_list_content = match (&self.encryption_key_id, encryption_manager) { + (Some(_), None) => { + return Err(Error::new( + ErrorKind::PreconditionFailed, + "Snapshot has encryption_key_id but no EncryptionManager configured on Table", + )); + } + (Some(key_id), Some(em)) => { + let key_metadata = em.decrypt_manifest_list_key_metadata(key_id).await?; + let input = file_io.new_input(&self.manifest_list)?; + EncryptedInputFile::new(input, key_metadata).read().await? + } + (None, _) => file_io.new_input(&self.manifest_list)?.read().await?, + }; ManifestList::parse_with_version( &manifest_list_content, // TODO: You don't really need the version since you could just project any Avro in @@ -521,13 +536,82 @@ impl SnapshotRetention { #[cfg(test)] mod tests { use std::collections::HashMap; + use std::sync::Arc; + use bytes::Bytes; use chrono::{TimeZone, Utc}; + use crate::encryption::kms::{KeyManagementClient, MemoryKeyManagementClient}; + use crate::encryption::{EncryptionManager, StandardKeyMetadata}; + use crate::io::FileIO; use crate::spec::TableMetadata; + use crate::spec::manifest_list::{ManifestList, ManifestListWriter}; use crate::spec::snapshot::_serde::SnapshotV1; use crate::spec::snapshot::{Operation, Snapshot, Summary}; + const ENCRYPTION_TEST_V3_METADATA: &str = r#"{ + "format-version": 3, + "table-uuid": "9c12d441-03fe-4693-9a96-a0705ddf69c1", + "location": "memory:///table", + "last-sequence-number": 0, + "last-updated-ms": 1602638573590, + "last-column-id": 1, + "current-schema-id": 0, + "schemas": [{"type": "struct", "schema-id": 0, "fields": [ + {"id": 1, "name": "x", "required": true, "type": "long"} + ]}], + "default-spec-id": 0, + "partition-specs": [{"spec-id": 0, "fields": []}], + "last-partition-id": 1000, + "default-sort-order-id": 0, + "sort-orders": [{"order-id": 0, "fields": []}], + "properties": {}, + "snapshots": [], + "snapshot-log": [], + "metadata-log": [], + "refs": {}, + "next-row-id": 0 + }"#; + + fn encryption_test_metadata() -> TableMetadata { + serde_json::from_str(ENCRYPTION_TEST_V3_METADATA).unwrap() + } + + fn encryption_test_kms() -> Arc { + let kms = MemoryKeyManagementClient::new(); + kms.add_master_key("master-1").unwrap(); + Arc::new(kms) + } + + fn encryption_test_manager() -> EncryptionManager { + EncryptionManager::builder() + .kms_client(encryption_test_kms()) + .table_key_id("master-1") + .build() + } + + async fn write_v3_manifest_list_bytes(io: &FileIO, path: &str) -> Bytes { + let output = io.new_output(path).unwrap(); + let mut writer = ManifestListWriter::v3(output, 1, None, 0, Some(0)); + writer.add_manifests(std::iter::empty()).unwrap(); + writer.close().await.unwrap(); + io.new_input(path).unwrap().read().await.unwrap() + } + + fn snapshot_pointing_at(manifest_list_path: &str, key_id: Option) -> Snapshot { + Snapshot::builder() + .with_snapshot_id(1) + .with_sequence_number(0) + .with_timestamp_ms(0) + .with_manifest_list(manifest_list_path.to_string()) + .with_summary(Summary { + operation: Operation::Append, + additional_properties: HashMap::new(), + }) + .with_encryption_key_id(key_id) + .build() + } + #[test] fn schema() { let record = r#" @@ -729,4 +813,52 @@ mod tests { assert_eq!(v2_snapshot.parent_snapshot_id(), None); assert_eq!(v2_snapshot.schema_id(), None); } + + #[tokio::test] + async fn load_manifest_list_errors_when_encrypted_but_no_manager_configured() { + let io = FileIO::new_with_memory(); + let snapshot = snapshot_pointing_at( + "memory:///table/metadata/manifest-list-enc.avro", + Some("k1".to_string()), + ); + let metadata = encryption_test_metadata(); + + let err = snapshot + .load_manifest_list(&io, &metadata, None) + .await + .unwrap_err(); + assert_eq!(err.kind(), crate::ErrorKind::PreconditionFailed); + } + + #[tokio::test] + async fn load_manifest_list_decrypts_roundtrip() { + let io = FileIO::new_with_memory(); + let plain_path = "memory:///table/metadata/manifest-list-plain.avro"; + let encrypted_path = "memory:///table/metadata/manifest-list-enc.avro"; + + // Build raw manifest list bytes via the standard writer. + let raw_bytes = write_v3_manifest_list_bytes(&io, plain_path).await; + + // Encrypt those bytes to a second path and capture the file's key metadata. + let mgr = encryption_test_manager(); + let encrypted_output = mgr.encrypt(io.new_output(encrypted_path).unwrap()); + let std_key_metadata: StandardKeyMetadata = encrypted_output.key_metadata().clone(); + encrypted_output.write(raw_bytes).await.unwrap(); + + // Wrap the file's key metadata with a KEK and record the resulting wrapped + // entry's id on the snapshot. + let key_id = mgr + .encrypt_manifest_list_key_metadata(&std_key_metadata) + .await + .unwrap(); + + let snapshot = snapshot_pointing_at(encrypted_path, Some(key_id)); + let metadata = encryption_test_metadata(); + + let manifest_list: ManifestList = snapshot + .load_manifest_list(&io, &metadata, Some(&mgr)) + .await + .unwrap(); + assert_eq!(manifest_list.entries().len(), 0); + } } diff --git a/crates/iceberg/src/spec/table_properties.rs b/crates/iceberg/src/spec/table_properties.rs index dc21da565c..883f05e5a5 100644 --- a/crates/iceberg/src/spec/table_properties.rs +++ b/crates/iceberg/src/spec/table_properties.rs @@ -128,6 +128,11 @@ pub struct TableProperties { pub cdc_max_chunk_size: usize, /// Content-defined chunking normalization level (gearhash bit adjustment). pub cdc_norm_level: i32, + /// The master key id used to encrypt this table's manifest list and data + /// files. `None` if `encryption.key-id` is not set. + pub encryption_key_id: Option, + /// The encryption data encryption key length in bytes. + pub encryption_data_key_length: usize, } impl TableProperties { @@ -253,6 +258,15 @@ impl TableProperties { "write.parquet.content-defined-chunking.norm-level"; /// Default matches `parquet::file::properties::DEFAULT_CDC_NORM_LEVEL`. pub const PROPERTY_PARQUET_CDC_NORM_LEVEL_DEFAULT: i32 = 0; + + /// Property key for the master key id used to encrypt the table's manifest + /// list and data files as defined in https://iceberg.apache.org/docs/nightly/encryption/. + pub const PROPERTY_ENCRYPTION_KEY_ID: &str = "encryption.key-id"; + + /// Property key for the encryption data encryption key (DEK) length in bytes. + pub const PROPERTY_ENCRYPTION_DATA_KEY_LENGTH: &str = "encryption.data-key-length"; + /// Default value for the encryption DEK length (16 bytes = AES-128). + pub const PROPERTY_ENCRYPTION_DATA_KEY_LENGTH_DEFAULT: usize = 16; } impl TryFrom<&HashMap> for TableProperties { @@ -322,6 +336,14 @@ impl TryFrom<&HashMap> for TableProperties { TableProperties::PROPERTY_PARQUET_CDC_NORM_LEVEL, TableProperties::PROPERTY_PARQUET_CDC_NORM_LEVEL_DEFAULT, )?, + encryption_key_id: props + .get(TableProperties::PROPERTY_ENCRYPTION_KEY_ID) + .cloned(), + encryption_data_key_length: parse_property( + props, + TableProperties::PROPERTY_ENCRYPTION_DATA_KEY_LENGTH, + TableProperties::PROPERTY_ENCRYPTION_DATA_KEY_LENGTH_DEFAULT, + )?, }) } } diff --git a/crates/iceberg/src/table.rs b/crates/iceberg/src/table.rs index d2ba93f854..59c0e0fd61 100644 --- a/crates/iceberg/src/table.rs +++ b/crates/iceberg/src/table.rs @@ -20,12 +20,16 @@ use std::sync::Arc; use crate::arrow::ArrowReaderBuilder; +use crate::encryption::kms::KeyManagementClient; +use crate::encryption::{AesKeySize, EncryptionManager}; use crate::inspect::MetadataTable; use crate::io::FileIO; use crate::io::object_cache::ObjectCache; use crate::runtime::Runtime; use crate::scan::TableScanBuilder; -use crate::spec::{SchemaRef, TableMetadata, TableMetadataRef}; +use crate::spec::{ + FormatVersion, ManifestList, SchemaRef, Snapshot, TableMetadata, TableMetadataRef, +}; use crate::{Error, ErrorKind, Result, TableIdent}; /// Builder to create table scan. @@ -34,6 +38,7 @@ pub struct TableBuilder { metadata_location: Option, metadata: Option, identifier: Option, + kms_client: Option>, readonly: bool, disable_cache: bool, cache_size_bytes: Option, @@ -47,6 +52,7 @@ impl TableBuilder { metadata_location: None, metadata: None, identifier: None, + kms_client: None, readonly: false, disable_cache: false, cache_size_bytes: None, @@ -104,6 +110,16 @@ impl TableBuilder { self } + /// optional - sets the KMS client used to unwrap keys for table encryption. + /// + /// If the table metadata has the `encryption.key-id` property set, a + /// [`KeyManagementClient`] must be provided here so the table can build + /// an [`EncryptionManager`]; otherwise [`Self::build`] will return an error. + pub fn kms_client(mut self, kms_client: Arc) -> Self { + self.kms_client = Some(kms_client); + self + } + /// build the Table pub fn build(self) -> Result { let Self { @@ -111,6 +127,7 @@ impl TableBuilder { metadata_location, metadata, identifier, + kms_client, readonly, disable_cache, cache_size_bytes, @@ -145,15 +162,24 @@ impl TableBuilder { )); }; + let encryption_manager = maybe_configure_encryption(kms_client.as_ref(), &metadata)?; + let object_cache = if disable_cache { - Arc::new(ObjectCache::with_disabled_cache(file_io.clone())) + Arc::new(ObjectCache::with_disabled_cache( + file_io.clone(), + encryption_manager.clone(), + )) } else if let Some(cache_size_bytes) = cache_size_bytes { Arc::new(ObjectCache::new_with_capacity( file_io.clone(), cache_size_bytes, + encryption_manager.clone(), )) } else { - Arc::new(ObjectCache::new(file_io.clone())) + Arc::new(ObjectCache::new( + file_io.clone(), + encryption_manager.clone(), + )) }; Ok(Table { @@ -164,6 +190,7 @@ impl TableBuilder { readonly, object_cache, runtime, + encryption_manager, }) } } @@ -178,6 +205,7 @@ pub struct Table { readonly: bool, object_cache: Arc, runtime: Runtime, + encryption_manager: Option>, } impl Table { @@ -238,6 +266,16 @@ impl Table { self.object_cache.clone() } + /// Returns the [`EncryptionManager`] for this table, if encryption is + /// configured. + /// + /// A manager is present iff the table metadata has the + /// `encryption.key-id` property set and a [`KeyManagementClient`] was + /// supplied to the [`TableBuilder`]. + pub fn encryption_manager(&self) -> Option<&EncryptionManager> { + self.encryption_manager.as_deref() + } + /// Creates a table scan. pub fn scan(&self) -> TableScanBuilder<'_> { TableScanBuilder::new(self) @@ -264,12 +302,39 @@ impl Table { self.metadata.current_schema().clone() } + /// Creates a [`ManifestListLoader`] for the given snapshot. + pub fn manifest_list_loader<'a>(&'a self, snapshot: &'a Snapshot) -> ManifestListLoader<'a> { + ManifestListLoader { + snapshot, + file_io: &self.file_io, + table_metadata: &self.metadata, + encryption_manager: self.encryption_manager.as_deref(), + } + } + /// Create a reader for the table. pub fn reader_builder(&self) -> ArrowReaderBuilder { ArrowReaderBuilder::new(self.file_io.clone(), self.runtime().clone()) } } +/// Loads a [`ManifestList`] for a snapshot. +pub struct ManifestListLoader<'a> { + snapshot: &'a Snapshot, + file_io: &'a FileIO, + table_metadata: &'a TableMetadataRef, + encryption_manager: Option<&'a EncryptionManager>, +} + +impl ManifestListLoader<'_> { + /// Loads and returns the [`ManifestList`] for this snapshot. + pub async fn load(&self) -> Result { + self.snapshot + .load_manifest_list(self.file_io, self.table_metadata, self.encryption_manager) + .await + } +} + /// `StaticTable` is a read-only table struct that can be created from a metadata file or from `TableMetaData` without a catalog. /// It can only be used to read metadata and for table scan. /// # Examples @@ -356,9 +421,58 @@ impl StaticTable { } } +/// If the table metadata sets the `encryption.key-id` property, build an +/// [`EncryptionManager`] for the table. +/// +/// Returns `Ok(None)` if the property is not set. Returns an error if the +/// property is set but no [`KeyManagementClient`] was provided. +fn maybe_configure_encryption( + kms_client: Option<&Arc>, + metadata: &TableMetadataRef, +) -> Result>> { + let table_properties = metadata.table_properties()?; + let Some(table_key_id) = table_properties.encryption_key_id else { + return Ok(None); + }; + + // Encryption is a v3 feature: `encryption-keys` table metadata and the + // snapshot `key-id` field are introduced in format version 3. + if metadata.format_version() < FormatVersion::V3 { + return Err(Error::new( + ErrorKind::PreconditionFailed, + format!( + "Table encryption requires format version 3, found {}", + metadata.format_version() + ), + )); + } + + let kms_client = kms_client.ok_or_else(|| { + Error::new( + ErrorKind::PreconditionFailed, + "Table has encryption.key-id set but no KeyManagementClient was provided to TableBuilder", + ) + })?; + + let em = EncryptionManager::builder() + .kms_client(Arc::clone(kms_client)) + .table_key_id(table_key_id) + .encryption_keys(metadata.encryption_keys.clone()) + .key_size(AesKeySize::from_key_length( + table_properties.encryption_data_key_length, + )?) + .build(); + Ok(Some(Arc::new(em))) +} + #[cfg(test)] mod tests { + use std::collections::HashMap; + use super::*; + use crate::encryption::StandardKeyMetadata; + use crate::encryption::kms::MemoryKeyManagementClient; + use crate::spec::{ManifestListWriter, Operation, Snapshot, Summary, TableProperties}; #[tokio::test] async fn test_static_table_from_file() { @@ -432,4 +546,188 @@ mod tests { assert!(!table.readonly()); assert_eq!(table.identifier.name(), "table"); } + + const V3_METADATA: &str = r#"{ + "format-version": 3, + "table-uuid": "9c12d441-03fe-4693-9a96-a0705ddf69c1", + "location": "memory:///table", + "last-sequence-number": 0, + "last-updated-ms": 1602638573590, + "last-column-id": 1, + "current-schema-id": 0, + "schemas": [{"type": "struct", "schema-id": 0, "fields": [ + {"id": 1, "name": "x", "required": true, "type": "long"} + ]}], + "default-spec-id": 0, + "partition-specs": [{"spec-id": 0, "fields": []}], + "last-partition-id": 1000, + "default-sort-order-id": 0, + "sort-orders": [{"order-id": 0, "fields": []}], + "properties": {}, + "snapshots": [], + "snapshot-log": [], + "metadata-log": [], + "refs": {}, + "next-row-id": 0 + }"#; + + const V2_METADATA: &str = r#"{ + "format-version": 2, + "table-uuid": "9c12d441-03fe-4693-9a96-a0705ddf69c1", + "location": "memory:///table", + "last-sequence-number": 0, + "last-updated-ms": 1602638573590, + "last-column-id": 1, + "current-schema-id": 0, + "schemas": [{"type": "struct", "schema-id": 0, "fields": [ + {"id": 1, "name": "x", "required": true, "type": "long"} + ]}], + "default-spec-id": 0, + "partition-specs": [{"spec-id": 0, "fields": []}], + "last-partition-id": 1000, + "default-sort-order-id": 0, + "sort-orders": [{"order-id": 0, "fields": []}], + "properties": {}, + "snapshots": [], + "snapshot-log": [], + "metadata-log": [], + "refs": {} + }"#; + + fn make_kms() -> Arc { + let kms = MemoryKeyManagementClient::new(); + kms.add_master_key("master-1").unwrap(); + Arc::new(kms) + } + + async fn write_empty_manifest_list_bytes(io: &FileIO, path: &str) -> bytes::Bytes { + let output = io.new_output(path).unwrap(); + let mut writer = ManifestListWriter::v3(output, 1, None, 0, Some(0)); + writer.add_manifests(std::iter::empty()).unwrap(); + writer.close().await.unwrap(); + io.new_input(path).unwrap().read().await.unwrap() + } + + #[tokio::test] + async fn table_decrypts_manifest_list_via_object_cache() { + let io = FileIO::new_with_memory(); + let plain_path = "memory:///table/metadata/manifest-list-plain.avro"; + let encrypted_path = "memory:///table/metadata/manifest-list-enc.avro"; + + // Encrypt a real manifest list onto the encrypted path. + let raw = write_empty_manifest_list_bytes(&io, plain_path).await; + let kms = make_kms(); + let mgr = EncryptionManager::builder() + .kms_client(Arc::clone(&kms)) + .table_key_id("master-1") + .build(); + let encrypted_output = mgr.encrypt(io.new_output(encrypted_path).unwrap()); + let std_km: StandardKeyMetadata = encrypted_output.key_metadata().clone(); + encrypted_output.write(raw).await.unwrap(); + let key_id = mgr + .encrypt_manifest_list_key_metadata(&std_km) + .await + .unwrap(); + + // Snapshot the wrapped keys (manifest-list entry + KEK) the manager produced. + let encryption_keys = mgr.with_encryption_keys(|keys| keys.clone()); + + // Build a TableMetadata with those keys, the encryption.key-id property, + // and a snapshot whose encryption_key_id points at the wrapped entry. + let mut metadata: TableMetadata = serde_json::from_str(V3_METADATA).unwrap(); + metadata.properties.insert( + TableProperties::PROPERTY_ENCRYPTION_KEY_ID.to_string(), + "master-1".to_string(), + ); + metadata.encryption_keys = encryption_keys; + + let snapshot = Snapshot::builder() + .with_snapshot_id(1) + .with_sequence_number(0) + .with_timestamp_ms(0) + .with_manifest_list(encrypted_path.to_string()) + .with_summary(Summary { + operation: Operation::Append, + additional_properties: HashMap::new(), + }) + .with_schema_id(0) + .with_encryption_key_id(Some(key_id)) + .build(); + let snapshot_ref = Arc::new(snapshot); + metadata + .snapshots + .insert(snapshot_ref.snapshot_id(), snapshot_ref.clone()); + metadata.current_snapshot_id = Some(snapshot_ref.snapshot_id()); + + // Build the table with the KMS client, then read via the object cache. + let table = Table::builder() + .file_io(io) + .metadata(metadata) + .identifier(TableIdent::from_strs(["ns", "enc"]).unwrap()) + .kms_client(kms) + .runtime(Runtime::try_current().unwrap()) + .build() + .unwrap(); + assert!(table.encryption_manager().is_some()); + + let manifest_list = table + .object_cache() + .get_manifest_list(&snapshot_ref, &table.metadata_ref()) + .await + .unwrap(); + assert_eq!(manifest_list.entries().len(), 0); + } + + #[tokio::test] + async fn table_builder_errors_when_encryption_key_id_set_but_no_kms() { + let mut metadata: TableMetadata = serde_json::from_str(V3_METADATA).unwrap(); + metadata.properties.insert( + TableProperties::PROPERTY_ENCRYPTION_KEY_ID.to_string(), + "master-1".to_string(), + ); + + let err = Table::builder() + .file_io(FileIO::new_with_memory()) + .metadata(metadata) + .identifier(TableIdent::from_strs(["ns", "enc"]).unwrap()) + .runtime(Runtime::try_current().unwrap()) + .build() + .unwrap_err(); + assert_eq!(err.kind(), ErrorKind::PreconditionFailed); + } + + #[tokio::test] + async fn table_builder_errors_when_encryption_set_on_pre_v3_table() { + // Encryption is a v3 spec feature; setting encryption.key-id on a v2 table + // must be rejected even when a KMS client is available. + let mut metadata: TableMetadata = serde_json::from_str(V2_METADATA).unwrap(); + metadata.properties.insert( + TableProperties::PROPERTY_ENCRYPTION_KEY_ID.to_string(), + "master-1".to_string(), + ); + + let err = Table::builder() + .file_io(FileIO::new_with_memory()) + .metadata(metadata) + .identifier(TableIdent::from_strs(["ns", "enc"]).unwrap()) + .kms_client(make_kms()) + .runtime(Runtime::try_current().unwrap()) + .build() + .unwrap_err(); + assert_eq!(err.kind(), ErrorKind::PreconditionFailed); + } + + #[tokio::test] + async fn table_builder_skips_encryption_when_property_absent() { + let metadata: TableMetadata = serde_json::from_str(V2_METADATA).unwrap(); + let table = Table::builder() + .file_io(FileIO::new_with_memory()) + .metadata(metadata) + .identifier(TableIdent::from_strs(["ns", "plain"]).unwrap()) + .kms_client(make_kms()) + .runtime(Runtime::try_current().unwrap()) + .build() + .unwrap(); + assert!(table.encryption_manager().is_none()); + } } diff --git a/crates/iceberg/src/transaction/append.rs b/crates/iceberg/src/transaction/append.rs index 08d4032409..1d263a1719 100644 --- a/crates/iceberg/src/transaction/append.rs +++ b/crates/iceberg/src/transaction/append.rs @@ -128,11 +128,10 @@ impl SnapshotProduceOperation for FastAppendOperation { return Ok(vec![]); }; - let manifest_list = snapshot - .load_manifest_list( - snapshot_produce.table.file_io(), - &snapshot_produce.table.metadata_ref(), - ) + let manifest_list = snapshot_produce + .table + .manifest_list_loader(snapshot) + .load() .await?; Ok(manifest_list @@ -305,7 +304,7 @@ mod tests { unreachable!() }; let manifest_list = new_snapshot - .load_manifest_list(table.file_io(), table.metadata()) + .load_manifest_list(table.file_io(), table.metadata(), None) .await .unwrap(); assert_eq!(1, manifest_list.entries().len()); diff --git a/crates/iceberg/src/transaction/mod.rs b/crates/iceberg/src/transaction/mod.rs index 04ee1997d0..f1445a6415 100644 --- a/crates/iceberg/src/transaction/mod.rs +++ b/crates/iceberg/src/transaction/mod.rs @@ -605,13 +605,8 @@ mod test_row_lineage { assert_eq!(table.metadata().next_row_id(), 30); // Check written manifest for first_row_id - let manifest_list = table - .metadata() - .current_snapshot() - .unwrap() - .load_manifest_list(table.file_io(), table.metadata()) - .await - .unwrap(); + let snapshot = table.metadata().current_snapshot().unwrap(); + let manifest_list = table.manifest_list_loader(snapshot).load().await.unwrap(); assert_eq!(manifest_list.entries().len(), 1); let manifest_file = &manifest_list.entries()[0]; @@ -633,13 +628,7 @@ mod test_row_lineage { assert_eq!(table.metadata().next_row_id(), 30 + 17 + 11); // Check written manifest for first_row_id - let manifest_list = table - .metadata() - .current_snapshot() - .unwrap() - .load_manifest_list(table.file_io(), table.metadata()) - .await - .unwrap(); + let manifest_list = table.manifest_list_loader(snapshot).load().await.unwrap(); assert_eq!(manifest_list.entries().len(), 2); let manifest_file = &manifest_list.entries()[1]; assert_eq!(manifest_file.first_row_id, Some(30)); diff --git a/crates/iceberg/src/transaction/snapshot.rs b/crates/iceberg/src/transaction/snapshot.rs index 8f643a7d1e..4b6d05a189 100644 --- a/crates/iceberg/src/transaction/snapshot.rs +++ b/crates/iceberg/src/transaction/snapshot.rs @@ -172,8 +172,10 @@ impl<'a> SnapshotProducer<'a> { let mut referenced_files = Vec::new(); if let Some(current_snapshot) = self.table.metadata().current_snapshot() { - let manifest_list = current_snapshot - .load_manifest_list(self.table.file_io(), &self.table.metadata_ref()) + let manifest_list = self + .table + .manifest_list_loader(current_snapshot) + .load() .await?; for manifest_list_entry in manifest_list.entries() { let manifest = manifest_list_entry diff --git a/crates/integrations/datafusion/src/physical_plan/commit.rs b/crates/integrations/datafusion/src/physical_plan/commit.rs index 3b3ff3d6b3..bb59e854f5 100644 --- a/crates/integrations/datafusion/src/physical_plan/commit.rs +++ b/crates/integrations/datafusion/src/physical_plan/commit.rs @@ -504,8 +504,9 @@ mod tests { let current_snapshot = updated_table.metadata().current_snapshot().unwrap(); // Load the manifest list to verify the data files were added - let manifest_list = current_snapshot - .load_manifest_list(updated_table.file_io(), updated_table.metadata()) + let manifest_list = updated_table + .manifest_list_loader(current_snapshot) + .load() .await?; // There should be at least one manifest