Skip to content

Commit e9fa1f9

Browse files
committed
orchestrator: Use separate DB pools for API, events and bg tasks
1 parent 56a563e commit e9fa1f9

File tree

6 files changed

+99
-43
lines changed

6 files changed

+99
-43
lines changed

orchestrator/src/api/events/handlers.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -371,7 +371,7 @@ pub async fn stream_events(
371371

372372
// Set project_id for RLS - required, error if it fails
373373
state
374-
.db
374+
.db_sse
375375
.set_project_id(&project_id, false)
376376
.await
377377
.map_err(|e| {
@@ -433,7 +433,7 @@ pub async fn stream_events(
433433

434434
loop {
435435
let events = match state_clone
436-
.db
436+
.db_sse
437437
.get_events(
438438
&topic_clone,
439439
&project_id_clone,
@@ -455,7 +455,7 @@ pub async fn stream_events(
455455
if events.is_empty() {
456456
if let Some(execution_id) = workflow_run_id_clone {
457457
if !execution_completed {
458-
match state_clone.db.get_execution(&execution_id).await {
458+
match state_clone.db_sse.get_execution(&execution_id).await {
459459
Ok(execution) => {
460460
match execution.status.as_str() {
461461
"failed" => {

orchestrator/src/api/workers/handlers.rs

Lines changed: 17 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -201,21 +201,21 @@ pub async fn try_dispatch_execution(state: &AppState) -> anyhow::Result<()> {
201201
loop {
202202
// Claim and assign execution in a single transaction
203203
// Uses SELECT FOR UPDATE SKIP LOCKED to allow multiple orchestrators to work in parallel
204-
match state.db.claim_and_assign_execution_for_push().await {
204+
match state.db_bg.claim_and_assign_execution_for_push().await {
205205
Ok(Some((execution, worker))) => {
206206
// Successfully claimed and assigned, now push
207207
match push_work_to_worker(&worker, &execution).await {
208208
Ok(()) => {
209209
// Push successful - mark execution as running
210-
if let Err(e) = state.db.mark_execution_running(&execution.id).await {
210+
if let Err(e) = state.db_bg.mark_execution_running(&execution.id).await {
211211
tracing::error!(
212212
"Failed to mark execution {} as running: {}",
213213
execution.id,
214214
e
215215
);
216216
// Rollback since we can't mark as running (no error since push succeeded)
217217
if let Err(e) = state
218-
.db
218+
.db_bg
219219
.rollback_execution_assignment(&execution.id, &worker.id, None)
220220
.await
221221
{
@@ -229,7 +229,11 @@ pub async fn try_dispatch_execution(state: &AppState) -> anyhow::Result<()> {
229229
continue;
230230
}
231231
// Update push status
232-
if let Err(e) = state.db.update_worker_push_status(&worker.id, true).await {
232+
if let Err(e) = state
233+
.db_bg
234+
.update_worker_push_status(&worker.id, true)
235+
.await
236+
{
233237
tracing::error!("Failed to update worker push status: {}", e);
234238
}
235239
// Successfully dispatched, continue processing more executions
@@ -243,7 +247,7 @@ pub async fn try_dispatch_execution(state: &AppState) -> anyhow::Result<()> {
243247
);
244248
// Rollback execution assignment with error
245249
if let Err(e) = state
246-
.db
250+
.db_bg
247251
.rollback_execution_assignment(&execution.id, &worker.id, Some(&err))
248252
.await
249253
{
@@ -524,7 +528,7 @@ pub async fn poll_workflow(
524528

525529
// Get project_id from worker and set session variable for RLS
526530
let project_id = state
527-
.db
531+
.db_sse
528532
.get_project_id_from_worker(&worker_id)
529533
.await
530534
.map_err(|e| {
@@ -533,7 +537,7 @@ pub async fn poll_workflow(
533537
})?;
534538

535539
state
536-
.db
540+
.db_sse
537541
.set_project_id(&project_id, false)
538542
.await
539543
.map_err(|e| {
@@ -549,7 +553,7 @@ pub async fn poll_workflow(
549553

550554
// Update heartbeat
551555
state
552-
.db
556+
.db_sse
553557
.update_worker_heartbeat(&worker_id)
554558
.await
555559
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
@@ -568,7 +572,7 @@ pub async fn poll_workflow(
568572
break;
569573
}
570574

571-
match state.db.claim_next_executions(&worker_id).await {
575+
match state.db_sse.claim_next_executions(&worker_id).await {
572576
Ok(Some(execution)) => {
573577
tracing::info!("[poll_workflow] claimed execution: {}", execution.id);
574578
executions.push(execution);
@@ -590,7 +594,10 @@ pub async fn poll_workflow(
590594
worker_id
591595
);
592596

593-
let _ = state.db.update_worker_status(&worker_id, "online").await;
597+
let _ = state
598+
.db_sse
599+
.update_worker_status(&worker_id, "online")
600+
.await;
594601

595602
let responses: Vec<PollWorkflowResponse> = executions
596603
.into_iter()

orchestrator/src/db/mod.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,12 +32,13 @@ impl Database {
3232
}
3333

3434
/// Log current pool metrics
35-
pub async fn log_pool_metrics(&self) {
35+
pub async fn log_pool_metrics(&self, name: &str) {
3636
let size = self.pool.size();
3737
let num_idle = self.pool.num_idle();
3838
let active = size.saturating_sub(num_idle as u32);
3939

4040
tracing::info!(
41+
pool = name,
4142
pool_size = size,
4243
idle_connections = num_idle,
4344
active_connections = active,

orchestrator/src/lib.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@ pub mod db;
66
pub use db::Database;
77

88
pub struct AppState {
9-
pub db: Database,
9+
pub db: Database, // API handlers (short-lived requests)
10+
pub db_sse: Database, // SSE streaming + long-polling
11+
pub db_bg: Database, // Background tasks
1012
pub local_mode: bool,
1113
}

orchestrator/src/main.rs

Lines changed: 68 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,9 @@ mod db;
1818
pub use db::Database;
1919

2020
pub struct AppState {
21-
pub db: Database,
21+
pub db: Database, // API handlers (short-lived requests)
22+
pub db_sse: Database, // SSE streaming + long-polling
23+
pub db_bg: Database, // Background tasks
2224
pub local_mode: bool,
2325
}
2426

@@ -216,18 +218,52 @@ async fn main() -> anyhow::Result<()> {
216218
let database_url = std::env::var("DATABASE_URL")
217219
.unwrap_or_else(|_| "postgres://postgres:postgres@localhost/polos".to_string());
218220

219-
let pool = PgPoolOptions::new()
220-
.max_connections(20)
221-
.min_connections(5)
221+
// Pool sizes configurable via env vars
222+
let pool_api_max: u32 = std::env::var("DB_POOL_API_MAX")
223+
.ok()
224+
.and_then(|s| s.parse().ok())
225+
.unwrap_or(10);
226+
let pool_sse_max: u32 = std::env::var("DB_POOL_SSE_MAX")
227+
.ok()
228+
.and_then(|s| s.parse().ok())
229+
.unwrap_or(15);
230+
let pool_bg_max: u32 = std::env::var("DB_POOL_BG_MAX")
231+
.ok()
232+
.and_then(|s| s.parse().ok())
233+
.unwrap_or(10);
234+
235+
let pool_api = PgPoolOptions::new()
236+
.max_connections(pool_api_max)
237+
.min_connections(2)
222238
.connect(&database_url)
223239
.await?;
224240

225-
tracing::info!("Connected to database");
241+
let pool_sse = PgPoolOptions::new()
242+
.max_connections(pool_sse_max)
243+
.min_connections(2)
244+
.connect(&database_url)
245+
.await?;
226246

227-
sqlx::migrate!("./migrations").run(&pool).await?;
247+
let pool_bg = PgPoolOptions::new()
248+
.max_connections(pool_bg_max)
249+
.min_connections(2)
250+
.connect(&database_url)
251+
.await?;
252+
253+
tracing::info!(
254+
api_max = pool_api_max,
255+
sse_max = pool_sse_max,
256+
bg_max = pool_bg_max,
257+
"Connected to database with 3 connection pools"
258+
);
259+
260+
// Run migrations on the API pool only (migrations only need to run once)
261+
sqlx::migrate!("./migrations").run(&pool_api).await?;
228262
tracing::info!("Migrations complete");
229263

230-
let db = Database::new(pool);
264+
let db = Database::new(pool_api);
265+
let db_sse = Database::new(pool_sse);
266+
let db_bg = Database::new(pool_bg);
231267

232268
// Check if local mode can be enabled (only allowed for localhost bind addresses)
233269
let bind_address = std::env::var("BIND_ADDRESS").unwrap_or_else(|_| "0.0.0.0:8080".to_string());
@@ -255,7 +291,9 @@ async fn main() -> anyhow::Result<()> {
255291
}
256292

257293
let state = Arc::new(AppState {
258-
db: db.clone(),
294+
db,
295+
db_sse,
296+
db_bg,
259297
local_mode,
260298
});
261299

@@ -268,8 +306,10 @@ async fn main() -> anyhow::Result<()> {
268306
loop {
269307
tokio::time::sleep(tokio::time::Duration::from_secs(10)).await;
270308

271-
// Log pool metrics
272-
pool_metrics_state.db.log_pool_metrics().await;
309+
// Log pool metrics for all pools
310+
pool_metrics_state.db.log_pool_metrics("api").await;
311+
pool_metrics_state.db_sse.log_pool_metrics("sse").await;
312+
pool_metrics_state.db_bg.log_pool_metrics("bg").await;
273313

274314
// Sample connection acquisition time
275315
pool_metrics_state
@@ -288,7 +328,7 @@ async fn main() -> anyhow::Result<()> {
288328
.spawn(async move {
289329
loop {
290330
tokio::time::sleep(tokio::time::Duration::from_secs(60)).await;
291-
match cleanup_state.db.cleanup_stale_workers().await {
331+
match cleanup_state.db_bg.cleanup_stale_workers().await {
292332
Ok((stale_claimed, deleted_workers, marked_offline_workers, orphaned_executions)) => {
293333
if stale_claimed > 0 || marked_offline_workers > 0 || deleted_workers > 0 || orphaned_executions > 0 {
294334
tracing::info!(
@@ -317,7 +357,7 @@ async fn main() -> anyhow::Result<()> {
317357
loop {
318358
tokio::time::sleep(tokio::time::Duration::from_secs(3600)).await; // Run every hour
319359
match execution_cleanup_state
320-
.db
360+
.db_bg
321361
.cleanup_old_executions(retention_days)
322362
.await
323363
{
@@ -347,7 +387,7 @@ async fn main() -> anyhow::Result<()> {
347387

348388
// Process expired waits one at a time until none remain
349389
loop {
350-
match wait_resume_state.db.get_and_resume_expired_wait().await {
390+
match wait_resume_state.db_bg.get_and_resume_expired_wait().await {
351391
Ok(Some(expired_wait)) => {
352392
tracing::info!(
353393
"Resumed execution {}: {} from expired wait",
@@ -388,7 +428,7 @@ async fn main() -> anyhow::Result<()> {
388428

389429
// Process event triggers one at a time until none remain
390430
loop {
391-
match trigger_state.db.process_one_event_trigger().await {
431+
match trigger_state.db_bg.process_one_event_trigger().await {
392432
Ok(Some(count)) => {
393433
if count > 0 {
394434
tracing::info!("Processed {} event trigger(s)", count);
@@ -428,7 +468,11 @@ async fn main() -> anyhow::Result<()> {
428468

429469
// Process event waits one at a time until none remain
430470
loop {
431-
match event_wait_state.db.check_and_resume_one_event_wait().await {
471+
match event_wait_state
472+
.db_bg
473+
.check_and_resume_one_event_wait()
474+
.await
475+
{
432476
Ok(Some(count)) => {
433477
if count > 0 {
434478
tracing::info!("Resumed 1 execution waiting on event");
@@ -467,7 +511,7 @@ async fn main() -> anyhow::Result<()> {
467511

468512
// Process scheduled workflows one at a time until none remain
469513
loop {
470-
match schedule_state.db.process_one_scheduled_workflow().await {
514+
match schedule_state.db_bg.process_one_scheduled_workflow().await {
471515
Ok(Some(count)) => {
472516
if count > 0 {
473517
tracing::info!("Processed 1 scheduled workflow");
@@ -520,14 +564,14 @@ async fn main() -> anyhow::Result<()> {
520564
let mut processed_count = 0;
521565
// Process timed-out executions one at a time
522566
loop {
523-
match timeout_monitor_state.db.get_timed_out_executions(10).await {
567+
match timeout_monitor_state.db_bg.get_timed_out_executions(10).await {
524568
Ok(timed_out_executions) => {
525569
if timed_out_executions.is_empty() {
526570
break; // No more timed-out executions
527571
}
528572
for (execution_id, _assigned_to_worker, _push_endpoint_url) in timed_out_executions {
529573
// Cancel the execution (recursively cancels all children)
530-
match timeout_monitor_state.db.cancel_execution(&execution_id, "timeout").await {
574+
match timeout_monitor_state.db_bg.cancel_execution(&execution_id, "timeout").await {
531575
Ok(executions_to_cancel) => {
532576
tracing::info!("Cancelled timed-out execution: {} (and {} children)", execution_id, executions_to_cancel.len().saturating_sub(1));
533577
// Send cancel requests to all workers for all executions being cancelled
@@ -543,7 +587,7 @@ async fn main() -> anyhow::Result<()> {
543587
api::workers::CancelRequestResult::NotFound => {
544588
// Execution not found on worker - mark as cancelled
545589
tracing::info!("Timed-out execution {} not found on worker {} - marking as cancelled", exec_id_clone, worker_id);
546-
if let Err(e) = timeout_state_clone.db.mark_execution_cancelled(&exec_id_clone).await {
590+
if let Err(e) = timeout_state_clone.db_bg.mark_execution_cancelled(&exec_id_clone).await {
547591
tracing::error!("Failed to mark timed-out execution {} as cancelled (not found): {}", exec_id_clone, e);
548592
}
549593
}
@@ -588,7 +632,7 @@ async fn main() -> anyhow::Result<()> {
588632
// Process pending_cancel executions
589633
loop {
590634
match pending_cancel_state
591-
.db
635+
.db_bg
592636
.get_pending_cancel_executions(10)
593637
.await
594638
{
@@ -618,7 +662,7 @@ async fn main() -> anyhow::Result<()> {
618662
if should_directly_cancel {
619663
// Cancelled more than 2 minutes ago - directly mark as cancelled
620664
if let Err(e) = pending_cancel_state
621-
.db
665+
.db_bg
622666
.mark_execution_cancelled(&execution_id)
623667
.await
624668
{
@@ -660,7 +704,7 @@ async fn main() -> anyhow::Result<()> {
660704
worker_id
661705
);
662706
if let Err(e) = pending_cancel_state
663-
.db
707+
.db_bg
664708
.mark_execution_cancelled(&execution_id)
665709
.await
666710
{
@@ -683,7 +727,7 @@ async fn main() -> anyhow::Result<()> {
683727
execution_id
684728
);
685729
if let Err(e) = pending_cancel_state
686-
.db
730+
.db_bg
687731
.mark_execution_cancelled(&execution_id)
688732
.await
689733
{
@@ -706,7 +750,7 @@ async fn main() -> anyhow::Result<()> {
706750
} else {
707751
// No worker assigned - directly mark as cancelled
708752
if let Err(e) = pending_cancel_state
709-
.db
753+
.db_bg
710754
.mark_execution_cancelled(&execution_id)
711755
.await
712756
{

orchestrator/tests/db/executions_test.rs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -252,11 +252,13 @@ async fn test_get_timed_out_executions() {
252252
.await
253253
.expect("Failed to create test worker");
254254

255-
// Manually set status to running and started_at to past (simulating a timed-out execution)
255+
// Manually set status to running and started_at far in the past (simulating a timed-out execution)
256+
// Use a very old started_at so this execution sorts first in the ORDER BY started_at ASC query,
257+
// even when the test DB has accumulated stale timed-out executions from previous runs.
256258
sqlx::query(
257-
"UPDATE workflow_executions
258-
SET status = 'running',
259-
started_at = NOW() - INTERVAL '2 seconds',
259+
"UPDATE workflow_executions
260+
SET status = 'running',
261+
started_at = NOW() - INTERVAL '999 days',
260262
assigned_to_worker = $1
261263
WHERE id = $2",
262264
)

0 commit comments

Comments
 (0)