Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,9 @@ dist

# Project files
.db

# WAL artifacts
.journal
*.wal
checkpoint.meta
checkpoint.meta.tmp
4 changes: 3 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ sha1 = "0.11.0-rc.3"
sha2 = "0.11.0-rc.3"
sha3 = "0.11.0-rc.3"

tempfile = "3"
uts-bmt = { path = "crates/bmt" }
uts-contracts = { path = "crates/contracts" }
uts-core = { path = "crates/core" }
Expand Down
21 changes: 16 additions & 5 deletions crates/calendar/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@ use axum::{
use digest::{OutputSizeUser, typenum::Unsigned};
use rocksdb::DB;
use sha3::Keccak256;
use std::{env, sync::Arc};
use std::{env, path::PathBuf, sync::Arc};
use tower_http::{cors, cors::CorsLayer};
use tracing::info;
use uts_calendar::{AppState, routes, shutdown_signal, time};
use uts_contracts::uts::UniversalTimestamps;
use uts_journal::Journal;
use uts_journal::{Journal, JournalConfig, checkpoint::CheckpointConfig};
use uts_stamper::{Stamper, StamperConfig};

const RING_BUFFER_CAPACITY: usize = 1 << 20; // 1 million entries
Expand All @@ -33,8 +33,16 @@ async fn main() -> eyre::Result<()> {
))?;

// journal
// TODO: graceful shutdown
let journal = Journal::with_capacity(RING_BUFFER_CAPACITY);
let journal = Journal::with_capacity_and_config(
RING_BUFFER_CAPACITY,
JournalConfig {
consumer_checkpoint: CheckpointConfig {
path: PathBuf::from("./.journal/.checkpoint"),
..Default::default()
},
wal_dir: PathBuf::from("./.journal"),
},
)?;

let key = MnemonicBuilder::from_phrase(env::var("MNEMONIC")?.as_str())
.index(0u32)?
Expand Down Expand Up @@ -78,7 +86,7 @@ async fn main() -> eyre::Result<()> {
.route("/timestamp/{commitment}", get(routes::ots::get_timestamp))
.with_state(Arc::new(AppState {
signer,
journal,
journal: journal.clone(),
db,
}))
.layer(
Expand All @@ -93,5 +101,8 @@ async fn main() -> eyre::Result<()> {
.with_graceful_shutdown(shutdown_signal())
.await?;

// this will join the journal's background task and ensure flush of all pending commits
journal.shutdown()?;

Ok(())
}
4 changes: 3 additions & 1 deletion crates/journal/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,12 @@ repository.workspace = true
version.workspace = true

[dependencies]
dyn-clone = { workspace = true }
thiserror = { workspace = true }
tracing = { workspace = true }

[dev-dependencies]
eyre = { workspace = true }
tempfile = { workspace = true }
tokio = { workspace = true, features = ["full"] }

[lints]
Expand Down
181 changes: 181 additions & 0 deletions crates/journal/src/checkpoint.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
use std::{
fs,
fs::File,
io,
io::Read,
path::{Path, PathBuf},
sync::{Mutex, atomic::AtomicU64},
time::{Duration, Instant},
};

/// Configuration for checkpointing mechanism.
#[derive(Debug, Clone)]
pub struct CheckpointConfig {
/// Path to the checkpoint file where the last persisted index is stored. This file will be
/// created if it does not exist, and updated atomically when a new checkpoint is flushed to
/// disk.
pub path: PathBuf,
/// Flush interval for checkpointing, used to determine when to flush the persisted checkpoint
/// to disk.
pub min_interval: Duration,
/// Flush threshold for checkpointing, used to determine when to flush the persisted checkpoint
/// to disk based on the number of new entries since the last flush.
pub min_advance: u64,
/// Suffix for temporary checkpoint file when performing checkpointing. The checkpoint will be
/// atomically renamed to the final checkpoint file after flush.
pub temp_suffix: &'static str,
}

impl Default for CheckpointConfig {
fn default() -> Self {
Self {
path: PathBuf::from("checkpoint.meta"),
min_interval: Duration::from_secs(1),
min_advance: 128,
temp_suffix: ".tmp",
}
}
}

/// Checkpointing for tracking `consumed_index`.
#[derive(Debug)]
pub struct Checkpoint {
config: CheckpointConfig,
current: AtomicU64,
inner: Mutex<CheckpointInner>,
}

#[derive(Debug)]
struct CheckpointInner {
temp_path: PathBuf,

/// The index of the last persisted checkpoint. This is updated when a new checkpoint is
/// flushed to disk.
persisted_index: u64,
last_flush_time: Instant,
}

impl Checkpoint {
/// Creates a new checkpoint instance with the given configuration. This will attempt to recover
/// the last persisted checkpoint index from disk, and initialize the internal state accordingly.
#[instrument(skip_all, err)]
pub fn new(config: CheckpointConfig) -> io::Result<Self> {
let parent = config.path.parent().ok_or(io::Error::new(
io::ErrorKind::NotFound,
"parent directory does not exist",
))?;
fs::create_dir_all(parent)?;

let mut inner = CheckpointInner {
temp_path: config.path.with_added_extension(config.temp_suffix),

persisted_index: 0,
last_flush_time: Instant::now(),
};
let recovered = inner.recover(&config)?;

Ok(Self {
config,
current: AtomicU64::new(recovered),
inner: Mutex::new(inner),
})
}

/// Returns the last persisted checkpoint index, which is updated when a new checkpoint is
/// flushed to disk. This requires acquiring the lock on the inner state, and may lag behind
/// the current index until the next flush to disk.
#[instrument(skip(self), ret)]
pub fn persisted_index(&self) -> u64 {
let inner = self.inner.lock().unwrap();
inner.persisted_index
}

/// Returns the current checkpoint index, which may be ahead of the last persisted index.
/// This is updated atomically when `update` is called, and can be read without acquiring the
/// lock on the inner state.
///
/// The persisted index may lag behind the current index until the next flush to disk.
#[instrument(skip(self), ret)]
pub fn current_index(&self) -> u64 {
self.current.load(std::sync::atomic::Ordering::Acquire)
}

/// Updates the current checkpoint index. This will trigger a flush to disk if:
/// - the new index has advanced by at least `min_advance` since the last persisted index
/// - or, the time since the last flush has exceeded `min_interval`.
#[instrument(skip(self), err)]
pub fn update(&self, new_index: u64) -> io::Result<()> {
let mut inner = self.inner.lock().unwrap();
self.current
.store(new_index, std::sync::atomic::Ordering::Release);
inner.update(new_index, &self.config, false)
}

/// Flush the current checkpoint to disk immediately, regardless of the configured flush
/// interval and flush threshold.
#[instrument(skip(self), err)]
pub fn flush(&self) -> io::Result<()> {
let mut inner = self.inner.lock().unwrap();
let new_index = self.current.load(std::sync::atomic::Ordering::Acquire);
inner.update(new_index, &self.config, true)
}
}

impl CheckpointInner {
#[instrument(skip(self), err)]
fn recover(&mut self, config: &CheckpointConfig) -> io::Result<u64> {
// Try to recover from the temp checkpoint file first
if let Ok(index) = recover_from_disk(&self.temp_path) {
self.persisted_index = index;
fs::rename(&self.temp_path, &config.path)?;
} else {
match recover_from_disk(&config.path) {
Ok(index) => self.persisted_index = index,
Err(e) if e.kind() == io::ErrorKind::NotFound => self.persisted_index = 0,
Err(e) => return Err(e),
}
}
Ok(self.persisted_index)
}

fn update(
&mut self,
new_index: u64,
config: &CheckpointConfig,
forced: bool,
) -> io::Result<()> {
if new_index <= self.persisted_index {
warn!(
"New checkpoint index {} is not greater than persisted index {}, skipping update",
new_index, self.persisted_index
);
return Ok(());
}

let now = Instant::now();
let should_flush = new_index - self.persisted_index >= config.min_advance;
let timeouts = now.duration_since(self.last_flush_time) >= config.min_interval;
if forced || should_flush || timeouts {
fs::write(&self.temp_path, &new_index.to_le_bytes())?;
fs::rename(&self.temp_path, &config.path)?;
self.persisted_index = new_index;
self.last_flush_time = now;
}
Ok(())
}
}

fn recover_from_disk(path: &Path) -> io::Result<u64> {
let mut file = File::open(&path)?;
let metadata = file.metadata()?;
if metadata.len() != 8 {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"invalid checkpoint file",
));
}
let mut buf = [0u8; 8];
file.read_exact(&mut buf)?;
let index = u64::from_le_bytes(buf);
Ok(index)
}
22 changes: 10 additions & 12 deletions crates/journal/src/error.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
use std::fmt;

/// Error indicating that the journal buffer is full.
#[derive(Debug)]
pub struct BufferFull;

impl fmt::Display for BufferFull {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "Journal buffer is full")
}
/// Error indicating that the journal buffer is not available now.
#[derive(Debug, thiserror::Error)]
pub enum JournalUnavailable {
/// The journal is shutting down, no new entries can be accepted.
#[error("journal is shutting down")]
Shutdown,
/// The journal buffer is full, new entries cannot be accepted until some entries are consumed
/// and the buffer has space.
#[error("journal buffer is full")]
Full,
}

impl std::error::Error for BufferFull {}
Loading