diff --git a/src/fetch/tests.rs b/src/fetch/tests.rs index d32107f0..034b29c5 100644 --- a/src/fetch/tests.rs +++ b/src/fetch/tests.rs @@ -9,7 +9,7 @@ use tonic::async_trait; use crate::config::Config; use crate::store::activation::{Activation, ActivationStatus}; use crate::store::traits::ActivationStore; -use crate::store::types::{BucketRange, FailedTasksForwarder}; +use crate::store::types::{BucketRange, FailedTasksForwarder, StatusUpdate}; use crate::test_utils::make_activations; use super::*; @@ -127,11 +127,7 @@ impl ActivationStore for MockStore { unimplemented!() } - async fn set_status_batch( - &self, - _ids: &[String], - _status: ActivationStatus, - ) -> Result { + async fn set_status_batch(&self, _updates: &[StatusUpdate]) -> Result { unimplemented!() } diff --git a/src/grpc/server.rs b/src/grpc/server.rs index 103a0a10..b1403ae1 100644 --- a/src/grpc/server.rs +++ b/src/grpc/server.rs @@ -18,6 +18,7 @@ use tracing::{debug, error, instrument, warn}; use crate::config::{Config, DeliveryMode}; use crate::store::activation::ActivationStatus; use crate::store::traits::ActivationStore; +pub use crate::store::types::StatusUpdate; pub struct TaskbrokerServer { pub store: Arc, @@ -107,16 +108,17 @@ impl ConsumerService for TaskbrokerServer { let max_attempts = request.get_ref().max_attempts; let delay_on_retry = request.get_ref().delay_on_retry; - // Use batching channel if available and we don't need to update retry state. - // If max_attempts or delay_on_retry is Some, we can't use batching API to update the - // activation, and have to fall back to individual set_status. - if let Some(ref tx) = self.update_tx - && max_attempts.is_none() - && delay_on_retry.is_none() - { - tx.send((id, status)) - .await - .map_err(|_| Status::internal("Status update channel closed"))?; + // Use the batching channel if available. Retry-state updates (max_attempts / + // delay_on_retry) are batched too — the flusher's set_status_batch applies them. + if let Some(ref tx) = self.update_tx { + tx.send(StatusUpdate { + id, + status, + max_attempts, + delay_on_retry, + }) + .await + .map_err(|_| Status::internal("Status update channel closed"))?; metrics::histogram!("grpc_server.set_status.duration").record(start_time.elapsed()); return Ok(Response::new(SetTaskStatusResponse { task: None })); @@ -227,12 +229,11 @@ impl ConsumerService for TaskbrokerServer { let start_time = Instant::now(); let updates = request.get_ref().updates.clone(); - // Updates can be broken into different batches based on the status and the retry state. - let mut batches: HashMap> = HashMap::new(); - let mut retry_updates: Vec = Vec::new(); - + // Validate and convert every update into a StatusUpdate. Retry state (max_attempts / + // delay_on_retry) rides along and is applied by set_status_batch in the same transaction. + let mut status_updates: Vec = Vec::with_capacity(updates.len()); + let mut sizes_by_status: HashMap = HashMap::new(); for update in updates { - let id = update.id.clone(); let status: ActivationStatus = TaskActivationStatus::try_from(update.status) .map_err(|e| { Status::invalid_argument(format!("Unable to deserialize status: {e:?}")) @@ -246,153 +247,90 @@ impl ConsumerService for TaskbrokerServer { if status == ActivationStatus::Failure { metrics::counter!("grpc_server.set_status.failure").increment(1); } - if update.max_attempts.is_some() || update.delay_on_retry.is_some() { - retry_updates.push(update); - } else { - batches.entry(status).or_default().push(id); - } + *sizes_by_status.entry(status).or_default() += 1; + status_updates.push(StatusUpdate { + id: update.id, + status, + max_attempts: update.max_attempts, + delay_on_retry: update.delay_on_retry, + }); } - // For each status and batch of IDs, update the status for those IDs. In the case of a failure, return an error to the worker. - // Track and log a warning if the number of rows updated is less than the number of IDs requested. metrics::histogram!("grpc_server.set_batch_activation_status.num_batches") - .record(batches.len() as f64); - for (status, ids) in batches { - let requested = ids.len() as u64; - metrics::histogram!("grpc_server.set_batch_activation_status.batch_size", "status" => status.to_string()).record(requested as f64); - match self.store.set_status_batch(&ids, status).await { - Ok(affected) => { - metrics::histogram!("grpc_server.set_batch_activation_status.affected_diff", "status" => status.to_string()) - .record((requested - affected) as f64); - if affected < requested { - metrics::histogram!( - "grpc_server.set_batch_activation_status.partial", - "status" => status.to_string() - ) - .record((requested - affected) as f64); - warn!( - ?status, - requested, affected, "Updated fewer rows than IDs requested in batch" - ); - } - } - Err(e) => { - metrics::counter!("grpc_server.set_status", "result" => "error", "status" => status.to_string()).increment(requested); - metrics::histogram!("grpc_server.set_batch_activation_status.duration") - .record(start_time.elapsed()); - error!("Failed to set batch activation status: {:?}", e); - return Err(Status::internal("Failed to set batch activation status")); - } - } + .record(sizes_by_status.len() as f64); + for (status, size) in &sizes_by_status { + metrics::histogram!("grpc_server.set_batch_activation_status.batch_size", "status" => status.to_string()) + .record(*size as f64); } - metrics::histogram!("grpc_server.set_batch_activation_status.retry_updates") - .record(retry_updates.len() as f64); - for update in retry_updates { - let id = update.id.clone(); - let status: ActivationStatus = TaskActivationStatus::try_from(update.status) - .map_err(|e| { - Status::invalid_argument(format!("Unable to deserialize status: {e:?}")) - })? - .into(); - let max_attempts = update.max_attempts; - let delay_on_retry = update.delay_on_retry; - match self - .store - .set_status(&id, status, max_attempts, delay_on_retry) - .await - { - Ok(Some(_)) => metrics::counter!("grpc_server.set_status", "result" => "ok", "status" => status.to_string()).increment(1), - Ok(None) => metrics::counter!("grpc_server.set_status", "result" => "not_found", "status" => status.to_string()).increment(1), - - Err(e) => { - metrics::counter!("grpc_server.set_status", "result" => "error", "status" => status.to_string()).increment(1); - - error!("Unable to update status of activation in batch {:?} to {:?}: {:?}", id, status, e); - - metrics::histogram!("grpc_server.set_batch_activation_status.duration").record(start_time.elapsed()); - return Err(Status::internal(format!( - "Unable to update status of activation in batch {id:?} to {status:?}" - ))); + let requested = status_updates.len() as u64; + match self.store.set_status_batch(&status_updates).await { + Ok(affected) => { + metrics::histogram!("grpc_server.set_batch_activation_status.affected_diff") + .record((requested - affected) as f64); + if affected < requested { + metrics::histogram!("grpc_server.set_batch_activation_status.partial") + .record((requested - affected) as f64); + warn!( + requested, + affected, "Updated fewer rows than IDs requested in batch" + ); } } + Err(e) => { + metrics::counter!("grpc_server.set_status", "result" => "error") + .increment(requested); + metrics::histogram!("grpc_server.set_batch_activation_status.duration") + .record(start_time.elapsed()); + error!("Failed to set batch activation status: {:?}", e); + return Err(Status::internal("Failed to set batch activation status")); + } } + metrics::histogram!("grpc_server.set_batch_activation_status.duration") .record(start_time.elapsed()); return Ok(Response::new(SetBatchActivationStatusResponse {})); } } -pub type StatusUpdate = (String, ActivationStatus); - pub async fn flush_updates(store: Arc, buffer: &mut Vec) { if buffer.is_empty() { return; } - let mut by_status: HashMap> = HashMap::new(); - - for (id, status) in buffer.drain(..) { - by_status.entry(status).or_default().push(id); - } - - for (status, ids) in by_status { - let requested = ids.len() as u64; - let st = status.to_string(); - - metrics::histogram!("grpc_server.flush_updates.requested", "status" => st.clone()) - .record(requested as f64); - - match store.set_status_batch(&ids, status).await { - Ok(affected) => { - metrics::histogram!( - "grpc_server.flush_updates.affected", - "status" => st.clone() - ) - .record(affected as f64); - - metrics::counter!( - "grpc_server.flush_updates.updated", - "status" => st.clone() - ) - .increment(affected); + let requested = buffer.len() as u64; + metrics::histogram!("grpc_server.flush_updates.requested").record(requested as f64); - metrics::counter!("grpc_server.flush_updates", "result" => "ok").increment(1); - - if affected < requested { - metrics::counter!( - "grpc_server.flush_updates.partial", - "status" => st.clone() - ) - .increment(1); + match store.set_status_batch(buffer).await { + Ok(affected) => { + metrics::histogram!("grpc_server.flush_updates.affected").record(affected as f64); + metrics::counter!("grpc_server.flush_updates.updated").increment(affected); + metrics::counter!("grpc_server.flush_updates", "result" => "ok").increment(1); - warn!( - ?status, - requested, affected, "Updated fewer rows than IDs requested from server" - ); - } - - debug!( - ?status, - affected, requested, "Flushed status batch from server" + if affected < requested { + metrics::counter!("grpc_server.flush_updates.partial").increment(1); + warn!( + requested, + affected, "Updated fewer rows than IDs requested from server" ); } - Err(e) => { - metrics::counter!("grpc_server.flush_updates", "result" => "error").increment(1); + debug!(affected, requested, "Flushed status batch from server"); - error!( - ?status, - requested, - error = ?e, - "Failed to flush status batch from server" - ); + // Successfully flushed; clear the buffer. + buffer.clear(); + } - // Push failed updates back into the buffer so they can be retried on next flush - for id in ids { - buffer.push((id, status)); - } - } + Err(e) => { + metrics::counter!("grpc_server.flush_updates", "result" => "error").increment(1); + + error!( + requested, + error = ?e, + "Failed to flush status batch from server" + ); + + // Leave the buffer intact so the updates are retried on the next flush. } } } diff --git a/src/grpc/server_tests.rs b/src/grpc/server_tests.rs index d503b36c..41670f9e 100644 --- a/src/grpc/server_tests.rs +++ b/src/grpc/server_tests.rs @@ -11,7 +11,7 @@ use tokio::sync::mpsc; use tonic::{Code, Request}; use crate::config::{Config, DeliveryMode}; -use crate::grpc::server::{StatusUpdate, TaskbrokerServer}; +use crate::grpc::server::{StatusUpdate, TaskbrokerServer, flush_updates}; use crate::store::activation::ActivationStatus; use crate::test_utils::{create_config, create_test_store, make_activations}; @@ -457,9 +457,9 @@ async fn test_set_task_status_forwards_to_update_channel(#[case] adapter: &str) "push path returns no next task from the store" ); - let (id, status) = update_rx.recv().await.expect("status update on channel"); - assert_eq!(id, "id_0"); - assert_eq!(status, ActivationStatus::Complete); + let update = update_rx.recv().await.expect("status update on channel"); + assert_eq!(update.id, "id_0"); + assert_eq!(update.status, ActivationStatus::Complete); let row = store.get_by_id("id_0").await.unwrap().expect("row exists"); assert_eq!( @@ -632,8 +632,8 @@ async fn test_set_batch_activation_status_retry(#[case] adapter: &str) { update_tx: None, }; - // max_attempts is set, so this update is applied individually via set_status - // rather than being grouped into a batch. + // max_attempts / delay_on_retry are set, so set_status_batch also updates the + // activation's retry_state in the same transaction as the status flip. let request = SetBatchActivationStatusRequest { updates: vec![SetTaskStatusRequest { id: "id_0".to_string(), @@ -651,6 +651,10 @@ async fn test_set_batch_activation_status_retry(#[case] adapter: &str) { let row = store.get_by_id("id_0").await.unwrap().expect("row exists"); assert_eq!(row.status, ActivationStatus::Retry); + let activation = TaskActivation::decode(&row.activation as &[u8]).unwrap(); + let retry_state = activation.retry_state.expect("retry_state persisted"); + assert_eq!(retry_state.max_attempts, 5); + assert_eq!(retry_state.delay_on_retry, Some(60)); } #[tokio::test] @@ -671,8 +675,9 @@ async fn test_set_batch_activation_status_mixed(#[case] adapter: &str) { update_tx: None, }; - // Two batched conclusion updates plus one retry update that carries retry - // state, exercising both the batch path and the individual set_status path. + // Two conclusion updates plus one retry update that carries retry state, all + // handled by a single set_status_batch call that groups by status and updates + // the retry update's blob. let request = SetBatchActivationStatusRequest { updates: vec![ SetTaskStatusRequest { @@ -710,4 +715,56 @@ async fn test_set_batch_activation_status_mixed(#[case] adapter: &str) { assert_eq!(row1.status, ActivationStatus::Complete); let row2 = store.get_by_id("id_2").await.unwrap().expect("row exists"); assert_eq!(row2.status, ActivationStatus::Retry); + let activation2 = TaskActivation::decode(&row2.activation as &[u8]).unwrap(); + let retry_state = activation2.retry_state.expect("retry_state persisted"); + assert_eq!(retry_state.max_attempts, 3); + assert_eq!(retry_state.delay_on_retry, Some(60)); +} + +#[tokio::test] +#[rstest] +#[case::sqlite("sqlite")] +#[case::postgres("postgres")] +async fn test_flush_updates_mixed_statuses_and_retry(#[case] adapter: &str) { + let store = create_test_store(adapter).await; + + let activations = make_activations(3); + store.store(&activations).await.unwrap(); + + // A single flush carrying multiple statuses, including a retry update that also + // mutates retry_state. The flusher groups by status and applies the retry blob. + let mut buffer = vec![ + StatusUpdate { + id: "id_0".to_string(), + status: ActivationStatus::Complete, + max_attempts: None, + delay_on_retry: None, + }, + StatusUpdate { + id: "id_1".to_string(), + status: ActivationStatus::Failure, + max_attempts: None, + delay_on_retry: None, + }, + StatusUpdate { + id: "id_2".to_string(), + status: ActivationStatus::Retry, + max_attempts: Some(7), + delay_on_retry: Some(90), + }, + ]; + + flush_updates(store.clone(), &mut buffer).await; + assert!(buffer.is_empty(), "buffer is drained on a successful flush"); + + let row0 = store.get_by_id("id_0").await.unwrap().expect("row exists"); + assert_eq!(row0.status, ActivationStatus::Complete); + let row1 = store.get_by_id("id_1").await.unwrap().expect("row exists"); + assert_eq!(row1.status, ActivationStatus::Failure); + let row2 = store.get_by_id("id_2").await.unwrap().expect("row exists"); + assert_eq!(row2.status, ActivationStatus::Retry); + let activation2 = TaskActivation::decode(&row2.activation as &[u8]).unwrap(); + let retry_state = activation2.retry_state.expect("retry_state persisted"); + assert_eq!(retry_state.max_attempts, 7); + assert_eq!(retry_state.delay_on_retry, Some(90)); } diff --git a/src/push/tests.rs b/src/push/tests.rs index ce1013b6..07ab3d2f 100644 --- a/src/push/tests.rs +++ b/src/push/tests.rs @@ -11,7 +11,7 @@ use crate::config::Config; use crate::push::updater::test_eager_updater; use crate::store::activation::{Activation, ActivationStatus}; use crate::store::traits::ActivationStore; -use crate::store::types::FailedTasksForwarder; +use crate::store::types::{FailedTasksForwarder, StatusUpdate}; use crate::test_utils::make_activations; use crate::worker::test_worker_map; @@ -75,7 +75,7 @@ impl ActivationStore for MockStore { Ok(None) } - async fn set_status_batch(&self, _ids: &[String], _status: ActivationStatus) -> Result { + async fn set_status_batch(&self, _updates: &[StatusUpdate]) -> Result { Ok(0) } diff --git a/src/store/adapters/postgres.rs b/src/store/adapters/postgres.rs index cd3c4d4a..000f8a14 100644 --- a/src/store/adapters/postgres.rs +++ b/src/store/adapters/postgres.rs @@ -23,7 +23,7 @@ use crate::push::compute_claim_lease_ms; use crate::store::activation::{Activation, ActivationStatus}; use crate::store::retry::retry_query; use crate::store::traits::ActivationStore; -use crate::store::types::{BucketRange, DepthCounts, FailedTasksForwarder}; +use crate::store::types::{BucketRange, DepthCounts, FailedTasksForwarder, StatusUpdate}; /// Run migrations. pub async fn migrate(config: &StoreConfig) -> Result<()> { @@ -882,26 +882,107 @@ impl ActivationStore for PostgresStore { #[instrument(skip_all)] #[framed] - async fn set_status_batch( - &self, - ids: &[String], - status: ActivationStatus, - ) -> Result { - if ids.is_empty() { + async fn set_status_batch(&self, updates: &[StatusUpdate]) -> Result { + if updates.is_empty() { return Ok(0); } + // Group ids by status. Updates that carry retry state are tracked separately so we can + // update their activation blobs after flipping the status. + let mut ids_by_status: HashMap> = HashMap::new(); + let mut retry_params: HashMap<&str, (Option, Option)> = HashMap::new(); + for update in updates { + ids_by_status + .entry(update.status) + .or_default() + .push(&update.id); + if update.max_attempts.is_some() || update.delay_on_retry.is_some() { + retry_params.insert(&update.id, (update.max_attempts, update.delay_on_retry)); + } + } + retry_query(&self.config.retry, "set_status_batch", || async { - let mut conn = self.acquire_write_conn_metric("set_status_batch").await?; + let mut tx = self.begin_write_tx_metric("set_status_batch").await?; + let mut affected = 0u64; + + for (status, ids) in &ids_by_status { + let group_has_retry = ids.iter().any(|id| retry_params.contains_key(*id)); - let result = - sqlx::query("UPDATE inflight_taskactivations SET status = $1 WHERE id = ANY($2)") + if !group_has_retry { + // Hot path: status-only update, no blob read required. + let result = sqlx::query( + "UPDATE inflight_taskactivations SET status = $1 WHERE id = ANY($2)", + ) .bind(status.to_string()) .bind(ids) - .execute(&mut *conn) + .execute(&mut *tx) .await?; + affected += result.rows_affected(); + continue; + } - Ok(result.rows_affected()) + // Retry path: flip the status and read back the current blobs so we can update + // retry_state for the rows that need it. + let rows: Vec = sqlx::query_as( + "UPDATE inflight_taskactivations SET status = $1 WHERE id = ANY($2) RETURNING *, kafka_offset AS offset", + ) + .bind(status.to_string()) + .bind(ids) + .fetch_all(&mut *tx) + .await?; + affected += rows.len() as u64; + + let mut changed_ids: Vec = Vec::new(); + let mut changed_blobs: Vec> = Vec::new(); + for row in &rows { + let Some((max_attempts, delay_on_retry)) = + retry_params.get(&*row.id).copied() + else { + continue; + }; + + let mut activation = TaskActivation::decode(&row.activation as &[u8])?; + let mut needs_update = false; + let task_retry_state = activation.retry_state.get_or_insert_default(); + + // Only update the blob if a value actually changed. This should rarely happen + // after the first retry, since max_attempts comes from the task's retry + // decorator which stays constant across retries. + if let Some(max_attempts) = max_attempts + && task_retry_state.max_attempts != max_attempts + { + task_retry_state.max_attempts = max_attempts; + needs_update = true; + } + + if let Some(delay_on_retry) = delay_on_retry + && task_retry_state.delay_on_retry != Some(delay_on_retry) + { + task_retry_state.delay_on_retry = Some(delay_on_retry); + needs_update = true; + } + + if needs_update { + changed_ids.push(row.id.to_string()); + changed_blobs.push(activation.encode_to_vec()); + } + } + + if !changed_ids.is_empty() { + sqlx::query( + "UPDATE inflight_taskactivations AS t SET activation = d.activation \ + FROM UNNEST($1::text[], $2::bytea[]) AS d(id, activation) \ + WHERE t.id = d.id", + ) + .bind(&changed_ids) + .bind(&changed_blobs) + .execute(&mut *tx) + .await?; + } + } + + tx.commit().await?; + Ok(affected) }) .await } diff --git a/src/store/adapters/sqlite.rs b/src/store/adapters/sqlite.rs index cdd051aa..b0c072f0 100644 --- a/src/store/adapters/sqlite.rs +++ b/src/store/adapters/sqlite.rs @@ -1,4 +1,5 @@ use std::borrow::Cow; +use std::collections::HashMap; use std::str::FromStr; use std::time::Instant; @@ -29,7 +30,7 @@ use crate::config::Config; use crate::push::compute_claim_lease_ms; use crate::store::activation::{Activation, ActivationStatus}; use crate::store::traits::ActivationStore; -use crate::store::types::{BucketRange, FailedTasksForwarder}; +use crate::store::types::{BucketRange, FailedTasksForwarder, StatusUpdate}; /// Database representation of an [`Activation`], used for both reads and /// writes. @@ -839,34 +840,98 @@ impl ActivationStore for SqliteStore { } #[instrument(skip_all)] - async fn set_status_batch( - &self, - ids: &[String], - status: ActivationStatus, - ) -> Result { - if ids.is_empty() { + async fn set_status_batch(&self, updates: &[StatusUpdate]) -> Result { + if updates.is_empty() { return Ok(0); } - let mut conn = self.acquire_write_conn_metric("set_status_batch").await?; + // Group ids by status. Updates carrying retry state are tracked separately so we can + // update their activation blobs after flipping the status. + let mut ids_by_status: HashMap> = HashMap::new(); + let mut retry_params: HashMap<&str, (Option, Option)> = HashMap::new(); + for update in updates { + ids_by_status + .entry(update.status) + .or_default() + .push(&update.id); + if update.max_attempts.is_some() || update.delay_on_retry.is_some() { + retry_params.insert(&update.id, (update.max_attempts, update.delay_on_retry)); + } + } - let mut query_builder = QueryBuilder::new("UPDATE inflight_taskactivations "); + let mut tx = self.begin_write_tx_metric("set_status_batch").await?; + let mut affected = 0u64; - query_builder - .push("SET status = ") - .push_bind(status) - .push(" WHERE id IN ("); + for (status, ids) in &ids_by_status { + let mut query_builder = QueryBuilder::new("UPDATE inflight_taskactivations "); + query_builder + .push("SET status = ") + .push_bind(*status) + .push(" WHERE id IN ("); + let mut separated = query_builder.separated(", "); + for id in ids.iter() { + separated.push_bind(*id); + } + separated.push_unseparated(")"); - let mut separated = query_builder.separated(", "); + let result = query_builder.build().execute(&mut *tx).await?; + affected += result.rows_affected(); - for id in ids.iter() { - separated.push_bind(id); - } + if !ids.iter().any(|id| retry_params.contains_key(*id)) { + continue; + } - separated.push_unseparated(")"); + // Retry path: update retry_state for the rows that need it. + for id in ids.iter() { + let Some((max_attempts, delay_on_retry)) = retry_params.get(*id).copied() else { + continue; + }; - let result = query_builder.build().execute(&mut *conn).await?; - Ok(result.rows_affected()) + let row: Option = + sqlx::query_as("SELECT * FROM inflight_taskactivations WHERE id = $1") + .bind(*id) + .fetch_optional(&mut *tx) + .await?; + let Some(row) = row else { + continue; + }; + + let mut activation = TaskActivation::decode(&row.activation as &[u8])?; + let mut needs_update = false; + let task_retry_state = activation.retry_state.get_or_insert_default(); + + // Only update the blob if a value actually changed. This should rarely happen + // after the first retry, since max_attempts comes from the task's retry decorator + // which stays constant across retries. + if let Some(max_attempts) = max_attempts + && task_retry_state.max_attempts != max_attempts + { + task_retry_state.max_attempts = max_attempts; + needs_update = true; + } + + if let Some(delay_on_retry) = delay_on_retry + && task_retry_state.delay_on_retry != Some(delay_on_retry) + { + task_retry_state.delay_on_retry = Some(delay_on_retry); + needs_update = true; + } + + if needs_update { + let updated_activation = activation.encode_to_vec(); + sqlx::query( + "UPDATE inflight_taskactivations SET activation = $1 WHERE id = $2", + ) + .bind(&updated_activation) + .bind(*id) + .execute(&mut *tx) + .await?; + } + } + } + + tx.commit().await?; + Ok(affected) } #[instrument(skip_all)] diff --git a/src/store/traits.rs b/src/store/traits.rs index db5b1825..c5a596fb 100644 --- a/src/store/traits.rs +++ b/src/store/traits.rs @@ -7,7 +7,7 @@ use tokio::join; use tracing::warn; use crate::store::activation::{Activation, ActivationStatus}; -use crate::store::types::{BucketRange, DepthCounts, FailedTasksForwarder}; +use crate::store::types::{BucketRange, DepthCounts, FailedTasksForwarder, StatusUpdate}; #[async_trait] pub trait ActivationStore: Send + Sync { @@ -87,11 +87,14 @@ pub trait ActivationStore: Send + Sync { ) -> Result, Error>; /// Update the status of multiple activations in one batch. - async fn set_status_batch( - &self, - ids: &[String], - status: ActivationStatus, - ) -> Result; + /// + /// Updates that carry `max_attempts`/`delay_on_retry` (for Retry status) also have their + /// activation's `retry_state` updated. The status and any blob updates for a given activation + /// are applied within a single transaction so that a concurrent reader (e.g. upkeep) never + /// observes a `Retry` status with a stale `retry_state`. + /// + /// Returns the number of rows whose status was updated. + async fn set_status_batch(&self, updates: &[StatusUpdate]) -> Result; /// COUNT OPERATIONS /// Get the age of the oldest pending activation in seconds diff --git a/src/store/types.rs b/src/store/types.rs index 0f308fdb..dae354ac 100644 --- a/src/store/types.rs +++ b/src/store/types.rs @@ -1,5 +1,19 @@ +use crate::store::activation::ActivationStatus; + pub type BucketRange = (i16, i16); +/// A status update for a single activation. +/// +/// When `max_attempts` or `delay_on_retry` is set (for Retry status), the +/// activation's `retry_state` is also updated alongside the status column. +#[derive(Clone, Debug)] +pub struct StatusUpdate { + pub id: String, + pub status: ActivationStatus, + pub max_attempts: Option, + pub delay_on_retry: Option, +} + pub struct FailedTasksForwarder { pub to_discard: Vec<(String, Vec)>, pub to_deadletter: Vec<(String, Vec)>,