Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions flocks/server/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,24 @@ async def _delayed_kafka_start() -> None:
except Exception as e:
log.warning("kafka.manager.start_failed", {"error": str(e)})

# Start workflow pollers for workflows with poller enabled.
# Mirrors Kafka/syslog startup so persistent slow-path workflows resume
# automatically without delaying server readiness.
try:
from flocks.workflow.poller_manager import default_manager as default_poller_manager

async def _delayed_poller_start() -> None:
await asyncio.sleep(3)
try:
await default_poller_manager.start_all()
log.info("workflow.poller.started")
except Exception as exc:
log.warning("workflow.poller.start_failed", {"error": str(exc)})

_schedule_startup_phase(app, log, "workflow.poller.start", _delayed_poller_start)
except Exception as e:
log.warning("workflow.poller.start_failed", {"error": str(e)})

try:
from flocks.updater.updater import recover_upgrade_state

Expand Down
85 changes: 85 additions & 0 deletions flocks/server/routes/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -1416,6 +1416,16 @@ class KafkaConfigRequest(BaseModel):
outputTopic: Optional[str] = None


class WorkflowPollerConfigRequest(BaseModel):
"""Per-workflow background poller configuration."""

enabled: bool = False
intervalSeconds: int = Field(30, ge=1)
timeoutSeconds: int = Field(7200, ge=1)
noOverlap: bool = True
inputs: Dict[str, Any] = Field(default_factory=dict)


class SyslogConfigRequest(BaseModel):
"""Per-workflow syslog listener configuration (experimental)."""

Expand Down Expand Up @@ -1657,6 +1667,81 @@ async def get_kafka_status(workflow_id: str):
raise HTTPException(status_code=500, detail=f"Failed to get Kafka status: {str(e)}")


@router.post("/workflow/{workflow_id}/poller-config")
async def save_workflow_poller_config(workflow_id: str, req: WorkflowPollerConfigRequest):
"""Save background poller configuration for a workflow."""
try:
if not _read_workflow_from_fs(workflow_id):
raise HTTPException(status_code=404, detail=f"Workflow not found: {workflow_id}")

config = {
"workflowId": workflow_id,
"enabled": req.enabled,
"intervalSeconds": req.intervalSeconds,
"timeoutSeconds": req.timeoutSeconds,
"noOverlap": req.noOverlap,
"inputs": req.inputs,
"updatedAt": int(time.time() * 1000),
}
await Storage.write(f"workflow_poller_config/{workflow_id}", config)

from flocks.workflow.poller_manager import default_manager as _poller_default_manager

poller_status = await _poller_default_manager.restart_workflow(workflow_id)
if req.enabled and (poller_status or {}).get("state") == "failed":
err = (poller_status or {}).get("error") or "poller_start_failed"
raise HTTPException(
status_code=409,
detail=f"Workflow poller failed to start: {err}",
)
return {"ok": True, "status": poller_status}
except HTTPException:
raise
except Exception as e:
log.error("workflow.poller_config.save.error", {"id": workflow_id, "error": str(e)})
raise HTTPException(status_code=500, detail=f"Failed to save poller config: {str(e)}")


@router.get("/workflow/{workflow_id}/poller-config")
async def get_workflow_poller_config(workflow_id: str):
"""Get saved poller configuration for a workflow."""
try:
return await Storage.read(f"workflow_poller_config/{workflow_id}")
except Exception as e:
log.error("workflow.poller_config.get.error", {"id": workflow_id, "error": str(e)})
raise HTTPException(status_code=500, detail=f"Failed to get poller config: {str(e)}")


@router.get("/workflow/{workflow_id}/poller-status")
async def get_workflow_poller_status(workflow_id: str):
"""Return the runtime status of a workflow poller."""
try:
from flocks.workflow.poller_manager import default_manager as _poller_default_manager

return _poller_default_manager.get_status(workflow_id)
except Exception as e:
log.error("workflow.poller_status.get.error", {"id": workflow_id, "error": str(e)})
raise HTTPException(status_code=500, detail=f"Failed to get poller status: {str(e)}")


@router.post("/workflow/{workflow_id}/poller-run-once")
async def run_workflow_poller_once(workflow_id: str):
"""Trigger one immediate poller execution for a workflow."""
try:
if not _read_workflow_from_fs(workflow_id):
raise HTTPException(status_code=404, detail=f"Workflow not found: {workflow_id}")

from flocks.workflow.poller_manager import default_manager as _poller_default_manager

poller_status = await _poller_default_manager.run_once(workflow_id)
return {"ok": True, "status": poller_status}
except HTTPException:
raise
except Exception as e:
log.error("workflow.poller_run_once.error", {"id": workflow_id, "error": str(e)})
raise HTTPException(status_code=500, detail=f"Failed to run workflow poller once: {str(e)}")


@router.post("/workflow/{workflow_id}/syslog-config")
async def save_syslog_config(workflow_id: str, req: SyslogConfigRequest):
"""
Expand Down
Loading