diff --git a/bottomless/src/bottomless_wal.rs b/bottomless/src/bottomless_wal.rs index 819256b588..213cf47360 100644 --- a/bottomless/src/bottomless_wal.rs +++ b/bottomless/src/bottomless_wal.rs @@ -1,5 +1,6 @@ use std::ffi::c_int; use std::sync::{Arc, Mutex}; +use std::time::Instant; use crate::completion_progress::SavepointTracker; use libsql_sys::ffi::{SQLITE_BUSY, SQLITE_IOERR_WRITE}; @@ -91,6 +92,7 @@ impl WrapWal for BottomlessWalWrapper { Ok(num_frames) } + #[tracing::instrument(skip_all, fields(in_wal = in_wal, backfilled = backfilled))] fn checkpoint( &mut self, wrapped: &mut T, @@ -104,8 +106,9 @@ impl WrapWal for BottomlessWalWrapper { in_wal: Option<&mut i32>, backfilled: Option<&mut i32>, ) -> Result<()> { + let before = Instant::now(); { - tracing::trace!("bottomless checkpoint"); + tracing::trace!("bottomless checkpoint: {mode:?}"); /* In order to avoid partial checkpoints, passive checkpoint ** mode is not allowed. Only TRUNCATE checkpoints are accepted, @@ -143,6 +146,7 @@ impl WrapWal for BottomlessWalWrapper { ); return Err(Error::new(SQLITE_IOERR_WRITE)); } + tracing::debug!("commited after {:?}", before.elapsed()); if let Err(e) = runtime.block_on(replicator.wait_until_snapshotted()) { tracing::error!( "Failed to wait for S3 replicator to confirm database snapshot backup: {}", @@ -150,6 +154,7 @@ impl WrapWal for BottomlessWalWrapper { ); return Err(Error::new(SQLITE_IOERR_WRITE)); } + tracing::debug!("snapshotted after {:?}", before.elapsed()); Ok(()) })??; @@ -166,6 +171,8 @@ impl WrapWal for BottomlessWalWrapper { backfilled, )?; + tracing::debug!("underlying checkpoint call after {:?}", before.elapsed()); + #[allow(clippy::await_holding_lock)] // uncontended -> only gets called under a libSQL write lock { @@ -173,7 +180,7 @@ impl WrapWal for BottomlessWalWrapper { self.try_with_replicator(|replicator| { if let Err(e) = runtime.block_on(async move { replicator.new_generation().await; - replicator.snapshot_main_db_file().await + replicator.snapshot_main_db_file(false).await }) { tracing::error!("Failed to snapshot the main db file during checkpoint: {e}"); return Err(Error::new(SQLITE_IOERR_WRITE)); @@ -182,6 +189,8 @@ impl WrapWal for BottomlessWalWrapper { })??; } + tracing::debug!("checkpoint finnished after {:?}", before.elapsed()); + Ok(()) } } diff --git a/bottomless/src/replicator.rs b/bottomless/src/replicator.rs index 9d88392379..1a02a4690a 100644 --- a/bottomless/src/replicator.rs +++ b/bottomless/src/replicator.rs @@ -69,6 +69,7 @@ pub struct Replicator { join_set: JoinSet<()>, upload_progress: Arc>, last_uploaded_frame_no: Receiver, + skip_snapshot: bool, } #[derive(Debug)] @@ -113,6 +114,8 @@ pub struct Options { pub s3_upload_max_parallelism: usize, /// Max number of retries for S3 operations pub s3_max_retries: u32, + /// Skip snapshot upload per checkpoint. + pub skip_snapshot: bool, } impl Options { @@ -202,6 +205,17 @@ impl Options { other ), }; + let skip_snapshot = match env_var_or("LIBSQL_BOTTOMLESS_SKIP_SNAPSHOT", false) + .to_lowercase() + .as_ref() + { + "yes" | "true" | "1" | "y" | "t" => true, + "no" | "false" | "0" | "n" | "f" => false, + other => bail!( + "Invalid LIBSQL_BOTTOMLESS_SKIP_SNAPSHOT environment variable: {}", + other + ), + }; let s3_max_retries = env_var_or("LIBSQL_BOTTOMLESS_S3_MAX_RETRIES", 10).parse::()?; let cipher = match encryption_cipher { Some(cipher) => Cipher::from_str(&cipher)?, @@ -226,6 +240,7 @@ impl Options { region, bucket_name, s3_max_retries, + skip_snapshot, }) } } @@ -386,6 +401,7 @@ impl Replicator { encryption_config: options.encryption_config, max_frames_per_batch: options.max_frames_per_batch, s3_upload_max_parallelism: options.s3_upload_max_parallelism, + skip_snapshot: options.skip_snapshot, join_set, upload_progress, last_uploaded_frame_no, @@ -901,7 +917,12 @@ impl Replicator { // Sends the main database file to S3 - if -wal file is present, it's replicated // too - it means that the local file was detected to be newer than its remote // counterpart. - pub async fn snapshot_main_db_file(&mut self) -> Result>> { + pub async fn snapshot_main_db_file(&mut self, force: bool) -> Result>> { + if self.skip_snapshot && !force { + tracing::trace!("database snapshot skipped"); + let _ = self.snapshot_notifier.send(Ok(self.generation().ok())); + return Ok(None); + } if !self.main_db_exists_and_not_empty().await { let generation = self.generation()?; tracing::debug!( @@ -1301,6 +1322,14 @@ impl Replicator { ); match wal_pages.cmp(&last_consistent_frame) { std::cmp::Ordering::Equal => { + if local_counter == [0u8; 4] && wal_pages == 0 { + if self.get_dependency(&generation).await?.is_some() { + // empty generation and empty local state, but we have a dependency + // to previous generation: restore required + return Ok(None); + } + } + tracing::info!( "Remote generation is up-to-date, reusing it in this session" ); diff --git a/libsql-server/src/connection/connection_manager.rs b/libsql-server/src/connection/connection_manager.rs index 18e3ee88f8..979a21592d 100644 --- a/libsql-server/src/connection/connection_manager.rs +++ b/libsql-server/src/connection/connection_manager.rs @@ -258,6 +258,7 @@ impl WrapWal for ManagedConnectionWalWrapper { } else { mode }; + tracing::debug!("attempted checkpoint mode: {mode:?}"); let ret = wrapped.checkpoint( db, mode, diff --git a/libsql-server/src/connection/libsql.rs b/libsql-server/src/connection/libsql.rs index 2f793994e5..9001024b82 100644 --- a/libsql-server/src/connection/libsql.rs +++ b/libsql-server/src/connection/libsql.rs @@ -417,8 +417,9 @@ impl Connection { ); unsafe { - extern "C" fn do_nothing(_: *mut c_void, _: c_int) -> c_int { - 1 + const MAX_RETRIES: c_int = 8; + extern "C" fn do_nothing(_: *mut c_void, n: c_int) -> c_int { + (n < MAX_RETRIES) as _ } libsql_sys::ffi::sqlite3_busy_handler( conn.handle(), diff --git a/libsql-server/src/namespace/meta_store.rs b/libsql-server/src/namespace/meta_store.rs index 18be5cf234..ec6546b640 100644 --- a/libsql-server/src/namespace/meta_store.rs +++ b/libsql-server/src/namespace/meta_store.rs @@ -126,6 +126,7 @@ pub async fn metastore_connection_maker( max_batch_interval: config.backup_interval, s3_upload_max_parallelism: 32, s3_max_retries: 10, + skip_snapshot: false, }; let mut replicator = bottomless::replicator::Replicator::with_options( db_path.join("data").to_str().unwrap(), @@ -137,7 +138,7 @@ pub async fn metastore_connection_maker( match action { bottomless::replicator::RestoreAction::SnapshotMainDbFile => { replicator.new_generation().await; - if let Some(_handle) = replicator.snapshot_main_db_file().await? { + if let Some(_handle) = replicator.snapshot_main_db_file(true).await? { tracing::trace!( "got snapshot handle after restore with generation upgrade" ); diff --git a/libsql-server/src/namespace/mod.rs b/libsql-server/src/namespace/mod.rs index e963e71ba5..66d546383c 100644 --- a/libsql-server/src/namespace/mod.rs +++ b/libsql-server/src/namespace/mod.rs @@ -924,7 +924,7 @@ pub async fn init_bottomless_replicator( match action { bottomless::replicator::RestoreAction::SnapshotMainDbFile => { replicator.new_generation().await; - if let Some(_handle) = replicator.snapshot_main_db_file().await? { + if let Some(_handle) = replicator.snapshot_main_db_file(true).await? { tracing::trace!("got snapshot handle after restore with generation upgrade"); } // Restoration process only leaves the local WAL file if it was diff --git a/libsql-server/src/replication/primary/replication_logger_wal.rs b/libsql-server/src/replication/primary/replication_logger_wal.rs index c7f49ce68d..6d7a268a81 100644 --- a/libsql-server/src/replication/primary/replication_logger_wal.rs +++ b/libsql-server/src/replication/primary/replication_logger_wal.rs @@ -121,7 +121,7 @@ impl ReplicationLoggerWalWrapper { self.buffer.clear(); } - pub fn logger(&self) -> Arc { + pub(crate) fn logger(&self) -> Arc { self.logger.clone() } }