Skip to content

Commit 9e93240

Browse files
committed
feature: Approval UI page
1 parent faef610 commit 9e93240

File tree

15 files changed

+701
-86
lines changed

15 files changed

+701
-86
lines changed
Lines changed: 160 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,160 @@
1+
use axum::{
2+
extract::{Path, State},
3+
http::StatusCode,
4+
Json,
5+
};
6+
use serde::{Deserialize, Serialize};
7+
use std::sync::Arc;
8+
use uuid::Uuid;
9+
10+
use crate::AppState;
11+
12+
#[derive(Serialize)]
13+
pub struct ApprovalResponse {
14+
pub execution_id: String,
15+
pub step_key: String,
16+
pub status: String,
17+
pub data: Option<serde_json::Value>,
18+
}
19+
20+
#[derive(Deserialize)]
21+
pub struct SubmitApprovalRequest {
22+
pub data: serde_json::Value,
23+
}
24+
25+
/// Get approval data for a suspended execution step.
26+
/// The execution_id in the URL is the root execution ID.
27+
pub async fn get_approval(
28+
State(state): State<Arc<AppState>>,
29+
Path((execution_id, step_key)): Path<(String, String)>,
30+
) -> Result<Json<ApprovalResponse>, StatusCode> {
31+
let execution_id_uuid = Uuid::parse_str(&execution_id).map_err(|_| StatusCode::BAD_REQUEST)?;
32+
33+
// Get project_id from execution and set RLS
34+
let project_id = state
35+
.db
36+
.get_project_id_from_execution(&execution_id_uuid)
37+
.await
38+
.map_err(|e| {
39+
tracing::error!("Failed to get project_id from execution: {}", e);
40+
StatusCode::NOT_FOUND
41+
})?;
42+
43+
state
44+
.db
45+
.set_project_id(&project_id, false)
46+
.await
47+
.map_err(|e| {
48+
tracing::error!("Failed to set project_id: {}", e);
49+
StatusCode::INTERNAL_SERVER_ERROR
50+
})?;
51+
52+
let execution = state
53+
.db
54+
.get_execution(&execution_id_uuid)
55+
.await
56+
.map_err(|_| StatusCode::NOT_FOUND)?;
57+
58+
// If not waiting, return status without data so UI can show "already handled"
59+
if execution.status != "waiting" {
60+
return Ok(Json(ApprovalResponse {
61+
execution_id,
62+
step_key,
63+
status: execution.status,
64+
data: None,
65+
}));
66+
}
67+
68+
// Construct event topic using this execution's workflow_id (this is the root)
69+
let topic = format!("workflow/{}/{}", execution.workflow_id, execution_id_uuid);
70+
71+
let events = state
72+
.db
73+
.get_events(&topic, &project_id, None, None, 100)
74+
.await
75+
.map_err(|e| {
76+
tracing::error!("Failed to get events for topic {}: {}", topic, e);
77+
StatusCode::INTERNAL_SERVER_ERROR
78+
})?;
79+
80+
// Find the suspend event matching this step_key
81+
let suspend_event_type = format!("suspend_{}", step_key);
82+
let suspend_data = events
83+
.iter()
84+
.rev() // Most recent first
85+
.find(|e| {
86+
e.event_type
87+
.as_ref()
88+
.map(|t| t == &suspend_event_type)
89+
.unwrap_or(false)
90+
})
91+
.map(|e| e.data.clone());
92+
93+
Ok(Json(ApprovalResponse {
94+
execution_id,
95+
step_key,
96+
status: execution.status,
97+
data: suspend_data,
98+
}))
99+
}
100+
101+
/// Submit approval response for a suspended execution step (unauthenticated).
102+
/// The execution_id in the URL is the root execution ID.
103+
pub async fn submit_approval(
104+
State(state): State<Arc<AppState>>,
105+
Path((execution_id, step_key)): Path<(String, String)>,
106+
Json(req): Json<SubmitApprovalRequest>,
107+
) -> Result<StatusCode, StatusCode> {
108+
let execution_id_uuid = Uuid::parse_str(&execution_id).map_err(|_| StatusCode::BAD_REQUEST)?;
109+
110+
// Get project_id from execution and set RLS
111+
let project_id = state
112+
.db
113+
.get_project_id_from_execution(&execution_id_uuid)
114+
.await
115+
.map_err(|e| {
116+
tracing::error!("Failed to get project_id from execution: {}", e);
117+
StatusCode::NOT_FOUND
118+
})?;
119+
120+
state
121+
.db
122+
.set_project_id(&project_id, false)
123+
.await
124+
.map_err(|e| {
125+
tracing::error!("Failed to set project_id: {}", e);
126+
StatusCode::INTERNAL_SERVER_ERROR
127+
})?;
128+
129+
// Verify execution is still waiting
130+
let execution = state
131+
.db
132+
.get_execution(&execution_id_uuid)
133+
.await
134+
.map_err(|_| StatusCode::NOT_FOUND)?;
135+
136+
if execution.status != "waiting" {
137+
return Err(StatusCode::CONFLICT);
138+
}
139+
140+
// Publish resume event
141+
let topic = format!("workflow/{}/{}", execution.workflow_id, execution_id_uuid);
142+
let event_type = format!("resume_{}", step_key);
143+
let events: Vec<(Option<String>, serde_json::Value, Option<Uuid>, i32)> =
144+
vec![(Some(event_type), req.data, None, 0)];
145+
146+
state
147+
.db
148+
.publish_events_batch(topic, events, None, None, &project_id)
149+
.await
150+
.map_err(|e| {
151+
tracing::error!(
152+
"Failed to publish resume event for execution {}: {}",
153+
execution_id_uuid,
154+
e
155+
);
156+
StatusCode::INTERNAL_SERVER_ERROR
157+
})?;
158+
159+
Ok(StatusCode::OK)
160+
}
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
pub mod handlers;
2+
3+
pub use handlers::*;

orchestrator/src/api/auth/middleware.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,4 +115,5 @@ fn should_skip_auth(path: &str) -> bool {
115115
"/health" | "/api/v1/auth/signup" | "/api/v1/auth/signin" | "/api/v1/auth/oauth-signin"
116116
) || path.starts_with("/docs")
117117
|| path.starts_with("/api-docs")
118+
|| path.starts_with("/api/v1/approvals")
118119
}

orchestrator/src/api/executions/handlers.rs

Lines changed: 0 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -178,15 +178,6 @@ pub struct FailExecutionRequest {
178178
final_state: Option<serde_json::Value>,
179179
}
180180

181-
/// Request to resume a waiting execution
182-
#[derive(Deserialize, ToSchema)]
183-
pub struct ResumeExecutionRequest {
184-
/// Step key that's waiting
185-
pub step_key: String,
186-
/// Data to resume with
187-
pub data: serde_json::Value,
188-
}
189-
190181
#[derive(Deserialize)]
191182
pub struct StoreStepOutputRequest {
192183
step_key: String,
@@ -1156,74 +1147,6 @@ pub async fn confirm_cancellation(
11561147
Ok(StatusCode::OK)
11571148
}
11581149

1159-
/// Resume a waiting execution
1160-
#[utoipa::path(
1161-
post,
1162-
path = "/api/v1/executions/{execution_id}/resume",
1163-
tag = "Executions",
1164-
request_body = ResumeExecutionRequest,
1165-
params(
1166-
("execution_id" = String, Path, description = "Execution ID to resume")
1167-
),
1168-
responses(
1169-
(status = 200, description = "Execution resumed"),
1170-
(status = 400, description = "Invalid execution ID"),
1171-
(status = 500, description = "Internal server error")
1172-
),
1173-
security(
1174-
("bearer_auth" = []),
1175-
("cookie_auth" = [])
1176-
)
1177-
)]
1178-
pub async fn resume_execution(
1179-
State(state): State<Arc<AppState>>,
1180-
Path(execution_id): Path<String>,
1181-
Json(req): Json<ResumeExecutionRequest>,
1182-
) -> Result<StatusCode, StatusCode> {
1183-
let execution_id_uuid = Uuid::parse_str(&execution_id).map_err(|e| {
1184-
tracing::error!("Invalid execution_id format: {} - {}", execution_id, e);
1185-
StatusCode::BAD_REQUEST
1186-
})?;
1187-
1188-
let project_id = state
1189-
.db
1190-
.get_project_id_from_execution(&execution_id_uuid)
1191-
.await
1192-
.map_err(|e| {
1193-
tracing::error!("Failed to get project_id from execution: {}", e);
1194-
StatusCode::INTERNAL_SERVER_ERROR
1195-
})?;
1196-
1197-
state
1198-
.db
1199-
.set_project_id(&project_id, false)
1200-
.await
1201-
.map_err(|e| {
1202-
tracing::error!("Failed to set project_id: {}", e);
1203-
StatusCode::INTERNAL_SERVER_ERROR
1204-
})?;
1205-
1206-
let topic = format!("{}/{}/resume", req.step_key, execution_id_uuid);
1207-
1208-
let events: Vec<(Option<String>, serde_json::Value, Option<Uuid>, i32)> =
1209-
vec![(Some("resume".to_string()), req.data, None, 0)];
1210-
1211-
state
1212-
.db
1213-
.publish_events_batch(topic, events, None, None, &project_id)
1214-
.await
1215-
.map_err(|e| {
1216-
tracing::error!(
1217-
"Failed to publish resume event for execution {}: {}",
1218-
execution_id_uuid,
1219-
e
1220-
);
1221-
StatusCode::INTERNAL_SERVER_ERROR
1222-
})?;
1223-
1224-
Ok(StatusCode::OK)
1225-
}
1226-
12271150
pub async fn store_step_output(
12281151
State(state): State<Arc<AppState>>,
12291152
headers: HeaderMap,

orchestrator/src/api/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
pub mod agents;
22
pub mod api_keys;
3+
pub mod approvals;
34
pub mod auth;
45
pub mod common;
56
pub mod deployments;

orchestrator/src/main.rs

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,6 @@ pub struct AppState {
6565
api::executions::handlers::submit_workflows,
6666
api::executions::handlers::get_execution,
6767
api::executions::handlers::cancel_execution,
68-
api::executions::handlers::resume_execution,
6968
// Traces
7069
api::traces::handlers::get_traces,
7170
api::traces::handlers::get_trace_by_id,
@@ -116,7 +115,6 @@ pub struct AppState {
116115
api::executions::handlers::SubmitWorkflowsResponse,
117116
api::executions::handlers::ExecutionResponse,
118117
api::executions::handlers::CancelExecutionResponse,
119-
api::executions::handlers::ResumeExecutionRequest,
120118
// Events
121119
api::events::handlers::EventData,
122120
api::events::handlers::PublishEventRequest,
@@ -268,7 +266,8 @@ async fn main() -> anyhow::Result<()> {
268266
let db_bg = Database::new(pool_bg);
269267

270268
// Check if local mode can be enabled (only allowed for localhost bind addresses)
271-
let bind_address = std::env::var("BIND_ADDRESS").unwrap_or_else(|_| "0.0.0.0:8080".to_string());
269+
let bind_address =
270+
std::env::var("POLOS_BIND_ADDRESS").unwrap_or_else(|_| "0.0.0.0:8080".to_string());
272271

273272
let local_mode_requested = std::env::var("POLOS_LOCAL_MODE")
274273
.unwrap_or_else(|_| "False".to_string())
@@ -833,10 +832,6 @@ async fn main() -> anyhow::Result<()> {
833832
"/api/v1/executions/:execution_id/cancel",
834833
post(api::executions::cancel_execution),
835834
)
836-
.route(
837-
"/api/v1/executions/:execution_id/resume",
838-
post(api::executions::resume_execution),
839-
)
840835
.route(
841836
"/internal/executions/:execution_id/confirm-cancellation",
842837
post(api::executions::confirm_cancellation),
@@ -970,6 +965,15 @@ async fn main() -> anyhow::Result<()> {
970965
"/api/v1/conversation/:conversation_id/get",
971966
get(api::state::get_conversation_history),
972967
)
968+
// Approval endpoints
969+
.route(
970+
"/api/v1/approvals/:execution_id/:step_key",
971+
get(api::approvals::get_approval),
972+
)
973+
.route(
974+
"/api/v1/approvals/:execution_id/:step_key/submit",
975+
post(api::approvals::submit_approval),
976+
)
973977
.layer(middleware::from_fn_with_state(
974978
state.clone(),
975979
api::auth::middleware::authenticate_api_v1_middleware,
@@ -1028,7 +1032,8 @@ async fn main() -> anyhow::Result<()> {
10281032
})
10291033
.with_state(state);
10301034

1031-
let bind_address = std::env::var("BIND_ADDRESS").unwrap_or_else(|_| "0.0.0.0:8080".to_string());
1035+
let bind_address =
1036+
std::env::var("POLOS_BIND_ADDRESS").unwrap_or_else(|_| "0.0.0.0:8080".to_string());
10321037
let listener = tokio::net::TcpListener::bind(&bind_address).await?;
10331038
tracing::info!("Orchestrator listening on {}", listener.local_addr()?);
10341039

sdk/python/polos/core/step.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -677,6 +677,10 @@ async def suspend(
677677
topic = f"workflow/{self.ctx.root_workflow_id}/{self.ctx.root_execution_id}"
678678
# Publish suspend event
679679
client = get_client_or_raise()
680+
# Inject approval URL into event data so consumers can link to it
681+
approval_url = f"{client.api_url}/approve/{self.ctx.root_execution_id}/{step_key}"
682+
if isinstance(serialized_data, dict):
683+
serialized_data["_approval_url"] = approval_url
680684
await batch_publish(
681685
client=client,
682686
topic=topic,

sdk/typescript/src/runtime/executor.test.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ function createMockOrchestratorClient(): OrchestratorClient {
1919
})),
2020
batchInvokeWorkflows: mock.fn(async () => ({ executions: [] })),
2121
publishEvent: mock.fn(async () => ({ sequence_ids: [1] })),
22+
getApiUrl: mock.fn(() => 'http://localhost:8080'),
2223
setWaiting: mock.fn(async () => undefined),
2324
getExecution: mock.fn(async () => ({
2425
execution_id: 'exec-1',

sdk/typescript/src/runtime/executor.ts

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -850,12 +850,17 @@ function createOrchestratorStepHelper(
850850

851851
// Publish suspend event on workflow topic
852852
const topic = `workflow/${execCtx.rootWorkflowId}/${execCtx.rootExecutionId}`;
853+
const approvalUrl = `${orchestratorClient.getApiUrl()}/approve/${execCtx.rootExecutionId}/${key}`;
854+
const eventData =
855+
options?.data != null && typeof options.data === 'object' && !Array.isArray(options.data)
856+
? { ...(options.data as Record<string, unknown>), _approval_url: approvalUrl }
857+
: { _original: options?.data, _approval_url: approvalUrl };
853858
await orchestratorClient.publishEvent({
854859
topic,
855860
events: [
856861
{
857862
eventType: `suspend_${key}`,
858-
data: options?.data,
863+
data: eventData,
859864
},
860865
],
861866
executionId: execCtx.executionId,

sdk/typescript/src/runtime/orchestrator-client.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,13 @@ export class OrchestratorClient {
8383
this.maxRetries = config.maxRetries ?? 3;
8484
}
8585

86+
/**
87+
* Get the API URL.
88+
*/
89+
getApiUrl(): string {
90+
return this.apiUrl;
91+
}
92+
8693
/**
8794
* Get the project ID.
8895
*/

0 commit comments

Comments
 (0)