From 3249784dfb43111b414a25d6a5bddb5de744361e Mon Sep 17 00:00:00 2001 From: Jan Michael Auer Date: Thu, 26 Mar 2026 13:47:33 +0100 Subject: [PATCH 1/3] feat(service): Implement BigTable-backed ChangeLog for tiered storage MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add a BigTable implementation of the ChangeLog trait so that in-flight mutations in TieredStorage are persisted for crash recovery. The ChangeLog records each mutation (object ID, old and new storage locations) before work begins and removes it on clean completion. On startup, TieredStorage scans for stale entries — those whose cell timestamp is older than a staleness threshold — and re-runs the cleanup to converge any partially-completed state. Stale entries are claimed via a BigTable CheckAndMutateRow CAS: an entry is only claimed if its timestamp is still below the threshold, so concurrent recovery instances cannot process the same entry twice. Rows stream lazily from BigTable as the caller polls, with CAS claims interleaved with row delivery. The changelog backend is now explicitly configured via a new ChangeLogConfig enum on TieredStorageConfig (Noop | BigTable), defaulting to Noop. This decouples the changelog from the HV backend config and lets operators opt in independently. The heartbeat-bump mechanism that would keep active-operation timestamps fresh is not yet implemented; the staleness threshold is currently fixed at 30 seconds. Co-Authored-By: Claude --- devservices/run-bigtable.sh | 3 +- objectstore-service/src/backend/bigtable.rs | 440 ++++++++++++++++++- objectstore-service/src/backend/changelog.rs | 50 ++- objectstore-service/src/backend/mod.rs | 31 +- objectstore-service/src/backend/tiered.rs | 9 +- 5 files changed, 510 insertions(+), 23 deletions(-) diff --git a/devservices/run-bigtable.sh b/devservices/run-bigtable.sh index 5b8a02eb..692650fb 100755 --- a/devservices/run-bigtable.sh +++ b/devservices/run-bigtable.sh @@ -14,7 +14,8 @@ for _ in {1..4}; do done cbt deletetable objectstore || true -cbt createtable objectstore families=fg,fm +cbt createtable objectstore families=fg,fm,fc cbt setgcpolicy objectstore fg maxage=1s +cbt setgcpolicy objectstore fc maxversions=1 wait $EMULATOR_PID diff --git a/objectstore-service/src/backend/bigtable.rs b/objectstore-service/src/backend/bigtable.rs index 4f290c39..347a2862 100644 --- a/objectstore-service/src/backend/bigtable.rs +++ b/objectstore-service/src/backend/bigtable.rs @@ -36,11 +36,12 @@ use std::time::{Duration, SystemTime}; use bigtable_rs::bigtable::{BigTableConnection, Error as BigTableError, RowCell}; use bigtable_rs::google::bigtable::v2::{self, mutation}; use bytes::Bytes; -use futures_util::TryStreamExt; +use futures_util::{StreamExt, TryStreamExt}; use objectstore_types::metadata::{ExpirationPolicy, Metadata}; use serde::{Deserialize, Serialize}; use tonic::Code; +use crate::backend::changelog::{Change, ChangeId, ChangeLog, ChangeStream}; use crate::backend::common::{ Backend, DeleteResponse, GetResponse, HighVolumeBackend, MetadataResponse, PutResponse, TieredGet, TieredMetadata, TieredWrite, Tombstone, @@ -144,6 +145,8 @@ const REQUEST_RETRY_COUNT: usize = 2; /// How many times to retry a CAS mutation before giving up and returning an error. const CAS_RETRY_COUNT: usize = 3; +/// Column qualifier for changelog entry data in the `fc` family. +const COLUMN_CHANGE: &[u8] = b"c"; /// Column that stores the raw payload (compressed). const COLUMN_PAYLOAD: &[u8] = b"p"; /// Column that stores metadata in JSON. @@ -162,6 +165,19 @@ const FILTER_META: &[u8] = b"^[mrt]$"; const FAMILY_GC: &str = "fg"; /// Column family that uses manual garbage collection. const FAMILY_MANUAL: &str = "fm"; +/// Column family for the write-ahead changelog. Requires `max_versions = 1` GC policy. +const FAMILY_CHANGELOG: &str = "fc"; + +/// Row key prefix for all changelog entries. +/// +/// All changelog rows have keys of the form `~changelog/{change_id}`. The `~` prefix +/// sorts after all regular storage paths, isolating changelog rows from data. +const CHANGE_PREFIX: &[u8] = b"~changelog/"; +/// Minimum age of a changelog entry's cell timestamp before recovery may claim it. +/// +/// Entries younger than this threshold are assumed to belong to an active operation +/// and are skipped by the recovery scan. +const CHANGE_THRESHOLD: Duration = Duration::from_secs(30); /// BigTable storage backend for high-volume, low-latency object storage. pub struct BigTableBackend { @@ -1153,6 +1169,266 @@ fn is_retryable(error: &BigTableError) -> bool { } } +/// Builds the BigTable row key for a changelog entry: `~changelog/{change_id}`. +fn change_key(id: &ChangeId) -> Vec { + format!("~changelog/{id}").into_bytes() +} + +/// Parses a changelog row key back into a [`ChangeId`]. +/// +/// Returns `None` if the key does not match the `~changelog/{uuid}` format. +fn parse_change_id(row_key: &[u8]) -> Option { + let s = std::str::from_utf8(row_key).ok()?; + let uuid_str = s.strip_prefix("~changelog/")?; + ChangeId::from_uuid_str(uuid_str) +} + +/// Returns the current time as a millisecond-aligned microsecond BigTable timestamp. +fn now_micros() -> Result { + let millis = SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .map_err(|e| Error::Generic { + context: "system time is before UNIX epoch".to_owned(), + cause: Some(Box::new(e)), + })? + .as_millis(); + let micros = millis * 1000; + micros.try_into().map_err(|e| Error::Generic { + context: format!("timestamp {micros}µs overflows i64"), + cause: Some(Box::new(e)), + }) +} + +/// Builds a row filter matching changelog entries whose heartbeat is stale. +/// +/// The filter matches the `d` column where the most recent cell timestamp is +/// strictly older than `cutoff_micros` (i.e., the operation that wrote it has +/// not bumped its heartbeat recently enough). +fn changelog_staleness_filter(cutoff_micros: i64) -> v2::RowFilter { + v2::RowFilter { + filter: Some(v2::row_filter::Filter::Chain(v2::row_filter::Chain { + filters: vec![ + column_filter(COLUMN_CHANGE), + v2::RowFilter { + filter: Some(v2::row_filter::Filter::CellsPerColumnLimitFilter(1)), + }, + v2::RowFilter { + filter: Some(v2::row_filter::Filter::TimestampRangeFilter( + v2::TimestampRange { + start_timestamp_micros: 0, + end_timestamp_micros: cutoff_micros, + }, + )), + }, + ], + })), + } +} + +/// Serialized form of a [`Change`] stored in the changelog column. +/// +/// Object IDs are stored as storage path strings so the format is +/// self-describing and independent of the in-memory representation. +#[derive(Debug, Deserialize, Serialize)] +struct ChangeRecord { + id: String, + new: Option, + old: Option, +} + +fn serialize_change(change: &Change) -> Result> { + let record = ChangeRecord { + id: change.id.as_storage_path().to_string(), + new: change + .new + .as_ref() + .map(|id| id.as_storage_path().to_string()), + old: change + .old + .as_ref() + .map(|id| id.as_storage_path().to_string()), + }; + serde_json::to_vec(&record) + .map_err(|cause| Error::serde("failed to serialize changelog entry", cause)) +} + +fn deserialize_change(data: &[u8]) -> Result { + let record: ChangeRecord = serde_json::from_slice(data) + .map_err(|cause| Error::serde("failed to deserialize changelog entry", cause))?; + let id = ObjectId::from_storage_path(&record.id) + .ok_or_else(|| Error::generic("corrupt changelog entry: invalid object id"))?; + let new = parse_optional_object_id(record.new.as_deref())?; + let old = parse_optional_object_id(record.old.as_deref())?; + Ok(Change { id, new, old }) +} + +fn parse_optional_object_id(path: Option<&str>) -> Result> { + path.map(|p| { + ObjectId::from_storage_path(p) + .ok_or_else(|| Error::generic("corrupt changelog entry: invalid storage path")) + }) + .transpose() +} + +/// Claims a stale changelog entry via compare-and-swap. +/// +/// Atomically rewrites the entry with a fresh timestamp iff the cell timestamp +/// is still older than `cutoff_micros`. Returns `true` if claimed, `false` if +/// another instance already refreshed it. +/// +/// Takes only the two fields needed from [`BigTableBackend`] so the caller +/// can avoid cloning the entire backend for the `'static` stream closure. +async fn claim_changelog_entry( + bigtable: &BigTableConnection, + table_path: &str, + id: &ChangeId, + change: &Change, + cutoff_micros: i64, +) -> Result { + let row_key = change_key(id); + let value = serialize_change(change)?; + let timestamp_micros = now_micros()?; + let claim_mutation = mutation(mutation::Mutation::SetCell(mutation::SetCell { + family_name: FAMILY_CHANGELOG.to_owned(), + column_qualifier: COLUMN_CHANGE.to_owned(), + timestamp_micros, + value, + })); + + let filter = changelog_staleness_filter(cutoff_micros); + let request = v2::CheckAndMutateRowRequest { + table_name: table_path.to_owned(), + row_key, + predicate_filter: Some(filter), + true_mutations: vec![claim_mutation], + false_mutations: vec![], + ..Default::default() + }; + + let result = retry("changelog-claim", || async { + bigtable + .client() + .check_and_mutate_row(request.clone()) + .await + }) + .await?; + + Ok(result.predicate_matched) +} + +/// Parses a raw BigTable row into a `(ChangeId, Change)` pair. +/// +/// Returns `None` if the row key is not a changelog entry or the `c` column is absent +/// (both indicate rows that should be silently skipped). +fn changelog_entry_from_row( + row_key: Vec, + cells: Vec, +) -> Result> { + let id = match parse_change_id(&row_key) { + Some(id) => id, + None => return Ok(None), + }; + let cell = match cells.into_iter().find(|c| c.qualifier == COLUMN_CHANGE) { + Some(cell) => cell, + None => return Ok(None), + }; + let change = deserialize_change(&cell.value)?; + Ok(Some((id, change))) +} + +/// Processes one row from a changelog scan: deserializes it and attempts a CAS claim. +/// +/// Returns `Ok(Some(...))` if the row was claimed, `Ok(None)` if it was intentionally +/// skipped (already claimed by another instance, or not a valid changelog row), and +/// `Err` if a recoverable error occurred that the caller should log. +async fn process_scan_row( + result: Result<(Vec, Vec), bigtable_rs::bigtable::Error>, + bigtable: &BigTableConnection, + table_path: &str, + cutoff_micros: i64, +) -> Result> { + let (row_key, cells) = result.map_err(|e| Error::Generic { + context: "changelog scan row error".to_owned(), + cause: Some(Box::new(e)), + })?; + + let Some((id, change)) = changelog_entry_from_row(row_key, cells)? else { + return Ok(None); + }; + + let claimed = claim_changelog_entry(bigtable, table_path, &id, &change, cutoff_micros).await?; + Ok(claimed.then_some((id, change))) +} + +#[async_trait::async_trait] +impl ChangeLog for BigTableBackend { + async fn record(&self, id: &ChangeId, change: &Change) -> Result<()> { + let mutation = mutation(mutation::Mutation::SetCell(mutation::SetCell { + family_name: FAMILY_CHANGELOG.to_owned(), + column_qualifier: COLUMN_CHANGE.to_owned(), + timestamp_micros: now_micros()?, + value: serialize_change(change)?, + })); + + self.mutate(change_key(id), [mutation], "changelog-record") + .await?; + + Ok(()) + } + + async fn remove(&self, id: &ChangeId) -> Result<()> { + self.mutate(change_key(id), [delete_row_mutation()], "changelog-remove") + .await?; + + Ok(()) + } + + async fn scan(&self) -> Result { + // Compute the staleness cutoff: entries with cell timestamp older than + // this were written by operations that are no longer bumping heartbeats. + let staleness_threshold_micros = CHANGE_THRESHOLD.as_millis() as i64 * 1000; + let cutoff_micros = now_micros()? - staleness_threshold_micros; + + let request = v2::ReadRowsRequest { + table_name: self.table_path.clone(), + filter: Some(changelog_staleness_filter(cutoff_micros)), + ..Default::default() + }; + + // Open a server-streaming RPC. Rows arrive lazily as the caller polls, + // so CAS claims are interleaved with row delivery without buffering the + // full result set. Mid-stream gRPC errors skip the affected row and are + // logged; the initial connection error is retried then propagates as Err. + let row_stream = retry("changelog-scan", || async { + self.bigtable + .client() + .stream_rows_with_prefix(request.clone(), CHANGE_PREFIX.to_vec()) + .await + }) + .await?; + + // Clone only the two fields needed for CAS claims. + let bigtable = self.bigtable.clone(); + let table_path = self.table_path.clone(); + + let stream = row_stream.filter_map(move |result| { + let bigtable = bigtable.clone(); + let table_path = table_path.clone(); + async move { + match process_scan_row(result, &bigtable, &table_path, cutoff_micros).await { + Ok(entry) => entry, + Err(e) => { + objectstore_log::error!(!!&e, "Failed to process changelog scan row"); + None + } + } + } + }); + + Ok(stream.boxed()) + } +} + #[cfg(test)] mod tests { use std::collections::BTreeMap; @@ -2023,4 +2299,166 @@ mod tests { Ok(()) } + + // --- Section 5: ChangeLog --- + + fn make_change(key: &str) -> (ObjectId, Change) { + let id = ObjectId::random(ObjectContext { + usecase: "testing".into(), + scopes: Scopes::from_iter([Scope::create("testing", "value").unwrap()]), + }); + let new_blob = ObjectId::random(ObjectContext { + usecase: "testing".into(), + scopes: Scopes::from_iter([Scope::create("testing", "value").unwrap()]), + }); + let change = Change { + id: id.clone(), + new: Some(new_blob), + old: None, + }; + let _ = key; // for test naming clarity only + (id, change) + } + + /// Record then remove: scan returns nothing. + #[tokio::test] + async fn changelog_record_remove_scan() -> Result<()> { + let _guard = CHANGELOG_SCAN_LOCK.lock().unwrap(); + let backend = create_test_backend().await?; + let change_id = crate::backend::changelog::ChangeId::new(); + let (_, change) = make_change("cr-obj"); + + backend.record(&change_id, &change).await?; + backend.remove(&change_id).await?; + + let entries: Vec<_> = backend.scan().await?.collect().await; + assert!( + entries.iter().all(|(id, _)| id != &change_id), + "removed entry must not appear in scan" + ); + + Ok(()) + } + + /// Remove a nonexistent entry: should not error. + #[tokio::test] + async fn changelog_remove_nonexistent_is_ok() -> Result<()> { + let backend = create_test_backend().await?; + let change_id = crate::backend::changelog::ChangeId::new(); + + backend.remove(&change_id).await?; + + Ok(()) + } + + /// A freshly recorded entry is NOT stale yet, so scan skips it. + #[tokio::test] + async fn changelog_fresh_entry_not_returned_by_scan() -> Result<()> { + let _guard = CHANGELOG_SCAN_LOCK.lock().unwrap(); + let backend = create_test_backend().await?; + let change_id = crate::backend::changelog::ChangeId::new(); + let (_, change) = make_change("fresh-obj"); + + backend.record(&change_id, &change).await?; + + let entries: Vec<_> = backend.scan().await?.collect().await; + // The freshly-written entry has a current timestamp — scan must skip it. + assert!( + entries.iter().all(|(id, _)| id != &change_id), + "fresh entry must not appear in scan" + ); + + // Clean up. + backend.remove(&change_id).await?; + + Ok(()) + } + + // Serializes all changelog tests that call `scan()`. + // + // All bigtable changelog tests share the same emulator table. Any concurrent `scan()` call + // reads ALL stale entries in the table and tries to claim them via CAS, so tests that write + // stale entries and then assert scan returns them can lose their entry to a sibling test's + // scan. Holding this lock for the duration of every test that calls `scan()` prevents that. + // + // `std::sync::Mutex` (not `tokio::sync::Mutex`) is required because each `#[tokio::test]` + // creates its own runtime; a tokio mutex is runtime-bound and cannot synchronize across them. + static CHANGELOG_SCAN_LOCK: std::sync::Mutex<()> = std::sync::Mutex::new(()); + + /// Write an entry with an old timestamp directly (bypassing heartbeat), + /// then verify scan claims it. + #[tokio::test] + async fn changelog_stale_entry_claimed_by_scan() -> Result<()> { + use crate::backend::changelog::ChangeId; + + let _guard = CHANGELOG_SCAN_LOCK.lock().unwrap(); + let backend = create_test_backend().await?; + let change_id = ChangeId::new(); + let (_, change) = make_change("stale-obj"); + + // Write the entry with a timestamp that is already past the staleness threshold. + let stale_micros = now_micros()? - CHANGE_THRESHOLD.as_millis() as i64 * 1000 * 2; + let value = serialize_change(&change)?; + let set_cell = mutation(mutation::Mutation::SetCell(mutation::SetCell { + family_name: FAMILY_CHANGELOG.to_owned(), + column_qualifier: COLUMN_CHANGE.to_owned(), + timestamp_micros: stale_micros, + value, + })); + backend + .mutate(change_key(&change_id), [set_cell], "test-setup") + .await?; + + // Scan should claim and return it. + let entries: Vec<_> = backend.scan().await?.collect().await; + assert!( + entries.iter().any(|(id, _)| id == &change_id), + "stale entry must be claimed by scan" + ); + + // Clean up. + backend.remove(&change_id).await?; + + Ok(()) + } + + /// A stale entry claimed by one scan instance is not claimed again by a concurrent scan. + #[tokio::test] + async fn changelog_stale_entry_claimed_once() -> Result<()> { + use crate::backend::changelog::ChangeId; + + let _guard = CHANGELOG_SCAN_LOCK.lock().unwrap(); + let backend = create_test_backend().await?; + let change_id = ChangeId::new(); + let (_, change) = make_change("once-obj"); + + // Write a stale entry. + let stale_micros = now_micros()? - CHANGE_THRESHOLD.as_millis() as i64 * 1000 * 2; + let value = serialize_change(&change)?; + let set_cell = mutation(mutation::Mutation::SetCell(mutation::SetCell { + family_name: FAMILY_CHANGELOG.to_owned(), + column_qualifier: COLUMN_CHANGE.to_owned(), + timestamp_micros: stale_micros, + value, + })); + backend + .mutate(change_key(&change_id), [set_cell], "test-setup") + .await?; + + // First scan claims the entry (bumps timestamp to now). + let first: Vec<_> = backend.scan().await?.collect().await; + let claimed_first = first.iter().any(|(id, _)| id == &change_id); + + // Second scan should NOT return the same entry (timestamp is now fresh). + let second: Vec<_> = backend.scan().await?.collect().await; + let claimed_second = second.iter().any(|(id, _)| id == &change_id); + + assert!(claimed_first, "first scan must claim the entry"); + assert!(!claimed_second, "second scan must not reclaim the entry"); + + // Clean up. + backend.remove(&change_id).await?; + + Ok(()) + } } diff --git a/objectstore-service/src/backend/changelog.rs b/objectstore-service/src/backend/changelog.rs index 0e05bb84..06ed0ab8 100644 --- a/objectstore-service/src/backend/changelog.rs +++ b/objectstore-service/src/backend/changelog.rs @@ -24,6 +24,8 @@ use std::fmt; use std::sync::{Arc, Mutex}; use std::time::Duration; +use futures_util::StreamExt; +use futures_util::stream::BoxStream; use tokio_util::task::TaskTracker; use tokio_util::task::task_tracker::TaskTrackerToken; @@ -36,6 +38,13 @@ const INITIAL_BACKOFF: Duration = Duration::from_millis(100); /// Maximum delay for exponential backoff retries in background cleanup tasks. const MAX_BACKOFF: Duration = Duration::from_secs(30); +/// Stream of changes claimed during a recovery scan. +/// +/// Each item is a `(ChangeId, Change)` pair that has been atomically claimed +/// from durable storage. Items are yielded lazily — the CAS claim for each +/// entry happens as the caller polls the stream. +pub type ChangeStream = BoxStream<'static, (ChangeId, Change)>; + /// Unique identifier for a change log entry. /// /// Generated per-operation as a UUIDv7. In durable storage, scoped to the @@ -49,6 +58,11 @@ impl ChangeId { pub fn new() -> Self { Self(uuid::Uuid::now_v7()) } + + /// Parses a `ChangeId` from its UUID string representation. + pub(crate) fn from_uuid_str(s: &str) -> Option { + uuid::Uuid::parse_str(s).ok().map(Self) + } } impl fmt::Display for ChangeId { @@ -142,13 +156,13 @@ impl ChangeManager { // Hold one token for the duration of recovery to prevent premature shutdown. let _token = self.tracker.token(); - let entries = + let mut stream = self.changelog.scan().await.inspect_err(|e| { objectstore_log::error!(!!e, "Failed to run changelog recovery") })?; // NB: Intentionally clean up sequentially to reduce load on the system. - for (id, change) in entries { + while let Some((id, change)) = stream.next().await { let state = ChangeState { id, change, @@ -191,15 +205,12 @@ pub trait ChangeLog: fmt::Debug + Send + Sync { /// a nonexistent entry is not an error (idempotent). async fn remove(&self, id: &ChangeId) -> Result<()>; - /// Returns all outstanding changes eligible for recovery. - /// - /// During normal operation this returns only the calling instance's - /// entries. During recovery of a dead instance, the implementation - /// may return that instance's entries after the caller has claimed - /// ownership (via heartbeat CAS). + /// Scans for outstanding changes and returns them as a lazy stream. /// - /// The returned entries are unordered. - async fn scan(&self) -> Result>; + /// For durable implementations this performs atomic claiming via + /// compare-and-swap so that concurrent instances do not double-process + /// the same entry. Items where the claim fails are silently skipped. + async fn scan(&self) -> Result; } /// In-memory [`ChangeLog`] for tests and deployments without durable logging. @@ -225,13 +236,15 @@ impl ChangeLog for InMemoryChangeLog { Ok(()) } - async fn scan(&self) -> Result> { + async fn scan(&self) -> Result { + // TODO: Check if we should return anything here. This should probably yield empty since + // there are no abandoned in-memory entries ever. let entries = self.entries.lock().expect("lock poisoned"); - let result = entries + let items: Vec<(ChangeId, Change)> = entries .iter() .map(|(id, change)| (id.clone(), change.clone())) .collect(); - Ok(result) + Ok(futures_util::stream::iter(items).boxed()) } } @@ -253,8 +266,8 @@ impl ChangeLog for NoopChangeLog { Ok(()) } - async fn scan(&self) -> Result> { - Ok(Vec::new()) + async fn scan(&self) -> Result { + Ok(futures_util::stream::empty().boxed()) } } @@ -444,7 +457,7 @@ mod tests { log.record(&id, &change).await.unwrap(); - let entries = log.scan().await.unwrap(); + let entries: Vec<_> = log.scan().await.unwrap().collect().await; assert_eq!(entries.len(), 1); assert_eq!(entries[0].0, id); } @@ -462,7 +475,7 @@ mod tests { log.record(&id, &change).await.unwrap(); log.remove(&id).await.unwrap(); - let entries = log.scan().await.unwrap(); + let entries: Vec<_> = log.scan().await.unwrap().collect().await; assert!(entries.is_empty()); } @@ -505,7 +518,8 @@ mod tests { // Log entry must survive so recovery can clean up the orphaned blob. let rt = tokio::runtime::Runtime::new().unwrap(); - let entries = rt.block_on(log.scan()).unwrap(); + let entries: Vec<_> = + rt.block_on(async { log.scan().await.unwrap().collect::>().await }); assert_eq!(entries.len(), 1, "log entry must persist"); } } diff --git a/objectstore-service/src/backend/mod.rs b/objectstore-service/src/backend/mod.rs index 401aad1b..84e36127 100644 --- a/objectstore-service/src/backend/mod.rs +++ b/objectstore-service/src/backend/mod.rs @@ -61,7 +61,7 @@ pub async fn from_config(config: StorageConfig) -> Result { let hv = hv_from_config(c.high_volume).await?; let lt = from_leaf_config(*c.long_term).await?; - let log = Box::new(changelog::NoopChangeLog); + let log = changelog_from_config(c.changelog).await?; Box::new(tiered::TieredStorage::new(hv, lt, log)) } // All non-Tiered variants are handled by from_leaf_config. A wildcard @@ -104,3 +104,32 @@ async fn hv_from_config( HighVolumeStorageConfig::BigTable(c) => Box::new(bigtable::BigTableBackend::new(c).await?), }) } + +/// Configuration for the write-ahead changelog in a [`tiered::TieredStorageConfig`]. +/// +/// The changelog records in-flight mutations for crash recovery. Defaults to +/// [`ChangeLogConfig::Noop`], which disables crash recovery. +#[derive(Debug, Clone, Default, Deserialize, Serialize)] +#[serde(tag = "type", rename_all = "lowercase")] +pub enum ChangeLogConfig { + /// Disables the changelog. In-flight mutations are not recoverable after a crash. + #[default] + Noop, + /// [Google Bigtable] changelog backend. + /// + /// Uses a dedicated BigTable connection to record mutations without + /// interfering with the critical path. + /// + /// [Google Bigtable]: https://cloud.google.com/bigtable + BigTable(bigtable::BigTableConfig), +} + +/// Constructs a [`changelog::ChangeLog`] from the given config. +async fn changelog_from_config( + config: ChangeLogConfig, +) -> anyhow::Result> { + Ok(match config { + ChangeLogConfig::Noop => Box::new(changelog::NoopChangeLog), + ChangeLogConfig::BigTable(c) => Box::new(bigtable::BigTableBackend::new(c).await?), + }) +} diff --git a/objectstore-service/src/backend/tiered.rs b/objectstore-service/src/backend/tiered.rs index ec626ca2..486a1d8b 100644 --- a/objectstore-service/src/backend/tiered.rs +++ b/objectstore-service/src/backend/tiered.rs @@ -111,7 +111,7 @@ use crate::backend::common::{ Backend, DeleteResponse, GetResponse, HighVolumeBackend, MetadataResponse, PutResponse, TieredGet, TieredMetadata, TieredWrite, Tombstone, }; -use crate::backend::{HighVolumeStorageConfig, StorageConfig}; +use crate::backend::{ChangeLogConfig, HighVolumeStorageConfig, StorageConfig}; use crate::error::Result; use crate::id::ObjectId; use crate::stream::{ClientStream, SizedPeek}; @@ -159,6 +159,11 @@ pub struct TieredStorageConfig { pub high_volume: HighVolumeStorageConfig, /// Backend for large, long-term objects. pub long_term: Box, + /// Write-ahead changelog for crash recovery of in-flight mutations. + /// + /// Defaults to [`ChangeLogConfig::Noop`], which disables crash recovery. + #[serde(default)] + pub changelog: ChangeLogConfig, } /// Two-tier storage backend that routes objects by size. @@ -1280,7 +1285,7 @@ mod tests { assert!(lt_inner.is_empty(), "orphaned LT blob was not cleaned up"); // The log entry must be gone once cleanup completes. - let entries = log.scan().await.unwrap(); + let entries: Vec<_> = log.scan().await.unwrap().collect().await; assert!( entries.is_empty(), "changelog entry not removed after cleanup" From b257cc9df1792f91989e5f741d7504a2cdb214f3 Mon Sep 17 00:00:00 2001 From: Jan Michael Auer Date: Fri, 27 Mar 2026 09:16:17 +0100 Subject: [PATCH 2/3] feat(service): Add changelog heartbeat and replace stream scan with bounded Vec - Add a heartbeat task spawned per in-flight change that re-records the entry every 10s, keeping it alive in durable storage while cleanup is in progress. The heartbeat is aborted before log removal to prevent re-recording a deleted entry. - Rewrite ChangeManager::recover() as an infinite polling loop: exponential backoff on scan errors, fixed wait on empty results, immediate re-poll after processing entries. - Replace the lazy ChangeStream (BoxStream) with an eager bounded Vec. The scan now accepts a max count, passed as rows_limit to Bigtable's ReadRows RPC, so the streaming connection is closed before cleanup begins. This avoids holding a long-lived gRPC stream open across slow sequential cleanup. --- objectstore-service/src/backend/bigtable.rs | 77 ++++----- objectstore-service/src/backend/changelog.rs | 166 +++++++++++-------- objectstore-service/src/backend/tiered.rs | 6 +- 3 files changed, 133 insertions(+), 116 deletions(-) diff --git a/objectstore-service/src/backend/bigtable.rs b/objectstore-service/src/backend/bigtable.rs index 347a2862..568b63d8 100644 --- a/objectstore-service/src/backend/bigtable.rs +++ b/objectstore-service/src/backend/bigtable.rs @@ -36,12 +36,12 @@ use std::time::{Duration, SystemTime}; use bigtable_rs::bigtable::{BigTableConnection, Error as BigTableError, RowCell}; use bigtable_rs::google::bigtable::v2::{self, mutation}; use bytes::Bytes; -use futures_util::{StreamExt, TryStreamExt}; +use futures_util::TryStreamExt; use objectstore_types::metadata::{ExpirationPolicy, Metadata}; use serde::{Deserialize, Serialize}; use tonic::Code; -use crate::backend::changelog::{Change, ChangeId, ChangeLog, ChangeStream}; +use crate::backend::changelog::{Change, ChangeId, ChangeLog}; use crate::backend::common::{ Backend, DeleteResponse, GetResponse, HighVolumeBackend, MetadataResponse, PutResponse, TieredGet, TieredMetadata, TieredWrite, Tombstone, @@ -1342,16 +1342,12 @@ fn changelog_entry_from_row( /// skipped (already claimed by another instance, or not a valid changelog row), and /// `Err` if a recoverable error occurred that the caller should log. async fn process_scan_row( - result: Result<(Vec, Vec), bigtable_rs::bigtable::Error>, + row_key: Vec, + cells: Vec, bigtable: &BigTableConnection, table_path: &str, cutoff_micros: i64, ) -> Result> { - let (row_key, cells) = result.map_err(|e| Error::Generic { - context: "changelog scan row error".to_owned(), - cause: Some(Box::new(e)), - })?; - let Some((id, change)) = changelog_entry_from_row(row_key, cells)? else { return Ok(None); }; @@ -1383,7 +1379,7 @@ impl ChangeLog for BigTableBackend { Ok(()) } - async fn scan(&self) -> Result { + async fn scan(&self, max: u16) -> Result> { // Compute the staleness cutoff: entries with cell timestamp older than // this were written by operations that are no longer bumping heartbeats. let staleness_threshold_micros = CHANGE_THRESHOLD.as_millis() as i64 * 1000; @@ -1392,40 +1388,36 @@ impl ChangeLog for BigTableBackend { let request = v2::ReadRowsRequest { table_name: self.table_path.clone(), filter: Some(changelog_staleness_filter(cutoff_micros)), + rows_limit: max.into(), ..Default::default() }; - // Open a server-streaming RPC. Rows arrive lazily as the caller polls, - // so CAS claims are interleaved with row delivery without buffering the - // full result set. Mid-stream gRPC errors skip the affected row and are - // logged; the initial connection error is retried then propagates as Err. - let row_stream = retry("changelog-scan", || async { + let rows = retry("changelog-scan", || async { self.bigtable .client() - .stream_rows_with_prefix(request.clone(), CHANGE_PREFIX.to_vec()) + .read_rows_with_prefix(request.clone(), CHANGE_PREFIX.to_vec()) .await }) .await?; - // Clone only the two fields needed for CAS claims. - let bigtable = self.bigtable.clone(); - let table_path = self.table_path.clone(); - - let stream = row_stream.filter_map(move |result| { - let bigtable = bigtable.clone(); - let table_path = table_path.clone(); - async move { - match process_scan_row(result, &bigtable, &table_path, cutoff_micros).await { - Ok(entry) => entry, - Err(e) => { - objectstore_log::error!(!!&e, "Failed to process changelog scan row"); - None - } - } + let mut entries = Vec::with_capacity(rows.len()); + for (row_key, cells) in rows { + match process_scan_row( + row_key, + cells, + &self.bigtable, + &self.table_path, + cutoff_micros, + ) + .await + { + Ok(Some(entry)) => entries.push(entry), + Ok(None) => {} + Err(e) => objectstore_log::error!(!!&e, "Failed to process changelog scan row"), } - }); + } - Ok(stream.boxed()) + Ok(entries) } } @@ -1437,6 +1429,7 @@ mod tests { use objectstore_types::scope::{Scope, Scopes}; use super::*; + use crate::backend::changelog::ChangeId; use crate::id::ObjectContext; use crate::stream; @@ -2325,13 +2318,13 @@ mod tests { async fn changelog_record_remove_scan() -> Result<()> { let _guard = CHANGELOG_SCAN_LOCK.lock().unwrap(); let backend = create_test_backend().await?; - let change_id = crate::backend::changelog::ChangeId::new(); + let change_id = ChangeId::new(); let (_, change) = make_change("cr-obj"); backend.record(&change_id, &change).await?; backend.remove(&change_id).await?; - let entries: Vec<_> = backend.scan().await?.collect().await; + let entries: Vec<_> = backend.scan(100).await?; assert!( entries.iter().all(|(id, _)| id != &change_id), "removed entry must not appear in scan" @@ -2344,7 +2337,7 @@ mod tests { #[tokio::test] async fn changelog_remove_nonexistent_is_ok() -> Result<()> { let backend = create_test_backend().await?; - let change_id = crate::backend::changelog::ChangeId::new(); + let change_id = ChangeId::new(); backend.remove(&change_id).await?; @@ -2356,12 +2349,12 @@ mod tests { async fn changelog_fresh_entry_not_returned_by_scan() -> Result<()> { let _guard = CHANGELOG_SCAN_LOCK.lock().unwrap(); let backend = create_test_backend().await?; - let change_id = crate::backend::changelog::ChangeId::new(); + let change_id = ChangeId::new(); let (_, change) = make_change("fresh-obj"); backend.record(&change_id, &change).await?; - let entries: Vec<_> = backend.scan().await?.collect().await; + let entries: Vec<_> = backend.scan(100).await?; // The freshly-written entry has a current timestamp — scan must skip it. assert!( entries.iter().all(|(id, _)| id != &change_id), @@ -2389,8 +2382,6 @@ mod tests { /// then verify scan claims it. #[tokio::test] async fn changelog_stale_entry_claimed_by_scan() -> Result<()> { - use crate::backend::changelog::ChangeId; - let _guard = CHANGELOG_SCAN_LOCK.lock().unwrap(); let backend = create_test_backend().await?; let change_id = ChangeId::new(); @@ -2410,7 +2401,7 @@ mod tests { .await?; // Scan should claim and return it. - let entries: Vec<_> = backend.scan().await?.collect().await; + let entries: Vec<_> = backend.scan(100).await?; assert!( entries.iter().any(|(id, _)| id == &change_id), "stale entry must be claimed by scan" @@ -2425,8 +2416,6 @@ mod tests { /// A stale entry claimed by one scan instance is not claimed again by a concurrent scan. #[tokio::test] async fn changelog_stale_entry_claimed_once() -> Result<()> { - use crate::backend::changelog::ChangeId; - let _guard = CHANGELOG_SCAN_LOCK.lock().unwrap(); let backend = create_test_backend().await?; let change_id = ChangeId::new(); @@ -2446,11 +2435,11 @@ mod tests { .await?; // First scan claims the entry (bumps timestamp to now). - let first: Vec<_> = backend.scan().await?.collect().await; + let first: Vec<_> = backend.scan(100).await?; let claimed_first = first.iter().any(|(id, _)| id == &change_id); // Second scan should NOT return the same entry (timestamp is now fresh). - let second: Vec<_> = backend.scan().await?.collect().await; + let second: Vec<_> = backend.scan(100).await?; let claimed_second = second.iter().any(|(id, _)| id == &change_id); assert!(claimed_first, "first scan must claim the entry"); diff --git a/objectstore-service/src/backend/changelog.rs b/objectstore-service/src/backend/changelog.rs index 06ed0ab8..a9b229f1 100644 --- a/objectstore-service/src/backend/changelog.rs +++ b/objectstore-service/src/backend/changelog.rs @@ -24,10 +24,8 @@ use std::fmt; use std::sync::{Arc, Mutex}; use std::time::Duration; -use futures_util::StreamExt; -use futures_util::stream::BoxStream; +use tokio::task::JoinHandle; use tokio_util::task::TaskTracker; -use tokio_util::task::task_tracker::TaskTrackerToken; use crate::backend::common::{Backend, HighVolumeBackend, TieredMetadata}; use crate::error::Result; @@ -37,13 +35,12 @@ use crate::id::ObjectId; const INITIAL_BACKOFF: Duration = Duration::from_millis(100); /// Maximum delay for exponential backoff retries in background cleanup tasks. const MAX_BACKOFF: Duration = Duration::from_secs(30); - -/// Stream of changes claimed during a recovery scan. -/// -/// Each item is a `(ChangeId, Change)` pair that has been atomically claimed -/// from durable storage. Items are yielded lazily — the CAS claim for each -/// entry happens as the caller polls the stream. -pub type ChangeStream = BoxStream<'static, (ChangeId, Change)>; +/// Maximum number of stale entries returned per [`ChangeLog::scan`] call. +const SCAN_COUNT: u16 = 100; +/// How often the heartbeat task re-records a change to extend its lifetime. +const REFRESH_INTERVAL: Duration = Duration::from_secs(10); +/// Time after which a change is considered expired if not completed or refreshed. +pub const EXPIRY: Duration = REFRESH_INTERVAL.saturating_mul(3); /// Unique identifier for a change log entry. /// @@ -132,49 +129,73 @@ impl ChangeManager { /// When the [`ChangeGuard`] is dropped, a background process is spawned to /// clean up any unreferenced objects in LT storage. pub async fn record(self: Arc, change: Change) -> Result { - let token = self.tracker.token(); - let id = ChangeId::new(); self.changelog.record(&id, &change).await?; let state = ChangeState { - id, - change, + id: id.clone(), + change: change.clone(), phase: ChangePhase::Recorded, manager: self.clone(), - _token: token, + heartbeat: Some(self.clone().spawn_heartbeat(id, change)), }; Ok(ChangeGuard { state: Some(state) }) } - /// Scans the changelog for outstanding entries and runs cleanup for each. + fn spawn_heartbeat(self: Arc, id: ChangeId, change: Change) -> JoinHandle<()> { + let token = self.tracker.token(); + + tokio::spawn(async move { + // TODO: Explain why the token is here now. + let _token = token; + + loop { + tokio::time::sleep(REFRESH_INTERVAL).await; + // TODO: Ensure we cancel cleanup if we keep failing here! + let _ = self.changelog.record(&id, &change).await; + } + }) + } + + /// Polls the changelog for stale entries and runs cleanup for each. /// - /// Spawn this into a background task at startup to recover from any orphaned objects after a - /// crash. During normal operation, this should return an empty list and have no effect. - pub async fn recover(self: Arc) -> Result<()> { - // Hold one token for the duration of recovery to prevent premature shutdown. - let _token = self.tracker.token(); - - let mut stream = - self.changelog.scan().await.inspect_err(|e| { - objectstore_log::error!(!!e, "Failed to run changelog recovery") - })?; - - // NB: Intentionally clean up sequentially to reduce load on the system. - while let Some((id, change)) = stream.next().await { - let state = ChangeState { - id, - change, - phase: ChangePhase::Recovered, - manager: self.clone(), - _token: self.tracker.token(), - }; - - state.cleanup().await; - } + /// Runs as an infinite background loop. On each iteration: + /// - If the scan fails, waits with exponential backoff before retrying. + /// - If the scan returns entries, cleans them up sequentially then loops immediately. + /// - If the scan is empty, waits [`REFRESH_INTERVAL`] before polling again. + /// + /// Spawn this at startup to recover from any orphaned objects after a crash. + pub async fn recover(self: Arc) { + let mut backoff = INITIAL_BACKOFF; - Ok(()) + loop { + match self.changelog.scan(SCAN_COUNT).await { + Err(e) => { + objectstore_log::error!(!!&e, "Failed to run changelog recovery"); + tokio::time::sleep(backoff).await; + backoff = (backoff.mul_f32(1.5)).min(MAX_BACKOFF); + } + Ok(entries) if entries.is_empty() => { + backoff = INITIAL_BACKOFF; + tokio::time::sleep(REFRESH_INTERVAL).await; + } + Ok(entries) => { + backoff = INITIAL_BACKOFF; + // NB: Intentionally sequential to reduce load on the system. + for (id, change) in entries { + let state = ChangeState { + id: id.clone(), + change: change.clone(), + phase: ChangePhase::Recovered, + manager: self.clone(), + heartbeat: Some(self.clone().spawn_heartbeat(id, change)), + }; + state.cleanup().await; + } + } + } + } } } @@ -205,12 +226,12 @@ pub trait ChangeLog: fmt::Debug + Send + Sync { /// a nonexistent entry is not an error (idempotent). async fn remove(&self, id: &ChangeId) -> Result<()>; - /// Scans for outstanding changes and returns them as a lazy stream. + /// Returns up to `max` changes eligible for recovery. /// - /// For durable implementations this performs atomic claiming via - /// compare-and-swap so that concurrent instances do not double-process - /// the same entry. Items where the claim fails are silently skipped. - async fn scan(&self) -> Result; + /// During normal operation, this returns an empty list. After a crash, it + /// may discover stale entries from another instance, claim them, and + /// return them for cleanup. + async fn scan(&self, max: u16) -> Result>; } /// In-memory [`ChangeLog`] for tests and deployments without durable logging. @@ -222,6 +243,18 @@ pub struct InMemoryChangeLog { entries: Arc>>, } +impl InMemoryChangeLog { + /// Returns `true` if the log contains no entries. + pub fn is_empty(&self) -> bool { + self.entries.lock().expect("lock poisoned").is_empty() + } + + /// Returns a snapshot of all entries currently in the log. + pub fn entries(&self) -> HashMap { + self.entries.lock().expect("lock poisoned").clone() + } +} + #[async_trait::async_trait] impl ChangeLog for InMemoryChangeLog { async fn record(&self, id: &ChangeId, change: &Change) -> Result<()> { @@ -236,15 +269,8 @@ impl ChangeLog for InMemoryChangeLog { Ok(()) } - async fn scan(&self) -> Result { - // TODO: Check if we should return anything here. This should probably yield empty since - // there are no abandoned in-memory entries ever. - let entries = self.entries.lock().expect("lock poisoned"); - let items: Vec<(ChangeId, Change)> = entries - .iter() - .map(|(id, change)| (id.clone(), change.clone())) - .collect(); - Ok(futures_util::stream::iter(items).boxed()) + async fn scan(&self, _max: u16) -> Result> { + Ok(vec![]) } } @@ -266,8 +292,8 @@ impl ChangeLog for NoopChangeLog { Ok(()) } - async fn scan(&self) -> Result { - Ok(futures_util::stream::empty().boxed()) + async fn scan(&self, _max: u16) -> Result> { + Ok(vec![]) } } @@ -304,7 +330,7 @@ struct ChangeState { change: Change, phase: ChangePhase, manager: Arc, - _token: TaskTrackerToken, + heartbeat: Option>, } impl ChangeState { @@ -314,7 +340,7 @@ impl ChangeState { } /// Determines tombstone state and runs cleanup for unreferenced objects. - async fn cleanup(self) { + async fn cleanup(mut self) { let current = match self.phase { // For `Recovered`, we must first check the state of the tombstone. ChangePhase::Recovered => self.read_tombstone().await, @@ -375,7 +401,13 @@ impl ChangeState { } /// Removes this change's log entry, retrying with exponential backoff until success. - async fn cleanup_log(&self) { + /// + /// Aborts the heartbeat task first so it cannot re-record after removal. + async fn cleanup_log(&mut self) { + if let Some(handle) = self.heartbeat.take() { + handle.abort(); + } + let mut delay = INITIAL_BACKOFF; while self.manager.changelog.remove(&self.id).await.is_err() { tokio::time::sleep(delay).await; @@ -386,6 +418,10 @@ impl ChangeState { impl Drop for ChangeState { fn drop(&mut self) { + if let Some(handle) = self.heartbeat.take() { + handle.abort(); + } + if self.phase != ChangePhase::Completed { objectstore_log::error!( change = ?self.change, @@ -457,9 +493,9 @@ mod tests { log.record(&id, &change).await.unwrap(); - let entries: Vec<_> = log.scan().await.unwrap().collect().await; + let entries = log.entries(); assert_eq!(entries.len(), 1); - assert_eq!(entries[0].0, id); + assert!(entries.contains_key(&id)); } #[tokio::test] @@ -475,8 +511,7 @@ mod tests { log.record(&id, &change).await.unwrap(); log.remove(&id).await.unwrap(); - let entries: Vec<_> = log.scan().await.unwrap().collect().await; - assert!(entries.is_empty()); + assert!(log.is_empty()); } #[tokio::test] @@ -517,9 +552,6 @@ mod tests { drop(guard); // Log entry must survive so recovery can clean up the orphaned blob. - let rt = tokio::runtime::Runtime::new().unwrap(); - let entries: Vec<_> = - rt.block_on(async { log.scan().await.unwrap().collect::>().await }); - assert_eq!(entries.len(), 1, "log entry must persist"); + assert_eq!(log.entries().len(), 1, "log entry must persist"); } } diff --git a/objectstore-service/src/backend/tiered.rs b/objectstore-service/src/backend/tiered.rs index 486a1d8b..23329645 100644 --- a/objectstore-service/src/backend/tiered.rs +++ b/objectstore-service/src/backend/tiered.rs @@ -1285,10 +1285,6 @@ mod tests { assert!(lt_inner.is_empty(), "orphaned LT blob was not cleaned up"); // The log entry must be gone once cleanup completes. - let entries: Vec<_> = log.scan().await.unwrap().collect().await; - assert!( - entries.is_empty(), - "changelog entry not removed after cleanup" - ); + assert!(log.is_empty(), "changelog entry not removed after cleanup"); } } From 37e63ecffe65dce300d422be0bb97a512718bc8b Mon Sep 17 00:00:00 2001 From: Jan Michael Auer Date: Fri, 27 Mar 2026 11:27:09 +0100 Subject: [PATCH 3/3] feat(service): Abort in-process cleanup when changelog heartbeat fails MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit If changelog heartbeats fail consistently, the write-ahead log entry will expire and be claimed by recovery on another instance. Scheduling local cleanup in addition would be redundant and add load during incidents where Bigtable is already degraded — recovery runs cleanup sequentially and in a controlled way. - Heartbeat now returns on the first error (after internal retries); the tracker token is released and the JoinHandle becomes finished. - ChangeGuard::is_valid() exposes whether the heartbeat is still running. - put_long_term checks is_valid() before the HV CAS and aborts if the heartbeat has stopped, preventing the race where recovery reads a pre-CAS tombstone and deletes the newly uploaded LT blob. - ChangeGuard::drop skips scheduling local cleanup when is_valid() is false, leaving the log entry for recovery to handle. --- objectstore-service/src/backend/changelog.rs | 26 +++++++++++++++++--- objectstore-service/src/backend/tiered.rs | 11 ++++++++- 2 files changed, 32 insertions(+), 5 deletions(-) diff --git a/objectstore-service/src/backend/changelog.rs b/objectstore-service/src/backend/changelog.rs index a9b229f1..187aa081 100644 --- a/objectstore-service/src/backend/changelog.rs +++ b/objectstore-service/src/backend/changelog.rs @@ -147,13 +147,16 @@ impl ChangeManager { let token = self.tracker.token(); tokio::spawn(async move { - // TODO: Explain why the token is here now. + // Prevent shutdown while the change is active in this instance. let _token = token; loop { tokio::time::sleep(REFRESH_INTERVAL).await; - // TODO: Ensure we cancel cleanup if we keep failing here! - let _ = self.changelog.record(&id, &change).await; + if self.changelog.record(&id, &change).await.is_err() { + // `record` retries internally. If it fails repeatedly, stop the heartbeat and + // let recovery from a different instance handle the expired entry. + return; + } } }) } @@ -414,6 +417,11 @@ impl ChangeState { delay = (delay.mul_f32(1.5)).min(MAX_BACKOFF); } } + + /// Returns `true` if the change entry is still held by this instance. + fn is_valid(&self) -> bool { + self.heartbeat.as_ref().is_some_and(|h| !h.is_finished()) + } } impl Drop for ChangeState { @@ -444,17 +452,27 @@ pub struct ChangeGuard { impl ChangeGuard { /// Advances the operation to the given phase. Zero-cost, no I/O. - pub(crate) fn advance(&mut self, phase: ChangePhase) { + pub fn advance(&mut self, phase: ChangePhase) { if let Some(ref mut state) = self.state { state.phase = phase; } } + + /// Returns `true` if the change entry is still held by this instance. + /// + /// This can return false if the internal heartbeats failed to refresh the entry in the log, + /// causing it to be claimed by another instance during recovery. Proceeding with tombstone + /// writes during this time can lead to inconsistencies. + pub fn is_valid(&self) -> bool { + self.state.as_ref().is_some_and(|s| s.is_valid()) + } } impl Drop for ChangeGuard { fn drop(&mut self) { if let Some(state) = self.state.take() && state.phase != ChangePhase::Completed + && state.is_valid() && let Ok(handle) = tokio::runtime::Handle::try_current() { handle.spawn(state.cleanup()); diff --git a/objectstore-service/src/backend/tiered.rs b/objectstore-service/src/backend/tiered.rs index 23329645..4b582407 100644 --- a/objectstore-service/src/backend/tiered.rs +++ b/objectstore-service/src/backend/tiered.rs @@ -112,7 +112,7 @@ use crate::backend::common::{ TieredGet, TieredMetadata, TieredWrite, Tombstone, }; use crate::backend::{ChangeLogConfig, HighVolumeStorageConfig, StorageConfig}; -use crate::error::Result; +use crate::error::{Error, Result}; use crate::id::ObjectId; use crate::stream::{ClientStream, SizedPeek}; @@ -324,6 +324,15 @@ impl TieredStorage { .await?; guard.advance(ChangePhase::Written); + if !guard.is_valid() { + // The log is no longer synchronized due to backend failures. Abort since writing now + // could lead to an orphan. + return Err(Error::Io(std::io::Error::new( + std::io::ErrorKind::TimedOut, + "failed to keep write-ahead log synchronize", + ))); + } + // 3. CAS commit: write tombstone only if HV state matches what we saw. let tombstone = Tombstone { target: new.clone(),