From 6cd8de76b1026368304d98781a964db77412843b Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Mon, 18 May 2026 16:05:34 -0700 Subject: [PATCH 1/3] use runtime to delete data files in parallel --- crates/catalog/glue/src/catalog.rs | 1 + crates/catalog/hms/src/catalog.rs | 1 + crates/catalog/sql/src/catalog.rs | 1 + crates/iceberg/src/catalog/memory/catalog.rs | 1 + crates/iceberg/src/catalog/utils.rs | 61 ++++++++++++++------ 5 files changed, 47 insertions(+), 18 deletions(-) diff --git a/crates/catalog/glue/src/catalog.rs b/crates/catalog/glue/src/catalog.rs index c51f6a6a89..7326a4f9e9 100644 --- a/crates/catalog/glue/src/catalog.rs +++ b/crates/catalog/glue/src/catalog.rs @@ -679,6 +679,7 @@ impl Catalog for GlueCatalog { self.drop_table(table).await?; iceberg::drop_table_data( table_info.file_io(), + &self.runtime, table_info.metadata(), table_info.metadata_location(), ) diff --git a/crates/catalog/hms/src/catalog.rs b/crates/catalog/hms/src/catalog.rs index d778a3d5fc..e187fa1f5c 100644 --- a/crates/catalog/hms/src/catalog.rs +++ b/crates/catalog/hms/src/catalog.rs @@ -626,6 +626,7 @@ impl Catalog for HmsCatalog { self.drop_table(table).await?; iceberg::drop_table_data( table_info.file_io(), + &self.runtime, table_info.metadata(), table_info.metadata_location(), ) diff --git a/crates/catalog/sql/src/catalog.rs b/crates/catalog/sql/src/catalog.rs index c7bf9d0cfd..4bdd2e30aa 100644 --- a/crates/catalog/sql/src/catalog.rs +++ b/crates/catalog/sql/src/catalog.rs @@ -776,6 +776,7 @@ impl Catalog for SqlCatalog { self.drop_table(table).await?; iceberg::drop_table_data( table_info.file_io(), + &self.runtime, table_info.metadata(), table_info.metadata_location(), ) diff --git a/crates/iceberg/src/catalog/memory/catalog.rs b/crates/iceberg/src/catalog/memory/catalog.rs index 3ae01a23df..8b4481f3dc 100644 --- a/crates/iceberg/src/catalog/memory/catalog.rs +++ b/crates/iceberg/src/catalog/memory/catalog.rs @@ -345,6 +345,7 @@ impl Catalog for MemoryCatalog { self.drop_table(table_ident).await?; crate::catalog::utils::drop_table_data( table_info.file_io(), + &self.runtime, table_info.metadata(), table_info.metadata_location(), ) diff --git a/crates/iceberg/src/catalog/utils.rs b/crates/iceberg/src/catalog/utils.rs index d450f9df80..1578ba011c 100644 --- a/crates/iceberg/src/catalog/utils.rs +++ b/crates/iceberg/src/catalog/utils.rs @@ -19,11 +19,12 @@ use std::collections::HashSet; -use futures::{TryStreamExt, stream}; +use futures::stream; -use crate::Result; use crate::io::FileIO; +use crate::runtime::Runtime; use crate::spec::TableMetadata; +use crate::{Error, ErrorKind, Result}; const DELETE_CONCURRENCY: usize = 10; @@ -38,6 +39,7 @@ const DELETE_CONCURRENCY: usize = 10; /// may share the same data files. pub async fn drop_table_data( io: &FileIO, + runtime: &Runtime, metadata: &TableMetadata, metadata_location: Option<&str>, ) -> Result<()> { @@ -63,7 +65,7 @@ pub async fn drop_table_data( // Delete data files only if gc.enabled is true, to avoid corrupting shared tables if metadata.table_properties()?.gc_enabled { - delete_data_files(io, &manifests_to_delete).await?; + delete_data_files(io, &runtime, &manifests_to_delete).await?; } // Delete manifest files @@ -105,20 +107,43 @@ pub async fn drop_table_data( } /// Reads manifests concurrently and deletes the data files referenced within. -async fn delete_data_files(io: &FileIO, manifest_paths: &HashSet) -> Result<()> { - stream::iter(manifest_paths.iter().map(Ok)) - .try_for_each_concurrent(DELETE_CONCURRENCY, |manifest_path| async move { - let input = io.new_input(manifest_path)?; - let manifest_content = input.read().await?; - let manifest = crate::spec::Manifest::parse_avro(&manifest_content)?; - - let data_file_paths = manifest - .entries() - .iter() - .map(|entry| entry.data_file.file_path().to_string()) - .collect::>(); - - io.delete_stream(stream::iter(data_file_paths)).await +/// +/// Spawns tasks on the IO runtime, bounded by `DELETE_CONCURRENCY` using a +/// semaphore to avoid overwhelming the object store. +async fn delete_data_files( + io: &FileIO, + runtime: &Runtime, + manifest_paths: &HashSet, +) -> Result<()> { + let semaphore = std::sync::Arc::new(tokio::sync::Semaphore::new(DELETE_CONCURRENCY)); + + let handles: Vec<_> = manifest_paths + .iter() + .map(|manifest_path| { + let io = io.clone(); + let path = manifest_path.clone(); + let sem = semaphore.clone(); + runtime.io().spawn(async move { + let _permit = sem.acquire().await.map_err(|e| { + Error::new(ErrorKind::Unexpected, "semaphore closed").with_source(e) + })?; + + let input = io.new_input(&path)?; + let manifest_content = input.read().await?; + let manifest = crate::spec::Manifest::parse_avro(&manifest_content)?; + + let data_file_paths = manifest + .entries() + .iter() + .map(|entry| entry.data_file.file_path().to_string()) + .collect::>(); + + io.delete_stream(stream::iter(data_file_paths)).await + }) }) - .await + .collect(); + + futures::future::try_join_all(handles.into_iter().map(|h| async move { h.await? })).await?; + + Ok(()) } From 8c951e9380b8be2d70129c43c7278a05b26fbf88 Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Mon, 18 May 2026 16:12:26 -0700 Subject: [PATCH 2/3] use table_info.runtime() --- crates/catalog/glue/src/catalog.rs | 2 +- crates/catalog/hms/src/catalog.rs | 2 +- crates/catalog/sql/src/catalog.rs | 2 +- crates/iceberg/src/catalog/memory/catalog.rs | 2 +- crates/iceberg/src/table.rs | 2 +- 5 files changed, 5 insertions(+), 5 deletions(-) diff --git a/crates/catalog/glue/src/catalog.rs b/crates/catalog/glue/src/catalog.rs index 7326a4f9e9..5713752a5a 100644 --- a/crates/catalog/glue/src/catalog.rs +++ b/crates/catalog/glue/src/catalog.rs @@ -679,7 +679,7 @@ impl Catalog for GlueCatalog { self.drop_table(table).await?; iceberg::drop_table_data( table_info.file_io(), - &self.runtime, + table_info.runtime(), table_info.metadata(), table_info.metadata_location(), ) diff --git a/crates/catalog/hms/src/catalog.rs b/crates/catalog/hms/src/catalog.rs index e187fa1f5c..be013156c8 100644 --- a/crates/catalog/hms/src/catalog.rs +++ b/crates/catalog/hms/src/catalog.rs @@ -626,7 +626,7 @@ impl Catalog for HmsCatalog { self.drop_table(table).await?; iceberg::drop_table_data( table_info.file_io(), - &self.runtime, + table_info.runtime(), table_info.metadata(), table_info.metadata_location(), ) diff --git a/crates/catalog/sql/src/catalog.rs b/crates/catalog/sql/src/catalog.rs index 4bdd2e30aa..e75ae47e61 100644 --- a/crates/catalog/sql/src/catalog.rs +++ b/crates/catalog/sql/src/catalog.rs @@ -776,7 +776,7 @@ impl Catalog for SqlCatalog { self.drop_table(table).await?; iceberg::drop_table_data( table_info.file_io(), - &self.runtime, + table_info.runtime(), table_info.metadata(), table_info.metadata_location(), ) diff --git a/crates/iceberg/src/catalog/memory/catalog.rs b/crates/iceberg/src/catalog/memory/catalog.rs index 8b4481f3dc..a6bcd4246a 100644 --- a/crates/iceberg/src/catalog/memory/catalog.rs +++ b/crates/iceberg/src/catalog/memory/catalog.rs @@ -345,7 +345,7 @@ impl Catalog for MemoryCatalog { self.drop_table(table_ident).await?; crate::catalog::utils::drop_table_data( table_info.file_io(), - &self.runtime, + table_info.runtime(), table_info.metadata(), table_info.metadata_location(), ) diff --git a/crates/iceberg/src/table.rs b/crates/iceberg/src/table.rs index d2ba93f854..1ea43976c2 100644 --- a/crates/iceberg/src/table.rs +++ b/crates/iceberg/src/table.rs @@ -250,7 +250,7 @@ impl Table { } /// Returns the [`Runtime`] for this table. - pub(crate) fn runtime(&self) -> &Runtime { + pub fn runtime(&self) -> &Runtime { &self.runtime } From 9b4c615657685237384f71ad4eec9b1037114e39 Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Mon, 18 May 2026 16:20:36 -0700 Subject: [PATCH 3/3] minor --- crates/iceberg/src/catalog/utils.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/iceberg/src/catalog/utils.rs b/crates/iceberg/src/catalog/utils.rs index 1578ba011c..7a5a6c14e3 100644 --- a/crates/iceberg/src/catalog/utils.rs +++ b/crates/iceberg/src/catalog/utils.rs @@ -65,7 +65,7 @@ pub async fn drop_table_data( // Delete data files only if gc.enabled is true, to avoid corrupting shared tables if metadata.table_properties()?.gc_enabled { - delete_data_files(io, &runtime, &manifests_to_delete).await?; + delete_data_files(io, runtime, &manifests_to_delete).await?; } // Delete manifest files