Skip to content

Commit cdd6685

Browse files
committed
orchestrator: fix queueing bug, retrieve all data for events
1 parent a5f5adb commit cdd6685

File tree

3 files changed

+63
-42
lines changed

3 files changed

+63
-42
lines changed

orchestrator/src/db/events.rs

Lines changed: 35 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
// Event-related database operations
2+
use chrono::{DateTime, Utc};
23
use sqlx::{postgres::PgRow, Row};
34
use uuid::Uuid;
45

@@ -77,26 +78,30 @@ impl Database {
7778
let mut sequence_ids = Vec::new();
7879
let mut last_event_type: Option<String> = None;
7980
let mut last_data: Option<serde_json::Value> = None;
81+
let mut last_event_id: Option<Uuid> = None;
82+
let mut last_created_at: Option<DateTime<Utc>> = None;
8083

8184
// Publish all events (all for the same topic)
8285
for (event_type, data, execution_id, attempt_number) in events {
8386
// Insert event and return sequence_id
8487
let row: PgRow = sqlx::query(
85-
"INSERT INTO events (id, topic, event_type, data, status, execution_id, attempt_number, project_id)
86-
VALUES (gen_random_uuid(), $1, $2, $3, $4, $5, $6, $7)
87-
RETURNING sequence_id"
88-
)
89-
.bind(&topic)
90-
.bind(event_type.as_deref())
91-
.bind(data.clone())
92-
.bind("valid")
93-
.bind(execution_id.as_ref())
94-
.bind(attempt_number)
95-
.bind(project_id)
96-
.fetch_one(&mut *tx)
97-
.await?;
88+
"INSERT INTO events (id, topic, event_type, data, status, execution_id, attempt_number, project_id)
89+
VALUES (gen_random_uuid(), $1, $2, $3, $4, $5, $6, $7)
90+
RETURNING sequence_id, id, created_at"
91+
)
92+
.bind(&topic)
93+
.bind(event_type.as_deref())
94+
.bind(data.clone())
95+
.bind("valid")
96+
.bind(execution_id.as_ref())
97+
.bind(attempt_number)
98+
.bind(project_id)
99+
.fetch_one(&mut *tx)
100+
.await?;
98101

99102
let seq_id: i64 = row.get("sequence_id");
103+
last_event_id = Some(row.get("id"));
104+
last_created_at = Some(row.get("created_at"));
100105
sequence_ids.push(seq_id);
101106
// Store last event data for resuming waiting executions
102107
last_event_type = event_type.clone();
@@ -106,18 +111,18 @@ impl Database {
106111
// Check for executions waiting on this topic
107112
// Use FOR UPDATE SKIP LOCKED to allow parallel processing across multiple orchestrator instances
108113
let waiting_rows = sqlx::query(
109-
"SELECT ws.execution_id, COALESCE(ws.root_execution_id, ws.execution_id) as root_execution_id, ws.step_key
110-
FROM wait_steps ws
111-
INNER JOIN workflow_executions e ON ws.execution_id = e.id
112-
WHERE e.status = 'waiting'
113-
AND ws.wait_type = 'event'
114-
AND ws.wait_topic = $1
115-
AND ws.step_key IS NOT NULL
116-
FOR UPDATE SKIP LOCKED"
117-
)
118-
.bind(&topic)
119-
.fetch_all(&mut *tx)
120-
.await?;
114+
"SELECT ws.execution_id, COALESCE(ws.root_execution_id, ws.execution_id) as root_execution_id, ws.step_key
115+
FROM wait_steps ws
116+
INNER JOIN workflow_executions e ON ws.execution_id = e.id
117+
WHERE e.status = 'waiting'
118+
AND ws.wait_type = 'event'
119+
AND ws.wait_topic = $1
120+
AND ws.step_key IS NOT NULL
121+
FOR UPDATE SKIP LOCKED"
122+
)
123+
.bind(&topic)
124+
.fetch_all(&mut *tx)
125+
.await?;
121126

122127
// Resume each waiting execution (use last event data)
123128
for row in waiting_rows {
@@ -130,6 +135,8 @@ impl Database {
130135
"topic": topic,
131136
"event_type": last_event_type,
132137
"data": last_data,
138+
"id": last_event_id,
139+
"created_at": last_created_at,
133140
});
134141

135142
// Get project_id from execution
@@ -240,7 +247,9 @@ impl Database {
240247
'sequence_id', sequence_id,
241248
'topic', wait_topic,
242249
'event_type', event_type,
243-
'data', data
250+
'data', data,
251+
'id', event_id,
252+
'created_at', created_at
244253
),
245254
NULL::jsonb, -- error
246255
true, -- success

orchestrator/src/db/workers.rs

Lines changed: 27 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -201,40 +201,53 @@ impl Database {
201201

202202
// Get one queued execution with SELECT FOR UPDATE SKIP LOCKED
203203
// This allows multiple orchestrators to work on different executions in parallel
204-
// Uses CTE to efficiently check queue concurrency limits
204+
// Uses CTEs to efficiently check queue concurrency limits AND worker availability
205+
// Only selects executions that have at least one available worker for their deployment_id
205206
let execution_row = sqlx::query(
206207
"WITH running_counts AS (
207-
SELECT
208+
SELECT
208209
queue_name,
209210
deployment_id,
210211
COALESCE(concurrency_key, '') as concurrency_key,
211212
COUNT(*) as running_count
212213
FROM workflow_executions
213214
WHERE status IN ('claimed', 'running')
214215
GROUP BY queue_name, deployment_id, COALESCE(concurrency_key, '')
216+
),
217+
available_deployments AS (
218+
-- Get deployment_ids that have at least one available worker
219+
SELECT DISTINCT current_deployment_id
220+
FROM workers
221+
WHERE mode = 'push'
222+
AND status = 'online'
223+
AND current_execution_count < max_concurrent_executions
224+
AND push_failure_count < push_failure_threshold
225+
AND last_heartbeat > NOW() - INTERVAL '60 seconds'
215226
)
216-
SELECT e.id, e.workflow_id, e.status, e.payload, e.result, e.error,
217-
e.created_at, e.started_at, e.completed_at,
218-
e.deployment_id, e.parent_execution_id, e.root_execution_id,
219-
e.retry_count, e.step_key, e.queue_name, e.concurrency_key,
220-
e.batch_id, e.session_id, e.user_id, e.output_schema_name,
221-
e.otel_traceparent, e.otel_span_id, e.initial_state, e.final_state,
227+
SELECT e.id, e.workflow_id, e.status, e.payload, e.result, e.error,
228+
e.created_at, e.started_at, e.completed_at,
229+
e.deployment_id, e.parent_execution_id, e.root_execution_id,
230+
e.retry_count, e.step_key, e.queue_name, e.concurrency_key,
231+
e.batch_id, e.session_id, e.user_id, e.output_schema_name,
232+
e.otel_traceparent, e.otel_span_id, e.initial_state, e.final_state,
222233
e.claimed_at, e.queued_at, e.run_timeout_seconds, q.concurrency_limit
223234
FROM workflow_executions e
224-
INNER JOIN queues q
225-
ON q.name = e.queue_name
235+
INNER JOIN queues q
236+
ON q.name = e.queue_name
226237
AND q.deployment_id = e.deployment_id
227-
LEFT JOIN running_counts rc
228-
ON rc.queue_name = e.queue_name
238+
INNER JOIN available_deployments ad
239+
ON ad.current_deployment_id = e.deployment_id
240+
LEFT JOIN running_counts rc
241+
ON rc.queue_name = e.queue_name
229242
AND rc.deployment_id = e.deployment_id
230243
AND rc.concurrency_key = COALESCE(e.concurrency_key, '')
231244
WHERE e.status = 'queued'
232245
AND (
233246
q.concurrency_limit IS NULL
234247
OR COALESCE(rc.running_count, 0) < q.concurrency_limit
235248
)
236-
ORDER BY COALESCE(e.queued_at, e.created_at) ASC
237-
LIMIT 1
249+
ORDER BY COALESCE(e.queued_at, e.created_at) ASC
250+
LIMIT 1
238251
FOR UPDATE OF e SKIP LOCKED",
239252
)
240253
.fetch_optional(&mut *tx)

orchestrator/src/main.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,8 +50,7 @@ async fn main() -> anyhow::Result<()> {
5050
// In production: use fmt subscriber with stdout and env filter
5151
tracing_subscriber::fmt()
5252
.with_env_filter(
53-
EnvFilter::from_default_env()
54-
.add_directive(tracing::Level::INFO.into()),
53+
EnvFilter::from_default_env().add_directive(tracing::Level::INFO.into()),
5554
)
5655
.init();
5756
}

0 commit comments

Comments
 (0)