Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 11 additions & 2 deletions bottomless/src/bottomless_wal.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::ffi::c_int;
use std::sync::{Arc, Mutex};
use std::time::Instant;

use crate::completion_progress::SavepointTracker;
use libsql_sys::ffi::{SQLITE_BUSY, SQLITE_IOERR_WRITE};
Expand Down Expand Up @@ -91,6 +92,7 @@ impl<T: Wal> WrapWal<T> for BottomlessWalWrapper {
Ok(num_frames)
}

#[tracing::instrument(skip_all, fields(in_wal = in_wal, backfilled = backfilled))]
fn checkpoint(
&mut self,
wrapped: &mut T,
Expand All @@ -104,8 +106,9 @@ impl<T: Wal> WrapWal<T> for BottomlessWalWrapper {
in_wal: Option<&mut i32>,
backfilled: Option<&mut i32>,
) -> Result<()> {
let before = Instant::now();
{
tracing::trace!("bottomless checkpoint");
tracing::trace!("bottomless checkpoint: {mode:?}");

/* In order to avoid partial checkpoints, passive checkpoint
** mode is not allowed. Only TRUNCATE checkpoints are accepted,
Expand Down Expand Up @@ -143,13 +146,15 @@ impl<T: Wal> WrapWal<T> for BottomlessWalWrapper {
);
return Err(Error::new(SQLITE_IOERR_WRITE));
}
tracing::debug!("commited after {:?}", before.elapsed());
if let Err(e) = runtime.block_on(replicator.wait_until_snapshotted()) {
tracing::error!(
"Failed to wait for S3 replicator to confirm database snapshot backup: {}",
e
);
return Err(Error::new(SQLITE_IOERR_WRITE));
}
tracing::debug!("snapshotted after {:?}", before.elapsed());

Ok(())
})??;
Expand All @@ -166,14 +171,16 @@ impl<T: Wal> WrapWal<T> for BottomlessWalWrapper {
backfilled,
)?;

tracing::debug!("underlying checkpoint call after {:?}", before.elapsed());

#[allow(clippy::await_holding_lock)]
// uncontended -> only gets called under a libSQL write lock
{
let runtime = tokio::runtime::Handle::current();
self.try_with_replicator(|replicator| {
if let Err(e) = runtime.block_on(async move {
replicator.new_generation().await;
replicator.snapshot_main_db_file().await
replicator.snapshot_main_db_file(false).await
}) {
tracing::error!("Failed to snapshot the main db file during checkpoint: {e}");
return Err(Error::new(SQLITE_IOERR_WRITE));
Expand All @@ -182,6 +189,8 @@ impl<T: Wal> WrapWal<T> for BottomlessWalWrapper {
})??;
}

tracing::debug!("checkpoint finnished after {:?}", before.elapsed());

Ok(())
}
}
31 changes: 30 additions & 1 deletion bottomless/src/replicator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ pub struct Replicator {
join_set: JoinSet<()>,
upload_progress: Arc<Mutex<CompletionProgress>>,
last_uploaded_frame_no: Receiver<u32>,
skip_snapshot: bool,
}

#[derive(Debug)]
Expand Down Expand Up @@ -113,6 +114,8 @@ pub struct Options {
pub s3_upload_max_parallelism: usize,
/// Max number of retries for S3 operations
pub s3_max_retries: u32,
/// Skip snapshot upload per checkpoint.
pub skip_snapshot: bool,
}

impl Options {
Expand Down Expand Up @@ -202,6 +205,17 @@ impl Options {
other
),
};
let skip_snapshot = match env_var_or("LIBSQL_BOTTOMLESS_SKIP_SNAPSHOT", false)
.to_lowercase()
.as_ref()
{
"yes" | "true" | "1" | "y" | "t" => true,
"no" | "false" | "0" | "n" | "f" => false,
other => bail!(
"Invalid LIBSQL_BOTTOMLESS_SKIP_SNAPSHOT environment variable: {}",
other
),
};
let s3_max_retries = env_var_or("LIBSQL_BOTTOMLESS_S3_MAX_RETRIES", 10).parse::<u32>()?;
let cipher = match encryption_cipher {
Some(cipher) => Cipher::from_str(&cipher)?,
Expand All @@ -226,6 +240,7 @@ impl Options {
region,
bucket_name,
s3_max_retries,
skip_snapshot,
})
}
}
Expand Down Expand Up @@ -386,6 +401,7 @@ impl Replicator {
encryption_config: options.encryption_config,
max_frames_per_batch: options.max_frames_per_batch,
s3_upload_max_parallelism: options.s3_upload_max_parallelism,
skip_snapshot: options.skip_snapshot,
join_set,
upload_progress,
last_uploaded_frame_no,
Expand Down Expand Up @@ -901,7 +917,12 @@ impl Replicator {
// Sends the main database file to S3 - if -wal file is present, it's replicated
// too - it means that the local file was detected to be newer than its remote
// counterpart.
pub async fn snapshot_main_db_file(&mut self) -> Result<Option<JoinHandle<()>>> {
pub async fn snapshot_main_db_file(&mut self, force: bool) -> Result<Option<JoinHandle<()>>> {
if self.skip_snapshot && !force {
tracing::trace!("database snapshot skipped");
let _ = self.snapshot_notifier.send(Ok(self.generation().ok()));
return Ok(None);
}
if !self.main_db_exists_and_not_empty().await {
let generation = self.generation()?;
tracing::debug!(
Expand Down Expand Up @@ -1301,6 +1322,14 @@ impl Replicator {
);
match wal_pages.cmp(&last_consistent_frame) {
std::cmp::Ordering::Equal => {
if local_counter == [0u8; 4] && wal_pages == 0 {
if self.get_dependency(&generation).await?.is_some() {
// empty generation and empty local state, but we have a dependency
// to previous generation: restore required
return Ok(None);
Copy link
Copy Markdown
Contributor Author

@Horusiath Horusiath Mar 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is fairly important and could be counted as a bug fix on its own. It doesn't happen atm. (probably), but in general it could potentially cause DB to restore to empty state if we didn't manage to snapshot DB and WAL log was not present in current generation.

}
}

tracing::info!(
"Remote generation is up-to-date, reusing it in this session"
);
Expand Down
1 change: 1 addition & 0 deletions libsql-server/src/connection/connection_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,7 @@ impl WrapWal<Sqlite3Wal> for ManagedConnectionWalWrapper {
} else {
mode
};
tracing::debug!("attempted checkpoint mode: {mode:?}");
let ret = wrapped.checkpoint(
db,
mode,
Expand Down
5 changes: 3 additions & 2 deletions libsql-server/src/connection/libsql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -417,8 +417,9 @@ impl<W: Wal> Connection<W> {
);

unsafe {
extern "C" fn do_nothing(_: *mut c_void, _: c_int) -> c_int {
1
const MAX_RETRIES: c_int = 8;
extern "C" fn do_nothing(_: *mut c_void, n: c_int) -> c_int {
(n < MAX_RETRIES) as _
}
libsql_sys::ffi::sqlite3_busy_handler(
conn.handle(),
Expand Down
3 changes: 2 additions & 1 deletion libsql-server/src/namespace/meta_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ pub async fn metastore_connection_maker(
max_batch_interval: config.backup_interval,
s3_upload_max_parallelism: 32,
s3_max_retries: 10,
skip_snapshot: false,
};
let mut replicator = bottomless::replicator::Replicator::with_options(
db_path.join("data").to_str().unwrap(),
Expand All @@ -137,7 +138,7 @@ pub async fn metastore_connection_maker(
match action {
bottomless::replicator::RestoreAction::SnapshotMainDbFile => {
replicator.new_generation().await;
if let Some(_handle) = replicator.snapshot_main_db_file().await? {
if let Some(_handle) = replicator.snapshot_main_db_file(true).await? {
tracing::trace!(
"got snapshot handle after restore with generation upgrade"
);
Expand Down
2 changes: 1 addition & 1 deletion libsql-server/src/namespace/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -924,7 +924,7 @@ pub async fn init_bottomless_replicator(
match action {
bottomless::replicator::RestoreAction::SnapshotMainDbFile => {
replicator.new_generation().await;
if let Some(_handle) = replicator.snapshot_main_db_file().await? {
if let Some(_handle) = replicator.snapshot_main_db_file(true).await? {
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we don't want to force snapshot generation after db restore, this should be changed to false.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the benefit in creating a snapshot right after we restored from backup?

Copy link
Copy Markdown
Contributor Author

@Horusiath Horusiath Mar 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd say it's mostly related to forking, as I haven't tested if we don't get data loss if we won't snapshot after forking.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably snapshot explicitly after forking though, and not after other kind of restores

tracing::trace!("got snapshot handle after restore with generation upgrade");
}
// Restoration process only leaves the local WAL file if it was
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ impl ReplicationLoggerWalWrapper {
self.buffer.clear();
}

pub fn logger(&self) -> Arc<ReplicationLogger> {
pub(crate) fn logger(&self) -> Arc<ReplicationLogger> {
self.logger.clone()
}
}
Expand Down