Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions internal/api/contract/agents.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,11 @@ type AgentTaskReleaseRequest struct {
Reason string `json:"reason,omitempty"`
}

// AgentTaskBlockRequest parks the caller session's claimed task run in needs_attention.
type AgentTaskBlockRequest struct {
Reason string `json:"reason,omitempty"`
}

// CoordinationMessageMetadataPayload carries typed task/run correlation for channel messages.
type CoordinationMessageMetadataPayload struct {
TaskID string `json:"task_id"`
Expand Down
36 changes: 36 additions & 0 deletions internal/api/core/agent_tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ const (
agentTaskActionComplete = "agent.task.complete"
agentTaskActionFail = "agent.task.fail"
agentTaskActionRelease = "agent.task.release"
agentTaskActionBlock = "agent.task.block"
)

type agentSoulClaimLocker interface {
Expand Down Expand Up @@ -217,6 +218,41 @@ func (h *BaseHandlers) AgentTaskRelease(c *gin.Context) {
c.JSON(http.StatusOK, contract.AgentTaskLeaseResponse{Lease: AgentTaskLeasePayloadFromRun(run, nil)})
}

// AgentTaskBlock parks one claimed task run in needs_attention.
func (h *BaseHandlers) AgentTaskBlock(c *gin.Context) {
manager, caller, runID, ok := h.agentTaskLeaseMutationSetup(c, agentTaskActionBlock)
if !ok {
Comment thread
richardfogaca marked this conversation as resolved.
return
}

var req contract.AgentTaskBlockRequest
if err := c.ShouldBindJSON(&req); err != nil {
h.respondError(
c,
http.StatusBadRequest,
NewTaskValidationError(fmt.Errorf("%s: decode agent task block request: %w", h.transportName(), err)),
)
return
}

handle, err := h.lookupAgentTaskLease(c.Request.Context(), manager, caller, runID)
if err != nil {
h.respondError(c, statusForAgentTaskError(err), err)
return
}
run, err := manager.BlockRunLease(c.Request.Context(), taskpkg.LeaseBlock{
RunID: runID,
ClaimToken: handle.ClaimToken,
Reason: req.Reason,
}, caller.Actor)
if err != nil {
h.respondError(c, statusForAgentTaskError(err), err)
return
}

c.JSON(http.StatusOK, contract.AgentTaskLeaseResponse{Lease: AgentTaskLeasePayloadFromRun(run, nil)})
}

// AgentTaskComplete completes one claimed task run after token verification.
func (h *BaseHandlers) AgentTaskComplete(c *gin.Context) {
manager, caller, runID, ok := h.agentTaskLeaseMutationSetup(c, agentTaskActionComplete)
Expand Down
12 changes: 12 additions & 0 deletions internal/api/testutil/task_stub.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ type StubTaskManager struct {
AttachRunSessionFn func(context.Context, string, string, taskpkg.ActorContext) (*taskpkg.Run, error)
HeartbeatRunLeaseFn func(context.Context, taskpkg.LeaseHeartbeat, taskpkg.ActorContext) (*taskpkg.Run, error)
ReleaseRunLeaseFn func(context.Context, taskpkg.LeaseRelease, taskpkg.ActorContext) (*taskpkg.Run, error)
BlockRunLeaseFn func(context.Context, taskpkg.LeaseBlock, taskpkg.ActorContext) (*taskpkg.Run, error)
ForceReleaseRunFn forceReleaseRunFunc
ForceFailRunFn forceFailRunFunc
RetryRunFn retryRunFunc
Expand Down Expand Up @@ -546,6 +547,17 @@ func (s StubTaskManager) ReleaseRunLease(
return nil, taskpkg.ErrTaskRunNotFound
}

func (s StubTaskManager) BlockRunLease(
ctx context.Context,
block taskpkg.LeaseBlock,
actor taskpkg.ActorContext,
) (*taskpkg.Run, error) {
if s.BlockRunLeaseFn != nil {
return s.BlockRunLeaseFn(ctx, block, actor)
}
return nil, taskpkg.ErrTaskRunNotFound
}

func (s StubTaskManager) ForceReleaseRun(
ctx context.Context,
runID string,
Expand Down
2 changes: 2 additions & 0 deletions internal/api/udsapi/handlers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,7 @@ func TestRegisterRoutesCoversTechSpecEndpoints(t *testing.T) {
"POST /api/agent/channels/reply",
"POST /api/agent/soul/validate",
"POST /api/agent/spawn",
"POST /api/agent/tasks/:run_id/block",
"POST /api/agent/tasks/:run_id/complete",
"POST /api/agent/tasks/:run_id/fail",
"POST /api/agent/tasks/:run_id/heartbeat",
Expand Down Expand Up @@ -840,6 +841,7 @@ func TestRegisterTaskRoutesUseSharedHandlerBindings(t *testing.T) {
"GET /api/agent/me": "AgentMe",
"POST /api/agent/channels/:channel/send": "AgentChannelSend",
"POST /api/agent/channels/reply": "AgentChannelReply",
"POST /api/agent/tasks/:run_id/block": "AgentTaskBlock",
"POST /api/agent/tasks/:run_id/complete": "AgentTaskComplete",
"POST /api/agent/tasks/:run_id/fail": "AgentTaskFail",
"POST /api/agent/tasks/:run_id/heartbeat": "AgentTaskHeartbeat",
Expand Down
1 change: 1 addition & 0 deletions internal/api/udsapi/routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ func registerAgentKernelRoutes(api gin.IRouter, handlers *Handlers) {
tasks.POST("/:run_id/complete", handlers.AgentTaskComplete)
tasks.POST("/:run_id/fail", handlers.AgentTaskFail)
tasks.POST("/:run_id/release", handlers.AgentTaskRelease)
tasks.POST("/:run_id/block", handlers.AgentTaskBlock)
}
}
}
Expand Down
18 changes: 18 additions & 0 deletions internal/cli/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -482,6 +482,12 @@ type DaemonClient interface {
request AgentTaskFailRequest,
credentials agentidentity.Credentials,
) (AgentTaskLeaseRecord, error)
AgentTaskBlock(
ctx context.Context,
runID string,
request AgentTaskBlockRequest,
credentials agentidentity.Credentials,
) (AgentTaskLeaseRecord, error)
AgentTaskRelease(
ctx context.Context,
runID string,
Expand Down Expand Up @@ -1059,6 +1065,9 @@ type AgentTaskCompleteRequest = contract.AgentTaskCompleteRequest
// AgentTaskFailRequest captures one agent lease failure request.
type AgentTaskFailRequest = contract.AgentTaskFailRequest

// AgentTaskBlockRequest captures one agent lease block request.
type AgentTaskBlockRequest = contract.AgentTaskBlockRequest

// AgentTaskReleaseRequest captures one agent lease release request.
type AgentTaskReleaseRequest = contract.AgentTaskReleaseRequest

Expand Down Expand Up @@ -4778,6 +4787,15 @@ func (c *unixSocketClient) AgentTaskFail(
return c.agentTaskLeaseAction(ctx, strings.TrimSpace(runID), "fail", request, credentials)
}

func (c *unixSocketClient) AgentTaskBlock(
ctx context.Context,
runID string,
request AgentTaskBlockRequest,
credentials agentidentity.Credentials,
) (AgentTaskLeaseRecord, error) {
return c.agentTaskLeaseAction(ctx, strings.TrimSpace(runID), "block", request, credentials)
}

func (c *unixSocketClient) AgentTaskRelease(
ctx context.Context,
runID string,
Expand Down
13 changes: 13 additions & 0 deletions internal/cli/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,7 @@ type stubClient struct {
agentTaskHeartbeatFn func(context.Context, string, AgentTaskHeartbeatRequest, agentidentity.Credentials) (AgentTaskLeaseRecord, error)
agentTaskCompleteFn func(context.Context, string, AgentTaskCompleteRequest, agentidentity.Credentials) (AgentTaskLeaseRecord, error)
agentTaskFailFn func(context.Context, string, AgentTaskFailRequest, agentidentity.Credentials) (AgentTaskLeaseRecord, error)
agentTaskBlockFn func(context.Context, string, AgentTaskBlockRequest, agentidentity.Credentials) (AgentTaskLeaseRecord, error)
agentTaskReleaseFn func(context.Context, string, AgentTaskReleaseRequest, agentidentity.Credentials) (AgentTaskLeaseRecord, error)
}

Expand Down Expand Up @@ -2641,6 +2642,18 @@ func (s *stubClient) AgentTaskFail(
return AgentTaskLeaseRecord{}, errors.New("unexpected AgentTaskFail call")
}

func (s *stubClient) AgentTaskBlock(
ctx context.Context,
runID string,
request AgentTaskBlockRequest,
credentials agentidentity.Credentials,
) (AgentTaskLeaseRecord, error) {
if s.agentTaskBlockFn != nil {
return s.agentTaskBlockFn(ctx, runID, request, credentials)
}
return AgentTaskLeaseRecord{}, errors.New("unexpected AgentTaskBlock call")
}

func (s *stubClient) AgentTaskRelease(
ctx context.Context,
runID string,
Expand Down
45 changes: 45 additions & 0 deletions internal/cli/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,7 @@ func newTaskCommand(deps commandDeps) *cobra.Command {
cmd.AddCommand(newTaskHeartbeatCommand(deps))
cmd.AddCommand(newTaskCompleteCommand(deps))
cmd.AddCommand(newTaskFailCommand(deps))
cmd.AddCommand(newTaskBlockCommand(deps))
cmd.AddCommand(newTaskReleaseCommand(deps))
cmd.AddCommand(newTaskRetryCommand(deps))
cmd.AddCommand(newTaskRecoverCommand(deps))
Expand Down Expand Up @@ -1441,6 +1442,50 @@ func newTaskFailCommand(deps commandDeps) *cobra.Command {
return cmd
}

func newTaskBlockCommand(deps commandDeps) *cobra.Command {
var reason string

cmd := &cobra.Command{
Use: "block <run-id>",
Short: "Park a claimed task run in needs_attention for human input",
Args: cobra.ExactArgs(1),
Example: ` # Park the current session's claimed run with a specific blocker
agh task block run-123 --reason "Need product approval for the destructive migration"`,
RunE: func(cmd *cobra.Command, args []string) error {
runID, err := requiredAgentTaskRunID(args[0])
if err != nil {
return err
}
request := AgentTaskBlockRequest{Reason: strings.TrimSpace(reason)}
if request.Reason == "" {
return errors.New("cli: --reason is required")
}

client, err := clientFromDeps(deps)
if err != nil {
return err
}
credentials, err := requireAgentCommandIdentity(
cmd.Context(),
deps,
client,
agentActionCLI("task.block"),
)
if err != nil {
return err
}
record, err := client.AgentTaskBlock(cmd.Context(), runID, request, credentials)
if err != nil {
return err
}
return writeCommandOutput(cmd, agentTaskLeaseBundle(record))
},
}
cmd.Flags().StringVar(&reason, "reason", "", "Specific human-facing blocker or question")
mustMarkFlagRequired(cmd, "reason")
return cmd
}

type taskFailCommandFlags struct {
reason string
errorMessage string
Expand Down
41 changes: 41 additions & 0 deletions internal/cli/task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -880,6 +880,38 @@ func TestAgentTaskCommandsMapLeaseRequests(t *testing.T) {
}
},
},
{
name: "Should map task block request",
args: []string{
"task",
"block",
"run-1",
"--reason",
"Need approval for the production migration",
"-o",
"json",
},
fn: func(t *testing.T) *stubClient {
t.Helper()
return &stubClient{
agentTaskBlockFn: func(
ctx context.Context,
runID string,
request AgentTaskBlockRequest,
credentials agentidentity.Credentials,
) (AgentTaskLeaseRecord, error) {
if ctx == nil {
t.Fatal("AgentTaskBlock context is nil")
}
assertAgentCredentials(t, credentials)
if runID != "run-1" || request.Reason != "Need approval for the production migration" {
t.Fatalf("block runID=%q request=%#v, want run-1 reason", runID, request)
}
return agentTaskLeaseRecord(taskpkg.TaskRunStatusNeedsAttention), nil
},
}
},
},
} {
t.Run(tt.name, func(t *testing.T) {
t.Parallel()
Expand Down Expand Up @@ -1115,6 +1147,11 @@ func TestAgentTaskCommandsValidateBeforeAgentCalls(t *testing.T) {
args: []string{"task", "next", "--priority-min", "-1", "-o", "json"},
wantErr: "--priority-min must be zero or positive",
},
{
name: "Should reject block without reason",
args: []string{"task", "block", "run-1", "-o", "json"},
wantErr: `required flag(s) "reason" not set`,
},
{
name: "Should reject invalid result json",
args: []string{
Expand Down Expand Up @@ -1183,6 +1220,10 @@ func TestAgentTaskCommandsValidateBeforeAgentCalls(t *testing.T) {
t.Fatal("AgentTaskFail should not be called for local validation errors")
return AgentTaskLeaseRecord{}, nil
},
agentTaskBlockFn: func(context.Context, string, AgentTaskBlockRequest, agentidentity.Credentials) (AgentTaskLeaseRecord, error) {
t.Fatal("AgentTaskBlock should not be called for local validation errors")
return AgentTaskLeaseRecord{}, nil
},
agentTaskReleaseFn: func(context.Context, string, AgentTaskReleaseRequest, agentidentity.Credentials) (AgentTaskLeaseRecord, error) {
t.Fatal("AgentTaskRelease should not be called for local validation errors")
return AgentTaskLeaseRecord{}, nil
Expand Down
3 changes: 2 additions & 1 deletion internal/coordinator/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ var (
toolspkg.ToolIDTaskRunComplete.String(),
toolspkg.ToolIDTaskRunFail.String(),
toolspkg.ToolIDTaskRunRelease.String(),
toolspkg.ToolIDTaskRunBlock.String(),
Comment thread
richardfogaca marked this conversation as resolved.
toolspkg.ToolIDTaskCreate.String(),
}
)
Expand Down Expand Up @@ -244,7 +245,7 @@ func PromptOverlay(input PromptInput) string {
b.WriteString("\nUse public AGH agent APIs only:\n")
b.WriteString("- `agh me context` for the Situation Surface.\n")
b.WriteString("- `agh task create` to persist follow-up task intent.\n")
b.WriteString("- `agh task next|heartbeat|complete|fail|release` for task ownership and terminal status.\n")
b.WriteString("- `agh task next|heartbeat|complete|fail|release|block` for task ownership and terminal status.\n")
b.WriteString("- `agh ch list|recv|send|reply` for operational worker communication.\n")
b.WriteString("- `agh spawn` for bounded worker delegation.\n")
b.WriteString("\nCreating a task only records follow-up intent. The current coordinator run is the active ")
Expand Down
2 changes: 1 addition & 1 deletion internal/coordinator/coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ func TestPromptOverlayUsesPublicAPIsAndRunChannel(t *testing.T) {
for _, required := range []string{
"agh me context",
"agh task create",
"agh task next|heartbeat|complete|fail|release",
"agh task next|heartbeat|complete|fail|release|block",
"agh ch list|recv|send|reply",
"agh spawn",
"The current coordinator run is the active execution boundary",
Expand Down
7 changes: 7 additions & 0 deletions internal/daemon/daemon_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6184,6 +6184,13 @@ func (r *recordingRegistry) ReleaseRunLease(
return taskpkg.Run{}, taskpkg.ErrTaskRunNotFound
}

func (r *recordingRegistry) BlockRunLease(
context.Context,
taskpkg.LeaseBlock,
) (taskpkg.Run, error) {
return taskpkg.Run{}, taskpkg.ErrTaskRunNotFound
}

func (r *recordingRegistry) ForceReleaseTaskRun(
context.Context,
taskpkg.ForceReleaseRunMutation,
Expand Down
Loading