Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions crates/catalog/glue/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -679,6 +679,7 @@ impl Catalog for GlueCatalog {
self.drop_table(table).await?;
iceberg::drop_table_data(
table_info.file_io(),
table_info.runtime(),
table_info.metadata(),
table_info.metadata_location(),
)
Expand Down
1 change: 1 addition & 0 deletions crates/catalog/hms/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -626,6 +626,7 @@ impl Catalog for HmsCatalog {
self.drop_table(table).await?;
iceberg::drop_table_data(
table_info.file_io(),
table_info.runtime(),
table_info.metadata(),
table_info.metadata_location(),
)
Expand Down
1 change: 1 addition & 0 deletions crates/catalog/sql/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -776,6 +776,7 @@ impl Catalog for SqlCatalog {
self.drop_table(table).await?;
iceberg::drop_table_data(
table_info.file_io(),
table_info.runtime(),
table_info.metadata(),
table_info.metadata_location(),
)
Expand Down
1 change: 1 addition & 0 deletions crates/iceberg/src/catalog/memory/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,7 @@ impl Catalog for MemoryCatalog {
self.drop_table(table_ident).await?;
crate::catalog::utils::drop_table_data(
table_info.file_io(),
table_info.runtime(),
table_info.metadata(),
table_info.metadata_location(),
)
Expand Down
61 changes: 43 additions & 18 deletions crates/iceberg/src/catalog/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,12 @@

use std::collections::HashSet;

use futures::{TryStreamExt, stream};
use futures::stream;

use crate::Result;
use crate::io::FileIO;
use crate::runtime::Runtime;
use crate::spec::TableMetadata;
use crate::{Error, ErrorKind, Result};

const DELETE_CONCURRENCY: usize = 10;

Expand All @@ -38,6 +39,7 @@ const DELETE_CONCURRENCY: usize = 10;
/// may share the same data files.
pub async fn drop_table_data(
io: &FileIO,
runtime: &Runtime,
metadata: &TableMetadata,
metadata_location: Option<&str>,
) -> Result<()> {
Expand All @@ -63,7 +65,7 @@ pub async fn drop_table_data(

// Delete data files only if gc.enabled is true, to avoid corrupting shared tables
if metadata.table_properties()?.gc_enabled {
delete_data_files(io, &manifests_to_delete).await?;
delete_data_files(io, runtime, &manifests_to_delete).await?;
}

// Delete manifest files
Expand Down Expand Up @@ -105,20 +107,43 @@ pub async fn drop_table_data(
}

/// Reads manifests concurrently and deletes the data files referenced within.
async fn delete_data_files(io: &FileIO, manifest_paths: &HashSet<String>) -> Result<()> {
stream::iter(manifest_paths.iter().map(Ok))
.try_for_each_concurrent(DELETE_CONCURRENCY, |manifest_path| async move {
let input = io.new_input(manifest_path)?;
let manifest_content = input.read().await?;
let manifest = crate::spec::Manifest::parse_avro(&manifest_content)?;

let data_file_paths = manifest
.entries()
.iter()
.map(|entry| entry.data_file.file_path().to_string())
.collect::<Vec<_>>();

io.delete_stream(stream::iter(data_file_paths)).await
///
/// Spawns tasks on the IO runtime, bounded by `DELETE_CONCURRENCY` using a
/// semaphore to avoid overwhelming the object store.
async fn delete_data_files(
io: &FileIO,
runtime: &Runtime,
manifest_paths: &HashSet<String>,
) -> Result<()> {
let semaphore = std::sync::Arc::new(tokio::sync::Semaphore::new(DELETE_CONCURRENCY));

let handles: Vec<_> = manifest_paths
.iter()
.map(|manifest_path| {
let io = io.clone();
let path = manifest_path.clone();
let sem = semaphore.clone();
runtime.io().spawn(async move {
let _permit = sem.acquire().await.map_err(|e| {
Error::new(ErrorKind::Unexpected, "semaphore closed").with_source(e)
})?;

let input = io.new_input(&path)?;
let manifest_content = input.read().await?;
let manifest = crate::spec::Manifest::parse_avro(&manifest_content)?;

let data_file_paths = manifest
.entries()
.iter()
.map(|entry| entry.data_file.file_path().to_string())
.collect::<Vec<_>>();

io.delete_stream(stream::iter(data_file_paths)).await
})
})
.await
.collect();

futures::future::try_join_all(handles.into_iter().map(|h| async move { h.await? })).await?;

Ok(())
}
2 changes: 1 addition & 1 deletion crates/iceberg/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ impl Table {
}

/// Returns the [`Runtime`] for this table.
pub(crate) fn runtime(&self) -> &Runtime {
pub fn runtime(&self) -> &Runtime {
&self.runtime
}

Expand Down
Loading