Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 2 additions & 6 deletions src/fetch/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use tonic::async_trait;
use crate::config::Config;
use crate::store::activation::{Activation, ActivationStatus};
use crate::store::traits::ActivationStore;
use crate::store::types::{BucketRange, FailedTasksForwarder};
use crate::store::types::{BucketRange, FailedTasksForwarder, StatusUpdate};
use crate::test_utils::make_activations;

use super::*;
Expand Down Expand Up @@ -127,11 +127,7 @@ impl ActivationStore for MockStore {
unimplemented!()
}

async fn set_status_batch(
&self,
_ids: &[String],
_status: ActivationStatus,
) -> Result<u64, Error> {
async fn set_status_batch(&self, _updates: &[StatusUpdate]) -> Result<u64, Error> {
unimplemented!()
}

Expand Down
210 changes: 74 additions & 136 deletions src/grpc/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use tracing::{debug, error, instrument, warn};
use crate::config::{Config, DeliveryMode};
use crate::store::activation::ActivationStatus;
use crate::store::traits::ActivationStore;
pub use crate::store::types::StatusUpdate;

pub struct TaskbrokerServer {
pub store: Arc<dyn ActivationStore>,
Expand Down Expand Up @@ -107,16 +108,17 @@ impl ConsumerService for TaskbrokerServer {
let max_attempts = request.get_ref().max_attempts;
let delay_on_retry = request.get_ref().delay_on_retry;

// Use batching channel if available and we don't need to update retry state.
// If max_attempts or delay_on_retry is Some, we can't use batching API to update the
// activation, and have to fall back to individual set_status.
if let Some(ref tx) = self.update_tx
&& max_attempts.is_none()
&& delay_on_retry.is_none()
{
tx.send((id, status))
.await
.map_err(|_| Status::internal("Status update channel closed"))?;
// Use the batching channel if available. Retry-state updates (max_attempts /
// delay_on_retry) are batched too — the flusher's set_status_batch applies them.
if let Some(ref tx) = self.update_tx {
tx.send(StatusUpdate {
id,
status,
max_attempts,
delay_on_retry,
})
.await
.map_err(|_| Status::internal("Status update channel closed"))?;

metrics::histogram!("grpc_server.set_status.duration").record(start_time.elapsed());
return Ok(Response::new(SetTaskStatusResponse { task: None }));
Expand Down Expand Up @@ -227,12 +229,11 @@ impl ConsumerService for TaskbrokerServer {
let start_time = Instant::now();
let updates = request.get_ref().updates.clone();

// Updates can be broken into different batches based on the status and the retry state.
let mut batches: HashMap<ActivationStatus, Vec<String>> = HashMap::new();
let mut retry_updates: Vec<SetTaskStatusRequest> = Vec::new();

// Validate and convert every update into a StatusUpdate. Retry state (max_attempts /
// delay_on_retry) rides along and is applied by set_status_batch in the same transaction.
let mut status_updates: Vec<StatusUpdate> = Vec::with_capacity(updates.len());
let mut sizes_by_status: HashMap<ActivationStatus, u64> = HashMap::new();
for update in updates {
let id = update.id.clone();
let status: ActivationStatus = TaskActivationStatus::try_from(update.status)
.map_err(|e| {
Status::invalid_argument(format!("Unable to deserialize status: {e:?}"))
Expand All @@ -246,153 +247,90 @@ impl ConsumerService for TaskbrokerServer {
if status == ActivationStatus::Failure {
metrics::counter!("grpc_server.set_status.failure").increment(1);
}
if update.max_attempts.is_some() || update.delay_on_retry.is_some() {
retry_updates.push(update);
} else {
batches.entry(status).or_default().push(id);
}
*sizes_by_status.entry(status).or_default() += 1;
status_updates.push(StatusUpdate {
id: update.id,
status,
max_attempts: update.max_attempts,
delay_on_retry: update.delay_on_retry,
});
}

// For each status and batch of IDs, update the status for those IDs. In the case of a failure, return an error to the worker.
// Track and log a warning if the number of rows updated is less than the number of IDs requested.
metrics::histogram!("grpc_server.set_batch_activation_status.num_batches")
.record(batches.len() as f64);
for (status, ids) in batches {
let requested = ids.len() as u64;
metrics::histogram!("grpc_server.set_batch_activation_status.batch_size", "status" => status.to_string()).record(requested as f64);
match self.store.set_status_batch(&ids, status).await {
Ok(affected) => {
metrics::histogram!("grpc_server.set_batch_activation_status.affected_diff", "status" => status.to_string())
.record((requested - affected) as f64);
if affected < requested {
metrics::histogram!(
"grpc_server.set_batch_activation_status.partial",
"status" => status.to_string()
)
.record((requested - affected) as f64);
warn!(
?status,
requested, affected, "Updated fewer rows than IDs requested in batch"
);
}
}
Err(e) => {
metrics::counter!("grpc_server.set_status", "result" => "error", "status" => status.to_string()).increment(requested);
metrics::histogram!("grpc_server.set_batch_activation_status.duration")
.record(start_time.elapsed());
error!("Failed to set batch activation status: {:?}", e);
return Err(Status::internal("Failed to set batch activation status"));
}
}
.record(sizes_by_status.len() as f64);
for (status, size) in &sizes_by_status {
metrics::histogram!("grpc_server.set_batch_activation_status.batch_size", "status" => status.to_string())
.record(*size as f64);
}

metrics::histogram!("grpc_server.set_batch_activation_status.retry_updates")
.record(retry_updates.len() as f64);
for update in retry_updates {
let id = update.id.clone();
let status: ActivationStatus = TaskActivationStatus::try_from(update.status)
.map_err(|e| {
Status::invalid_argument(format!("Unable to deserialize status: {e:?}"))
})?
.into();
let max_attempts = update.max_attempts;
let delay_on_retry = update.delay_on_retry;
match self
.store
.set_status(&id, status, max_attempts, delay_on_retry)
.await
{
Ok(Some(_)) => metrics::counter!("grpc_server.set_status", "result" => "ok", "status" => status.to_string()).increment(1),
Ok(None) => metrics::counter!("grpc_server.set_status", "result" => "not_found", "status" => status.to_string()).increment(1),

Err(e) => {
metrics::counter!("grpc_server.set_status", "result" => "error", "status" => status.to_string()).increment(1);

error!("Unable to update status of activation in batch {:?} to {:?}: {:?}", id, status, e);

metrics::histogram!("grpc_server.set_batch_activation_status.duration").record(start_time.elapsed());
return Err(Status::internal(format!(
"Unable to update status of activation in batch {id:?} to {status:?}"
)));
let requested = status_updates.len() as u64;
match self.store.set_status_batch(&status_updates).await {
Ok(affected) => {
metrics::histogram!("grpc_server.set_batch_activation_status.affected_diff")
.record((requested - affected) as f64);
if affected < requested {
metrics::histogram!("grpc_server.set_batch_activation_status.partial")
.record((requested - affected) as f64);
warn!(
requested,
affected, "Updated fewer rows than IDs requested in batch"
);
}
}
Err(e) => {
metrics::counter!("grpc_server.set_status", "result" => "error")
.increment(requested);
metrics::histogram!("grpc_server.set_batch_activation_status.duration")
.record(start_time.elapsed());
error!("Failed to set batch activation status: {:?}", e);
return Err(Status::internal("Failed to set batch activation status"));
}
}

metrics::histogram!("grpc_server.set_batch_activation_status.duration")
.record(start_time.elapsed());
return Ok(Response::new(SetBatchActivationStatusResponse {}));
}
}

pub type StatusUpdate = (String, ActivationStatus);

pub async fn flush_updates(store: Arc<dyn ActivationStore>, buffer: &mut Vec<StatusUpdate>) {
if buffer.is_empty() {
return;
}

let mut by_status: HashMap<ActivationStatus, Vec<String>> = HashMap::new();

for (id, status) in buffer.drain(..) {
by_status.entry(status).or_default().push(id);
}

for (status, ids) in by_status {
let requested = ids.len() as u64;
let st = status.to_string();

metrics::histogram!("grpc_server.flush_updates.requested", "status" => st.clone())
.record(requested as f64);

match store.set_status_batch(&ids, status).await {
Ok(affected) => {
metrics::histogram!(
"grpc_server.flush_updates.affected",
"status" => st.clone()
)
.record(affected as f64);

metrics::counter!(
"grpc_server.flush_updates.updated",
"status" => st.clone()
)
.increment(affected);
let requested = buffer.len() as u64;
metrics::histogram!("grpc_server.flush_updates.requested").record(requested as f64);

metrics::counter!("grpc_server.flush_updates", "result" => "ok").increment(1);

if affected < requested {
metrics::counter!(
"grpc_server.flush_updates.partial",
"status" => st.clone()
)
.increment(1);
match store.set_status_batch(buffer).await {
Ok(affected) => {
metrics::histogram!("grpc_server.flush_updates.affected").record(affected as f64);
metrics::counter!("grpc_server.flush_updates.updated").increment(affected);
metrics::counter!("grpc_server.flush_updates", "result" => "ok").increment(1);

warn!(
?status,
requested, affected, "Updated fewer rows than IDs requested from server"
);
}

debug!(
?status,
affected, requested, "Flushed status batch from server"
if affected < requested {
metrics::counter!("grpc_server.flush_updates.partial").increment(1);
warn!(
requested,
affected, "Updated fewer rows than IDs requested from server"
);
}

Err(e) => {
metrics::counter!("grpc_server.flush_updates", "result" => "error").increment(1);
debug!(affected, requested, "Flushed status batch from server");

error!(
?status,
requested,
error = ?e,
"Failed to flush status batch from server"
);
// Successfully flushed; clear the buffer.
buffer.clear();
}

// Push failed updates back into the buffer so they can be retried on next flush
for id in ids {
buffer.push((id, status));
}
}
Err(e) => {
metrics::counter!("grpc_server.flush_updates", "result" => "error").increment(1);

error!(
requested,
error = ?e,
"Failed to flush status batch from server"
);

// Leave the buffer intact so the updates are retried on the next flush.
}
}
}
73 changes: 65 additions & 8 deletions src/grpc/server_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use tokio::sync::mpsc;
use tonic::{Code, Request};

use crate::config::{Config, DeliveryMode};
use crate::grpc::server::{StatusUpdate, TaskbrokerServer};
use crate::grpc::server::{StatusUpdate, TaskbrokerServer, flush_updates};
use crate::store::activation::ActivationStatus;
use crate::test_utils::{create_config, create_test_store, make_activations};

Expand Down Expand Up @@ -457,9 +457,9 @@ async fn test_set_task_status_forwards_to_update_channel(#[case] adapter: &str)
"push path returns no next task from the store"
);

let (id, status) = update_rx.recv().await.expect("status update on channel");
assert_eq!(id, "id_0");
assert_eq!(status, ActivationStatus::Complete);
let update = update_rx.recv().await.expect("status update on channel");
assert_eq!(update.id, "id_0");
assert_eq!(update.status, ActivationStatus::Complete);

let row = store.get_by_id("id_0").await.unwrap().expect("row exists");
assert_eq!(
Expand Down Expand Up @@ -632,8 +632,8 @@ async fn test_set_batch_activation_status_retry(#[case] adapter: &str) {
update_tx: None,
};

// max_attempts is set, so this update is applied individually via set_status
// rather than being grouped into a batch.
// max_attempts / delay_on_retry are set, so set_status_batch also updates the
// activation's retry_state in the same transaction as the status flip.
let request = SetBatchActivationStatusRequest {
updates: vec![SetTaskStatusRequest {
id: "id_0".to_string(),
Expand All @@ -651,6 +651,10 @@ async fn test_set_batch_activation_status_retry(#[case] adapter: &str) {

let row = store.get_by_id("id_0").await.unwrap().expect("row exists");
assert_eq!(row.status, ActivationStatus::Retry);
let activation = TaskActivation::decode(&row.activation as &[u8]).unwrap();
let retry_state = activation.retry_state.expect("retry_state persisted");
assert_eq!(retry_state.max_attempts, 5);
assert_eq!(retry_state.delay_on_retry, Some(60));
}

#[tokio::test]
Expand All @@ -671,8 +675,9 @@ async fn test_set_batch_activation_status_mixed(#[case] adapter: &str) {
update_tx: None,
};

// Two batched conclusion updates plus one retry update that carries retry
// state, exercising both the batch path and the individual set_status path.
// Two conclusion updates plus one retry update that carries retry state, all
// handled by a single set_status_batch call that groups by status and updates
// the retry update's blob.
let request = SetBatchActivationStatusRequest {
updates: vec![
SetTaskStatusRequest {
Expand Down Expand Up @@ -710,4 +715,56 @@ async fn test_set_batch_activation_status_mixed(#[case] adapter: &str) {
assert_eq!(row1.status, ActivationStatus::Complete);
let row2 = store.get_by_id("id_2").await.unwrap().expect("row exists");
assert_eq!(row2.status, ActivationStatus::Retry);
let activation2 = TaskActivation::decode(&row2.activation as &[u8]).unwrap();
let retry_state = activation2.retry_state.expect("retry_state persisted");
assert_eq!(retry_state.max_attempts, 3);
assert_eq!(retry_state.delay_on_retry, Some(60));
}

#[tokio::test]
#[rstest]
#[case::sqlite("sqlite")]
#[case::postgres("postgres")]
async fn test_flush_updates_mixed_statuses_and_retry(#[case] adapter: &str) {
let store = create_test_store(adapter).await;

let activations = make_activations(3);
store.store(&activations).await.unwrap();

// A single flush carrying multiple statuses, including a retry update that also
// mutates retry_state. The flusher groups by status and applies the retry blob.
let mut buffer = vec![
StatusUpdate {
id: "id_0".to_string(),
status: ActivationStatus::Complete,
max_attempts: None,
delay_on_retry: None,
},
StatusUpdate {
id: "id_1".to_string(),
status: ActivationStatus::Failure,
max_attempts: None,
delay_on_retry: None,
},
StatusUpdate {
id: "id_2".to_string(),
status: ActivationStatus::Retry,
max_attempts: Some(7),
delay_on_retry: Some(90),
},
];

flush_updates(store.clone(), &mut buffer).await;
assert!(buffer.is_empty(), "buffer is drained on a successful flush");

let row0 = store.get_by_id("id_0").await.unwrap().expect("row exists");
assert_eq!(row0.status, ActivationStatus::Complete);
let row1 = store.get_by_id("id_1").await.unwrap().expect("row exists");
assert_eq!(row1.status, ActivationStatus::Failure);
let row2 = store.get_by_id("id_2").await.unwrap().expect("row exists");
assert_eq!(row2.status, ActivationStatus::Retry);
let activation2 = TaskActivation::decode(&row2.activation as &[u8]).unwrap();
let retry_state = activation2.retry_state.expect("retry_state persisted");
assert_eq!(retry_state.max_attempts, 7);
assert_eq!(retry_state.delay_on_retry, Some(90));
}
Loading