Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,9 @@ dist

# Project files
.db

# WAL artifacts
.journal
*.wal
checkpoint.meta
checkpoint.meta.tmp
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ sha1 = "0.11.0-rc.3"
sha2 = "0.11.0-rc.3"
sha3 = "0.11.0-rc.3"

tempfile = "3"
uts-bmt = { path = "crates/bmt" }
uts-contracts = { path = "crates/contracts" }
uts-core = { path = "crates/core" }
Expand Down
2 changes: 1 addition & 1 deletion crates/calendar/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ async fn main() -> eyre::Result<()> {
RING_BUFFER_CAPACITY,
JournalConfig {
consumer_checkpoint: CheckpointConfig {
path: PathBuf::from("./.checkpoint"),
path: PathBuf::from("./.journal/.checkpoint"),
..Default::default()
},
wal_dir: PathBuf::from("./.journal"),
Expand Down
1 change: 1 addition & 0 deletions crates/journal/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ tracing = { workspace = true }

[dev-dependencies]
eyre = { workspace = true }
tempfile = { workspace = true }
tokio = { workspace = true, features = ["full"] }

[lints]
Expand Down
69 changes: 58 additions & 11 deletions crates/journal/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ impl Default for JournalConfig {
/// All index here are monotonic u64, wrapping around on overflow.
///
/// Following invariants are maintained:
/// `consumed_index` <= `persisted_index` <= `write_index`.
/// `consumed_index` <= `persisted_index` <= `filled_index` <= `write_index`.
#[derive(Clone)]
pub struct Journal<const ENTRY_SIZE: usize> {
inner: Arc<JournalInner<{ ENTRY_SIZE }>>,
Expand All @@ -73,9 +73,14 @@ pub(crate) struct JournalInner<const ENTRY_SIZE: usize> {
/// Mask for indexing into the ring buffer.
index_mask: u64,
/// Next Write Position, aka:
/// - Total entries written count.
/// - Total entries reserved count.
/// - Position to write the next entry to.
write_index: AtomicU64,
/// Filled Boundary, aka:
/// - Total entries that have been fully written to the ring buffer.
/// - Advanced in order after each writer finishes copying data into its reserved slot.
/// - The WAL worker uses this (not `write_index`) to determine how far it can safely read.
filled_index: AtomicU64,
/// WAL Committed Boundary, aka.:
/// - Total committed entries count.
/// - Position has not yet been persisted to durable storage.
Expand Down Expand Up @@ -129,6 +134,7 @@ impl<const ENTRY_SIZE: usize> Journal<ENTRY_SIZE> {
waker_buffer,
index_mask,
write_index: AtomicU64::new(0),
filled_index: AtomicU64::new(0),
persisted_index: AtomicU64::new(0),
consumed_checkpoint: Checkpoint::new(config.consumer_checkpoint)?,
reader_taken: AtomicBool::new(false),
Expand Down Expand Up @@ -214,10 +220,28 @@ impl<const ENTRY_SIZE: usize> Journal<ENTRY_SIZE> {
let slot = unsafe { &mut *self.data_slot_ptr(current_written) };
slot.copy_from_slice(data);

// 4. Notify WAL worker if needed.
// 4. Publish the filled slot.
// Spin-wait until all prior slots are filled, then advance `filled_index`.
// The Release ordering ensures the slot write above is visible to the WAL worker
// before it reads `filled_index`.
while self
.inner
.filled_index
.compare_exchange_weak(
current_written,
current_written.wrapping_add(1),
Ordering::Release,
Ordering::Relaxed,
)
.is_err()
{
std::hint::spin_loop();
}

// 5. Notify WAL worker if needed.
let committed = self.inner.persisted_index.load(Ordering::Relaxed);
// Explain: Before we wrote to the slot, if there is no pending committed entry,
// There's a chance the WAL worker is sleeping, we need to wake it up.
// Explain: If there is no pending committed entry before ours,
// the WAL worker may be sleeping, so we need to wake it up.
if current_written == committed {
// Notify the WAL worker thread to persist new entries.
self.wal.unpark();
Expand Down Expand Up @@ -342,7 +366,7 @@ impl Deref for WakerEntry {

#[cfg(test)]
pub(crate) mod tests {
use super::*;
use crate::checkpoint::CheckpointConfig;

pub const ENTRY_SIZE: usize = 8;
pub const TEST_DATA: &[[u8; ENTRY_SIZE]] = &[
Expand All @@ -359,9 +383,25 @@ pub(crate) mod tests {
];
pub type Journal = crate::Journal<ENTRY_SIZE>;

/// Create a journal with an isolated temporary directory for WAL and checkpoint files.
/// Returns the journal and the temp dir guard (must be kept alive for the test duration).
pub fn test_journal(capacity: usize) -> (Journal, tempfile::TempDir) {
let tmp = tempfile::tempdir().expect("failed to create temp dir");
let config = crate::JournalConfig {
consumer_checkpoint: CheckpointConfig {
path: tmp.path().join("checkpoint.meta"),
..Default::default()
},
wal_dir: tmp.path().join("wal"),
};
let journal =
Journal::with_capacity_and_config(capacity, config).expect("failed to create journal");
(journal, tmp)
}

#[tokio::test(flavor = "current_thread")]
async fn try_reader_is_exclusive() -> eyre::Result<()> {
let journal = Journal::with_capacity(2)?;
let (journal, _tmp) = test_journal(2);

let reader = journal.try_reader().unwrap();

Expand All @@ -376,12 +416,13 @@ pub(crate) mod tests {
"reader acquisition should succeed after drop"
);

journal.shutdown()?;
Ok(())
}

#[tokio::test(flavor = "current_thread")]
async fn commit_and_read_round_trip() -> eyre::Result<()> {
let journal = Journal::with_capacity(4)?;
let (journal, _tmp) = test_journal(4);
let mut reader = journal.reader();

journal.commit(&TEST_DATA[0]).await;
Expand All @@ -396,26 +437,31 @@ pub(crate) mod tests {

reader.commit()?;
assert_eq!(reader.available(), 0);
journal.shutdown()?;
Ok(())
}

#[tokio::test(flavor = "current_thread")]
async fn commit_returns_error_when_full() -> eyre::Result<()> {
let journal = Journal::with_capacity(2)?;
let (journal, _tmp) = test_journal(2);

journal.commit(&TEST_DATA[1]).await;
journal.commit(&TEST_DATA[2]).await;

let err = journal
.try_commit(&TEST_DATA[3])
.expect_err("buffer should report full on third commit");
assert!(matches!(err, JournalBusy));
assert!(
matches!(err, crate::error::JournalUnavailable::Full),
"expected Full, got {err:?}"
);
journal.shutdown()?;
Ok(())
}

#[tokio::test(flavor = "current_thread")]
async fn reader_handles_wrap_around_reads() -> eyre::Result<()> {
let journal = Journal::with_capacity(4)?;
let (journal, _tmp) = test_journal(4);
let mut reader = journal.reader();

for entry in TEST_DATA.iter().take(4) {
Expand Down Expand Up @@ -450,6 +496,7 @@ pub(crate) mod tests {

reader.commit()?;
assert_eq!(reader.available(), 0);
journal.shutdown()?;
Ok(())
}
}
18 changes: 11 additions & 7 deletions crates/journal/src/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ mod tests {

#[tokio::test(flavor = "current_thread")]
async fn available_tracks_persisted_entries() -> eyre::Result<()> {
let journal = Journal::with_capacity(4)?;
let (journal, _tmp) = test_journal(4);
let mut reader = journal.reader();

assert_eq!(reader.available(), 0);
Expand All @@ -161,12 +161,13 @@ mod tests {
let slice = reader.read(1);
assert_eq!(slice.len(), 1);
assert_eq!(reader.available(), 1);
journal.shutdown()?;
Ok(())
}

#[tokio::test(flavor = "current_thread")]
async fn commit_updates_shared_consumed_boundary() -> eyre::Result<()> {
let journal = Journal::with_capacity(4)?;
let (journal, _tmp) = test_journal(4);
let mut reader = journal.reader();

for entry in TEST_DATA.iter().take(3) {
Expand All @@ -184,12 +185,13 @@ mod tests {

reader.commit()?;
assert_eq!(reader.journal.consumed_checkpoint.current_index(), 2);
journal.shutdown()?;
Ok(())
}

#[tokio::test(flavor = "current_thread")]
async fn wait_at_least_resumes_after_persistence() -> eyre::Result<()> {
let journal = Journal::with_capacity(4)?;
let (journal, _tmp) = test_journal(4);
let mut reader = journal.reader();

let journal_clone = journal.clone();
Expand All @@ -202,12 +204,13 @@ mod tests {
assert_eq!(reader.available(), 1);

task.await?;
journal.shutdown()?;
Ok(())
}

#[tokio::test(flavor = "current_thread")]
async fn wait_at_least_waits_for_correct_count() -> eyre::Result<()> {
let journal = Journal::with_capacity(4)?;
let (journal, _tmp) = test_journal(4);
let mut reader = journal.reader();

let journal_clone = journal.clone();
Expand All @@ -218,10 +221,11 @@ mod tests {
}
});

timeout(Duration::from_secs(1), reader.wait_at_least(3)).await?;
timeout(Duration::from_secs(10), reader.wait_at_least(3)).await?;
assert!(reader.available() >= 3);

task.await?;
journal.shutdown()?;
Ok(())
}

Expand All @@ -230,7 +234,7 @@ mod tests {
expected = "requested (5) exceeds max possible (4): journal.buffer.len()=4, journal.consumed_index=0"
)]
async fn wait_at_least_exceeds_buffer_size() {
let journal = Journal::with_capacity(4).unwrap();
let (journal, _tmp) = test_journal(4);
let mut reader = journal.reader();

timeout(Duration::from_secs(1), reader.wait_at_least(5))
Expand All @@ -243,7 +247,7 @@ mod tests {
expected = "requested (5) exceeds max possible (4): journal.buffer.len()=4, journal.consumed_index=0"
)]
async fn wait_at_least_dirty_read_exceeds_available() {
let journal = Journal::with_capacity(4).unwrap();
let (journal, _tmp) = test_journal(4);
journal.commit(&TEST_DATA[0]).await;

let mut reader = journal.reader();
Expand Down
Loading