diff --git a/devservices/run-bigtable.sh b/devservices/run-bigtable.sh index 5b8a02eb..692650fb 100755 --- a/devservices/run-bigtable.sh +++ b/devservices/run-bigtable.sh @@ -14,7 +14,8 @@ for _ in {1..4}; do done cbt deletetable objectstore || true -cbt createtable objectstore families=fg,fm +cbt createtable objectstore families=fg,fm,fc cbt setgcpolicy objectstore fg maxage=1s +cbt setgcpolicy objectstore fc maxversions=1 wait $EMULATOR_PID diff --git a/objectstore-service/src/backend/bigtable.rs b/objectstore-service/src/backend/bigtable.rs index 4f290c39..568b63d8 100644 --- a/objectstore-service/src/backend/bigtable.rs +++ b/objectstore-service/src/backend/bigtable.rs @@ -41,6 +41,7 @@ use objectstore_types::metadata::{ExpirationPolicy, Metadata}; use serde::{Deserialize, Serialize}; use tonic::Code; +use crate::backend::changelog::{Change, ChangeId, ChangeLog}; use crate::backend::common::{ Backend, DeleteResponse, GetResponse, HighVolumeBackend, MetadataResponse, PutResponse, TieredGet, TieredMetadata, TieredWrite, Tombstone, @@ -144,6 +145,8 @@ const REQUEST_RETRY_COUNT: usize = 2; /// How many times to retry a CAS mutation before giving up and returning an error. const CAS_RETRY_COUNT: usize = 3; +/// Column qualifier for changelog entry data in the `fc` family. +const COLUMN_CHANGE: &[u8] = b"c"; /// Column that stores the raw payload (compressed). const COLUMN_PAYLOAD: &[u8] = b"p"; /// Column that stores metadata in JSON. @@ -162,6 +165,19 @@ const FILTER_META: &[u8] = b"^[mrt]$"; const FAMILY_GC: &str = "fg"; /// Column family that uses manual garbage collection. const FAMILY_MANUAL: &str = "fm"; +/// Column family for the write-ahead changelog. Requires `max_versions = 1` GC policy. +const FAMILY_CHANGELOG: &str = "fc"; + +/// Row key prefix for all changelog entries. +/// +/// All changelog rows have keys of the form `~changelog/{change_id}`. The `~` prefix +/// sorts after all regular storage paths, isolating changelog rows from data. +const CHANGE_PREFIX: &[u8] = b"~changelog/"; +/// Minimum age of a changelog entry's cell timestamp before recovery may claim it. +/// +/// Entries younger than this threshold are assumed to belong to an active operation +/// and are skipped by the recovery scan. +const CHANGE_THRESHOLD: Duration = Duration::from_secs(30); /// BigTable storage backend for high-volume, low-latency object storage. pub struct BigTableBackend { @@ -1153,6 +1169,258 @@ fn is_retryable(error: &BigTableError) -> bool { } } +/// Builds the BigTable row key for a changelog entry: `~changelog/{change_id}`. +fn change_key(id: &ChangeId) -> Vec { + format!("~changelog/{id}").into_bytes() +} + +/// Parses a changelog row key back into a [`ChangeId`]. +/// +/// Returns `None` if the key does not match the `~changelog/{uuid}` format. +fn parse_change_id(row_key: &[u8]) -> Option { + let s = std::str::from_utf8(row_key).ok()?; + let uuid_str = s.strip_prefix("~changelog/")?; + ChangeId::from_uuid_str(uuid_str) +} + +/// Returns the current time as a millisecond-aligned microsecond BigTable timestamp. +fn now_micros() -> Result { + let millis = SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .map_err(|e| Error::Generic { + context: "system time is before UNIX epoch".to_owned(), + cause: Some(Box::new(e)), + })? + .as_millis(); + let micros = millis * 1000; + micros.try_into().map_err(|e| Error::Generic { + context: format!("timestamp {micros}µs overflows i64"), + cause: Some(Box::new(e)), + }) +} + +/// Builds a row filter matching changelog entries whose heartbeat is stale. +/// +/// The filter matches the `d` column where the most recent cell timestamp is +/// strictly older than `cutoff_micros` (i.e., the operation that wrote it has +/// not bumped its heartbeat recently enough). +fn changelog_staleness_filter(cutoff_micros: i64) -> v2::RowFilter { + v2::RowFilter { + filter: Some(v2::row_filter::Filter::Chain(v2::row_filter::Chain { + filters: vec![ + column_filter(COLUMN_CHANGE), + v2::RowFilter { + filter: Some(v2::row_filter::Filter::CellsPerColumnLimitFilter(1)), + }, + v2::RowFilter { + filter: Some(v2::row_filter::Filter::TimestampRangeFilter( + v2::TimestampRange { + start_timestamp_micros: 0, + end_timestamp_micros: cutoff_micros, + }, + )), + }, + ], + })), + } +} + +/// Serialized form of a [`Change`] stored in the changelog column. +/// +/// Object IDs are stored as storage path strings so the format is +/// self-describing and independent of the in-memory representation. +#[derive(Debug, Deserialize, Serialize)] +struct ChangeRecord { + id: String, + new: Option, + old: Option, +} + +fn serialize_change(change: &Change) -> Result> { + let record = ChangeRecord { + id: change.id.as_storage_path().to_string(), + new: change + .new + .as_ref() + .map(|id| id.as_storage_path().to_string()), + old: change + .old + .as_ref() + .map(|id| id.as_storage_path().to_string()), + }; + serde_json::to_vec(&record) + .map_err(|cause| Error::serde("failed to serialize changelog entry", cause)) +} + +fn deserialize_change(data: &[u8]) -> Result { + let record: ChangeRecord = serde_json::from_slice(data) + .map_err(|cause| Error::serde("failed to deserialize changelog entry", cause))?; + let id = ObjectId::from_storage_path(&record.id) + .ok_or_else(|| Error::generic("corrupt changelog entry: invalid object id"))?; + let new = parse_optional_object_id(record.new.as_deref())?; + let old = parse_optional_object_id(record.old.as_deref())?; + Ok(Change { id, new, old }) +} + +fn parse_optional_object_id(path: Option<&str>) -> Result> { + path.map(|p| { + ObjectId::from_storage_path(p) + .ok_or_else(|| Error::generic("corrupt changelog entry: invalid storage path")) + }) + .transpose() +} + +/// Claims a stale changelog entry via compare-and-swap. +/// +/// Atomically rewrites the entry with a fresh timestamp iff the cell timestamp +/// is still older than `cutoff_micros`. Returns `true` if claimed, `false` if +/// another instance already refreshed it. +/// +/// Takes only the two fields needed from [`BigTableBackend`] so the caller +/// can avoid cloning the entire backend for the `'static` stream closure. +async fn claim_changelog_entry( + bigtable: &BigTableConnection, + table_path: &str, + id: &ChangeId, + change: &Change, + cutoff_micros: i64, +) -> Result { + let row_key = change_key(id); + let value = serialize_change(change)?; + let timestamp_micros = now_micros()?; + let claim_mutation = mutation(mutation::Mutation::SetCell(mutation::SetCell { + family_name: FAMILY_CHANGELOG.to_owned(), + column_qualifier: COLUMN_CHANGE.to_owned(), + timestamp_micros, + value, + })); + + let filter = changelog_staleness_filter(cutoff_micros); + let request = v2::CheckAndMutateRowRequest { + table_name: table_path.to_owned(), + row_key, + predicate_filter: Some(filter), + true_mutations: vec![claim_mutation], + false_mutations: vec![], + ..Default::default() + }; + + let result = retry("changelog-claim", || async { + bigtable + .client() + .check_and_mutate_row(request.clone()) + .await + }) + .await?; + + Ok(result.predicate_matched) +} + +/// Parses a raw BigTable row into a `(ChangeId, Change)` pair. +/// +/// Returns `None` if the row key is not a changelog entry or the `c` column is absent +/// (both indicate rows that should be silently skipped). +fn changelog_entry_from_row( + row_key: Vec, + cells: Vec, +) -> Result> { + let id = match parse_change_id(&row_key) { + Some(id) => id, + None => return Ok(None), + }; + let cell = match cells.into_iter().find(|c| c.qualifier == COLUMN_CHANGE) { + Some(cell) => cell, + None => return Ok(None), + }; + let change = deserialize_change(&cell.value)?; + Ok(Some((id, change))) +} + +/// Processes one row from a changelog scan: deserializes it and attempts a CAS claim. +/// +/// Returns `Ok(Some(...))` if the row was claimed, `Ok(None)` if it was intentionally +/// skipped (already claimed by another instance, or not a valid changelog row), and +/// `Err` if a recoverable error occurred that the caller should log. +async fn process_scan_row( + row_key: Vec, + cells: Vec, + bigtable: &BigTableConnection, + table_path: &str, + cutoff_micros: i64, +) -> Result> { + let Some((id, change)) = changelog_entry_from_row(row_key, cells)? else { + return Ok(None); + }; + + let claimed = claim_changelog_entry(bigtable, table_path, &id, &change, cutoff_micros).await?; + Ok(claimed.then_some((id, change))) +} + +#[async_trait::async_trait] +impl ChangeLog for BigTableBackend { + async fn record(&self, id: &ChangeId, change: &Change) -> Result<()> { + let mutation = mutation(mutation::Mutation::SetCell(mutation::SetCell { + family_name: FAMILY_CHANGELOG.to_owned(), + column_qualifier: COLUMN_CHANGE.to_owned(), + timestamp_micros: now_micros()?, + value: serialize_change(change)?, + })); + + self.mutate(change_key(id), [mutation], "changelog-record") + .await?; + + Ok(()) + } + + async fn remove(&self, id: &ChangeId) -> Result<()> { + self.mutate(change_key(id), [delete_row_mutation()], "changelog-remove") + .await?; + + Ok(()) + } + + async fn scan(&self, max: u16) -> Result> { + // Compute the staleness cutoff: entries with cell timestamp older than + // this were written by operations that are no longer bumping heartbeats. + let staleness_threshold_micros = CHANGE_THRESHOLD.as_millis() as i64 * 1000; + let cutoff_micros = now_micros()? - staleness_threshold_micros; + + let request = v2::ReadRowsRequest { + table_name: self.table_path.clone(), + filter: Some(changelog_staleness_filter(cutoff_micros)), + rows_limit: max.into(), + ..Default::default() + }; + + let rows = retry("changelog-scan", || async { + self.bigtable + .client() + .read_rows_with_prefix(request.clone(), CHANGE_PREFIX.to_vec()) + .await + }) + .await?; + + let mut entries = Vec::with_capacity(rows.len()); + for (row_key, cells) in rows { + match process_scan_row( + row_key, + cells, + &self.bigtable, + &self.table_path, + cutoff_micros, + ) + .await + { + Ok(Some(entry)) => entries.push(entry), + Ok(None) => {} + Err(e) => objectstore_log::error!(!!&e, "Failed to process changelog scan row"), + } + } + + Ok(entries) + } +} + #[cfg(test)] mod tests { use std::collections::BTreeMap; @@ -1161,6 +1429,7 @@ mod tests { use objectstore_types::scope::{Scope, Scopes}; use super::*; + use crate::backend::changelog::ChangeId; use crate::id::ObjectContext; use crate::stream; @@ -2023,4 +2292,162 @@ mod tests { Ok(()) } + + // --- Section 5: ChangeLog --- + + fn make_change(key: &str) -> (ObjectId, Change) { + let id = ObjectId::random(ObjectContext { + usecase: "testing".into(), + scopes: Scopes::from_iter([Scope::create("testing", "value").unwrap()]), + }); + let new_blob = ObjectId::random(ObjectContext { + usecase: "testing".into(), + scopes: Scopes::from_iter([Scope::create("testing", "value").unwrap()]), + }); + let change = Change { + id: id.clone(), + new: Some(new_blob), + old: None, + }; + let _ = key; // for test naming clarity only + (id, change) + } + + /// Record then remove: scan returns nothing. + #[tokio::test] + async fn changelog_record_remove_scan() -> Result<()> { + let _guard = CHANGELOG_SCAN_LOCK.lock().unwrap(); + let backend = create_test_backend().await?; + let change_id = ChangeId::new(); + let (_, change) = make_change("cr-obj"); + + backend.record(&change_id, &change).await?; + backend.remove(&change_id).await?; + + let entries: Vec<_> = backend.scan(100).await?; + assert!( + entries.iter().all(|(id, _)| id != &change_id), + "removed entry must not appear in scan" + ); + + Ok(()) + } + + /// Remove a nonexistent entry: should not error. + #[tokio::test] + async fn changelog_remove_nonexistent_is_ok() -> Result<()> { + let backend = create_test_backend().await?; + let change_id = ChangeId::new(); + + backend.remove(&change_id).await?; + + Ok(()) + } + + /// A freshly recorded entry is NOT stale yet, so scan skips it. + #[tokio::test] + async fn changelog_fresh_entry_not_returned_by_scan() -> Result<()> { + let _guard = CHANGELOG_SCAN_LOCK.lock().unwrap(); + let backend = create_test_backend().await?; + let change_id = ChangeId::new(); + let (_, change) = make_change("fresh-obj"); + + backend.record(&change_id, &change).await?; + + let entries: Vec<_> = backend.scan(100).await?; + // The freshly-written entry has a current timestamp — scan must skip it. + assert!( + entries.iter().all(|(id, _)| id != &change_id), + "fresh entry must not appear in scan" + ); + + // Clean up. + backend.remove(&change_id).await?; + + Ok(()) + } + + // Serializes all changelog tests that call `scan()`. + // + // All bigtable changelog tests share the same emulator table. Any concurrent `scan()` call + // reads ALL stale entries in the table and tries to claim them via CAS, so tests that write + // stale entries and then assert scan returns them can lose their entry to a sibling test's + // scan. Holding this lock for the duration of every test that calls `scan()` prevents that. + // + // `std::sync::Mutex` (not `tokio::sync::Mutex`) is required because each `#[tokio::test]` + // creates its own runtime; a tokio mutex is runtime-bound and cannot synchronize across them. + static CHANGELOG_SCAN_LOCK: std::sync::Mutex<()> = std::sync::Mutex::new(()); + + /// Write an entry with an old timestamp directly (bypassing heartbeat), + /// then verify scan claims it. + #[tokio::test] + async fn changelog_stale_entry_claimed_by_scan() -> Result<()> { + let _guard = CHANGELOG_SCAN_LOCK.lock().unwrap(); + let backend = create_test_backend().await?; + let change_id = ChangeId::new(); + let (_, change) = make_change("stale-obj"); + + // Write the entry with a timestamp that is already past the staleness threshold. + let stale_micros = now_micros()? - CHANGE_THRESHOLD.as_millis() as i64 * 1000 * 2; + let value = serialize_change(&change)?; + let set_cell = mutation(mutation::Mutation::SetCell(mutation::SetCell { + family_name: FAMILY_CHANGELOG.to_owned(), + column_qualifier: COLUMN_CHANGE.to_owned(), + timestamp_micros: stale_micros, + value, + })); + backend + .mutate(change_key(&change_id), [set_cell], "test-setup") + .await?; + + // Scan should claim and return it. + let entries: Vec<_> = backend.scan(100).await?; + assert!( + entries.iter().any(|(id, _)| id == &change_id), + "stale entry must be claimed by scan" + ); + + // Clean up. + backend.remove(&change_id).await?; + + Ok(()) + } + + /// A stale entry claimed by one scan instance is not claimed again by a concurrent scan. + #[tokio::test] + async fn changelog_stale_entry_claimed_once() -> Result<()> { + let _guard = CHANGELOG_SCAN_LOCK.lock().unwrap(); + let backend = create_test_backend().await?; + let change_id = ChangeId::new(); + let (_, change) = make_change("once-obj"); + + // Write a stale entry. + let stale_micros = now_micros()? - CHANGE_THRESHOLD.as_millis() as i64 * 1000 * 2; + let value = serialize_change(&change)?; + let set_cell = mutation(mutation::Mutation::SetCell(mutation::SetCell { + family_name: FAMILY_CHANGELOG.to_owned(), + column_qualifier: COLUMN_CHANGE.to_owned(), + timestamp_micros: stale_micros, + value, + })); + backend + .mutate(change_key(&change_id), [set_cell], "test-setup") + .await?; + + // First scan claims the entry (bumps timestamp to now). + let first: Vec<_> = backend.scan(100).await?; + let claimed_first = first.iter().any(|(id, _)| id == &change_id); + + // Second scan should NOT return the same entry (timestamp is now fresh). + let second: Vec<_> = backend.scan(100).await?; + let claimed_second = second.iter().any(|(id, _)| id == &change_id); + + assert!(claimed_first, "first scan must claim the entry"); + assert!(!claimed_second, "second scan must not reclaim the entry"); + + // Clean up. + backend.remove(&change_id).await?; + + Ok(()) + } } diff --git a/objectstore-service/src/backend/changelog.rs b/objectstore-service/src/backend/changelog.rs index 0e05bb84..187aa081 100644 --- a/objectstore-service/src/backend/changelog.rs +++ b/objectstore-service/src/backend/changelog.rs @@ -24,8 +24,8 @@ use std::fmt; use std::sync::{Arc, Mutex}; use std::time::Duration; +use tokio::task::JoinHandle; use tokio_util::task::TaskTracker; -use tokio_util::task::task_tracker::TaskTrackerToken; use crate::backend::common::{Backend, HighVolumeBackend, TieredMetadata}; use crate::error::Result; @@ -35,6 +35,12 @@ use crate::id::ObjectId; const INITIAL_BACKOFF: Duration = Duration::from_millis(100); /// Maximum delay for exponential backoff retries in background cleanup tasks. const MAX_BACKOFF: Duration = Duration::from_secs(30); +/// Maximum number of stale entries returned per [`ChangeLog::scan`] call. +const SCAN_COUNT: u16 = 100; +/// How often the heartbeat task re-records a change to extend its lifetime. +const REFRESH_INTERVAL: Duration = Duration::from_secs(10); +/// Time after which a change is considered expired if not completed or refreshed. +pub const EXPIRY: Duration = REFRESH_INTERVAL.saturating_mul(3); /// Unique identifier for a change log entry. /// @@ -49,6 +55,11 @@ impl ChangeId { pub fn new() -> Self { Self(uuid::Uuid::now_v7()) } + + /// Parses a `ChangeId` from its UUID string representation. + pub(crate) fn from_uuid_str(s: &str) -> Option { + uuid::Uuid::parse_str(s).ok().map(Self) + } } impl fmt::Display for ChangeId { @@ -118,49 +129,76 @@ impl ChangeManager { /// When the [`ChangeGuard`] is dropped, a background process is spawned to /// clean up any unreferenced objects in LT storage. pub async fn record(self: Arc, change: Change) -> Result { - let token = self.tracker.token(); - let id = ChangeId::new(); self.changelog.record(&id, &change).await?; let state = ChangeState { - id, - change, + id: id.clone(), + change: change.clone(), phase: ChangePhase::Recorded, manager: self.clone(), - _token: token, + heartbeat: Some(self.clone().spawn_heartbeat(id, change)), }; Ok(ChangeGuard { state: Some(state) }) } - /// Scans the changelog for outstanding entries and runs cleanup for each. + fn spawn_heartbeat(self: Arc, id: ChangeId, change: Change) -> JoinHandle<()> { + let token = self.tracker.token(); + + tokio::spawn(async move { + // Prevent shutdown while the change is active in this instance. + let _token = token; + + loop { + tokio::time::sleep(REFRESH_INTERVAL).await; + if self.changelog.record(&id, &change).await.is_err() { + // `record` retries internally. If it fails repeatedly, stop the heartbeat and + // let recovery from a different instance handle the expired entry. + return; + } + } + }) + } + + /// Polls the changelog for stale entries and runs cleanup for each. /// - /// Spawn this into a background task at startup to recover from any orphaned objects after a - /// crash. During normal operation, this should return an empty list and have no effect. - pub async fn recover(self: Arc) -> Result<()> { - // Hold one token for the duration of recovery to prevent premature shutdown. - let _token = self.tracker.token(); - - let entries = - self.changelog.scan().await.inspect_err(|e| { - objectstore_log::error!(!!e, "Failed to run changelog recovery") - })?; - - // NB: Intentionally clean up sequentially to reduce load on the system. - for (id, change) in entries { - let state = ChangeState { - id, - change, - phase: ChangePhase::Recovered, - manager: self.clone(), - _token: self.tracker.token(), - }; - - state.cleanup().await; - } + /// Runs as an infinite background loop. On each iteration: + /// - If the scan fails, waits with exponential backoff before retrying. + /// - If the scan returns entries, cleans them up sequentially then loops immediately. + /// - If the scan is empty, waits [`REFRESH_INTERVAL`] before polling again. + /// + /// Spawn this at startup to recover from any orphaned objects after a crash. + pub async fn recover(self: Arc) { + let mut backoff = INITIAL_BACKOFF; - Ok(()) + loop { + match self.changelog.scan(SCAN_COUNT).await { + Err(e) => { + objectstore_log::error!(!!&e, "Failed to run changelog recovery"); + tokio::time::sleep(backoff).await; + backoff = (backoff.mul_f32(1.5)).min(MAX_BACKOFF); + } + Ok(entries) if entries.is_empty() => { + backoff = INITIAL_BACKOFF; + tokio::time::sleep(REFRESH_INTERVAL).await; + } + Ok(entries) => { + backoff = INITIAL_BACKOFF; + // NB: Intentionally sequential to reduce load on the system. + for (id, change) in entries { + let state = ChangeState { + id: id.clone(), + change: change.clone(), + phase: ChangePhase::Recovered, + manager: self.clone(), + heartbeat: Some(self.clone().spawn_heartbeat(id, change)), + }; + state.cleanup().await; + } + } + } + } } } @@ -191,15 +229,12 @@ pub trait ChangeLog: fmt::Debug + Send + Sync { /// a nonexistent entry is not an error (idempotent). async fn remove(&self, id: &ChangeId) -> Result<()>; - /// Returns all outstanding changes eligible for recovery. - /// - /// During normal operation this returns only the calling instance's - /// entries. During recovery of a dead instance, the implementation - /// may return that instance's entries after the caller has claimed - /// ownership (via heartbeat CAS). + /// Returns up to `max` changes eligible for recovery. /// - /// The returned entries are unordered. - async fn scan(&self) -> Result>; + /// During normal operation, this returns an empty list. After a crash, it + /// may discover stale entries from another instance, claim them, and + /// return them for cleanup. + async fn scan(&self, max: u16) -> Result>; } /// In-memory [`ChangeLog`] for tests and deployments without durable logging. @@ -211,6 +246,18 @@ pub struct InMemoryChangeLog { entries: Arc>>, } +impl InMemoryChangeLog { + /// Returns `true` if the log contains no entries. + pub fn is_empty(&self) -> bool { + self.entries.lock().expect("lock poisoned").is_empty() + } + + /// Returns a snapshot of all entries currently in the log. + pub fn entries(&self) -> HashMap { + self.entries.lock().expect("lock poisoned").clone() + } +} + #[async_trait::async_trait] impl ChangeLog for InMemoryChangeLog { async fn record(&self, id: &ChangeId, change: &Change) -> Result<()> { @@ -225,13 +272,8 @@ impl ChangeLog for InMemoryChangeLog { Ok(()) } - async fn scan(&self) -> Result> { - let entries = self.entries.lock().expect("lock poisoned"); - let result = entries - .iter() - .map(|(id, change)| (id.clone(), change.clone())) - .collect(); - Ok(result) + async fn scan(&self, _max: u16) -> Result> { + Ok(vec![]) } } @@ -253,8 +295,8 @@ impl ChangeLog for NoopChangeLog { Ok(()) } - async fn scan(&self) -> Result> { - Ok(Vec::new()) + async fn scan(&self, _max: u16) -> Result> { + Ok(vec![]) } } @@ -291,7 +333,7 @@ struct ChangeState { change: Change, phase: ChangePhase, manager: Arc, - _token: TaskTrackerToken, + heartbeat: Option>, } impl ChangeState { @@ -301,7 +343,7 @@ impl ChangeState { } /// Determines tombstone state and runs cleanup for unreferenced objects. - async fn cleanup(self) { + async fn cleanup(mut self) { let current = match self.phase { // For `Recovered`, we must first check the state of the tombstone. ChangePhase::Recovered => self.read_tombstone().await, @@ -362,17 +404,32 @@ impl ChangeState { } /// Removes this change's log entry, retrying with exponential backoff until success. - async fn cleanup_log(&self) { + /// + /// Aborts the heartbeat task first so it cannot re-record after removal. + async fn cleanup_log(&mut self) { + if let Some(handle) = self.heartbeat.take() { + handle.abort(); + } + let mut delay = INITIAL_BACKOFF; while self.manager.changelog.remove(&self.id).await.is_err() { tokio::time::sleep(delay).await; delay = (delay.mul_f32(1.5)).min(MAX_BACKOFF); } } + + /// Returns `true` if the change entry is still held by this instance. + fn is_valid(&self) -> bool { + self.heartbeat.as_ref().is_some_and(|h| !h.is_finished()) + } } impl Drop for ChangeState { fn drop(&mut self) { + if let Some(handle) = self.heartbeat.take() { + handle.abort(); + } + if self.phase != ChangePhase::Completed { objectstore_log::error!( change = ?self.change, @@ -395,17 +452,27 @@ pub struct ChangeGuard { impl ChangeGuard { /// Advances the operation to the given phase. Zero-cost, no I/O. - pub(crate) fn advance(&mut self, phase: ChangePhase) { + pub fn advance(&mut self, phase: ChangePhase) { if let Some(ref mut state) = self.state { state.phase = phase; } } + + /// Returns `true` if the change entry is still held by this instance. + /// + /// This can return false if the internal heartbeats failed to refresh the entry in the log, + /// causing it to be claimed by another instance during recovery. Proceeding with tombstone + /// writes during this time can lead to inconsistencies. + pub fn is_valid(&self) -> bool { + self.state.as_ref().is_some_and(|s| s.is_valid()) + } } impl Drop for ChangeGuard { fn drop(&mut self) { if let Some(state) = self.state.take() && state.phase != ChangePhase::Completed + && state.is_valid() && let Ok(handle) = tokio::runtime::Handle::try_current() { handle.spawn(state.cleanup()); @@ -444,9 +511,9 @@ mod tests { log.record(&id, &change).await.unwrap(); - let entries = log.scan().await.unwrap(); + let entries = log.entries(); assert_eq!(entries.len(), 1); - assert_eq!(entries[0].0, id); + assert!(entries.contains_key(&id)); } #[tokio::test] @@ -462,8 +529,7 @@ mod tests { log.record(&id, &change).await.unwrap(); log.remove(&id).await.unwrap(); - let entries = log.scan().await.unwrap(); - assert!(entries.is_empty()); + assert!(log.is_empty()); } #[tokio::test] @@ -504,8 +570,6 @@ mod tests { drop(guard); // Log entry must survive so recovery can clean up the orphaned blob. - let rt = tokio::runtime::Runtime::new().unwrap(); - let entries = rt.block_on(log.scan()).unwrap(); - assert_eq!(entries.len(), 1, "log entry must persist"); + assert_eq!(log.entries().len(), 1, "log entry must persist"); } } diff --git a/objectstore-service/src/backend/mod.rs b/objectstore-service/src/backend/mod.rs index 401aad1b..84e36127 100644 --- a/objectstore-service/src/backend/mod.rs +++ b/objectstore-service/src/backend/mod.rs @@ -61,7 +61,7 @@ pub async fn from_config(config: StorageConfig) -> Result { let hv = hv_from_config(c.high_volume).await?; let lt = from_leaf_config(*c.long_term).await?; - let log = Box::new(changelog::NoopChangeLog); + let log = changelog_from_config(c.changelog).await?; Box::new(tiered::TieredStorage::new(hv, lt, log)) } // All non-Tiered variants are handled by from_leaf_config. A wildcard @@ -104,3 +104,32 @@ async fn hv_from_config( HighVolumeStorageConfig::BigTable(c) => Box::new(bigtable::BigTableBackend::new(c).await?), }) } + +/// Configuration for the write-ahead changelog in a [`tiered::TieredStorageConfig`]. +/// +/// The changelog records in-flight mutations for crash recovery. Defaults to +/// [`ChangeLogConfig::Noop`], which disables crash recovery. +#[derive(Debug, Clone, Default, Deserialize, Serialize)] +#[serde(tag = "type", rename_all = "lowercase")] +pub enum ChangeLogConfig { + /// Disables the changelog. In-flight mutations are not recoverable after a crash. + #[default] + Noop, + /// [Google Bigtable] changelog backend. + /// + /// Uses a dedicated BigTable connection to record mutations without + /// interfering with the critical path. + /// + /// [Google Bigtable]: https://cloud.google.com/bigtable + BigTable(bigtable::BigTableConfig), +} + +/// Constructs a [`changelog::ChangeLog`] from the given config. +async fn changelog_from_config( + config: ChangeLogConfig, +) -> anyhow::Result> { + Ok(match config { + ChangeLogConfig::Noop => Box::new(changelog::NoopChangeLog), + ChangeLogConfig::BigTable(c) => Box::new(bigtable::BigTableBackend::new(c).await?), + }) +} diff --git a/objectstore-service/src/backend/tiered.rs b/objectstore-service/src/backend/tiered.rs index ec626ca2..4b582407 100644 --- a/objectstore-service/src/backend/tiered.rs +++ b/objectstore-service/src/backend/tiered.rs @@ -111,8 +111,8 @@ use crate::backend::common::{ Backend, DeleteResponse, GetResponse, HighVolumeBackend, MetadataResponse, PutResponse, TieredGet, TieredMetadata, TieredWrite, Tombstone, }; -use crate::backend::{HighVolumeStorageConfig, StorageConfig}; -use crate::error::Result; +use crate::backend::{ChangeLogConfig, HighVolumeStorageConfig, StorageConfig}; +use crate::error::{Error, Result}; use crate::id::ObjectId; use crate::stream::{ClientStream, SizedPeek}; @@ -159,6 +159,11 @@ pub struct TieredStorageConfig { pub high_volume: HighVolumeStorageConfig, /// Backend for large, long-term objects. pub long_term: Box, + /// Write-ahead changelog for crash recovery of in-flight mutations. + /// + /// Defaults to [`ChangeLogConfig::Noop`], which disables crash recovery. + #[serde(default)] + pub changelog: ChangeLogConfig, } /// Two-tier storage backend that routes objects by size. @@ -319,6 +324,15 @@ impl TieredStorage { .await?; guard.advance(ChangePhase::Written); + if !guard.is_valid() { + // The log is no longer synchronized due to backend failures. Abort since writing now + // could lead to an orphan. + return Err(Error::Io(std::io::Error::new( + std::io::ErrorKind::TimedOut, + "failed to keep write-ahead log synchronize", + ))); + } + // 3. CAS commit: write tombstone only if HV state matches what we saw. let tombstone = Tombstone { target: new.clone(), @@ -1280,10 +1294,6 @@ mod tests { assert!(lt_inner.is_empty(), "orphaned LT blob was not cleaned up"); // The log entry must be gone once cleanup completes. - let entries = log.scan().await.unwrap(); - assert!( - entries.is_empty(), - "changelog entry not removed after cleanup" - ); + assert!(log.is_empty(), "changelog entry not removed after cleanup"); } }