diff --git a/.gitignore b/.gitignore index 5ae8c34..3e0e929 100644 --- a/.gitignore +++ b/.gitignore @@ -38,3 +38,9 @@ dist # Project files .db + +# WAL artifacts +.journal +*.wal +checkpoint.meta +checkpoint.meta.tmp diff --git a/Cargo.lock b/Cargo.lock index 178fd02..2b3cacc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6379,6 +6379,7 @@ name = "uts-journal" version = "0.1.0" dependencies = [ "eyre", + "tempfile", "thiserror 2.0.17", "tokio", "tracing", diff --git a/Cargo.toml b/Cargo.toml index 3bc242a..5fd53e0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -86,6 +86,7 @@ sha1 = "0.11.0-rc.3" sha2 = "0.11.0-rc.3" sha3 = "0.11.0-rc.3" +tempfile = "3" uts-bmt = { path = "crates/bmt" } uts-contracts = { path = "crates/contracts" } uts-core = { path = "crates/core" } diff --git a/crates/calendar/src/main.rs b/crates/calendar/src/main.rs index 53f1bd3..6f8fe78 100644 --- a/crates/calendar/src/main.rs +++ b/crates/calendar/src/main.rs @@ -37,7 +37,7 @@ async fn main() -> eyre::Result<()> { RING_BUFFER_CAPACITY, JournalConfig { consumer_checkpoint: CheckpointConfig { - path: PathBuf::from("./.checkpoint"), + path: PathBuf::from("./.journal/.checkpoint"), ..Default::default() }, wal_dir: PathBuf::from("./.journal"), diff --git a/crates/journal/Cargo.toml b/crates/journal/Cargo.toml index 9221550..11ae72b 100644 --- a/crates/journal/Cargo.toml +++ b/crates/journal/Cargo.toml @@ -15,6 +15,7 @@ tracing = { workspace = true } [dev-dependencies] eyre = { workspace = true } +tempfile = { workspace = true } tokio = { workspace = true, features = ["full"] } [lints] diff --git a/crates/journal/src/lib.rs b/crates/journal/src/lib.rs index ad0be21..55d4edb 100644 --- a/crates/journal/src/lib.rs +++ b/crates/journal/src/lib.rs @@ -55,7 +55,7 @@ impl Default for JournalConfig { /// All index here are monotonic u64, wrapping around on overflow. /// /// Following invariants are maintained: -/// `consumed_index` <= `persisted_index` <= `write_index`. +/// `consumed_index` <= `persisted_index` <= `filled_index` <= `write_index`. #[derive(Clone)] pub struct Journal { inner: Arc>, @@ -73,9 +73,14 @@ pub(crate) struct JournalInner { /// Mask for indexing into the ring buffer. index_mask: u64, /// Next Write Position, aka: - /// - Total entries written count. + /// - Total entries reserved count. /// - Position to write the next entry to. write_index: AtomicU64, + /// Filled Boundary, aka: + /// - Total entries that have been fully written to the ring buffer. + /// - Advanced in order after each writer finishes copying data into its reserved slot. + /// - The WAL worker uses this (not `write_index`) to determine how far it can safely read. + filled_index: AtomicU64, /// WAL Committed Boundary, aka.: /// - Total committed entries count. /// - Position has not yet been persisted to durable storage. @@ -129,6 +134,7 @@ impl Journal { waker_buffer, index_mask, write_index: AtomicU64::new(0), + filled_index: AtomicU64::new(0), persisted_index: AtomicU64::new(0), consumed_checkpoint: Checkpoint::new(config.consumer_checkpoint)?, reader_taken: AtomicBool::new(false), @@ -214,10 +220,28 @@ impl Journal { let slot = unsafe { &mut *self.data_slot_ptr(current_written) }; slot.copy_from_slice(data); - // 4. Notify WAL worker if needed. + // 4. Publish the filled slot. + // Spin-wait until all prior slots are filled, then advance `filled_index`. + // The Release ordering ensures the slot write above is visible to the WAL worker + // before it reads `filled_index`. + while self + .inner + .filled_index + .compare_exchange_weak( + current_written, + current_written.wrapping_add(1), + Ordering::Release, + Ordering::Relaxed, + ) + .is_err() + { + std::hint::spin_loop(); + } + + // 5. Notify WAL worker if needed. let committed = self.inner.persisted_index.load(Ordering::Relaxed); - // Explain: Before we wrote to the slot, if there is no pending committed entry, - // There's a chance the WAL worker is sleeping, we need to wake it up. + // Explain: If there is no pending committed entry before ours, + // the WAL worker may be sleeping, so we need to wake it up. if current_written == committed { // Notify the WAL worker thread to persist new entries. self.wal.unpark(); @@ -342,7 +366,7 @@ impl Deref for WakerEntry { #[cfg(test)] pub(crate) mod tests { - use super::*; + use crate::checkpoint::CheckpointConfig; pub const ENTRY_SIZE: usize = 8; pub const TEST_DATA: &[[u8; ENTRY_SIZE]] = &[ @@ -359,9 +383,25 @@ pub(crate) mod tests { ]; pub type Journal = crate::Journal; + /// Create a journal with an isolated temporary directory for WAL and checkpoint files. + /// Returns the journal and the temp dir guard (must be kept alive for the test duration). + pub fn test_journal(capacity: usize) -> (Journal, tempfile::TempDir) { + let tmp = tempfile::tempdir().expect("failed to create temp dir"); + let config = crate::JournalConfig { + consumer_checkpoint: CheckpointConfig { + path: tmp.path().join("checkpoint.meta"), + ..Default::default() + }, + wal_dir: tmp.path().join("wal"), + }; + let journal = + Journal::with_capacity_and_config(capacity, config).expect("failed to create journal"); + (journal, tmp) + } + #[tokio::test(flavor = "current_thread")] async fn try_reader_is_exclusive() -> eyre::Result<()> { - let journal = Journal::with_capacity(2)?; + let (journal, _tmp) = test_journal(2); let reader = journal.try_reader().unwrap(); @@ -376,12 +416,13 @@ pub(crate) mod tests { "reader acquisition should succeed after drop" ); + journal.shutdown()?; Ok(()) } #[tokio::test(flavor = "current_thread")] async fn commit_and_read_round_trip() -> eyre::Result<()> { - let journal = Journal::with_capacity(4)?; + let (journal, _tmp) = test_journal(4); let mut reader = journal.reader(); journal.commit(&TEST_DATA[0]).await; @@ -396,12 +437,13 @@ pub(crate) mod tests { reader.commit()?; assert_eq!(reader.available(), 0); + journal.shutdown()?; Ok(()) } #[tokio::test(flavor = "current_thread")] async fn commit_returns_error_when_full() -> eyre::Result<()> { - let journal = Journal::with_capacity(2)?; + let (journal, _tmp) = test_journal(2); journal.commit(&TEST_DATA[1]).await; journal.commit(&TEST_DATA[2]).await; @@ -409,13 +451,17 @@ pub(crate) mod tests { let err = journal .try_commit(&TEST_DATA[3]) .expect_err("buffer should report full on third commit"); - assert!(matches!(err, JournalBusy)); + assert!( + matches!(err, crate::error::JournalUnavailable::Full), + "expected Full, got {err:?}" + ); + journal.shutdown()?; Ok(()) } #[tokio::test(flavor = "current_thread")] async fn reader_handles_wrap_around_reads() -> eyre::Result<()> { - let journal = Journal::with_capacity(4)?; + let (journal, _tmp) = test_journal(4); let mut reader = journal.reader(); for entry in TEST_DATA.iter().take(4) { @@ -450,6 +496,7 @@ pub(crate) mod tests { reader.commit()?; assert_eq!(reader.available(), 0); + journal.shutdown()?; Ok(()) } } diff --git a/crates/journal/src/reader.rs b/crates/journal/src/reader.rs index fe59a8a..9bf0702 100644 --- a/crates/journal/src/reader.rs +++ b/crates/journal/src/reader.rs @@ -147,7 +147,7 @@ mod tests { #[tokio::test(flavor = "current_thread")] async fn available_tracks_persisted_entries() -> eyre::Result<()> { - let journal = Journal::with_capacity(4)?; + let (journal, _tmp) = test_journal(4); let mut reader = journal.reader(); assert_eq!(reader.available(), 0); @@ -161,12 +161,13 @@ mod tests { let slice = reader.read(1); assert_eq!(slice.len(), 1); assert_eq!(reader.available(), 1); + journal.shutdown()?; Ok(()) } #[tokio::test(flavor = "current_thread")] async fn commit_updates_shared_consumed_boundary() -> eyre::Result<()> { - let journal = Journal::with_capacity(4)?; + let (journal, _tmp) = test_journal(4); let mut reader = journal.reader(); for entry in TEST_DATA.iter().take(3) { @@ -184,12 +185,13 @@ mod tests { reader.commit()?; assert_eq!(reader.journal.consumed_checkpoint.current_index(), 2); + journal.shutdown()?; Ok(()) } #[tokio::test(flavor = "current_thread")] async fn wait_at_least_resumes_after_persistence() -> eyre::Result<()> { - let journal = Journal::with_capacity(4)?; + let (journal, _tmp) = test_journal(4); let mut reader = journal.reader(); let journal_clone = journal.clone(); @@ -202,12 +204,13 @@ mod tests { assert_eq!(reader.available(), 1); task.await?; + journal.shutdown()?; Ok(()) } #[tokio::test(flavor = "current_thread")] async fn wait_at_least_waits_for_correct_count() -> eyre::Result<()> { - let journal = Journal::with_capacity(4)?; + let (journal, _tmp) = test_journal(4); let mut reader = journal.reader(); let journal_clone = journal.clone(); @@ -218,10 +221,11 @@ mod tests { } }); - timeout(Duration::from_secs(1), reader.wait_at_least(3)).await?; + timeout(Duration::from_secs(10), reader.wait_at_least(3)).await?; assert!(reader.available() >= 3); task.await?; + journal.shutdown()?; Ok(()) } @@ -230,7 +234,7 @@ mod tests { expected = "requested (5) exceeds max possible (4): journal.buffer.len()=4, journal.consumed_index=0" )] async fn wait_at_least_exceeds_buffer_size() { - let journal = Journal::with_capacity(4).unwrap(); + let (journal, _tmp) = test_journal(4); let mut reader = journal.reader(); timeout(Duration::from_secs(1), reader.wait_at_least(5)) @@ -243,7 +247,7 @@ mod tests { expected = "requested (5) exceeds max possible (4): journal.buffer.len()=4, journal.consumed_index=0" )] async fn wait_at_least_dirty_read_exceeds_available() { - let journal = Journal::with_capacity(4).unwrap(); + let (journal, _tmp) = test_journal(4); journal.commit(&TEST_DATA[0]).await; let mut reader = journal.reader(); diff --git a/crates/journal/src/wal.rs b/crates/journal/src/wal.rs index deb8768..89ac7a5 100644 --- a/crates/journal/src/wal.rs +++ b/crates/journal/src/wal.rs @@ -15,8 +15,8 @@ use std::{ thread::JoinHandle, }; -const MAX_SPINS: usize = 10_000; -const IO_BATCH_LIMIT: u64 = 128; +const MAX_SPIN: usize = 100; +const MAX_IO_BATCH: u64 = 128; /// Write-Ahead Log /// @@ -38,6 +38,18 @@ struct WalInner { shutdown_flag: Arc, } +impl Drop for WalInner { + fn drop(&mut self) { + // Signal the WAL worker to exit if it hasn't been shut down yet. + // This prevents orphaned worker threads from spinning after the journal is dropped. + if !self.shutdown_flag.swap(true, Ordering::AcqRel) { + if let Some(worker) = self.worker.lock().unwrap().as_ref() { + worker.thread().unpark(); + } + } + } +} + impl fmt::Debug for Wal { fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { f.debug_struct("Wal") @@ -159,20 +171,19 @@ impl WalWorker { let segment_size = journal.capacity() as u64; let mut persisted = journal.persisted_index.load(Ordering::Acquire); - let mut spins = 0; + let mut spin_count = 0; while !shutdown_flag.load(Ordering::Acquire) { - let written = journal.write_index.load(Ordering::Acquire); - let available = written.wrapping_sub(persisted); + let filled = journal.filled_index.load(Ordering::Acquire); + let available = filled.wrapping_sub(persisted); if available > 0 { - // reset spins counter - spins = 0; + spin_count = 0; - let available = available.min(IO_BATCH_LIMIT); + let available = available.min(MAX_IO_BATCH); let target_index = persisted + available; - // write entries to segment files, rotating files as needed + // write ALL available entries to segment files, rotating files as needed for i in persisted..target_index { let seg_id = i / segment_size; if seg_id != current_segment_id { @@ -221,17 +232,15 @@ impl WalWorker { continue; } - // busy-wait + park - if spins < MAX_SPINS { - // busy-wait + // no new data to persist, spin for a while before parking + if spin_count <= MAX_SPIN { + spin_count += 1; std::hint::spin_loop(); - spins += 1; - } else { - // park the thread - thread::park(); - // reset spins counter on wake - spins = 0; + continue; } + + // park until unparked by a new commit + thread::park(); } } } @@ -291,6 +300,7 @@ fn recover( } journal.write_index.store(write_index, Ordering::Relaxed); + journal.filled_index.store(write_index, Ordering::Relaxed); journal .persisted_index .store(write_index, Ordering::Relaxed); @@ -393,3 +403,434 @@ fn truncate_old_segments(base_dir: PathBuf, current_segment_id: u64) -> io::Resu } Ok(()) } + +#[cfg(test)] +mod tests { + use super::*; + use crate::{ + JournalConfig, + checkpoint::CheckpointConfig, + tests::{ENTRY_SIZE, TEST_DATA, test_journal}, + }; + use std::sync::atomic::Ordering; + + type Journal = crate::Journal; + + /// Helper: create a journal config pointing at the given temp directory. + fn test_config(tmp: &Path) -> JournalConfig { + JournalConfig { + consumer_checkpoint: CheckpointConfig { + path: tmp.join("checkpoint.meta"), + ..Default::default() + }, + wal_dir: tmp.join("wal"), + } + } + + // ── Segment file helpers ───────────────────────────────────────────── + + #[test] + fn format_segment_file_name_pads_to_twelve_digits() { + assert_eq!(format_segment_file_name(0), "000000000000.wal"); + assert_eq!(format_segment_file_name(1), "000000000001.wal"); + assert_eq!( + format_segment_file_name(999_999_999_999), + "999999999999.wal" + ); + } + + #[test] + fn open_segment_file_creates_file() -> io::Result<()> { + let tmp = tempfile::tempdir()?; + let f = open_segment_file(tmp.path(), 0)?; + assert!(tmp.path().join("000000000000.wal").exists()); + drop(f); + Ok(()) + } + + // ── scan_segments ──────────────────────────────────────────────────── + + #[test] + fn scan_segments_empty_directory() -> io::Result<()> { + let tmp = tempfile::tempdir()?; + let segments = scan_segments(tmp.path())?; + assert!(segments.is_empty()); + Ok(()) + } + + #[test] + fn scan_segments_returns_sorted_ids() -> io::Result<()> { + let tmp = tempfile::tempdir()?; + // Create files out of order. + File::create(tmp.path().join("000000000002.wal"))?; + File::create(tmp.path().join("000000000000.wal"))?; + File::create(tmp.path().join("000000000001.wal"))?; + + let segments = scan_segments(tmp.path())?; + assert_eq!(segments, vec![0, 1, 2]); + Ok(()) + } + + #[test] + fn scan_segments_skips_non_wal_files() -> io::Result<()> { + let tmp = tempfile::tempdir()?; + File::create(tmp.path().join("000000000000.wal"))?; + File::create(tmp.path().join("readme.txt"))?; + + let segments = scan_segments(tmp.path())?; + assert_eq!(segments, vec![0]); + Ok(()) + } + + #[test] + fn scan_segments_rejects_invalid_wal_names() -> io::Result<()> { + let tmp = tempfile::tempdir()?; + File::create(tmp.path().join("not_a_number.wal"))?; + + let err = scan_segments(tmp.path()).unwrap_err(); + assert_eq!(err.kind(), io::ErrorKind::InvalidData); + Ok(()) + } + + // ── truncate_old_segments ──────────────────────────────────────────── + + #[test] + fn truncate_old_segments_removes_two_behind() -> io::Result<()> { + let tmp = tempfile::tempdir()?; + File::create(tmp.path().join(format_segment_file_name(0)))?; + File::create(tmp.path().join(format_segment_file_name(1)))?; + File::create(tmp.path().join(format_segment_file_name(2)))?; + + truncate_old_segments(tmp.path().to_path_buf(), 2)?; + assert!( + !tmp.path().join(format_segment_file_name(0)).exists(), + "segment 0 should be deleted when current is 2" + ); + assert!(tmp.path().join(format_segment_file_name(1)).exists()); + assert!(tmp.path().join(format_segment_file_name(2)).exists()); + Ok(()) + } + + #[test] + fn truncate_old_segments_noop_when_too_few() -> io::Result<()> { + let tmp = tempfile::tempdir()?; + File::create(tmp.path().join(format_segment_file_name(0)))?; + File::create(tmp.path().join(format_segment_file_name(1)))?; + + // current_segment_id = 1, checked_sub(2) underflows → noop + truncate_old_segments(tmp.path().to_path_buf(), 1)?; + assert!(tmp.path().join(format_segment_file_name(0)).exists()); + assert!(tmp.path().join(format_segment_file_name(1)).exists()); + Ok(()) + } + + // ── WAL persistence end-to-end ─────────────────────────────────────── + + #[tokio::test(flavor = "current_thread")] + async fn wal_persists_entries_to_segment_file() -> eyre::Result<()> { + let (journal, tmp) = test_journal(4); + + journal.commit(&TEST_DATA[0]).await; + journal.commit(&TEST_DATA[1]).await; + + // WAL file should exist and contain the two entries. + let wal_path = tmp.path().join("wal").join(format_segment_file_name(0)); + assert!(wal_path.exists(), "WAL segment file should exist"); + + let data = fs::read(&wal_path)?; + assert_eq!( + data.len(), + ENTRY_SIZE * 2, + "segment should contain exactly 2 entries" + ); + assert_eq!(&data[..ENTRY_SIZE], &TEST_DATA[0]); + assert_eq!(&data[ENTRY_SIZE..ENTRY_SIZE * 2], &TEST_DATA[1]); + Ok(()) + } + + #[tokio::test(flavor = "current_thread")] + async fn wal_segment_rotation() -> eyre::Result<()> { + // Capacity 4 → segment_size = 4 entries per segment. + let (journal, tmp) = test_journal(4); + let mut reader = journal.reader(); + + // Fill first segment (4 entries). + for entry in TEST_DATA.iter().take(4) { + journal.commit(entry).await; + } + + // Consume some entries to free ring buffer space. + reader.read(4); + reader.commit()?; + + // Write one more entry, causing rotation to segment 1. + journal.commit(&TEST_DATA[4]).await; + + let seg0 = tmp.path().join("wal").join(format_segment_file_name(0)); + let seg1 = tmp.path().join("wal").join(format_segment_file_name(1)); + assert!(seg0.exists(), "segment 0 should exist"); + assert!(seg1.exists(), "segment 1 should exist after rotation"); + + let seg1_data = fs::read(&seg1)?; + assert_eq!(seg1_data.len(), ENTRY_SIZE); + assert_eq!(&seg1_data[..ENTRY_SIZE], &TEST_DATA[4]); + Ok(()) + } + + // ── Recovery ───────────────────────────────────────────────────────── + + #[tokio::test(flavor = "current_thread")] + async fn recover_from_empty_dir() -> eyre::Result<()> { + let tmp = tempfile::tempdir()?; + let config = test_config(tmp.path()); + + // First journal writes nothing. + let journal = Journal::with_capacity_and_config(4, config.clone())?; + journal.shutdown()?; + drop(journal); + + // Second journal recovers with indices at 0. + let journal = Journal::with_capacity_and_config(4, config)?; + assert_eq!(journal.inner.write_index.load(Ordering::Acquire), 0); + assert_eq!(journal.inner.persisted_index.load(Ordering::Acquire), 0); + journal.shutdown()?; + Ok(()) + } + + #[tokio::test(flavor = "current_thread")] + async fn recover_replays_data_into_ring_buffer() -> eyre::Result<()> { + let tmp = tempfile::tempdir()?; + let config = test_config(tmp.path()); + + // Write entries and shut down. + { + let journal = Journal::with_capacity_and_config(4, config.clone())?; + journal.commit(&TEST_DATA[0]).await; + journal.commit(&TEST_DATA[1]).await; + journal.commit(&TEST_DATA[2]).await; + journal.shutdown()?; + } + + // Recover and verify data via reader. + { + let journal = Journal::with_capacity_and_config(4, config)?; + assert_eq!(journal.inner.write_index.load(Ordering::Acquire), 3); + assert_eq!(journal.inner.persisted_index.load(Ordering::Acquire), 3); + + let mut reader = journal.reader(); + let entries = reader.read(3); + assert_eq!(entries.len(), 3); + assert_eq!(entries[0], TEST_DATA[0]); + assert_eq!(entries[1], TEST_DATA[1]); + assert_eq!(entries[2], TEST_DATA[2]); + journal.shutdown()?; + } + Ok(()) + } + + #[tokio::test(flavor = "current_thread")] + async fn recover_respects_consumed_checkpoint() -> eyre::Result<()> { + let tmp = tempfile::tempdir()?; + let config = test_config(tmp.path()); + + // Write entries, consume some, then shut down. + { + let journal = Journal::with_capacity_and_config(4, config.clone())?; + journal.commit(&TEST_DATA[0]).await; + journal.commit(&TEST_DATA[1]).await; + journal.commit(&TEST_DATA[2]).await; + let mut reader = journal.reader(); + reader.read(2); + reader.commit()?; + journal.shutdown()?; + } + + // Recover - reader should only see unconsumed entries. + { + let journal = Journal::with_capacity_and_config(4, config)?; + assert_eq!(journal.inner.write_index.load(Ordering::Acquire), 3); + let mut reader = journal.reader(); + let entries = reader.read(10); + assert_eq!(entries.len(), 1, "only 1 unconsumed entry should remain"); + assert_eq!(entries[0], TEST_DATA[2]); + journal.shutdown()?; + } + Ok(()) + } + + #[test] + fn recover_truncates_partial_last_segment() -> io::Result<()> { + let tmp = tempfile::tempdir()?; + let wal_dir = tmp.path().join("wal"); + fs::create_dir_all(&wal_dir)?; + + // Write 2.5 entries worth of data (partial entry at end). + let mut data = Vec::new(); + data.extend_from_slice(&TEST_DATA[0]); + data.extend_from_slice(&TEST_DATA[1]); + data.extend_from_slice(&[0xFF; ENTRY_SIZE / 2]); // partial write + fs::write(wal_dir.join(format_segment_file_name(0)), &data)?; + + let config = test_config(tmp.path()); + let journal = Journal::with_capacity_and_config(4, config).unwrap(); + + // Should recover 2 valid entries (truncated partial write). + assert_eq!(journal.inner.write_index.load(Ordering::Acquire), 2); + assert_eq!(journal.inner.persisted_index.load(Ordering::Acquire), 2); + + // Verify truncated file size. + let file_size = fs::metadata(wal_dir.join(format_segment_file_name(0)))?.len(); + assert_eq!(file_size, (ENTRY_SIZE * 2) as u64); + journal.shutdown().unwrap(); + Ok(()) + } + + #[test] + fn recover_detects_incomplete_non_last_segment() { + let tmp = tempfile::tempdir().unwrap(); + let wal_dir = tmp.path().join("wal"); + fs::create_dir_all(&wal_dir).unwrap(); + + // Segment 0 is incomplete (not full), segment 1 exists. + // For capacity 4, a full segment should be 4 * ENTRY_SIZE bytes. + fs::write( + wal_dir.join(format_segment_file_name(0)), + &[0u8; ENTRY_SIZE * 2], + ) + .unwrap(); + fs::write(wal_dir.join(format_segment_file_name(1)), &[0u8; 0]).unwrap(); + + let config = test_config(tmp.path()); + let err = Journal::with_capacity_and_config(4, config).unwrap_err(); + assert_eq!(err.kind(), io::ErrorKind::InvalidData); + } + + #[test] + fn recover_detects_consumed_ahead_of_written() { + let tmp = tempfile::tempdir().unwrap(); + let wal_dir = tmp.path().join("wal"); + fs::create_dir_all(&wal_dir).unwrap(); + + // Write a checkpoint claiming index 10 consumed... + let checkpoint_path = tmp.path().join("checkpoint.meta"); + fs::write(&checkpoint_path, &10u64.to_le_bytes()).unwrap(); + + // ...but WAL only has 2 entries. + let mut data = Vec::new(); + data.extend_from_slice(&TEST_DATA[0]); + data.extend_from_slice(&TEST_DATA[1]); + fs::write(wal_dir.join(format_segment_file_name(0)), &data).unwrap(); + + let config = JournalConfig { + consumer_checkpoint: CheckpointConfig { + path: checkpoint_path, + ..Default::default() + }, + wal_dir, + }; + let err = Journal::with_capacity_and_config(4, config).unwrap_err(); + assert_eq!(err.kind(), io::ErrorKind::InvalidData); + } + + // ── Shutdown ───────────────────────────────────────────────────────── + + #[tokio::test(flavor = "current_thread")] + async fn shutdown_rejects_new_commits() -> eyre::Result<()> { + let (journal, _tmp) = test_journal(4); + + journal.commit(&TEST_DATA[0]).await; + journal.shutdown()?; + + let err = journal + .try_commit(&TEST_DATA[1]) + .expect_err("commit after shutdown should fail"); + assert!( + matches!(err, crate::error::JournalUnavailable::Shutdown), + "expected Shutdown, got {err:?}" + ); + Ok(()) + } + + #[tokio::test(flavor = "current_thread")] + async fn shutdown_is_idempotent() -> eyre::Result<()> { + let (journal, _tmp) = test_journal(4); + journal.commit(&TEST_DATA[0]).await; + journal.shutdown()?; + journal.shutdown()?; + Ok(()) + } + + // ── WAL worker wakes on commit ─────────────────────────────────────── + + #[tokio::test(flavor = "current_thread")] + async fn commit_future_resolves_after_wal_persistence() -> eyre::Result<()> { + let (journal, _tmp) = test_journal(4); + + // The commit future should only resolve after the WAL worker persists. + journal.commit(&TEST_DATA[0]).await; + + // If we got here, the persisted_index must have advanced. + assert!( + journal.inner.persisted_index.load(Ordering::Acquire) >= 1, + "persisted_index should be >= 1 after commit future resolves" + ); + Ok(()) + } + + #[tokio::test(flavor = "current_thread")] + async fn multiple_commits_advance_persisted_index() -> eyre::Result<()> { + let (journal, _tmp) = test_journal(4); + + for i in 0..4 { + journal.commit(&TEST_DATA[i]).await; + } + + assert_eq!(journal.inner.persisted_index.load(Ordering::Acquire), 4); + Ok(()) + } + + // ── Recovery across segments ───────────────────────────────────────── + + #[tokio::test(flavor = "current_thread")] + async fn recover_across_segment_rotation() -> eyre::Result<()> { + let tmp = tempfile::tempdir()?; + let config = test_config(tmp.path()); + + // Fill more than one segment. Capacity 4 → segment_size = 4. + { + let journal = Journal::with_capacity_and_config(4, config.clone())?; + let mut reader = journal.reader(); + + // Fill first segment. + for entry in TEST_DATA.iter().take(4) { + journal.commit(entry).await; + } + + // Consume to free ring buffer. + reader.read(4); + reader.commit()?; + + // Write into second segment. + journal.commit(&TEST_DATA[4]).await; + journal.commit(&TEST_DATA[5]).await; + + // Consume second batch. + reader.read(2); + reader.commit()?; + + journal.shutdown()?; + } + + // Recover and write more. + { + let journal = Journal::with_capacity_and_config(4, config)?; + assert_eq!(journal.inner.write_index.load(Ordering::Acquire), 6); + + // Should be able to write new entries. + journal.commit(&TEST_DATA[6]).await; + assert_eq!(journal.inner.persisted_index.load(Ordering::Acquire), 7); + journal.shutdown()?; + } + Ok(()) + } +}