diff --git a/internal/api/contract/agents.go b/internal/api/contract/agents.go index 291de4eea..d362121f6 100644 --- a/internal/api/contract/agents.go +++ b/internal/api/contract/agents.go @@ -298,6 +298,11 @@ type AgentTaskReleaseRequest struct { Reason string `json:"reason,omitempty"` } +// AgentTaskBlockRequest parks the caller session's claimed task run in needs_attention. +type AgentTaskBlockRequest struct { + Reason string `json:"reason,omitempty"` +} + // CoordinationMessageMetadataPayload carries typed task/run correlation for channel messages. type CoordinationMessageMetadataPayload struct { TaskID string `json:"task_id"` diff --git a/internal/api/core/agent_tasks.go b/internal/api/core/agent_tasks.go index f3ddece4d..f812e35eb 100644 --- a/internal/api/core/agent_tasks.go +++ b/internal/api/core/agent_tasks.go @@ -21,6 +21,7 @@ const ( agentTaskActionComplete = "agent.task.complete" agentTaskActionFail = "agent.task.fail" agentTaskActionRelease = "agent.task.release" + agentTaskActionBlock = "agent.task.block" ) type agentSoulClaimLocker interface { @@ -217,6 +218,41 @@ func (h *BaseHandlers) AgentTaskRelease(c *gin.Context) { c.JSON(http.StatusOK, contract.AgentTaskLeaseResponse{Lease: AgentTaskLeasePayloadFromRun(run, nil)}) } +// AgentTaskBlock parks one claimed task run in needs_attention. +func (h *BaseHandlers) AgentTaskBlock(c *gin.Context) { + manager, caller, runID, ok := h.agentTaskLeaseMutationSetup(c, agentTaskActionBlock) + if !ok { + return + } + + var req contract.AgentTaskBlockRequest + if err := c.ShouldBindJSON(&req); err != nil { + h.respondError( + c, + http.StatusBadRequest, + NewTaskValidationError(fmt.Errorf("%s: decode agent task block request: %w", h.transportName(), err)), + ) + return + } + + handle, err := h.lookupAgentTaskLease(c.Request.Context(), manager, caller, runID) + if err != nil { + h.respondError(c, statusForAgentTaskError(err), err) + return + } + run, err := manager.BlockRunLease(c.Request.Context(), taskpkg.LeaseBlock{ + RunID: runID, + ClaimToken: handle.ClaimToken, + Reason: req.Reason, + }, caller.Actor) + if err != nil { + h.respondError(c, statusForAgentTaskError(err), err) + return + } + + c.JSON(http.StatusOK, contract.AgentTaskLeaseResponse{Lease: AgentTaskLeasePayloadFromRun(run, nil)}) +} + // AgentTaskComplete completes one claimed task run after token verification. func (h *BaseHandlers) AgentTaskComplete(c *gin.Context) { manager, caller, runID, ok := h.agentTaskLeaseMutationSetup(c, agentTaskActionComplete) diff --git a/internal/api/testutil/task_stub.go b/internal/api/testutil/task_stub.go index 681b128f3..913ac6bd4 100644 --- a/internal/api/testutil/task_stub.go +++ b/internal/api/testutil/task_stub.go @@ -140,6 +140,7 @@ type StubTaskManager struct { AttachRunSessionFn func(context.Context, string, string, taskpkg.ActorContext) (*taskpkg.Run, error) HeartbeatRunLeaseFn func(context.Context, taskpkg.LeaseHeartbeat, taskpkg.ActorContext) (*taskpkg.Run, error) ReleaseRunLeaseFn func(context.Context, taskpkg.LeaseRelease, taskpkg.ActorContext) (*taskpkg.Run, error) + BlockRunLeaseFn func(context.Context, taskpkg.LeaseBlock, taskpkg.ActorContext) (*taskpkg.Run, error) ForceReleaseRunFn forceReleaseRunFunc ForceFailRunFn forceFailRunFunc RetryRunFn retryRunFunc @@ -546,6 +547,17 @@ func (s StubTaskManager) ReleaseRunLease( return nil, taskpkg.ErrTaskRunNotFound } +func (s StubTaskManager) BlockRunLease( + ctx context.Context, + block taskpkg.LeaseBlock, + actor taskpkg.ActorContext, +) (*taskpkg.Run, error) { + if s.BlockRunLeaseFn != nil { + return s.BlockRunLeaseFn(ctx, block, actor) + } + return nil, taskpkg.ErrTaskRunNotFound +} + func (s StubTaskManager) ForceReleaseRun( ctx context.Context, runID string, diff --git a/internal/api/udsapi/handlers_test.go b/internal/api/udsapi/handlers_test.go index afe5b4b47..5e858a43c 100644 --- a/internal/api/udsapi/handlers_test.go +++ b/internal/api/udsapi/handlers_test.go @@ -352,6 +352,7 @@ func TestRegisterRoutesCoversTechSpecEndpoints(t *testing.T) { "POST /api/agent/channels/reply", "POST /api/agent/soul/validate", "POST /api/agent/spawn", + "POST /api/agent/tasks/:run_id/block", "POST /api/agent/tasks/:run_id/complete", "POST /api/agent/tasks/:run_id/fail", "POST /api/agent/tasks/:run_id/heartbeat", @@ -840,6 +841,7 @@ func TestRegisterTaskRoutesUseSharedHandlerBindings(t *testing.T) { "GET /api/agent/me": "AgentMe", "POST /api/agent/channels/:channel/send": "AgentChannelSend", "POST /api/agent/channels/reply": "AgentChannelReply", + "POST /api/agent/tasks/:run_id/block": "AgentTaskBlock", "POST /api/agent/tasks/:run_id/complete": "AgentTaskComplete", "POST /api/agent/tasks/:run_id/fail": "AgentTaskFail", "POST /api/agent/tasks/:run_id/heartbeat": "AgentTaskHeartbeat", diff --git a/internal/api/udsapi/routes.go b/internal/api/udsapi/routes.go index ff0641009..055c770db 100644 --- a/internal/api/udsapi/routes.go +++ b/internal/api/udsapi/routes.go @@ -176,6 +176,7 @@ func registerAgentKernelRoutes(api gin.IRouter, handlers *Handlers) { tasks.POST("/:run_id/complete", handlers.AgentTaskComplete) tasks.POST("/:run_id/fail", handlers.AgentTaskFail) tasks.POST("/:run_id/release", handlers.AgentTaskRelease) + tasks.POST("/:run_id/block", handlers.AgentTaskBlock) } } } diff --git a/internal/cli/client.go b/internal/cli/client.go index eb32e081f..73e0a45ce 100644 --- a/internal/cli/client.go +++ b/internal/cli/client.go @@ -482,6 +482,12 @@ type DaemonClient interface { request AgentTaskFailRequest, credentials agentidentity.Credentials, ) (AgentTaskLeaseRecord, error) + AgentTaskBlock( + ctx context.Context, + runID string, + request AgentTaskBlockRequest, + credentials agentidentity.Credentials, + ) (AgentTaskLeaseRecord, error) AgentTaskRelease( ctx context.Context, runID string, @@ -1059,6 +1065,9 @@ type AgentTaskCompleteRequest = contract.AgentTaskCompleteRequest // AgentTaskFailRequest captures one agent lease failure request. type AgentTaskFailRequest = contract.AgentTaskFailRequest +// AgentTaskBlockRequest captures one agent lease block request. +type AgentTaskBlockRequest = contract.AgentTaskBlockRequest + // AgentTaskReleaseRequest captures one agent lease release request. type AgentTaskReleaseRequest = contract.AgentTaskReleaseRequest @@ -4778,6 +4787,15 @@ func (c *unixSocketClient) AgentTaskFail( return c.agentTaskLeaseAction(ctx, strings.TrimSpace(runID), "fail", request, credentials) } +func (c *unixSocketClient) AgentTaskBlock( + ctx context.Context, + runID string, + request AgentTaskBlockRequest, + credentials agentidentity.Credentials, +) (AgentTaskLeaseRecord, error) { + return c.agentTaskLeaseAction(ctx, strings.TrimSpace(runID), "block", request, credentials) +} + func (c *unixSocketClient) AgentTaskRelease( ctx context.Context, runID string, diff --git a/internal/cli/helpers_test.go b/internal/cli/helpers_test.go index 893bc4dee..b44196102 100644 --- a/internal/cli/helpers_test.go +++ b/internal/cli/helpers_test.go @@ -295,6 +295,7 @@ type stubClient struct { agentTaskHeartbeatFn func(context.Context, string, AgentTaskHeartbeatRequest, agentidentity.Credentials) (AgentTaskLeaseRecord, error) agentTaskCompleteFn func(context.Context, string, AgentTaskCompleteRequest, agentidentity.Credentials) (AgentTaskLeaseRecord, error) agentTaskFailFn func(context.Context, string, AgentTaskFailRequest, agentidentity.Credentials) (AgentTaskLeaseRecord, error) + agentTaskBlockFn func(context.Context, string, AgentTaskBlockRequest, agentidentity.Credentials) (AgentTaskLeaseRecord, error) agentTaskReleaseFn func(context.Context, string, AgentTaskReleaseRequest, agentidentity.Credentials) (AgentTaskLeaseRecord, error) } @@ -2641,6 +2642,18 @@ func (s *stubClient) AgentTaskFail( return AgentTaskLeaseRecord{}, errors.New("unexpected AgentTaskFail call") } +func (s *stubClient) AgentTaskBlock( + ctx context.Context, + runID string, + request AgentTaskBlockRequest, + credentials agentidentity.Credentials, +) (AgentTaskLeaseRecord, error) { + if s.agentTaskBlockFn != nil { + return s.agentTaskBlockFn(ctx, runID, request, credentials) + } + return AgentTaskLeaseRecord{}, errors.New("unexpected AgentTaskBlock call") +} + func (s *stubClient) AgentTaskRelease( ctx context.Context, runID string, diff --git a/internal/cli/task.go b/internal/cli/task.go index 82a61d759..17526a91a 100644 --- a/internal/cli/task.go +++ b/internal/cli/task.go @@ -230,6 +230,7 @@ func newTaskCommand(deps commandDeps) *cobra.Command { cmd.AddCommand(newTaskHeartbeatCommand(deps)) cmd.AddCommand(newTaskCompleteCommand(deps)) cmd.AddCommand(newTaskFailCommand(deps)) + cmd.AddCommand(newTaskBlockCommand(deps)) cmd.AddCommand(newTaskReleaseCommand(deps)) cmd.AddCommand(newTaskRetryCommand(deps)) cmd.AddCommand(newTaskRecoverCommand(deps)) @@ -1441,6 +1442,50 @@ func newTaskFailCommand(deps commandDeps) *cobra.Command { return cmd } +func newTaskBlockCommand(deps commandDeps) *cobra.Command { + var reason string + + cmd := &cobra.Command{ + Use: "block ", + Short: "Park a claimed task run in needs_attention for human input", + Args: cobra.ExactArgs(1), + Example: ` # Park the current session's claimed run with a specific blocker + agh task block run-123 --reason "Need product approval for the destructive migration"`, + RunE: func(cmd *cobra.Command, args []string) error { + runID, err := requiredAgentTaskRunID(args[0]) + if err != nil { + return err + } + request := AgentTaskBlockRequest{Reason: strings.TrimSpace(reason)} + if request.Reason == "" { + return errors.New("cli: --reason is required") + } + + client, err := clientFromDeps(deps) + if err != nil { + return err + } + credentials, err := requireAgentCommandIdentity( + cmd.Context(), + deps, + client, + agentActionCLI("task.block"), + ) + if err != nil { + return err + } + record, err := client.AgentTaskBlock(cmd.Context(), runID, request, credentials) + if err != nil { + return err + } + return writeCommandOutput(cmd, agentTaskLeaseBundle(record)) + }, + } + cmd.Flags().StringVar(&reason, "reason", "", "Specific human-facing blocker or question") + mustMarkFlagRequired(cmd, "reason") + return cmd +} + type taskFailCommandFlags struct { reason string errorMessage string diff --git a/internal/cli/task_test.go b/internal/cli/task_test.go index b6b00c0bc..ec8f3972e 100644 --- a/internal/cli/task_test.go +++ b/internal/cli/task_test.go @@ -880,6 +880,38 @@ func TestAgentTaskCommandsMapLeaseRequests(t *testing.T) { } }, }, + { + name: "Should map task block request", + args: []string{ + "task", + "block", + "run-1", + "--reason", + "Need approval for the production migration", + "-o", + "json", + }, + fn: func(t *testing.T) *stubClient { + t.Helper() + return &stubClient{ + agentTaskBlockFn: func( + ctx context.Context, + runID string, + request AgentTaskBlockRequest, + credentials agentidentity.Credentials, + ) (AgentTaskLeaseRecord, error) { + if ctx == nil { + t.Fatal("AgentTaskBlock context is nil") + } + assertAgentCredentials(t, credentials) + if runID != "run-1" || request.Reason != "Need approval for the production migration" { + t.Fatalf("block runID=%q request=%#v, want run-1 reason", runID, request) + } + return agentTaskLeaseRecord(taskpkg.TaskRunStatusNeedsAttention), nil + }, + } + }, + }, } { t.Run(tt.name, func(t *testing.T) { t.Parallel() @@ -1115,6 +1147,11 @@ func TestAgentTaskCommandsValidateBeforeAgentCalls(t *testing.T) { args: []string{"task", "next", "--priority-min", "-1", "-o", "json"}, wantErr: "--priority-min must be zero or positive", }, + { + name: "Should reject block without reason", + args: []string{"task", "block", "run-1", "-o", "json"}, + wantErr: `required flag(s) "reason" not set`, + }, { name: "Should reject invalid result json", args: []string{ @@ -1183,6 +1220,10 @@ func TestAgentTaskCommandsValidateBeforeAgentCalls(t *testing.T) { t.Fatal("AgentTaskFail should not be called for local validation errors") return AgentTaskLeaseRecord{}, nil }, + agentTaskBlockFn: func(context.Context, string, AgentTaskBlockRequest, agentidentity.Credentials) (AgentTaskLeaseRecord, error) { + t.Fatal("AgentTaskBlock should not be called for local validation errors") + return AgentTaskLeaseRecord{}, nil + }, agentTaskReleaseFn: func(context.Context, string, AgentTaskReleaseRequest, agentidentity.Credentials) (AgentTaskLeaseRecord, error) { t.Fatal("AgentTaskRelease should not be called for local validation errors") return AgentTaskLeaseRecord{}, nil diff --git a/internal/coordinator/coordinator.go b/internal/coordinator/coordinator.go index c5704c95f..951e972c0 100644 --- a/internal/coordinator/coordinator.go +++ b/internal/coordinator/coordinator.go @@ -57,6 +57,7 @@ var ( toolspkg.ToolIDTaskRunComplete.String(), toolspkg.ToolIDTaskRunFail.String(), toolspkg.ToolIDTaskRunRelease.String(), + toolspkg.ToolIDTaskRunBlock.String(), toolspkg.ToolIDTaskCreate.String(), } ) @@ -244,7 +245,7 @@ func PromptOverlay(input PromptInput) string { b.WriteString("\nUse public AGH agent APIs only:\n") b.WriteString("- `agh me context` for the Situation Surface.\n") b.WriteString("- `agh task create` to persist follow-up task intent.\n") - b.WriteString("- `agh task next|heartbeat|complete|fail|release` for task ownership and terminal status.\n") + b.WriteString("- `agh task next|heartbeat|complete|fail|release|block` for task ownership and terminal status.\n") b.WriteString("- `agh ch list|recv|send|reply` for operational worker communication.\n") b.WriteString("- `agh spawn` for bounded worker delegation.\n") b.WriteString("\nCreating a task only records follow-up intent. The current coordinator run is the active ") diff --git a/internal/coordinator/coordinator_test.go b/internal/coordinator/coordinator_test.go index 5a17accdc..0fb8c7eb7 100644 --- a/internal/coordinator/coordinator_test.go +++ b/internal/coordinator/coordinator_test.go @@ -267,7 +267,7 @@ func TestPromptOverlayUsesPublicAPIsAndRunChannel(t *testing.T) { for _, required := range []string{ "agh me context", "agh task create", - "agh task next|heartbeat|complete|fail|release", + "agh task next|heartbeat|complete|fail|release|block", "agh ch list|recv|send|reply", "agh spawn", "The current coordinator run is the active execution boundary", diff --git a/internal/daemon/daemon_test.go b/internal/daemon/daemon_test.go index 5a901856b..cc6d1a86a 100644 --- a/internal/daemon/daemon_test.go +++ b/internal/daemon/daemon_test.go @@ -6184,6 +6184,13 @@ func (r *recordingRegistry) ReleaseRunLease( return taskpkg.Run{}, taskpkg.ErrTaskRunNotFound } +func (r *recordingRegistry) BlockRunLease( + context.Context, + taskpkg.LeaseBlock, +) (taskpkg.Run, error) { + return taskpkg.Run{}, taskpkg.ErrTaskRunNotFound +} + func (r *recordingRegistry) ForceReleaseTaskRun( context.Context, taskpkg.ForceReleaseRunMutation, diff --git a/internal/daemon/native_tools.go b/internal/daemon/native_tools.go index 6eb28b05a..ae7c682cc 100644 --- a/internal/daemon/native_tools.go +++ b/internal/daemon/native_tools.go @@ -916,6 +916,10 @@ func (n *daemonNativeTools) autonomyToolBindings( call: n.autonomyRelease, availability: availability, }, + toolspkg.ToolIDTaskRunBlock: { + call: n.autonomyBlock, + availability: availability, + }, toolspkg.ToolIDTaskRunReviewSubmit: { call: n.submitRunReview, availability: n.submitRunReviewAvailability, @@ -2763,6 +2767,43 @@ func (n *daemonNativeTools) autonomyRelease( return structuredResult(map[string]any{nativeToolsLeaseKey: lease}, fmt.Sprintf("released %s", lease.RunID)) } +func (n *daemonNativeTools) autonomyBlock( + ctx context.Context, + scope toolspkg.Scope, + req toolspkg.CallRequest, +) (toolspkg.ToolResult, error) { + var input autonomyBlockInput + if err := decodeNativeInput(req, &input); err != nil { + return toolspkg.ToolResult{}, err + } + actor, sessionID, err := autonomyActorContext(req.ToolID, scope) + if err != nil { + return toolspkg.ToolResult{}, err + } + runID, err := requiredNativeString(req.ToolID, "run_id", input.RunID) + if err != nil { + return toolspkg.ToolResult{}, err + } + reason, err := requiredNativeString(req.ToolID, "reason", input.Reason) + if err != nil { + return toolspkg.ToolResult{}, err + } + handle, err := n.lookupAutonomyLease(ctx, req.ToolID, sessionID, runID) + if err != nil { + return toolspkg.ToolResult{}, err + } + run, err := n.deps.Tasks.BlockRunLease(ctx, taskpkg.LeaseBlock{ + RunID: runID, + ClaimToken: handle.ClaimToken, + Reason: reason, + }, actor) + if err != nil { + return toolspkg.ToolResult{}, nativeAutonomyToolError(req.ToolID, err) + } + lease := core.AgentTaskLeasePayloadFromRun(run, nil) + return structuredResult(map[string]any{nativeToolsLeaseKey: lease}, fmt.Sprintf("blocked %s", lease.RunID)) +} + func (n *daemonNativeTools) skillsFor( ctx context.Context, scope toolspkg.Scope, @@ -3568,6 +3609,11 @@ type autonomyReleaseInput struct { Reason string `json:"reason,omitempty"` } +type autonomyBlockInput struct { + RunID string `json:"run_id"` + Reason string `json:"reason"` +} + func decodeNativeInput(req toolspkg.CallRequest, dst any) error { raw := req.Input if len(bytes.TrimSpace(raw)) == 0 { diff --git a/internal/daemon/native_tools_test.go b/internal/daemon/native_tools_test.go index 352136be1..d87f34de7 100644 --- a/internal/daemon/native_tools_test.go +++ b/internal/daemon/native_tools_test.go @@ -2123,6 +2123,11 @@ func TestDaemonNativeTools(t *testing.T) { toolID: toolspkg.ToolIDTaskRunRelease, input: json.RawMessage(`{"run_id":"run-1","reason":"handoff"}`), }, + { + name: "Should block with internal lease token", + toolID: toolspkg.ToolIDTaskRunBlock, + input: json.RawMessage(`{"run_id":"run-1","reason":"blocked_on_human"}`), + }, } { t.Run(tt.name, func(t *testing.T) { result, err := registry.Call( @@ -2139,12 +2144,32 @@ func TestDaemonNativeTools(t *testing.T) { } if tasks.lastCompletion.ClaimToken != rawToken || tasks.lastFailure.ClaimToken != rawToken || - tasks.lastRelease.ClaimToken != rawToken { + tasks.lastRelease.ClaimToken != rawToken || + tasks.lastBlock.ClaimToken != rawToken { t.Fatalf( - "terminal/release tokens = %q/%q/%q, want internal token", + "terminal/release/block tokens = %q/%q/%q/%q, want internal token", tasks.lastCompletion.ClaimToken, tasks.lastFailure.ClaimToken, tasks.lastRelease.ClaimToken, + tasks.lastBlock.ClaimToken, + ) + } + + lookupCallsBeforeInvalidBlock := tasks.lookupCalls + _, err = registry.Call( + t.Context(), + scope, + toolspkg.CallRequest{ + ToolID: toolspkg.ToolIDTaskRunBlock, + Input: json.RawMessage(`{"run_id":"run-1","reason":" "}`), + }, + ) + requireToolReason(t, err, toolspkg.ErrToolInvalidInput, toolspkg.ReasonSchemaInvalid) + if tasks.lookupCalls != lookupCallsBeforeInvalidBlock { + t.Fatalf( + "lookup calls after invalid block reason = %d, want %d", + tasks.lookupCalls, + lookupCallsBeforeInvalidBlock, ) } }) @@ -6406,6 +6431,9 @@ type nativeTaskManager struct { releaseCalls int lastRelease taskpkg.LeaseRelease releaseErr error + blockCalls int + lastBlock taskpkg.LeaseBlock + blockErr error lookupReviewCalls int lastReviewSessionID string reviewBinding taskpkg.RunReviewBinding @@ -6657,6 +6685,20 @@ func (m *nativeTaskManager) ReleaseRunLease( return &run, nil } +func (m *nativeTaskManager) BlockRunLease( + _ context.Context, + block taskpkg.LeaseBlock, + _ taskpkg.ActorContext, +) (*taskpkg.Run, error) { + m.blockCalls++ + m.lastBlock = block + if m.blockErr != nil { + return nil, m.blockErr + } + run := nativeLeaseRun(block.RunID, taskpkg.TaskRunStatusNeedsAttention, m.lookupHandle) + return &run, nil +} + func (m *nativeTaskManager) LookupRunReviewForSession( _ context.Context, sessionID string, @@ -6769,6 +6811,7 @@ func (m *nativeTaskManager) totalCalls() int { m.completeCalls + m.failCalls + m.releaseCalls + + m.blockCalls + m.lookupReviewCalls + m.requestReviewCalls + m.getReviewCalls + @@ -7088,6 +7131,14 @@ func (unsupportedNativeTaskManager) ReleaseRunLease( return nil, errUnexpectedNativeTaskCall } +func (unsupportedNativeTaskManager) BlockRunLease( + context.Context, + taskpkg.LeaseBlock, + taskpkg.ActorContext, +) (*taskpkg.Run, error) { + return nil, errUnexpectedNativeTaskCall +} + func (unsupportedNativeTaskManager) ForceReleaseRun( context.Context, string, diff --git a/internal/daemon/task_runtime.go b/internal/daemon/task_runtime.go index 87144f9f8..6e332439a 100644 --- a/internal/daemon/task_runtime.go +++ b/internal/daemon/task_runtime.go @@ -15,6 +15,7 @@ import ( "github.com/compozy/agh/internal/session" "github.com/compozy/agh/internal/store" taskpkg "github.com/compozy/agh/internal/task" + aghworkspace "github.com/compozy/agh/internal/workspace" ) const ( @@ -35,6 +36,10 @@ type taskStore interface { taskpkg.Store } +type taskWorkspaceGetter interface { + GetWorkspace(ctx context.Context, id string) (aghworkspace.Workspace, error) +} + type taskRuntime struct { manager *taskpkg.Service store taskStore @@ -490,6 +495,21 @@ func taskManagerOptions( }), taskpkg.WithStarvationAge(scheduler.MinQueuedAge), } + if workspaceStore, ok := store.(taskWorkspaceGetter); ok { + options = append(options, taskpkg.WithCompletionContractRootResolver( + func(ctx context.Context, taskRecord taskpkg.Task, _ taskpkg.Run) (string, error) { + workspaceID := strings.TrimSpace(taskRecord.WorkspaceID) + if workspaceID == "" { + return "", fmt.Errorf("workspace_id is required") + } + workspaceRecord, err := workspaceStore.GetWorkspace(ctx, workspaceID) + if err != nil { + return "", err + } + return workspaceRecord.RootDir, nil + }, + )) + } if hooks != nil { options = append(options, taskpkg.WithTaskRunHooks(hooks)) } diff --git a/internal/store/globaldb/global_db_task_claim.go b/internal/store/globaldb/global_db_task_claim.go index 514bf2fcf..d074fe33b 100644 --- a/internal/store/globaldb/global_db_task_claim.go +++ b/internal/store/globaldb/global_db_task_claim.go @@ -193,6 +193,39 @@ func (g *GlobalDB) ReleaseRunLease(ctx context.Context, release taskpkg.LeaseRel return updated, nil } +// BlockRunLease parks an active task-run lease in needs_attention after token verification. +func (g *GlobalDB) BlockRunLease(ctx context.Context, block taskpkg.LeaseBlock) (taskpkg.Run, error) { + if err := g.checkReady(ctx, "block task run lease"); err != nil { + return taskpkg.Run{}, err + } + normalized, err := block.Normalize(g.now()) + if err != nil { + return taskpkg.Run{}, err + } + + var updated taskpkg.Run + if err := g.withTaskImmediateTransaction(ctx, "block task run lease", func(exec taskSQLExecutor) error { + current, err := g.getTaskRunWithExecutor(ctx, exec, normalized.RunID) + if err != nil { + return err + } + if err := requireCurrentRunLease(current, normalized.ClaimToken, normalized.Now); err != nil { + return err + } + if err := blockLeasedRun(ctx, exec, current.ID, normalized.Reason); err != nil { + return err + } + if err := clearTaskCurrentRunProjection(ctx, exec, current.TaskID, current.ID); err != nil { + return err + } + updated, err = g.getTaskRunWithExecutor(ctx, exec, current.ID) + return err + }); err != nil { + return taskpkg.Run{}, err + } + return updated, nil +} + // CompleteRunLease marks one claimed run complete after token verification. func (g *GlobalDB) CompleteRunLease(ctx context.Context, completion taskpkg.LeaseCompletion) (taskpkg.Run, error) { if err := g.checkReady(ctx, "complete task run lease"); err != nil { @@ -817,6 +850,24 @@ func requeueLeasedRun(ctx context.Context, exec taskSQLExecutor, runID string) e return requireRowsAffected(result, taskpkg.ErrTaskRunNotFound, runID, "task run lease") } +func blockLeasedRun(ctx context.Context, exec taskSQLExecutor, runID string, reason string) error { + result, err := exec.ExecContext( + ctx, + `UPDATE task_runs + SET status = ?, claimed_by_kind = NULL, claimed_by_ref = NULL, session_id = NULL, + claim_token = NULL, claim_token_hash = NULL, lease_until = NULL, heartbeat_at = NULL, + claimed_at = NULL, ended_at = NULL, error = ?, result_json = NULL + WHERE id = ?`, + string(taskpkg.TaskRunStatusNeedsAttention), + strings.TrimSpace(reason), + runID, + ) + if err != nil { + return fmt.Errorf("store: block task run lease %q: %w", runID, err) + } + return requireRowsAffected(result, taskpkg.ErrTaskRunNotFound, runID, "task run lease") +} + func expiredLeaseRunIDs( ctx context.Context, exec taskSQLExecutor, diff --git a/internal/store/globaldb/global_db_task_claim_test.go b/internal/store/globaldb/global_db_task_claim_test.go index 445dd349c..4c24b27ba 100644 --- a/internal/store/globaldb/global_db_task_claim_test.go +++ b/internal/store/globaldb/global_db_task_claim_test.go @@ -838,6 +838,73 @@ func TestGlobalDBClaimLeaseLifecycleFencing(t *testing.T) { } } +func TestGlobalDBBlockRunLeaseParksNeedsAttention(t *testing.T) { + globalDB := openTestGlobalDB(t) + ctx := testutil.Context(t) + now := time.Date(2026, 4, 26, 13, 0, 0, 0, time.UTC) + taskRecord := taskRecordForTest("task-block-lease") + taskRecord.Status = taskpkg.TaskStatusReady + if err := globalDB.CreateTask(ctx, taskRecord); err != nil { + t.Fatalf("CreateTask() error = %v", err) + } + run := taskRunForTest("run-block-lease", taskRecord.ID) + if err := globalDB.CreateTaskRun(ctx, run); err != nil { + t.Fatalf("CreateTaskRun() error = %v", err) + } + claim, err := globalDB.ClaimNextRun(ctx, taskpkg.ClaimCriteria{ + Scope: taskpkg.ScopeGlobal, + ClaimerSessionID: "sess-block-lease", + LeaseDuration: time.Minute, + Now: now, + }) + if err != nil { + t.Fatalf("ClaimNextRun() error = %v", err) + } + blocked, err := globalDB.BlockRunLease(ctx, taskpkg.LeaseBlock{ + RunID: claim.Run.ID, + ClaimToken: claim.ClaimToken, + Reason: "blocked_on_human", + Now: now.Add(10 * time.Second), + }) + if err != nil { + t.Fatalf("BlockRunLease() error = %v", err) + } + if got, want := blocked.Status, taskpkg.TaskRunStatusNeedsAttention; got != want { + t.Fatalf("blocked.Status = %q, want %q", got, want) + } + if blocked.ClaimTokenHash != "" || + blocked.SessionID != "" || + blocked.ClaimedBy != nil || + !blocked.LeaseUntil.IsZero() { + t.Fatalf( + "blocked ownership fields = hash %q session %q claimed_by %#v lease %v, want cleared", + blocked.ClaimTokenHash, + blocked.SessionID, + blocked.ClaimedBy, + blocked.LeaseUntil, + ) + } + if got, want := blocked.Error, "blocked_on_human"; got != want { + t.Fatalf("blocked.Error = %q, want %q", got, want) + } + var rawClaimToken sql.NullString + if err := globalDB.db.QueryRowContext(ctx, `SELECT claim_token FROM task_runs WHERE id = ?`, blocked.ID). + Scan(&rawClaimToken); err != nil { + t.Fatalf("query blocked claim_token error = %v", err) + } + if rawClaimToken.Valid { + t.Fatalf("blocked stored raw claim_token = %q, want NULL", rawClaimToken.String) + } + if _, err := globalDB.ClaimNextRun(ctx, taskpkg.ClaimCriteria{ + Scope: taskpkg.ScopeGlobal, + ClaimerSessionID: "sess-other", + LeaseDuration: time.Minute, + Now: now.Add(20 * time.Second), + }); !errors.Is(err, taskpkg.ErrNoClaimableRun) { + t.Fatalf("ClaimNextRun(after block) error = %v, want %v", err, taskpkg.ErrNoClaimableRun) + } +} + func TestGlobalDBRecoverExpiredRunLeasesThenClaim(t *testing.T) { globalDB := openTestGlobalDB(t) ctx := testutil.Context(t) diff --git a/internal/task/completion_contract.go b/internal/task/completion_contract.go new file mode 100644 index 000000000..c630d010f --- /dev/null +++ b/internal/task/completion_contract.go @@ -0,0 +1,221 @@ +package task + +import ( + "context" + "encoding/json" + "fmt" + "os" + "path/filepath" + "strings" + "time" +) + +const completionContractMetadataKey = "completion_contract" + +type completionContract struct { + RequiredArtifacts []completionRequiredArtifact `json:"required_artifacts,omitempty"` + RequiredPaths []string `json:"required_paths,omitempty"` + MissingPolicy string `json:"missing_policy,omitempty"` +} + +type completionRequiredArtifact struct { + Path string `json:"path"` +} + +func (m *Service) validateCompletionContract(ctx context.Context, taskRecord Task, run Run) error { + if m == nil { + return nil + } + contracts, err := completionContractsFromMetadata(taskRecord.Metadata, run.Metadata) + if err != nil { + return err + } + if len(contracts) == 0 { + return nil + } + var missing []string + for _, contract := range contracts { + if err := validateCompletionMissingPolicy(contract.MissingPolicy); err != nil { + return err + } + required := contract.RequiredArtifactPaths() + for _, rawPath := range required { + resolved, err := m.resolveCompletionArtifactPath(ctx, taskRecord, run, rawPath) + if err != nil { + return err + } + info, err := os.Stat(resolved) + if err != nil { + if os.IsNotExist(err) { + missing = append(missing, rawPath) + continue + } + return fmt.Errorf("%w: completion contract artifact %q: %w", ErrValidation, rawPath, err) + } + if info.IsDir() { + missing = append(missing, rawPath) + } + } + } + if len(missing) > 0 { + return fmt.Errorf( + "%w: task run %q completion contract missing required artifact(s): %s", + ErrValidation, + run.ID, + strings.Join(missing, ", "), + ) + } + return nil +} + +func (m *Service) resolveCompletionArtifactPath( + ctx context.Context, + taskRecord Task, + run Run, + rawPath string, +) (string, error) { + trimmed := strings.TrimSpace(rawPath) + if trimmed == "" { + return "", fmt.Errorf("%w: completion contract artifact path is required", ErrValidation) + } + if strings.Contains(trimmed, "agh_claim_") { + return "", fmt.Errorf("%w: completion contract artifact path must not embed a claim token", ErrValidation) + } + if filepath.IsAbs(trimmed) { + return "", fmt.Errorf( + "%w: completion contract artifact %q must be relative to the workspace root", + ErrValidation, + trimmed, + ) + } + if m.contractRoot == nil { + return "", fmt.Errorf( + "%w: completion contract artifact %q is relative but no workspace root resolver is configured", + ErrValidation, + trimmed, + ) + } + root, err := m.contractRoot(ctx, taskRecord, run) + if err != nil { + return "", fmt.Errorf("%w: resolve completion contract root: %w", ErrValidation, err) + } + root = strings.TrimSpace(root) + if root == "" { + return "", fmt.Errorf( + "%w: completion contract root is required for relative artifact %q", + ErrValidation, + trimmed, + ) + } + cleanRel := filepath.Clean(trimmed) + if cleanRel == "." || strings.HasPrefix(cleanRel, ".."+string(filepath.Separator)) || cleanRel == ".." { + return "", fmt.Errorf( + "%w: completion contract artifact %q must stay under workspace root", + ErrValidation, + trimmed, + ) + } + return filepath.Join(root, cleanRel), nil +} + +func completionContractsFromMetadata(rawValues ...json.RawMessage) ([]completionContract, error) { + var contracts []completionContract + for _, raw := range rawValues { + if len(raw) == 0 || strings.TrimSpace(string(raw)) == "" { + continue + } + var envelope map[string]json.RawMessage + if err := json.Unmarshal(raw, &envelope); err != nil { + return nil, fmt.Errorf("%w: decode completion contract metadata: %w", ErrValidation, err) + } + payload, ok := envelope[completionContractMetadataKey] + if !ok || len(payload) == 0 || string(payload) == "null" { + continue + } + contract, err := decodeCompletionContract(payload) + if err != nil { + return nil, err + } + if len(contract.RequiredArtifactPaths()) > 0 { + contracts = append(contracts, contract) + } + } + return contracts, nil +} + +func decodeCompletionContract(raw json.RawMessage) (completionContract, error) { + var contract completionContract + if err := json.Unmarshal(raw, &contract); err != nil { + return completionContract{}, fmt.Errorf("%w: decode completion contract: %w", ErrValidation, err) + } + var envelope map[string]json.RawMessage + if err := json.Unmarshal(raw, &envelope); err != nil { + return completionContract{}, fmt.Errorf("%w: decode completion contract fields: %w", ErrValidation, err) + } + if rawArtifacts, ok := envelope["required_artifacts"]; ok { + paths, err := decodeCompletionRequiredArtifacts(rawArtifacts) + if err != nil { + return completionContract{}, err + } + contract.RequiredArtifacts = paths + } + return contract, nil +} + +func decodeCompletionRequiredArtifacts(raw json.RawMessage) ([]completionRequiredArtifact, error) { + var asObjects []completionRequiredArtifact + if err := json.Unmarshal(raw, &asObjects); err == nil { + return asObjects, nil + } + var asStrings []string + if err := json.Unmarshal(raw, &asStrings); err == nil { + artifacts := make([]completionRequiredArtifact, 0, len(asStrings)) + for _, path := range asStrings { + artifacts = append(artifacts, completionRequiredArtifact{Path: path}) + } + return artifacts, nil + } + return nil, fmt.Errorf("%w: completion_contract.required_artifacts must be an array", ErrValidation) +} + +func (c completionContract) RequiredArtifactPaths() []string { + paths := make([]string, 0, len(c.RequiredArtifacts)+len(c.RequiredPaths)) + for _, artifact := range c.RequiredArtifacts { + if trimmed := strings.TrimSpace(artifact.Path); trimmed != "" { + paths = append(paths, trimmed) + } + } + for _, path := range c.RequiredPaths { + if trimmed := strings.TrimSpace(path); trimmed != "" { + paths = append(paths, trimmed) + } + } + return paths +} + +func validateCompletionMissingPolicy(policy string) error { + switch strings.TrimSpace(policy) { + case "", "reject": + return nil + default: + return fmt.Errorf("%w: completion_contract.missing_policy must be reject", ErrValidation) + } +} + +func validateActiveLeasePreconditions(run Run, rawToken string, now time.Time) error { + if strings.TrimSpace(run.ClaimTokenHash) == "" { + return fmt.Errorf("%w: task run %q has no current claim token hash", ErrInvalidClaimToken, run.ID) + } + if !VerifyClaimToken(rawToken, run.ClaimTokenHash) { + return fmt.Errorf("%w: task run %q token mismatch", ErrInvalidClaimToken, run.ID) + } + switch run.Status.Normalize() { + case TaskRunStatusClaimed, TaskRunStatusStarting, TaskRunStatusRunning: + default: + return fmt.Errorf("%w: task run %q is not actively leased", ErrInvalidStatusTransition, run.ID) + } + if run.LeaseUntil.IsZero() || !run.LeaseUntil.After(now.UTC()) { + return fmt.Errorf("%w: task run %q lease expired", ErrLeaseExpired, run.ID) + } + return nil +} diff --git a/internal/task/interfaces.go b/internal/task/interfaces.go index 0e7b24e34..d17f26eb3 100644 --- a/internal/task/interfaces.go +++ b/internal/task/interfaces.go @@ -53,6 +53,7 @@ type Manager interface { AttachRunSession(ctx context.Context, runID string, sessionID string, actor ActorContext) (*Run, error) HeartbeatRunLease(ctx context.Context, heartbeat LeaseHeartbeat, actor ActorContext) (*Run, error) ReleaseRunLease(ctx context.Context, release LeaseRelease, actor ActorContext) (*Run, error) + BlockRunLease(ctx context.Context, block LeaseBlock, actor ActorContext) (*Run, error) ForceReleaseRun(ctx context.Context, runID string, release ForceReleaseRun, actor ActorContext) (*Run, error) ForceFailRun(ctx context.Context, runID string, failure ForceFailRun, actor ActorContext) (*Run, error) RetryRun(ctx context.Context, runID string, retry RetryRunRequest, actor ActorContext) (*RetryRunResult, error) @@ -139,6 +140,7 @@ type RunStore interface { ClaimNextRun(ctx context.Context, criteria ClaimCriteria) (ClaimResult, error) HeartbeatRunLease(ctx context.Context, heartbeat LeaseHeartbeat) (Run, error) ReleaseRunLease(ctx context.Context, release LeaseRelease) (Run, error) + BlockRunLease(ctx context.Context, block LeaseBlock) (Run, error) CompleteRunLease(ctx context.Context, completion LeaseCompletion) (Run, error) FailRunLease(ctx context.Context, failure LeaseFailure) (Run, error) ForceReleaseTaskRun(ctx context.Context, release ForceReleaseRunMutation) (ForceRunMutationResult, error) diff --git a/internal/task/interfaces_integration_test.go b/internal/task/interfaces_integration_test.go index 1fa9cb475..eaf3d96ff 100644 --- a/internal/task/interfaces_integration_test.go +++ b/internal/task/interfaces_integration_test.go @@ -79,6 +79,10 @@ func (fakeStore) ReleaseRunLease(context.Context, taskpkg.LeaseRelease) (taskpkg return taskpkg.Run{}, nil } +func (fakeStore) BlockRunLease(context.Context, taskpkg.LeaseBlock) (taskpkg.Run, error) { + return taskpkg.Run{}, nil +} + func (fakeStore) CompleteRunLease(context.Context, taskpkg.LeaseCompletion) (taskpkg.Run, error) { return taskpkg.Run{}, nil } diff --git a/internal/task/lease.go b/internal/task/lease.go index 26663d2f5..40dbd95c0 100644 --- a/internal/task/lease.go +++ b/internal/task/lease.go @@ -104,6 +104,14 @@ type LeaseRelease struct { Now time.Time `json:"now"` } +// LeaseBlock captures a token-fenced request to park an active lease in needs_attention. +type LeaseBlock struct { + RunID string `json:"run_id"` + ClaimToken string `json:"claim_token"` + Reason string `json:"reason,omitempty"` + Now time.Time `json:"now"` +} + // LeaseCompletion captures a token-fenced successful terminal transition. type LeaseCompletion struct { RunID string `json:"run_id"` @@ -346,6 +354,30 @@ func (r LeaseRelease) Validate(path string) error { return validateLeaseRunToken(r.RunID, r.ClaimToken, path) } +// Normalize returns a validated block request with default time applied. +func (b LeaseBlock) Normalize(defaultNow time.Time) (LeaseBlock, error) { + normalized := b + normalized.RunID = strings.TrimSpace(normalized.RunID) + normalized.ClaimToken = strings.TrimSpace(normalized.ClaimToken) + normalized.Reason = strings.TrimSpace(normalized.Reason) + normalized.Now = normalizeLeaseNow(normalized.Now, defaultNow) + if err := normalized.Validate("lease_block"); err != nil { + return LeaseBlock{}, err + } + return normalized, nil +} + +// Validate reports whether the block request is internally consistent. +func (b LeaseBlock) Validate(path string) error { + if err := validateLeaseRunToken(b.RunID, b.ClaimToken, path); err != nil { + return err + } + if strings.Contains(b.Reason, "agh_claim_") { + return fmt.Errorf("%w: %s must not embed a claim token", ErrValidation, nestedPath(path, "reason")) + } + return nil +} + // Normalize returns a validated structural session lease release request. func (r SessionLeaseRelease) Normalize(defaultNow time.Time) (SessionLeaseRelease, error) { normalized := r diff --git a/internal/task/lease_manager.go b/internal/task/lease_manager.go index 20d44f836..bd8dd51e9 100644 --- a/internal/task/lease_manager.go +++ b/internal/task/lease_manager.go @@ -124,6 +124,45 @@ func (m *Service) ReleaseRunLease( return &run, nil } +// BlockRunLease parks one active task-run lease in needs_attention after token verification. +func (m *Service) BlockRunLease( + ctx context.Context, + block LeaseBlock, + actor ActorContext, +) (*Run, error) { + if err := requireWriteAuthority(actor); err != nil { + return nil, err + } + normalized, err := block.Normalize(m.now().UTC()) + if err != nil { + return nil, err + } + previous, taskRecord, err := m.loadRunWithTask(ctx, normalized.RunID) + if err != nil { + return nil, err + } + if err := validateActiveLeasePreconditions(previous, normalized.ClaimToken, normalized.Now); err != nil { + return nil, err + } + run, err := m.store.BlockRunLease(ctx, normalized) + if err != nil { + return nil, err + } + if _, err := m.reconcileTaskCascade(ctx, taskRecord.ID); err != nil { + return nil, err + } + if err := m.recordTaskEvent(ctx, run.TaskID, run.ID, taskEventRunNeedsAttention, actor, runNeedsAttentionPayload{ + PreviousStatus: previous.Status, + Status: run.Status, + Diagnostic: normalized.Reason, + QueuedAt: run.QueuedAt, + CoordinationChannel: run.CoordinationChannelID, + }); err != nil { + return nil, err + } + return &run, nil +} + // ReleaseSessionRunLeases structurally releases every active task-run lease // bound to one session without requiring the raw claim token. This is reserved // for daemon-owned runtime cleanup paths such as safe-spawn reaping. @@ -189,6 +228,16 @@ func (m *Service) CompleteRunLease( if err != nil { return nil, err } + current, taskRecord, err := m.loadRunWithTask(ctx, normalized.RunID) + if err != nil { + return nil, err + } + if err := validateActiveLeasePreconditions(current, normalized.ClaimToken, normalized.Now); err != nil { + return nil, err + } + if err := m.validateCompletionContract(ctx, taskRecord, current); err != nil { + return nil, err + } run, err := m.store.CompleteRunLease(ctx, normalized) if err != nil { return nil, err diff --git a/internal/task/lease_test.go b/internal/task/lease_test.go index 9febc01e4..14807fc30 100644 --- a/internal/task/lease_test.go +++ b/internal/task/lease_test.go @@ -4,6 +4,8 @@ import ( "context" "encoding/json" "errors" + "os" + "strconv" "strings" "testing" "time" @@ -517,6 +519,200 @@ func TestManagerClaimNextRunRequiresWriteAuthority(t *testing.T) { } } +func TestManagerBlockRunLeaseParksRunNeedsAttention(t *testing.T) { + t.Parallel() + + manager := newTaskManagerForTest(t, newInMemoryManagerStore()) + operator := validActorContext() + agent := validActorContext() + agent.Actor = ActorIdentity{Kind: ActorKindAgentSession, Ref: "sess-block"} + agent.Origin = Origin{Kind: OriginKindAgentSession, Ref: "worker"} + + taskRecord, err := manager.CreateTask(context.Background(), CreateTask{ + Scope: ScopeGlobal, + Title: "Human blocked task", + }, operator) + if err != nil { + t.Fatalf("CreateTask() error = %v", err) + } + if _, err := manager.EnqueueRun(context.Background(), EnqueueRun{TaskID: taskRecord.ID}, operator); err != nil { + t.Fatalf("EnqueueRun() error = %v", err) + } + now := time.Now().UTC() + claim, err := manager.ClaimNextRun(context.Background(), ClaimCriteria{ + Scope: ScopeGlobal, + ClaimerSessionID: "sess-block", + LeaseDuration: time.Minute, + Now: now, + }, agent) + if err != nil { + t.Fatalf("ClaimNextRun() error = %v", err) + } + + blocked, err := manager.BlockRunLease(context.Background(), LeaseBlock{ + RunID: claim.Run.ID, + ClaimToken: claim.ClaimToken, + Reason: "blocked_on_human: Figma OAuth required", + Now: now.Add(10 * time.Second), + }, agent) + if err != nil { + t.Fatalf("BlockRunLease() error = %v", err) + } + if got, want := blocked.Status, TaskRunStatusNeedsAttention; got != want { + t.Fatalf("blocked.Status = %q, want %q", got, want) + } + if blocked.ClaimTokenHash != "" || + blocked.SessionID != "" || + blocked.ClaimedBy != nil || + !blocked.LeaseUntil.IsZero() { + t.Fatalf( + "blocked ownership fields = hash %q session %q claimed_by %#v lease %v, want cleared", + blocked.ClaimTokenHash, + blocked.SessionID, + blocked.ClaimedBy, + blocked.LeaseUntil, + ) + } + view, err := manager.GetTask(context.Background(), taskRecord.ID, operator) + if err != nil { + t.Fatalf("GetTask() error = %v", err) + } + if got, want := view.Task.Status, TaskStatusBlocked; got != want { + t.Fatalf("task status after block = %q, want %q", got, want) + } + if _, err := manager.ClaimNextRun(context.Background(), ClaimCriteria{ + Scope: ScopeGlobal, + ClaimerSessionID: "sess-other", + LeaseDuration: time.Minute, + Now: now.Add(20 * time.Second), + }, agent); !errors.Is(err, ErrNoClaimableRun) { + t.Fatalf("ClaimNextRun(after block) error = %v, want %v", err, ErrNoClaimableRun) + } +} + +func TestManagerCompleteRunLeaseRequiresCompletionContractArtifacts(t *testing.T) { + t.Parallel() + + root := t.TempDir() + manager := newTaskManagerForTestWithOptions( + t, + newInMemoryManagerStore(), + WithCompletionContractRootResolver(func(context.Context, Task, Run) (string, error) { + return root, nil + }), + ) + operator := validActorContext() + agent := validActorContext() + agent.Actor = ActorIdentity{Kind: ActorKindAgentSession, Ref: "sess-contract"} + agent.Origin = Origin{Kind: OriginKindAgentSession, Ref: "worker"} + + taskRecord, err := manager.CreateTask(context.Background(), CreateTask{ + Scope: ScopeGlobal, + Title: "Receipt gated task", + Metadata: json.RawMessage( + `{"completion_contract":{"required_artifacts":[{"path":"receipts/phase.yaml"}]}}`, + ), + }, operator) + if err != nil { + t.Fatalf("CreateTask() error = %v", err) + } + if _, err := manager.EnqueueRun(context.Background(), EnqueueRun{TaskID: taskRecord.ID}, operator); err != nil { + t.Fatalf("EnqueueRun() error = %v", err) + } + now := time.Now().UTC() + claim, err := manager.ClaimNextRun(context.Background(), ClaimCriteria{ + Scope: ScopeGlobal, + ClaimerSessionID: "sess-contract", + LeaseDuration: time.Minute, + Now: now, + }, agent) + if err != nil { + t.Fatalf("ClaimNextRun() error = %v", err) + } + _, err = manager.CompleteRunLease(context.Background(), LeaseCompletion{ + RunID: claim.Run.ID, + ClaimToken: claim.ClaimToken, + Result: RunResult{Value: json.RawMessage(`{"ok":true}`)}, + Now: now.Add(10 * time.Second), + }, agent) + if err == nil || !errors.Is(err, ErrValidation) { + t.Fatalf("CompleteRunLease(missing receipt) error = %v, want %v", err, ErrValidation) + } + + if err := os.MkdirAll(root+"/receipts", 0o755); err != nil { + t.Fatalf("MkdirAll(receipts) error = %v", err) + } + if err := os.WriteFile(root+"/receipts/phase.yaml", []byte("status: completed\n"), 0o644); err != nil { + t.Fatalf("WriteFile(receipt) error = %v", err) + } + completed, err := manager.CompleteRunLease(context.Background(), LeaseCompletion{ + RunID: claim.Run.ID, + ClaimToken: claim.ClaimToken, + Result: RunResult{Value: json.RawMessage(`{"ok":true}`)}, + Now: now.Add(20 * time.Second), + }, agent) + if err != nil { + t.Fatalf("CompleteRunLease(with receipt) error = %v", err) + } + if got, want := completed.Status, TaskRunStatusCompleted; got != want { + t.Fatalf("completed.Status = %q, want %q", got, want) + } +} + +func TestManagerCompleteRunLeaseRejectsAbsoluteCompletionContractArtifacts(t *testing.T) { + t.Parallel() + + root := t.TempDir() + absoluteReceipt := root + "/receipt.yaml" + if err := os.WriteFile(absoluteReceipt, []byte("status: completed\n"), 0o644); err != nil { + t.Fatalf("WriteFile(receipt) error = %v", err) + } + manager := newTaskManagerForTestWithOptions( + t, + newInMemoryManagerStore(), + WithCompletionContractRootResolver(func(context.Context, Task, Run) (string, error) { + return root, nil + }), + ) + operator := validActorContext() + agent := validActorContext() + agent.Actor = ActorIdentity{Kind: ActorKindAgentSession, Ref: "sess-contract"} + agent.Origin = Origin{Kind: OriginKindAgentSession, Ref: "worker"} + + taskRecord, err := manager.CreateTask(context.Background(), CreateTask{ + Scope: ScopeGlobal, + Title: "Absolute receipt task", + Metadata: json.RawMessage( + `{"completion_contract":{"required_artifacts":[{"path":` + strconv.Quote(absoluteReceipt) + `}]}}`, + ), + }, operator) + if err != nil { + t.Fatalf("CreateTask() error = %v", err) + } + if _, err := manager.EnqueueRun(context.Background(), EnqueueRun{TaskID: taskRecord.ID}, operator); err != nil { + t.Fatalf("EnqueueRun() error = %v", err) + } + now := time.Now().UTC() + claim, err := manager.ClaimNextRun(context.Background(), ClaimCriteria{ + Scope: ScopeGlobal, + ClaimerSessionID: "sess-contract", + LeaseDuration: time.Minute, + Now: now, + }, agent) + if err != nil { + t.Fatalf("ClaimNextRun() error = %v", err) + } + _, err = manager.CompleteRunLease(context.Background(), LeaseCompletion{ + RunID: claim.Run.ID, + ClaimToken: claim.ClaimToken, + Result: RunResult{Value: json.RawMessage(`{"ok":true}`)}, + Now: now.Add(10 * time.Second), + }, agent) + if err == nil || !errors.Is(err, ErrValidation) { + t.Fatalf("CompleteRunLease(absolute receipt) error = %v, want %v", err, ErrValidation) + } +} + func TestManagerReleaseSessionRunLeasesRequeuesActiveRunsStructurally(t *testing.T) { t.Parallel() diff --git a/internal/task/manager.go b/internal/task/manager.go index ee4d2d749..a9ecb71b1 100644 --- a/internal/task/manager.go +++ b/internal/task/manager.go @@ -70,6 +70,10 @@ const ( // Option customizes Service construction. type Option func(*managerOptions) +// CompletionContractRootResolver resolves the filesystem root used for relative +// completion-contract artifact paths. +type CompletionContractRootResolver func(ctx context.Context, task Task, run Run) (string, error) + type managerOptions struct { store Store sessions SessionExecutor @@ -81,6 +85,7 @@ type managerOptions struct { channelValidator func(string) error profileValidation ExecutionProfileValidationOptions forceRecovery ForceRecoveryOptions + contractRoot CompletionContractRootResolver now func() time.Time newID func(prefix string) string cancelGracePeriod time.Duration @@ -100,6 +105,7 @@ type Service struct { channelValidator func(string) error profileValidation ExecutionProfileValidationOptions forceRecovery ForceRecoveryOptions + contractRoot CompletionContractRootResolver now func() time.Time newID func(prefix string) string cancelGracePeriod time.Duration @@ -186,6 +192,14 @@ func WithForceRecoveryOptions(options ForceRecoveryOptions) Option { } } +// WithCompletionContractRootResolver injects workspace-root resolution for +// relative completion-contract artifact paths. +func WithCompletionContractRootResolver(resolver CompletionContractRootResolver) Option { + return func(opts *managerOptions) { + opts.contractRoot = resolver + } +} + // WithManagerNow overrides the manager clock for deterministic tests. func WithManagerNow(now func() time.Time) Option { return func(opts *managerOptions) { @@ -254,6 +268,7 @@ func NewManager(opts ...Option) (*Service, error) { channelValidator: options.channelValidator, profileValidation: options.profileValidation, forceRecovery: normalizeForceRecoveryOptions(options.forceRecovery), + contractRoot: options.contractRoot, now: options.now, newID: options.newID, cancelGracePeriod: options.cancelGracePeriod, @@ -1979,6 +1994,9 @@ func (m *Service) CompleteRun( if err := requireRunTransition(run, TaskRunStatusCompleted); err != nil { return nil, err } + if err := m.validateCompletionContract(ctx, taskRecord, run); err != nil { + return nil, err + } run.Status = TaskRunStatusCompleted run.Result = cloneRawJSON(normalizedResult.Value) @@ -2821,6 +2839,39 @@ func taskStatusFromSnapshot(currentStatus Status, unresolvedDependencies bool, r return taskStatusFromPolicySnapshot(currentStatus, unresolvedDependencies, false, false, runs) } +type taskRunPolicySnapshot struct { + hasQueuedOrClaimed bool + hasNeedsAttention bool + hasInProgress bool + hasLatestTerminal bool + latestTerminal Run + latestNeedsAttention Run +} + +func summarizeTaskRunPolicySnapshot(runs []Run) taskRunPolicySnapshot { + var snapshot taskRunPolicySnapshot + for idx := range runs { + run := runs[idx] + switch run.Status.Normalize() { + case TaskRunStatusStarting, TaskRunStatusRunning: + snapshot.hasInProgress = true + case TaskRunStatusQueued, TaskRunStatusClaimed: + snapshot.hasQueuedOrClaimed = true + case TaskRunStatusNeedsAttention: + snapshot.hasNeedsAttention = true + if snapshot.latestNeedsAttention.ID == "" || runComesAfter(run, snapshot.latestNeedsAttention) { + snapshot.latestNeedsAttention = run + } + case TaskRunStatusCompleted, TaskRunStatusFailed, TaskRunStatusCanceled: + if !snapshot.hasLatestTerminal || runComesAfter(run, snapshot.latestTerminal) { + snapshot.latestTerminal = run + snapshot.hasLatestTerminal = true + } + } + } + return snapshot +} + func taskStatusFromPolicySnapshot( currentStatus Status, unresolvedDependencies bool, @@ -2834,33 +2885,25 @@ func taskStatusFromPolicySnapshot( } runnableBlocked := unresolvedDependencies || approvalBlocked - hasQueuedOrClaimed := false - var latestTerminal Run - hasLatestTerminal := false - for idx := range runs { - run := runs[idx] - switch run.Status.Normalize() { - case TaskRunStatusStarting, TaskRunStatusRunning: - return TaskStatusInProgress - case TaskRunStatusQueued, TaskRunStatusClaimed: - hasQueuedOrClaimed = true - case TaskRunStatusCompleted, TaskRunStatusFailed, TaskRunStatusCanceled: - if !hasLatestTerminal || runComesAfter(run, latestTerminal) { - latestTerminal = run - hasLatestTerminal = true - } - } + snapshot := summarizeTaskRunPolicySnapshot(runs) + if snapshot.hasInProgress { + return TaskStatusInProgress + } + + if snapshot.hasNeedsAttention && + (!snapshot.hasLatestTerminal || runComesAfter(snapshot.latestNeedsAttention, snapshot.latestTerminal)) { + return TaskStatusBlocked } - if hasQueuedOrClaimed { + if snapshot.hasQueuedOrClaimed { if runnableBlocked { return TaskStatusBlocked } return TaskStatusReady } - if hasLatestTerminal { - switch latestTerminal.Status.Normalize() { + if snapshot.hasLatestTerminal { + switch snapshot.latestTerminal.Status.Normalize() { case TaskRunStatusCompleted: return TaskStatusCompleted case TaskRunStatusFailed: diff --git a/internal/task/manager_test.go b/internal/task/manager_test.go index df97eb04f..0051383ab 100644 --- a/internal/task/manager_test.go +++ b/internal/task/manager_test.go @@ -967,6 +967,30 @@ func (s *inMemoryManagerStore) ReleaseRunLease(_ context.Context, release LeaseR return cloneTaskRun(run), nil } +func (s *inMemoryManagerStore) BlockRunLease(_ context.Context, block LeaseBlock) (Run, error) { + normalized, err := block.Normalize(time.Now().UTC()) + if err != nil { + return Run{}, err + } + run, err := s.requireCurrentTestLease(normalized.RunID, normalized.ClaimToken, normalized.Now) + if err != nil { + return Run{}, err + } + run.Status = TaskRunStatusNeedsAttention + run.ClaimedBy = nil + run.SessionID = "" + run.ClaimToken = "" + run.ClaimTokenHash = "" + run.LeaseUntil = time.Time{} + run.HeartbeatAt = time.Time{} + run.ClaimedAt = time.Time{} + run.EndedAt = time.Time{} + run.Error = strings.TrimSpace(normalized.Reason) + run.Result = nil + s.runs[run.ID] = cloneTaskRun(run) + return cloneTaskRun(run), nil +} + func (s *inMemoryManagerStore) CompleteRunLease(_ context.Context, completion LeaseCompletion) (Run, error) { normalized, err := completion.Normalize(time.Now().UTC()) if err != nil { @@ -5795,6 +5819,33 @@ func TestTaskStatusFromSnapshot(t *testing.T) { }, want: TaskStatusCompleted, }, + { + name: "older needs_attention run does not mask latest terminal run", + currentStatus: TaskStatusBlocked, + runs: []Run{ + {ID: "run-1", Status: TaskRunStatusNeedsAttention, Attempt: 1, QueuedAt: base}, + {ID: "run-2", Status: TaskRunStatusCompleted, Attempt: 2, QueuedAt: base.Add(time.Second)}, + }, + want: TaskStatusCompleted, + }, + { + name: "latest needs_attention run blocks older terminal run", + currentStatus: TaskStatusReady, + runs: []Run{ + {ID: "run-1", Status: TaskRunStatusCompleted, Attempt: 1, QueuedAt: base}, + {ID: "run-2", Status: TaskRunStatusNeedsAttention, Attempt: 2, QueuedAt: base.Add(time.Second)}, + }, + want: TaskStatusBlocked, + }, + { + name: "needs_attention run parks task ahead of queued sibling", + currentStatus: TaskStatusReady, + runs: []Run{ + {ID: "run-1", Status: TaskRunStatusQueued, Attempt: 1, QueuedAt: base}, + {ID: "run-2", Status: TaskRunStatusNeedsAttention, Attempt: 2, QueuedAt: base.Add(time.Second)}, + }, + want: TaskStatusBlocked, + }, { name: "no runs with unresolved dependency is blocked", currentStatus: TaskStatusReady, diff --git a/internal/task/types.go b/internal/task/types.go index 6ac5d4831..4c345597a 100644 --- a/internal/task/types.go +++ b/internal/task/types.go @@ -766,6 +766,15 @@ type RecoverRunMutation struct { QueuedAt time.Time `json:"queued_at"` } +// BlockRunLeaseMutation captures one token-fenced transition from an active +// lease into needs_attention when the run is blocked on external input. +type BlockRunLeaseMutation struct { + RunID string `json:"run_id"` + ClaimToken string `json:"claim_token"` + Reason string `json:"reason,omitempty"` + Now time.Time `json:"now"` +} + // RunStarvation is the durable per-run escalation budget the scheduler advances each cycle a // claimable run stays queued past the starvation threshold. It survives daemon restart so the // tier ladder resumes from the persisted count rather than restarting on every Rebuild. diff --git a/internal/tools/builtin/autonomy.go b/internal/tools/builtin/autonomy.go index 7590b1368..171adeba3 100644 --- a/internal/tools/builtin/autonomy.go +++ b/internal/tools/builtin/autonomy.go @@ -79,6 +79,20 @@ var autonomyTools = []toolspkg.Descriptor{ []string{autonomyAutonomyKey, tasksTasksKey, autonomyLeasesKey}, []string{"release task run", "handoff work"}, ), + nativeDescriptor( + toolspkg.ToolIDTaskRunBlock, + "task_run_block", + "Task Run Block", + "Park the caller session's active task-run lease in needs_attention.", + autonomyBlockInputSchema, + toolspkg.RiskMutating, + false, + false, + false, + []toolspkg.ToolsetID{toolspkg.ToolsetIDAutonomy}, + []string{autonomyAutonomyKey, tasksTasksKey, autonomyLeasesKey}, + []string{"block task run", "needs attention", "human blocked"}, + ), nativeDescriptor( toolspkg.ToolIDTaskRunReviewSubmit, "submit_run_review", @@ -151,6 +165,16 @@ const autonomyReleaseInputSchema = `{ "additionalProperties":false }` +const autonomyBlockInputSchema = `{ + "type":"object", + "required":["run_id","reason"], + "properties":{ + "run_id":{"type":"string"}, + "reason":{"type":"string"} + }, + "additionalProperties":false +}` + const autonomySubmitRunReviewInputSchema = `{ "type":"object", "required":["review_id","run_id","outcome","confidence","reason","missing_work","next_round_guidance","delivery_id"], diff --git a/internal/tools/builtin/builtin_test.go b/internal/tools/builtin/builtin_test.go index eb849f6a8..810b0aef4 100644 --- a/internal/tools/builtin/builtin_test.go +++ b/internal/tools/builtin/builtin_test.go @@ -120,6 +120,7 @@ func TestBuiltinNativeDescriptors(t *testing.T) { toolspkg.ToolIDTaskRunComplete, toolspkg.ToolIDTaskRunFail, toolspkg.ToolIDTaskRunRelease, + toolspkg.ToolIDTaskRunBlock, toolspkg.ToolIDTaskRunReviewSubmit, toolspkg.ToolIDConfigShow, toolspkg.ToolIDConfigList, @@ -525,6 +526,7 @@ func TestBuiltinNativeDescriptors(t *testing.T) { ) requireDescriptorRisk(t, descriptors[toolspkg.ToolIDTaskRunFail], toolspkg.RiskMutating, false, false, false) requireDescriptorRisk(t, descriptors[toolspkg.ToolIDTaskRunRelease], toolspkg.RiskMutating, false, false, false) + requireDescriptorRisk(t, descriptors[toolspkg.ToolIDTaskRunBlock], toolspkg.RiskMutating, false, false, false) requireDescriptorRisk( t, descriptors[toolspkg.ToolIDTaskRunReviewSubmit], @@ -829,6 +831,7 @@ func TestBuiltinToolsetCatalog(t *testing.T) { t.Fatalf("Expand(autonomy) error = %v", err) } if want := []toolspkg.ToolID{ + toolspkg.ToolIDTaskRunBlock, toolspkg.ToolIDTaskRunClaimNext, toolspkg.ToolIDTaskRunComplete, toolspkg.ToolIDTaskRunFail, diff --git a/internal/tools/builtin/testdata/native-tool-catalog.json b/internal/tools/builtin/testdata/native-tool-catalog.json index 403c09aec..1d18e55bf 100644 --- a/internal/tools/builtin/testdata/native-tool-catalog.json +++ b/internal/tools/builtin/testdata/native-tool-catalog.json @@ -1459,6 +1459,17 @@ "input_schema_digest": "431e26a88d577077723820e7389f30b8b2847662b288660765f727c5d85a2a37", "output_schema_digest": "a2c799262a3ce3c19ef5cdd983bf3d12b43ab3c426227091b909dcb7054738c0" }, + { + "tool_id": "agh__task_run_block", + "native_name": "task_run_block", + "toolsets": ["agh__autonomy"], + "risk": "mutating", + "read_only": false, + "destructive": false, + "open_world": false, + "input_schema_digest": "eb7f5aa18f88eb3c936c794640646b6c24dc72c0d3af6f65c44f4efccec768e9", + "output_schema_digest": "a2c799262a3ce3c19ef5cdd983bf3d12b43ab3c426227091b909dcb7054738c0" + }, { "tool_id": "agh__task_run_claim_next", "native_name": "task_run_claim_next", diff --git a/internal/tools/builtin/toolsets.go b/internal/tools/builtin/toolsets.go index 85ca4afb2..cc6db3472 100644 --- a/internal/tools/builtin/toolsets.go +++ b/internal/tools/builtin/toolsets.go @@ -164,6 +164,7 @@ var builtinToolsets = []toolspkg.Toolset{ toolspkg.ToolIDTaskRunComplete.String(), toolspkg.ToolIDTaskRunFail.String(), toolspkg.ToolIDTaskRunRelease.String(), + toolspkg.ToolIDTaskRunBlock.String(), toolspkg.ToolIDTaskRunReviewSubmit.String(), }, }, diff --git a/internal/tools/builtin_ids.go b/internal/tools/builtin_ids.go index 9d96c1ccc..f398ff642 100644 --- a/internal/tools/builtin_ids.go +++ b/internal/tools/builtin_ids.go @@ -196,6 +196,8 @@ const ( ToolIDTaskRunFail ToolID = "agh__task_run_fail" // ToolIDTaskRunRelease releases the caller session's active run lease. ToolIDTaskRunRelease ToolID = "agh__task_run_release" + // ToolIDTaskRunBlock parks the caller session's active run lease in needs_attention. + ToolIDTaskRunBlock ToolID = "agh__task_run_block" // ToolIDTaskRunReviewSubmit submits the caller session's bound task-run review verdict. ToolIDTaskRunReviewSubmit ToolID = "agh__task_run_review_submit" // ToolIDConfigShow shows the redacted effective config. diff --git a/packages/site/content/runtime/core/configuration/config-toml.mdx b/packages/site/content/runtime/core/configuration/config-toml.mdx index 060489d42..c6e408154 100644 --- a/packages/site/content/runtime/core/configuration/config-toml.mdx +++ b/packages/site/content/runtime/core/configuration/config-toml.mdx @@ -1240,15 +1240,15 @@ run-enqueue APIs are the execution boundary. Coordinator launch only applies to workspace-scoped coordinated runs with a stable `coordination_channel_id`. Global runs and intent-only tasks do not start coordinators in the MVP. -| Field | Type | Default | Valid values | Description | -| ----------------------------------- | -------- | ------------- | ----------------------------------------- | -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | -| `enabled` | boolean | `false` | `true` or `false` | Allows coordinator bootstrap after a coordinated workspace run is enqueued. | -| `agent_name` | string | `coordinator` | Non-empty when enabled. | Agent definition used for managed coordinator sessions. | -| `provider` | string | empty | Empty or a configured provider key. | Optional provider override for the coordinator agent. | -| `model` | string | empty | Empty or provider-supported model string. | Optional model override. If set, `provider` or provider default resolution must exist. | -| `default_ttl` | duration | `2h` | `1m` through `24h`. | Lifetime assigned to managed coordinator sessions. | -| `max_children` | integer | `5` | `1` through `5`. | Maximum safe-spawn children a coordinator may hold at once. | -| `max_active_sessions_per_workspace` | integer | `5` | Positive integer. | Caps concurrent autonomy-managed sessions (coordinator + spawned workers) per workspace. Coordinator uniqueness is enforced by the daemon singleton check. | +| Field | Type | Default | Valid values | Description | +| ----------------------------------- | -------- | ------------- | ----------------------------------------- | ---------------------------------------------------------------------------------------------------------------------------------------------------------- | +| `enabled` | boolean | `false` | `true` or `false` | Allows coordinator bootstrap after a coordinated workspace run is enqueued. | +| `agent_name` | string | `coordinator` | Non-empty when enabled. | Agent definition used for managed coordinator sessions. | +| `provider` | string | empty | Empty or a configured provider key. | Optional provider override for the coordinator agent. | +| `model` | string | empty | Empty or provider-supported model string. | Optional model override. If set, `provider` or provider default resolution must exist. | +| `default_ttl` | duration | `2h` | `1m` through `24h`. | Lifetime assigned to managed coordinator sessions. | +| `max_children` | integer | `5` | `1` through `5`. | Maximum safe-spawn children a coordinator may hold at once. | +| `max_active_sessions_per_workspace` | integer | `5` | Positive integer. | Caps concurrent autonomy-managed sessions (coordinator + spawned workers) per workspace. Coordinator uniqueness is enforced by the daemon singleton check. | Provider and model resolution is intentionally narrow. The daemon applies: