diff --git a/.gitignore b/.gitignore index 5ae8c34..3e0e929 100644 --- a/.gitignore +++ b/.gitignore @@ -38,3 +38,9 @@ dist # Project files .db + +# WAL artifacts +.journal +*.wal +checkpoint.meta +checkpoint.meta.tmp diff --git a/Cargo.lock b/Cargo.lock index 3a81e53..2b3cacc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6378,9 +6378,11 @@ dependencies = [ name = "uts-journal" version = "0.1.0" dependencies = [ - "dyn-clone", "eyre", + "tempfile", + "thiserror 2.0.17", "tokio", + "tracing", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 3bc242a..5fd53e0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -86,6 +86,7 @@ sha1 = "0.11.0-rc.3" sha2 = "0.11.0-rc.3" sha3 = "0.11.0-rc.3" +tempfile = "3" uts-bmt = { path = "crates/bmt" } uts-contracts = { path = "crates/contracts" } uts-core = { path = "crates/core" } diff --git a/crates/calendar/src/main.rs b/crates/calendar/src/main.rs index 21182a2..6f8fe78 100644 --- a/crates/calendar/src/main.rs +++ b/crates/calendar/src/main.rs @@ -12,12 +12,12 @@ use axum::{ use digest::{OutputSizeUser, typenum::Unsigned}; use rocksdb::DB; use sha3::Keccak256; -use std::{env, sync::Arc}; +use std::{env, path::PathBuf, sync::Arc}; use tower_http::{cors, cors::CorsLayer}; use tracing::info; use uts_calendar::{AppState, routes, shutdown_signal, time}; use uts_contracts::uts::UniversalTimestamps; -use uts_journal::Journal; +use uts_journal::{Journal, JournalConfig, checkpoint::CheckpointConfig}; use uts_stamper::{Stamper, StamperConfig}; const RING_BUFFER_CAPACITY: usize = 1 << 20; // 1 million entries @@ -33,8 +33,16 @@ async fn main() -> eyre::Result<()> { ))?; // journal - // TODO: graceful shutdown - let journal = Journal::with_capacity(RING_BUFFER_CAPACITY); + let journal = Journal::with_capacity_and_config( + RING_BUFFER_CAPACITY, + JournalConfig { + consumer_checkpoint: CheckpointConfig { + path: PathBuf::from("./.journal/.checkpoint"), + ..Default::default() + }, + wal_dir: PathBuf::from("./.journal"), + }, + )?; let key = MnemonicBuilder::from_phrase(env::var("MNEMONIC")?.as_str()) .index(0u32)? @@ -78,7 +86,7 @@ async fn main() -> eyre::Result<()> { .route("/timestamp/{commitment}", get(routes::ots::get_timestamp)) .with_state(Arc::new(AppState { signer, - journal, + journal: journal.clone(), db, })) .layer( @@ -93,5 +101,8 @@ async fn main() -> eyre::Result<()> { .with_graceful_shutdown(shutdown_signal()) .await?; + // this will join the journal's background task and ensure flush of all pending commits + journal.shutdown()?; + Ok(()) } diff --git a/crates/journal/Cargo.toml b/crates/journal/Cargo.toml index c603867..11ae72b 100644 --- a/crates/journal/Cargo.toml +++ b/crates/journal/Cargo.toml @@ -10,10 +10,12 @@ repository.workspace = true version.workspace = true [dependencies] -dyn-clone = { workspace = true } +thiserror = { workspace = true } +tracing = { workspace = true } [dev-dependencies] eyre = { workspace = true } +tempfile = { workspace = true } tokio = { workspace = true, features = ["full"] } [lints] diff --git a/crates/journal/src/checkpoint.rs b/crates/journal/src/checkpoint.rs new file mode 100644 index 0000000..b5108de --- /dev/null +++ b/crates/journal/src/checkpoint.rs @@ -0,0 +1,181 @@ +use std::{ + fs, + fs::File, + io, + io::Read, + path::{Path, PathBuf}, + sync::{Mutex, atomic::AtomicU64}, + time::{Duration, Instant}, +}; + +/// Configuration for checkpointing mechanism. +#[derive(Debug, Clone)] +pub struct CheckpointConfig { + /// Path to the checkpoint file where the last persisted index is stored. This file will be + /// created if it does not exist, and updated atomically when a new checkpoint is flushed to + /// disk. + pub path: PathBuf, + /// Flush interval for checkpointing, used to determine when to flush the persisted checkpoint + /// to disk. + pub min_interval: Duration, + /// Flush threshold for checkpointing, used to determine when to flush the persisted checkpoint + /// to disk based on the number of new entries since the last flush. + pub min_advance: u64, + /// Suffix for temporary checkpoint file when performing checkpointing. The checkpoint will be + /// atomically renamed to the final checkpoint file after flush. + pub temp_suffix: &'static str, +} + +impl Default for CheckpointConfig { + fn default() -> Self { + Self { + path: PathBuf::from("checkpoint.meta"), + min_interval: Duration::from_secs(1), + min_advance: 128, + temp_suffix: ".tmp", + } + } +} + +/// Checkpointing for tracking `consumed_index`. +#[derive(Debug)] +pub struct Checkpoint { + config: CheckpointConfig, + current: AtomicU64, + inner: Mutex, +} + +#[derive(Debug)] +struct CheckpointInner { + temp_path: PathBuf, + + /// The index of the last persisted checkpoint. This is updated when a new checkpoint is + /// flushed to disk. + persisted_index: u64, + last_flush_time: Instant, +} + +impl Checkpoint { + /// Creates a new checkpoint instance with the given configuration. This will attempt to recover + /// the last persisted checkpoint index from disk, and initialize the internal state accordingly. + #[instrument(skip_all, err)] + pub fn new(config: CheckpointConfig) -> io::Result { + let parent = config.path.parent().ok_or(io::Error::new( + io::ErrorKind::NotFound, + "parent directory does not exist", + ))?; + fs::create_dir_all(parent)?; + + let mut inner = CheckpointInner { + temp_path: config.path.with_added_extension(config.temp_suffix), + + persisted_index: 0, + last_flush_time: Instant::now(), + }; + let recovered = inner.recover(&config)?; + + Ok(Self { + config, + current: AtomicU64::new(recovered), + inner: Mutex::new(inner), + }) + } + + /// Returns the last persisted checkpoint index, which is updated when a new checkpoint is + /// flushed to disk. This requires acquiring the lock on the inner state, and may lag behind + /// the current index until the next flush to disk. + #[instrument(skip(self), ret)] + pub fn persisted_index(&self) -> u64 { + let inner = self.inner.lock().unwrap(); + inner.persisted_index + } + + /// Returns the current checkpoint index, which may be ahead of the last persisted index. + /// This is updated atomically when `update` is called, and can be read without acquiring the + /// lock on the inner state. + /// + /// The persisted index may lag behind the current index until the next flush to disk. + #[instrument(skip(self), ret)] + pub fn current_index(&self) -> u64 { + self.current.load(std::sync::atomic::Ordering::Acquire) + } + + /// Updates the current checkpoint index. This will trigger a flush to disk if: + /// - the new index has advanced by at least `min_advance` since the last persisted index + /// - or, the time since the last flush has exceeded `min_interval`. + #[instrument(skip(self), err)] + pub fn update(&self, new_index: u64) -> io::Result<()> { + let mut inner = self.inner.lock().unwrap(); + self.current + .store(new_index, std::sync::atomic::Ordering::Release); + inner.update(new_index, &self.config, false) + } + + /// Flush the current checkpoint to disk immediately, regardless of the configured flush + /// interval and flush threshold. + #[instrument(skip(self), err)] + pub fn flush(&self) -> io::Result<()> { + let mut inner = self.inner.lock().unwrap(); + let new_index = self.current.load(std::sync::atomic::Ordering::Acquire); + inner.update(new_index, &self.config, true) + } +} + +impl CheckpointInner { + #[instrument(skip(self), err)] + fn recover(&mut self, config: &CheckpointConfig) -> io::Result { + // Try to recover from the temp checkpoint file first + if let Ok(index) = recover_from_disk(&self.temp_path) { + self.persisted_index = index; + fs::rename(&self.temp_path, &config.path)?; + } else { + match recover_from_disk(&config.path) { + Ok(index) => self.persisted_index = index, + Err(e) if e.kind() == io::ErrorKind::NotFound => self.persisted_index = 0, + Err(e) => return Err(e), + } + } + Ok(self.persisted_index) + } + + fn update( + &mut self, + new_index: u64, + config: &CheckpointConfig, + forced: bool, + ) -> io::Result<()> { + if new_index <= self.persisted_index { + warn!( + "New checkpoint index {} is not greater than persisted index {}, skipping update", + new_index, self.persisted_index + ); + return Ok(()); + } + + let now = Instant::now(); + let should_flush = new_index - self.persisted_index >= config.min_advance; + let timeouts = now.duration_since(self.last_flush_time) >= config.min_interval; + if forced || should_flush || timeouts { + fs::write(&self.temp_path, &new_index.to_le_bytes())?; + fs::rename(&self.temp_path, &config.path)?; + self.persisted_index = new_index; + self.last_flush_time = now; + } + Ok(()) + } +} + +fn recover_from_disk(path: &Path) -> io::Result { + let mut file = File::open(&path)?; + let metadata = file.metadata()?; + if metadata.len() != 8 { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + "invalid checkpoint file", + )); + } + let mut buf = [0u8; 8]; + file.read_exact(&mut buf)?; + let index = u64::from_le_bytes(buf); + Ok(index) +} diff --git a/crates/journal/src/error.rs b/crates/journal/src/error.rs index 74bdbc1..3355ad5 100644 --- a/crates/journal/src/error.rs +++ b/crates/journal/src/error.rs @@ -1,13 +1,11 @@ -use std::fmt; - -/// Error indicating that the journal buffer is full. -#[derive(Debug)] -pub struct BufferFull; - -impl fmt::Display for BufferFull { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "Journal buffer is full") - } +/// Error indicating that the journal buffer is not available now. +#[derive(Debug, thiserror::Error)] +pub enum JournalUnavailable { + /// The journal is shutting down, no new entries can be accepted. + #[error("journal is shutting down")] + Shutdown, + /// The journal buffer is full, new entries cannot be accepted until some entries are consumed + /// and the buffer has space. + #[error("journal buffer is full")] + Full, } - -impl std::error::Error for BufferFull {} diff --git a/crates/journal/src/lib.rs b/crates/journal/src/lib.rs index be4f093..34c28b3 100644 --- a/crates/journal/src/lib.rs +++ b/crates/journal/src/lib.rs @@ -1,14 +1,18 @@ //! Journal implementation for UTS +#[macro_use] +extern crate tracing; use crate::{ - error::BufferFull, + checkpoint::{Checkpoint, CheckpointConfig}, + error::JournalUnavailable, reader::JournalReader, - wal::{DummyWal, Wal}, + wal::Wal, }; use std::{ cell::UnsafeCell, - fmt, + fmt, io, ops::Deref, + path::PathBuf, pin::Pin, sync::{ Arc, Mutex, @@ -17,6 +21,8 @@ use std::{ task::{Poll, Waker}, }; +/// Checkpointing +pub mod checkpoint; /// Error types. pub mod error; /// Journal reader. @@ -24,17 +30,37 @@ pub mod reader; /// Write-Ahead Log backend. pub mod wal; -/// A journal for storing fixed-size entries in a ring buffer. +/// Configuration for the journal. +#[derive(Debug, Clone)] +pub struct JournalConfig { + /// Configuration for the consumer checkpoint, which tracks the `consumed_index` of the journal. + pub consumer_checkpoint: CheckpointConfig, + /// Directory for the write-ahead log (WAL) backend, which persists committed entries to disk + /// for durability and crash recovery, allowing the journal to recover from crashes without data + /// loss. + pub wal_dir: PathBuf, +} + +impl Default for JournalConfig { + fn default() -> Self { + Self { + consumer_checkpoint: CheckpointConfig::default(), + wal_dir: PathBuf::from("wal"), + } + } +} + +/// An `At-Least-Once` journal for storing fixed-size entries in a ring buffer. /// /// All index here are monotonic u64, wrapping around on overflow. /// /// Following invariants are maintained: -/// `consumed_index` <= `persisted_index` <= `write_index` +/// `consumed_index` <= `persisted_index` <= `filled_index` <= `write_index`. #[derive(Clone)] pub struct Journal { inner: Arc>, /// Wal backend for recovery. - wal: Box, + wal: Wal<{ ENTRY_SIZE }>, } pub(crate) struct JournalInner { @@ -47,9 +73,14 @@ pub(crate) struct JournalInner { /// Mask for indexing into the ring buffer. index_mask: u64, /// Next Write Position, aka: - /// - Total entries written count. + /// - Total entries reserved count. /// - Position to write the next entry to. write_index: AtomicU64, + /// Filled Boundary, aka: + /// - Total entries that have been fully written to the ring buffer. + /// - Advanced in order after each writer finishes copying data into its reserved slot. + /// - The WAL worker uses this (not `write_index`) to determine how far it can safely read. + filled_index: AtomicU64, /// WAL Committed Boundary, aka.: /// - Total committed entries count. /// - Position has not yet been persisted to durable storage. @@ -57,11 +88,13 @@ pub(crate) struct JournalInner { /// Free Boundary, aka.: /// - Total consumed entries count. /// - Position that has not yet been consumed by readers. - consumed_index: AtomicU64, + consumed_checkpoint: Checkpoint, /// Whether a reader has taken ownership of this journal. reader_taken: AtomicBool, /// Waker for the consumer to notify new persisted entries. consumer_wait: Mutex>, + /// Shutdown flag + shutdown: AtomicBool, } unsafe impl Sync for JournalInner {} @@ -77,7 +110,14 @@ impl Journal { /// Create a new journal with the specified capacity. /// /// The capacity will be rounded up to the next power of two. - pub fn with_capacity(capacity: usize) -> Self { + pub fn with_capacity(capacity: usize) -> io::Result { + Self::with_capacity_and_config(capacity, JournalConfig::default()) + } + + /// Create a new journal with the specified capacity. + /// + /// The capacity will be rounded up to the next power of two. + pub fn with_capacity_and_config(capacity: usize, config: JournalConfig) -> io::Result { let capacity = capacity.next_power_of_two(); let index_mask = capacity as u64 - 1; @@ -94,21 +134,23 @@ impl Journal { waker_buffer, index_mask, write_index: AtomicU64::new(0), + filled_index: AtomicU64::new(0), persisted_index: AtomicU64::new(0), - consumed_index: AtomicU64::new(0), + consumed_checkpoint: Checkpoint::new(config.consumer_checkpoint)?, reader_taken: AtomicBool::new(false), consumer_wait: Mutex::new(None), + shutdown: AtomicBool::new(false), }); - let wal = Box::new(DummyWal::new(inner.clone())); + let wal = Wal::new(config.wal_dir, inner.clone())?; - Self { inner, wal } + Ok(Self { inner, wal }) } /// Get the capacity of the journal. #[inline] fn capacity(&self) -> usize { - self.inner.buffer.len() + self.inner.capacity() } /// Acquires a reader for this journal. @@ -135,7 +177,9 @@ impl Journal { /// /// # Panics /// - /// Panics if the journal is full. + /// Panics if: + /// - the journal is full. + /// - the journal is shut down. pub fn commit(&self, data: &[u8; ENTRY_SIZE]) -> CommitFuture<'_, ENTRY_SIZE> { self.try_commit(data).expect("Journal buffer is full") } @@ -147,13 +191,17 @@ impl Journal { pub fn try_commit( &self, data: &[u8; ENTRY_SIZE], - ) -> Result, BufferFull> { + ) -> Result, JournalUnavailable> { + if self.inner.shutdown.load(Ordering::Acquire) { + return Err(JournalUnavailable::Shutdown); + } + let mut current_written = self.inner.write_index.load(Ordering::Relaxed); loop { // 1. Check if there is space in the buffer. - let consumed = self.inner.consumed_index.load(Ordering::Acquire); + let consumed = self.inner.consumed_checkpoint.current_index(); if current_written.wrapping_sub(consumed) >= self.capacity() as u64 { - return Err(BufferFull); + return Err(JournalUnavailable::Full); } // 2. Try to reserve a slot. @@ -172,10 +220,28 @@ impl Journal { let slot = unsafe { &mut *self.data_slot_ptr(current_written) }; slot.copy_from_slice(data); - // 4. Notify WAL worker if needed. + // 4. Publish the filled slot. + // Spin-wait until all prior slots are filled, then advance `filled_index`. + // The Release ordering ensures the slot write above is visible to the WAL worker + // before it reads `filled_index`. + while self + .inner + .filled_index + .compare_exchange_weak( + current_written, + current_written.wrapping_add(1), + Ordering::Release, + Ordering::Relaxed, + ) + .is_err() + { + std::hint::spin_loop(); + } + + // 5. Notify WAL worker if needed. let committed = self.inner.persisted_index.load(Ordering::Relaxed); - // Explain: Before we wrote to the slot, if there is no pending committed entry, - // There's a chance the WAL worker is sleeping, we need to wake it up. + // Explain: If there is no pending committed entry before ours, + // the WAL worker may be sleeping, so we need to wake it up. if current_written == committed { // Notify the WAL worker thread to persist new entries. self.wal.unpark(); @@ -188,6 +254,15 @@ impl Journal { }) } + /// Shut down the journal, flushing all checkpoints and shutting down the WAL. + pub fn shutdown(&self) -> io::Result<()> { + self.inner.shutdown.store(true, Ordering::SeqCst); + + self.inner.consumed_checkpoint.flush()?; + self.wal.shutdown(); + Ok(()) + } + /// Get a mut ptr to the slot at the given index. #[inline] fn data_slot_ptr(&self, index: u64) -> *mut [u8; ENTRY_SIZE] { @@ -202,6 +277,12 @@ impl Journal { } impl JournalInner { + /// Get the capacity of the journal. + #[inline] + fn capacity(&self) -> usize { + self.buffer.len() + } + /// Get a mut ptr to the slot at the given index. #[inline] const fn data_slot_ptr(&self, index: u64) -> *mut [u8; ENTRY_SIZE] { @@ -285,7 +366,7 @@ impl Deref for WakerEntry { #[cfg(test)] pub(crate) mod tests { - use super::*; + use crate::checkpoint::CheckpointConfig; pub const ENTRY_SIZE: usize = 8; pub const TEST_DATA: &[[u8; ENTRY_SIZE]] = &[ @@ -302,9 +383,25 @@ pub(crate) mod tests { ]; pub type Journal = crate::Journal; + /// Create a journal with an isolated temporary directory for WAL and checkpoint files. + /// Returns the journal and the temp dir guard (must be kept alive for the test duration). + pub fn test_journal(capacity: usize) -> (Journal, tempfile::TempDir) { + let tmp = tempfile::tempdir().expect("failed to create temp dir"); + let config = crate::JournalConfig { + consumer_checkpoint: CheckpointConfig { + path: tmp.path().join("checkpoint.meta"), + ..Default::default() + }, + wal_dir: tmp.path().join("wal"), + }; + let journal = + Journal::with_capacity_and_config(capacity, config).expect("failed to create journal"); + (journal, tmp) + } + #[tokio::test(flavor = "current_thread")] - async fn try_reader_is_exclusive() { - let journal = Journal::with_capacity(2); + async fn try_reader_is_exclusive() -> eyre::Result<()> { + let (journal, _tmp) = test_journal(2); let reader = journal.try_reader().unwrap(); @@ -318,11 +415,14 @@ pub(crate) mod tests { journal.try_reader().is_some(), "reader acquisition should succeed after drop" ); + + journal.shutdown()?; + Ok(()) } #[tokio::test(flavor = "current_thread")] async fn commit_and_read_round_trip() -> eyre::Result<()> { - let journal = Journal::with_capacity(4); + let (journal, _tmp) = test_journal(4); let mut reader = journal.reader(); journal.commit(&TEST_DATA[0]).await; @@ -335,14 +435,15 @@ pub(crate) mod tests { assert_eq!(entries[1], TEST_DATA[1]); } - reader.commit(); + reader.commit()?; assert_eq!(reader.available(), 0); + journal.shutdown()?; Ok(()) } #[tokio::test(flavor = "current_thread")] async fn commit_returns_error_when_full() -> eyre::Result<()> { - let journal = Journal::with_capacity(2); + let (journal, _tmp) = test_journal(2); journal.commit(&TEST_DATA[1]).await; journal.commit(&TEST_DATA[2]).await; @@ -350,13 +451,17 @@ pub(crate) mod tests { let err = journal .try_commit(&TEST_DATA[3]) .expect_err("buffer should report full on third commit"); - assert!(matches!(err, BufferFull)); + assert!( + matches!(err, crate::error::JournalUnavailable::Full), + "expected Full, got {err:?}" + ); + journal.shutdown()?; Ok(()) } #[tokio::test(flavor = "current_thread")] async fn reader_handles_wrap_around_reads() -> eyre::Result<()> { - let journal = Journal::with_capacity(4); + let (journal, _tmp) = test_journal(4); let mut reader = journal.reader(); for entry in TEST_DATA.iter().take(4) { @@ -369,7 +474,7 @@ pub(crate) mod tests { assert_eq!(entries[0], TEST_DATA[0]); assert_eq!(entries[1], TEST_DATA[1]); } - reader.commit(); + reader.commit()?; for entry in TEST_DATA.iter().skip(4).take(2) { journal.commit(entry).await; @@ -389,8 +494,9 @@ pub(crate) mod tests { assert_eq!(entries[1], TEST_DATA[5]); } - reader.commit(); + reader.commit()?; assert_eq!(reader.available(), 0); + journal.shutdown()?; Ok(()) } } diff --git a/crates/journal/src/reader.rs b/crates/journal/src/reader.rs index cea3f36..9bf0702 100644 --- a/crates/journal/src/reader.rs +++ b/crates/journal/src/reader.rs @@ -1,6 +1,6 @@ use crate::{ConsumerWait, JournalInner}; use std::{ - fmt, + fmt, io, pin::Pin, sync::{Arc, atomic::Ordering}, task::{Context, Poll}, @@ -30,13 +30,16 @@ impl Drop for JournalReader { impl JournalReader { pub(super) fn new(journal: Arc>) -> Self { - let consumed = journal.consumed_index.load(Ordering::Acquire); + let consumed = journal.consumed_checkpoint.current_index(); Self { journal, consumed } } /// Returns the number of available entries that are settled but not yet consumed by this reader. #[inline] pub fn available(&self) -> usize { + if self.journal.shutdown.load(Ordering::Acquire) { + return 0; + } let persisted = self.journal.persisted_index.load(Ordering::Acquire); persisted.wrapping_sub(self.consumed) as usize } @@ -55,7 +58,7 @@ impl JournalReader { // - didn't commit previously read entries, then asks for more than new entries than the buffer can hold // this is considered a misuse of the API / design flaw in the caller, so we panics let journal_buffer_size = self.journal.buffer.len() as u64; - let current_consumed = self.journal.consumed_index.load(Ordering::Acquire); + let current_consumed = self.journal.consumed_checkpoint.current_index(); let max_possible_target = current_consumed.wrapping_add(journal_buffer_size); if target_index > max_possible_target { panic!( @@ -132,22 +135,19 @@ impl JournalReader { } /// Commit current consumed index. - pub fn commit(&mut self) { - self.journal - .consumed_index - .store(self.consumed, Ordering::Release); + pub fn commit(&mut self) -> io::Result<()> { + self.journal.consumed_checkpoint.update(self.consumed) } } #[cfg(test)] mod tests { use crate::tests::*; - use std::sync::atomic::Ordering; use tokio::time::{Duration, sleep, timeout}; #[tokio::test(flavor = "current_thread")] async fn available_tracks_persisted_entries() -> eyre::Result<()> { - let journal = Journal::with_capacity(4); + let (journal, _tmp) = test_journal(4); let mut reader = journal.reader(); assert_eq!(reader.available(), 0); @@ -161,12 +161,13 @@ mod tests { let slice = reader.read(1); assert_eq!(slice.len(), 1); assert_eq!(reader.available(), 1); + journal.shutdown()?; Ok(()) } #[tokio::test(flavor = "current_thread")] async fn commit_updates_shared_consumed_boundary() -> eyre::Result<()> { - let journal = Journal::with_capacity(4); + let (journal, _tmp) = test_journal(4); let mut reader = journal.reader(); for entry in TEST_DATA.iter().take(3) { @@ -177,19 +178,20 @@ mod tests { assert_eq!(slice.len(), 2); assert_eq!(reader.available(), 1); assert_eq!( - reader.journal.consumed_index.load(Ordering::Acquire), + reader.journal.consumed_checkpoint.current_index(), 0, "global consumed boundary should not advance before commit", ); - reader.commit(); - assert_eq!(reader.journal.consumed_index.load(Ordering::Acquire), 2); + reader.commit()?; + assert_eq!(reader.journal.consumed_checkpoint.current_index(), 2); + journal.shutdown()?; Ok(()) } #[tokio::test(flavor = "current_thread")] async fn wait_at_least_resumes_after_persistence() -> eyre::Result<()> { - let journal = Journal::with_capacity(4); + let (journal, _tmp) = test_journal(4); let mut reader = journal.reader(); let journal_clone = journal.clone(); @@ -202,12 +204,13 @@ mod tests { assert_eq!(reader.available(), 1); task.await?; + journal.shutdown()?; Ok(()) } #[tokio::test(flavor = "current_thread")] async fn wait_at_least_waits_for_correct_count() -> eyre::Result<()> { - let journal = Journal::with_capacity(4); + let (journal, _tmp) = test_journal(4); let mut reader = journal.reader(); let journal_clone = journal.clone(); @@ -218,10 +221,11 @@ mod tests { } }); - timeout(Duration::from_secs(1), reader.wait_at_least(3)).await?; + timeout(Duration::from_secs(10), reader.wait_at_least(3)).await?; assert!(reader.available() >= 3); task.await?; + journal.shutdown()?; Ok(()) } @@ -230,7 +234,7 @@ mod tests { expected = "requested (5) exceeds max possible (4): journal.buffer.len()=4, journal.consumed_index=0" )] async fn wait_at_least_exceeds_buffer_size() { - let journal = Journal::with_capacity(4); + let (journal, _tmp) = test_journal(4); let mut reader = journal.reader(); timeout(Duration::from_secs(1), reader.wait_at_least(5)) @@ -243,7 +247,7 @@ mod tests { expected = "requested (5) exceeds max possible (4): journal.buffer.len()=4, journal.consumed_index=0" )] async fn wait_at_least_dirty_read_exceeds_available() { - let journal = Journal::with_capacity(4); + let (journal, _tmp) = test_journal(4); journal.commit(&TEST_DATA[0]).await; let mut reader = journal.reader(); diff --git a/crates/journal/src/wal.rs b/crates/journal/src/wal.rs index 392f13a..97666b6 100644 --- a/crates/journal/src/wal.rs +++ b/crates/journal/src/wal.rs @@ -1,115 +1,856 @@ use crate::JournalInner; -use dyn_clone::DynClone; use std::{ - sync::{Arc, atomic::Ordering}, + fmt, + fmt::Formatter, + fs, + fs::{File, OpenOptions}, + io, + io::{BufWriter, Read, Seek, SeekFrom, Write}, + path::{Path, PathBuf}, + sync::{ + Arc, Mutex, + atomic::{AtomicBool, Ordering}, + }, thread, thread::JoinHandle, - time::Duration, }; -const MAX_SPINS: usize = 10_000; -const IO_BATCH_LIMIT: u64 = 128; +const MAX_SPIN: usize = 100; +const MAX_IO_BATCH: u64 = 128; -/// Write-Ahead Log Trait +/// Write-Ahead Log /// /// Busy-Wait + Parking when there's no work to do. -pub trait Wal: DynClone + Send + Sync + 'static { - /// Unpark the WAL worker thread to notify new data is available. - fn unpark(&self); - /// Shutdown the WAL worker thread. - fn shutdown(&self) { - // Default implementation does nothing. - // Specific implementations can override this method to provide shutdown logic. - } -} - -dyn_clone::clone_trait_object!(Wal); - +/// +/// Using segmented log files named as `{base_dir}/{segment_id}.wal`, where `segment_id` is a +/// monotonically increasing integer. +/// +/// Each segment file contains a fixed number of entries +/// (at least to be the size of the journal buffer) to simplify recovery and management. #[derive(Clone)] -pub(crate) struct DummyWal { - worker: Arc>, +pub struct Wal { + inner: Arc>, } -struct WalWorker { +struct WalInner { + worker: Mutex>>, journal: Arc>, + shutdown_flag: Arc, } -impl DummyWal { - pub(crate) fn new(journal: Arc>) -> Self { - let worker = WalWorker { journal }; - let worker = thread::spawn(move || { - worker.run(); - }); - Self { - worker: Arc::new(worker), +impl Drop for WalInner { + fn drop(&mut self) { + // Signal the WAL worker to exit if it hasn't been shut down yet. + // This prevents orphaned worker threads from spinning after the journal is dropped. + if !self.shutdown_flag.swap(true, Ordering::AcqRel) { + if let Some(worker) = self.worker.lock().unwrap().as_ref() { + worker.thread().unpark(); + } } } } -impl Wal for DummyWal { - fn unpark(&self) { - self.worker.thread().unpark(); +impl fmt::Debug for Wal { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + f.debug_struct("Wal") + .field( + "write_index", + &self.inner.journal.write_index.load(Ordering::Acquire), + ) + .field( + "persisted_index", + &self.inner.journal.persisted_index.load(Ordering::Acquire), + ) + .finish() } } -impl WalWorker { - fn run(self) { - let Self { journal } = self; +impl Wal { + /// Create a new WAL instance with the given base directory for storing log segments and + /// a reference to the journal. This will recover existing segments from the base directory, + /// and start a background worker thread to handle persistence of log entries. + pub(crate) fn new>( + base_dir: P, + journal: Arc>, + ) -> io::Result { + let base_dir = base_dir.as_ref(); + fs::create_dir_all(base_dir)?; + if !base_dir.is_dir() { + return Err(io::Error::new( + io::ErrorKind::Other, + "Base path is not a directory", + )); + } + + let shutdown_flag = Arc::new(AtomicBool::new(false)); + + let current_segment_id = recover(base_dir, &journal)?; + let mut current_file = open_segment_file(base_dir, current_segment_id)?; + current_file.seek(SeekFrom::End(0))?; + + let worker = WalWorker { + current_segment_id, + current_file: BufWriter::new(current_file), + persisted_index: journal.persisted_index.load(Ordering::Acquire), - let mut persisted = 0; - let mut spins = 0; + base_dir: base_dir.to_path_buf(), + journal: journal.clone(), + shutdown_flag: shutdown_flag.clone(), + }; - loop { - let written = journal.write_index.load(Ordering::Acquire); - let available = written.wrapping_sub(persisted); + let handle = thread::Builder::new() + .name("wal-worker".to_string()) + .spawn(move || { + let mut worker = worker; + worker.run() + }) + .expect("Failed to spawn WAL worker thread"); + + let inner = WalInner { + worker: Mutex::new(Some(handle)), + journal, + shutdown_flag, + }; + + Ok(Self { + inner: Arc::new(inner), + }) + } + + /// Unpark the WAL worker thread to wake it up if it's parked. + /// This should be called after new entries are written to the journal, to ensure that the + /// worker thread can persist them in a timely manner. + pub fn unpark(&self) { + if self.inner.shutdown_flag.load(Ordering::Acquire) { + return; + } + self.inner + .worker + .lock() + .unwrap() + .as_ref() + .expect("WAL worker thread should be running") + .thread() + .unpark(); + } + + /// Shut down the WAL worker thread gracefully. This will set the shutdown flag, unpark the + /// worker thread if it's parked, and wait for it to finish. + pub fn shutdown(&self) { + if self.inner.shutdown_flag.swap(true, Ordering::AcqRel) { + // already shutdown + return; + } + let worker = self + .inner + .worker + .lock() + .unwrap() + .take() + .expect("WAL worker thread should be running"); + worker.thread().unpark(); + worker.join().expect("Failed to join WAL worker thread"); + } +} + +struct WalWorker { + current_segment_id: u64, + current_file: BufWriter, + persisted_index: u64, + + base_dir: PathBuf, + journal: Arc>, + shutdown_flag: Arc, +} + +impl WalWorker { + fn run(&mut self) { + let mut spin_count = 0; + + while !self.shutdown_flag.load(Ordering::Acquire) { + let filled = self.journal.filled_index.load(Ordering::Acquire); + let available = filled.wrapping_sub(self.persisted_index); if available > 0 { - // reset spins counter - spins = 0; - - // take as much as we can, limited by IO_BATCH_LIMIT - let batch_size = available.min(IO_BATCH_LIMIT); - let target_index = persisted + batch_size; - - // simulate IO - // TODO: replace with real IO - thread::sleep(Duration::from_millis(1)); - - // notify waiters only after data is "persisted" - for i in persisted..target_index { - let entry = journal.waker_slot(i); - if let Some(waker) = entry.lock().unwrap().take() { - waker.wake(); - } - } - - // update persisted index - persisted = target_index; - journal.persisted_index.store(persisted, Ordering::Release); - - // notify consumer if needed - let mut guard = journal.consumer_wait.lock().unwrap(); - if let Some(wait) = guard.as_ref() { - // Only wake if the target_index is reached - if persisted >= wait.target_index { - guard.take().unwrap().waker.wake(); - } - } + spin_count = 0; + + let new_persisted_index = self + .write(available.min(MAX_IO_BATCH)) + .expect("Failed to write WAL entries"); + + self.notify_writer(new_persisted_index); + self.update_index(new_persisted_index); + self.notify_consumer(); continue; } - // busy-wait + park - if spins < MAX_SPINS { - // busy-wait + // no new data to persist, spin for a while before parking + if spin_count <= MAX_SPIN { + spin_count += 1; std::hint::spin_loop(); - spins += 1; - } else { - // park the thread - thread::park(); - // reset spins counter on wake - spins = 0; + continue; + } + + // park until unparked by a new commit + thread::park(); + } + + // cleanup before exiting: persist any remaining entries + let filled = self.journal.filled_index.load(Ordering::Acquire); + let available = filled.wrapping_sub(self.persisted_index); + let new_persisted_index = self + .write(available.min(MAX_IO_BATCH)) + .expect("Failed to write WAL entries"); + + self.notify_writer(new_persisted_index); + self.update_index(new_persisted_index); + self.notify_consumer(); + } + + /// update the persisted index + fn update_index(&mut self, new_persisted_index: u64) { + self.persisted_index = new_persisted_index; + self.journal + .persisted_index + .store(self.persisted_index, Ordering::Release); + } + + /// Write `n` entries from the journal to the current WAL segment file, rotating files as needed. + fn write(&mut self, n: u64) -> io::Result { + let segment_size: u64 = self.journal.capacity() as u64; + let new_persisted_index = self.persisted_index + n; + + // write ALL available entries to segment files, rotating files as needed + for i in self.persisted_index..new_persisted_index { + let seg_id = i / segment_size; + if seg_id != self.current_segment_id { + // rotate to new segment file + self.current_segment_id = seg_id; + let new_file = open_segment_file(&self.base_dir, seg_id)?; + self.current_file = BufWriter::new(new_file); + let base_dir = self.base_dir.clone(); + thread::spawn(move || truncate_old_segments(base_dir, seg_id)); + } + + self.current_file + .write_all(unsafe { &*self.journal.data_slot_ptr(i) })?; + } + + self.current_file.flush()?; + self.current_file.get_ref().sync_all()?; + Ok(new_persisted_index) + } + + fn notify_writer(&mut self, new_persisted_index: u64) { + // notify waiters only after data is persisted + for i in self.persisted_index..new_persisted_index { + let entry = self.journal.waker_slot(i); + if let Some(waker) = entry.lock().unwrap().take() { + waker.wake(); + } + } + } + + fn notify_consumer(&mut self) { + // notify consumer if needed + let mut guard = self.journal.consumer_wait.lock().unwrap(); + if let Some(wait) = guard.as_ref() { + // Only wake if the new_persisted_index is reached + if self.persisted_index >= wait.target_index { + guard.take().unwrap().waker.wake(); } } } } + +fn recover( + base_dir: &Path, + journal: &JournalInner, +) -> io::Result { + let mut segments = scan_segments(base_dir)?; + if segments.is_empty() { + info!("No WAL segments found, starting fresh"); + return Ok(0); + } + + // remove the last segment for recovery, as it may be incomplete + let last_segment_id = segments.pop().expect("segments is not empty"); + + let segment_size = journal.capacity() as u64; + let complete_segment_size = segment_size * ENTRY_SIZE as u64; + for segment_id in segments.iter().copied() { + let path = base_dir.join(format_segment_file_name(segment_id)); + let metadata = fs::metadata(&path)?; + let file_size = metadata.len(); + + if file_size != complete_segment_size { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + format!("Incomplete WAL segment: {}", path.display()), + )); + } + } + + // handle last segment, which may be partially written + let path = base_dir.join(format_segment_file_name(last_segment_id)); + let metadata = fs::metadata(&path)?; + let file_size = metadata.len(); + let valid_count = file_size / ENTRY_SIZE as u64; + if file_size % ENTRY_SIZE as u64 != 0 { + warn!("Detected partial write in last segment #{last_segment_id}. Truncating."); + let f = OpenOptions::new().write(true).open(&path)?; + f.set_len(valid_count * ENTRY_SIZE as u64)?; + f.sync_all()?; + } + + let write_index = if last_segment_id == 0 { + valid_count + } else { + last_segment_id * segment_size + valid_count + }; + // consumed_checkpoint just recovered. + let consumed_index = journal.consumed_checkpoint.persisted_index(); + // Data loss happens, don't continue to recover, as it may cause more damage. + // User intervention is needed to fix the issue. + if consumed_index > write_index { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + format!( + "Consumed index {consumed_index} is greater than recovered write index {write_index}" + ), + )); + } + + journal.write_index.store(write_index, Ordering::Relaxed); + journal.filled_index.store(write_index, Ordering::Relaxed); + journal + .persisted_index + .store(write_index, Ordering::Relaxed); + info!("WAL Recovered: write_index={write_index}, consumed_index={consumed_index}"); + + // replay data to ring buffer + replay_data(&base_dir, journal)?; + Ok(last_segment_id) +} + +fn scan_segments(base_dir: &Path) -> io::Result> { + let mut segments: Vec = Vec::new(); + + let entries = fs::read_dir(&base_dir)?; + + for entry in entries { + let entry = entry?; + let file_type = entry.file_type()?; + if !file_type.is_file() { + continue; + } + + let file_name = entry.file_name(); + let Some(file_name) = file_name.to_str().and_then(|n| n.strip_suffix(".wal")) else { + continue; + }; + + let Ok(id) = file_name.parse::() else { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + format!("Invalid WAL file name: {}", entry.path().display()), + )); + }; + segments.push(id); + } + segments.sort_unstable(); + + Ok(segments) +} + +fn replay_data( + base_dir: &Path, + journal: &JournalInner<{ ENTRY_SIZE }>, +) -> io::Result<()> { + let segment_size = journal.capacity() as u64; + let entry_size = ENTRY_SIZE as u64; + + let write_index = journal.write_index.load(Ordering::Relaxed); + let consumed_index = journal.consumed_checkpoint.persisted_index(); + + let mut current_file: Option = None; + let mut current_seg_id: u64 = u64::MAX; + + for idx in consumed_index..write_index { + let seg_id = idx / segment_size; + + if seg_id != current_seg_id { + current_file = Some(open_segment_file(&base_dir, seg_id)?); + current_seg_id = seg_id; + } + + let offset = (idx % segment_size) * entry_size; + let slot_ptr = journal.data_slot_ptr(idx); + + if let Some(ref mut f) = current_file { + f.seek(SeekFrom::Start(offset))?; + let buffer = unsafe { &mut *slot_ptr }; + f.read_exact(buffer)?; + } + } + Ok(()) +} + +#[inline] +fn format_segment_file_name(segment_id: u64) -> String { + format!("{segment_id:012}.wal") +} + +fn open_segment_file(base_dir: &Path, segment_id: u64) -> io::Result { + let path = base_dir.join(format_segment_file_name(segment_id)); + OpenOptions::new() + .read(true) + .write(true) + .create(true) + .open(path) +} + +#[instrument(err)] +fn truncate_old_segments(base_dir: PathBuf, current_segment_id: u64) -> io::Result<()> { + let Some(to_delete) = current_segment_id.checked_sub(2) else { + // not segments to truncate + return Ok(()); + }; + let path = base_dir.join(format_segment_file_name(to_delete)); + if path.exists() { + fs::remove_file(path)?; + } + Ok(()) +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::{ + JournalConfig, + checkpoint::CheckpointConfig, + tests::{ENTRY_SIZE, TEST_DATA, test_journal}, + }; + use std::sync::atomic::Ordering; + + type Journal = crate::Journal; + + /// Helper: create a journal config pointing at the given temp directory. + fn test_config(tmp: &Path) -> JournalConfig { + JournalConfig { + consumer_checkpoint: CheckpointConfig { + path: tmp.join("checkpoint.meta"), + ..Default::default() + }, + wal_dir: tmp.join("wal"), + } + } + + // ── Segment file helpers ───────────────────────────────────────────── + + #[test] + fn format_segment_file_name_pads_to_twelve_digits() { + assert_eq!(format_segment_file_name(0), "000000000000.wal"); + assert_eq!(format_segment_file_name(1), "000000000001.wal"); + assert_eq!( + format_segment_file_name(999_999_999_999), + "999999999999.wal" + ); + } + + #[test] + fn open_segment_file_creates_file() -> io::Result<()> { + let tmp = tempfile::tempdir()?; + let f = open_segment_file(tmp.path(), 0)?; + assert!(tmp.path().join("000000000000.wal").exists()); + drop(f); + Ok(()) + } + + // ── scan_segments ──────────────────────────────────────────────────── + + #[test] + fn scan_segments_empty_directory() -> io::Result<()> { + let tmp = tempfile::tempdir()?; + let segments = scan_segments(tmp.path())?; + assert!(segments.is_empty()); + Ok(()) + } + + #[test] + fn scan_segments_returns_sorted_ids() -> io::Result<()> { + let tmp = tempfile::tempdir()?; + // Create files out of order. + File::create(tmp.path().join("000000000002.wal"))?; + File::create(tmp.path().join("000000000000.wal"))?; + File::create(tmp.path().join("000000000001.wal"))?; + + let segments = scan_segments(tmp.path())?; + assert_eq!(segments, vec![0, 1, 2]); + Ok(()) + } + + #[test] + fn scan_segments_skips_non_wal_files() -> io::Result<()> { + let tmp = tempfile::tempdir()?; + File::create(tmp.path().join("000000000000.wal"))?; + File::create(tmp.path().join("readme.txt"))?; + + let segments = scan_segments(tmp.path())?; + assert_eq!(segments, vec![0]); + Ok(()) + } + + #[test] + fn scan_segments_rejects_invalid_wal_names() -> io::Result<()> { + let tmp = tempfile::tempdir()?; + File::create(tmp.path().join("not_a_number.wal"))?; + + let err = scan_segments(tmp.path()).unwrap_err(); + assert_eq!(err.kind(), io::ErrorKind::InvalidData); + Ok(()) + } + + // ── truncate_old_segments ──────────────────────────────────────────── + + #[test] + fn truncate_old_segments_removes_two_behind() -> io::Result<()> { + let tmp = tempfile::tempdir()?; + File::create(tmp.path().join(format_segment_file_name(0)))?; + File::create(tmp.path().join(format_segment_file_name(1)))?; + File::create(tmp.path().join(format_segment_file_name(2)))?; + + truncate_old_segments(tmp.path().to_path_buf(), 2)?; + assert!( + !tmp.path().join(format_segment_file_name(0)).exists(), + "segment 0 should be deleted when current is 2" + ); + assert!(tmp.path().join(format_segment_file_name(1)).exists()); + assert!(tmp.path().join(format_segment_file_name(2)).exists()); + Ok(()) + } + + #[test] + fn truncate_old_segments_noop_when_too_few() -> io::Result<()> { + let tmp = tempfile::tempdir()?; + File::create(tmp.path().join(format_segment_file_name(0)))?; + File::create(tmp.path().join(format_segment_file_name(1)))?; + + // current_segment_id = 1, checked_sub(2) underflows → noop + truncate_old_segments(tmp.path().to_path_buf(), 1)?; + assert!(tmp.path().join(format_segment_file_name(0)).exists()); + assert!(tmp.path().join(format_segment_file_name(1)).exists()); + Ok(()) + } + + // ── WAL persistence end-to-end ─────────────────────────────────────── + + #[tokio::test(flavor = "current_thread")] + async fn wal_persists_entries_to_segment_file() -> eyre::Result<()> { + let (journal, tmp) = test_journal(4); + + journal.commit(&TEST_DATA[0]).await; + journal.commit(&TEST_DATA[1]).await; + + // WAL file should exist and contain the two entries. + let wal_path = tmp.path().join("wal").join(format_segment_file_name(0)); + assert!(wal_path.exists(), "WAL segment file should exist"); + + let data = fs::read(&wal_path)?; + assert_eq!( + data.len(), + ENTRY_SIZE * 2, + "segment should contain exactly 2 entries" + ); + assert_eq!(&data[..ENTRY_SIZE], &TEST_DATA[0]); + assert_eq!(&data[ENTRY_SIZE..ENTRY_SIZE * 2], &TEST_DATA[1]); + Ok(()) + } + + #[tokio::test(flavor = "current_thread")] + async fn wal_segment_rotation() -> eyre::Result<()> { + // Capacity 4 → segment_size = 4 entries per segment. + let (journal, tmp) = test_journal(4); + let mut reader = journal.reader(); + + // Fill first segment (4 entries). + for entry in TEST_DATA.iter().take(4) { + journal.commit(entry).await; + } + + // Consume some entries to free ring buffer space. + reader.read(4); + reader.commit()?; + + // Write one more entry, causing rotation to segment 1. + journal.commit(&TEST_DATA[4]).await; + + let seg0 = tmp.path().join("wal").join(format_segment_file_name(0)); + let seg1 = tmp.path().join("wal").join(format_segment_file_name(1)); + assert!(seg0.exists(), "segment 0 should exist"); + assert!(seg1.exists(), "segment 1 should exist after rotation"); + + let seg1_data = fs::read(&seg1)?; + assert_eq!(seg1_data.len(), ENTRY_SIZE); + assert_eq!(&seg1_data[..ENTRY_SIZE], &TEST_DATA[4]); + Ok(()) + } + + // ── Recovery ───────────────────────────────────────────────────────── + + #[tokio::test(flavor = "current_thread")] + async fn recover_from_empty_dir() -> eyre::Result<()> { + let tmp = tempfile::tempdir()?; + let config = test_config(tmp.path()); + + // First journal writes nothing. + let journal = Journal::with_capacity_and_config(4, config.clone())?; + journal.shutdown()?; + drop(journal); + + // Second journal recovers with indices at 0. + let journal = Journal::with_capacity_and_config(4, config)?; + assert_eq!(journal.inner.write_index.load(Ordering::Acquire), 0); + assert_eq!(journal.inner.persisted_index.load(Ordering::Acquire), 0); + journal.shutdown()?; + Ok(()) + } + + #[tokio::test(flavor = "current_thread")] + async fn recover_replays_data_into_ring_buffer() -> eyre::Result<()> { + let tmp = tempfile::tempdir()?; + let config = test_config(tmp.path()); + + // Write entries and shut down. + { + let journal = Journal::with_capacity_and_config(4, config.clone())?; + journal.commit(&TEST_DATA[0]).await; + journal.commit(&TEST_DATA[1]).await; + journal.commit(&TEST_DATA[2]).await; + journal.shutdown()?; + } + + // Recover and verify data via reader. + { + let journal = Journal::with_capacity_and_config(4, config)?; + assert_eq!(journal.inner.write_index.load(Ordering::Acquire), 3); + assert_eq!(journal.inner.persisted_index.load(Ordering::Acquire), 3); + + let mut reader = journal.reader(); + let entries = reader.read(3); + assert_eq!(entries.len(), 3); + assert_eq!(entries[0], TEST_DATA[0]); + assert_eq!(entries[1], TEST_DATA[1]); + assert_eq!(entries[2], TEST_DATA[2]); + journal.shutdown()?; + } + Ok(()) + } + + #[tokio::test(flavor = "current_thread")] + async fn recover_respects_consumed_checkpoint() -> eyre::Result<()> { + let tmp = tempfile::tempdir()?; + let config = test_config(tmp.path()); + + // Write entries, consume some, then shut down. + { + let journal = Journal::with_capacity_and_config(4, config.clone())?; + journal.commit(&TEST_DATA[0]).await; + journal.commit(&TEST_DATA[1]).await; + journal.commit(&TEST_DATA[2]).await; + let mut reader = journal.reader(); + reader.read(2); + reader.commit()?; + journal.shutdown()?; + } + + // Recover - reader should only see unconsumed entries. + { + let journal = Journal::with_capacity_and_config(4, config)?; + assert_eq!(journal.inner.write_index.load(Ordering::Acquire), 3); + let mut reader = journal.reader(); + let entries = reader.read(10); + assert_eq!(entries.len(), 1, "only 1 unconsumed entry should remain"); + assert_eq!(entries[0], TEST_DATA[2]); + journal.shutdown()?; + } + Ok(()) + } + + #[test] + fn recover_truncates_partial_last_segment() -> io::Result<()> { + let tmp = tempfile::tempdir()?; + let wal_dir = tmp.path().join("wal"); + fs::create_dir_all(&wal_dir)?; + + // Write 2.5 entries worth of data (partial entry at end). + let mut data = Vec::new(); + data.extend_from_slice(&TEST_DATA[0]); + data.extend_from_slice(&TEST_DATA[1]); + data.extend_from_slice(&[0xFF; ENTRY_SIZE / 2]); // partial write + fs::write(wal_dir.join(format_segment_file_name(0)), &data)?; + + let config = test_config(tmp.path()); + let journal = Journal::with_capacity_and_config(4, config).unwrap(); + + // Should recover 2 valid entries (truncated partial write). + assert_eq!(journal.inner.write_index.load(Ordering::Acquire), 2); + assert_eq!(journal.inner.persisted_index.load(Ordering::Acquire), 2); + + // Verify truncated file size. + let file_size = fs::metadata(wal_dir.join(format_segment_file_name(0)))?.len(); + assert_eq!(file_size, (ENTRY_SIZE * 2) as u64); + journal.shutdown().unwrap(); + Ok(()) + } + + #[test] + fn recover_detects_incomplete_non_last_segment() { + let tmp = tempfile::tempdir().unwrap(); + let wal_dir = tmp.path().join("wal"); + fs::create_dir_all(&wal_dir).unwrap(); + + // Segment 0 is incomplete (not full), segment 1 exists. + // For capacity 4, a full segment should be 4 * ENTRY_SIZE bytes. + fs::write( + wal_dir.join(format_segment_file_name(0)), + &[0u8; ENTRY_SIZE * 2], + ) + .unwrap(); + fs::write(wal_dir.join(format_segment_file_name(1)), &[0u8; 0]).unwrap(); + + let config = test_config(tmp.path()); + let err = Journal::with_capacity_and_config(4, config).unwrap_err(); + assert_eq!(err.kind(), io::ErrorKind::InvalidData); + } + + #[test] + fn recover_detects_consumed_ahead_of_written() { + let tmp = tempfile::tempdir().unwrap(); + let wal_dir = tmp.path().join("wal"); + fs::create_dir_all(&wal_dir).unwrap(); + + // Write a checkpoint claiming index 10 consumed... + let checkpoint_path = tmp.path().join("checkpoint.meta"); + fs::write(&checkpoint_path, &10u64.to_le_bytes()).unwrap(); + + // ...but WAL only has 2 entries. + let mut data = Vec::new(); + data.extend_from_slice(&TEST_DATA[0]); + data.extend_from_slice(&TEST_DATA[1]); + fs::write(wal_dir.join(format_segment_file_name(0)), &data).unwrap(); + + let config = JournalConfig { + consumer_checkpoint: CheckpointConfig { + path: checkpoint_path, + ..Default::default() + }, + wal_dir, + }; + let err = Journal::with_capacity_and_config(4, config).unwrap_err(); + assert_eq!(err.kind(), io::ErrorKind::InvalidData); + } + + // ── Shutdown ───────────────────────────────────────────────────────── + + #[tokio::test(flavor = "current_thread")] + async fn shutdown_rejects_new_commits() -> eyre::Result<()> { + let (journal, _tmp) = test_journal(4); + + journal.commit(&TEST_DATA[0]).await; + journal.shutdown()?; + + let err = journal + .try_commit(&TEST_DATA[1]) + .expect_err("commit after shutdown should fail"); + assert!( + matches!(err, crate::error::JournalUnavailable::Shutdown), + "expected Shutdown, got {err:?}" + ); + Ok(()) + } + + #[tokio::test(flavor = "current_thread")] + async fn shutdown_is_idempotent() -> eyre::Result<()> { + let (journal, _tmp) = test_journal(4); + journal.commit(&TEST_DATA[0]).await; + journal.shutdown()?; + journal.shutdown()?; + Ok(()) + } + + // ── WAL worker wakes on commit ─────────────────────────────────────── + + #[tokio::test(flavor = "current_thread")] + async fn commit_future_resolves_after_wal_persistence() -> eyre::Result<()> { + let (journal, _tmp) = test_journal(4); + + // The commit future should only resolve after the WAL worker persists. + journal.commit(&TEST_DATA[0]).await; + + // If we got here, the persisted_index must have advanced. + assert!( + journal.inner.persisted_index.load(Ordering::Acquire) >= 1, + "persisted_index should be >= 1 after commit future resolves" + ); + Ok(()) + } + + #[tokio::test(flavor = "current_thread")] + async fn multiple_commits_advance_persisted_index() -> eyre::Result<()> { + let (journal, _tmp) = test_journal(4); + + for i in 0..4 { + journal.commit(&TEST_DATA[i]).await; + } + + assert_eq!(journal.inner.persisted_index.load(Ordering::Acquire), 4); + Ok(()) + } + + // ── Recovery across segments ───────────────────────────────────────── + + #[tokio::test(flavor = "current_thread")] + async fn recover_across_segment_rotation() -> eyre::Result<()> { + let tmp = tempfile::tempdir()?; + let config = test_config(tmp.path()); + + // Fill more than one segment. Capacity 4 → segment_size = 4. + { + let journal = Journal::with_capacity_and_config(4, config.clone())?; + let mut reader = journal.reader(); + + // Fill first segment. + for entry in TEST_DATA.iter().take(4) { + journal.commit(entry).await; + } + + // Consume to free ring buffer. + reader.read(4); + reader.commit()?; + + // Write into second segment. + journal.commit(&TEST_DATA[4]).await; + journal.commit(&TEST_DATA[5]).await; + + // Consume second batch. + reader.read(2); + reader.commit()?; + + journal.shutdown()?; + } + + // Recover and write more. + { + let journal = Journal::with_capacity_and_config(4, config)?; + assert_eq!(journal.inner.write_index.load(Ordering::Acquire), 6); + + // Should be able to write new entries. + journal.commit(&TEST_DATA[6]).await; + assert_eq!(journal.inner.persisted_index.load(Ordering::Acquire), 7); + journal.shutdown()?; + } + Ok(()) + } +} diff --git a/crates/stamper/src/lib.rs b/crates/stamper/src/lib.rs index c66f0d2..ed087f3 100644 --- a/crates/stamper/src/lib.rs +++ b/crates/stamper/src/lib.rs @@ -299,6 +299,6 @@ where } self.cache.push_back(merkle_tree); self.cache_index.insert(root, self.cache.len() - 1); - self.reader.commit(); + self.reader.commit().expect("Failed to commit read entries"); // FIXME: handle error properly } }