From 247d3e34a47a0ef585daf103331363e885cc4fca Mon Sep 17 00:00:00 2001 From: Leonid Ryzhyk Date: Thu, 12 Dec 2024 00:35:15 -0800 Subject: [PATCH] Retry object store reads on temporary errors. I noticed that, when reading many parquet files from S3, one of the reads fails occasionally with a temporary error such as "connection closed before message completed". I think such transient failures are normal for S3, and the client is expected to retry the read in this case. However, opendal was not configured to perform a retry. This commit adds a retry layer to opendal with the default configuration (exponential backoff with up to 3 retries). This solves the issue for me. It might be useful to make this behavior configurable in the future, but default settings should work most of the time. Signed-off-by: Leonid Ryzhyk --- crates/iceberg/src/io/storage.rs | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/crates/iceberg/src/io/storage.rs b/crates/iceberg/src/io/storage.rs index b1f799a84e..cc726bb888 100644 --- a/crates/iceberg/src/io/storage.rs +++ b/crates/iceberg/src/io/storage.rs @@ -17,6 +17,7 @@ use std::sync::Arc; +use opendal::layers::RetryLayer; #[cfg(feature = "storage-gcs")] use opendal::services::GcsConfig; #[cfg(feature = "storage-s3")] @@ -94,7 +95,7 @@ impl Storage { path: &'a impl AsRef, ) -> crate::Result<(Operator, &'a str)> { let path = path.as_ref(); - match self { + let (operator, relative_path): (Operator, &str) = match self { #[cfg(feature = "storage-memory")] Storage::Memory(op) => { if let Some(stripped) = path.strip_prefix("memory:/") { @@ -155,7 +156,13 @@ impl Storage { ErrorKind::FeatureUnsupported, "No storage service has been enabled", )), - } + }?; + + // Transient errors are common for object stores; however there's no + // harm in retrying temporary failures for other storage backends as well. + let operator = operator.layer(RetryLayer::new()); + + Ok((operator, relative_path)) } /// Parse scheme.