From 7b1bb8a7864a2add1df654823bdb563e91b5b757 Mon Sep 17 00:00:00 2001 From: xiami762 <> Date: Mon, 1 Jun 2026 15:36:16 +0800 Subject: [PATCH 1/3] feat(workflow): add generic background poller with API and WebUI Introduce WorkflowPollerManager for scheduled workflow runs, REST endpoints for config/status/run-once, server startup integration, and Integration tab controls with tests. Co-authored-by: Cursor --- flocks/server/app.py | 18 + flocks/server/routes/workflow.py | 85 ++++ flocks/workflow/poller_manager.py | 427 ++++++++++++++++++ .../routes/test_workflow_poller_routes.py | 133 ++++++ tests/workflow/test_poller_manager.py | 185 ++++++++ webui/src/api/workflow.ts | 48 ++ webui/src/locales/en-US/workflow.json | 24 + webui/src/locales/zh-CN/workflow.json | 24 + .../tabs/IntegrationTab.test.tsx | 135 +++++- .../WorkflowDetail/tabs/IntegrationTab.tsx | 353 +++++++++++++++ 10 files changed, 1431 insertions(+), 1 deletion(-) create mode 100644 flocks/workflow/poller_manager.py create mode 100644 tests/server/routes/test_workflow_poller_routes.py create mode 100644 tests/workflow/test_poller_manager.py diff --git a/flocks/server/app.py b/flocks/server/app.py index b77d00f08..d12c78852 100644 --- a/flocks/server/app.py +++ b/flocks/server/app.py @@ -430,6 +430,24 @@ async def _delayed_kafka_start() -> None: except Exception as e: log.warning("kafka.manager.start_failed", {"error": str(e)}) + # Start workflow pollers for workflows with poller enabled. + # Mirrors Kafka/syslog startup so persistent slow-path workflows resume + # automatically without delaying server readiness. + try: + from flocks.workflow.poller_manager import default_manager as default_poller_manager + + async def _delayed_poller_start() -> None: + await asyncio.sleep(3) + try: + await default_poller_manager.start_all() + log.info("workflow.poller.started") + except Exception as exc: + log.warning("workflow.poller.start_failed", {"error": str(exc)}) + + _schedule_startup_phase(app, log, "workflow.poller.start", _delayed_poller_start) + except Exception as e: + log.warning("workflow.poller.start_failed", {"error": str(e)}) + try: from flocks.updater.updater import recover_upgrade_state diff --git a/flocks/server/routes/workflow.py b/flocks/server/routes/workflow.py index f286b7400..0f098383e 100644 --- a/flocks/server/routes/workflow.py +++ b/flocks/server/routes/workflow.py @@ -1416,6 +1416,16 @@ class KafkaConfigRequest(BaseModel): outputTopic: Optional[str] = None +class WorkflowPollerConfigRequest(BaseModel): + """Per-workflow background poller configuration.""" + + enabled: bool = False + intervalSeconds: int = Field(30, ge=1) + timeoutSeconds: int = Field(7200, ge=1) + noOverlap: bool = True + inputs: Dict[str, Any] = Field(default_factory=dict) + + class SyslogConfigRequest(BaseModel): """Per-workflow syslog listener configuration (experimental).""" @@ -1657,6 +1667,81 @@ async def get_kafka_status(workflow_id: str): raise HTTPException(status_code=500, detail=f"Failed to get Kafka status: {str(e)}") +@router.post("/workflow/{workflow_id}/poller-config") +async def save_workflow_poller_config(workflow_id: str, req: WorkflowPollerConfigRequest): + """Save background poller configuration for a workflow.""" + try: + if not _read_workflow_from_fs(workflow_id): + raise HTTPException(status_code=404, detail=f"Workflow not found: {workflow_id}") + + config = { + "workflowId": workflow_id, + "enabled": req.enabled, + "intervalSeconds": req.intervalSeconds, + "timeoutSeconds": req.timeoutSeconds, + "noOverlap": req.noOverlap, + "inputs": req.inputs, + "updatedAt": int(time.time() * 1000), + } + await Storage.write(f"workflow_poller_config/{workflow_id}", config) + + from flocks.workflow.poller_manager import default_manager as _poller_default_manager + + poller_status = await _poller_default_manager.restart_workflow(workflow_id) + if req.enabled and (poller_status or {}).get("state") == "failed": + err = (poller_status or {}).get("error") or "poller_start_failed" + raise HTTPException( + status_code=409, + detail=f"Workflow poller failed to start: {err}", + ) + return {"ok": True, "status": poller_status} + except HTTPException: + raise + except Exception as e: + log.error("workflow.poller_config.save.error", {"id": workflow_id, "error": str(e)}) + raise HTTPException(status_code=500, detail=f"Failed to save poller config: {str(e)}") + + +@router.get("/workflow/{workflow_id}/poller-config") +async def get_workflow_poller_config(workflow_id: str): + """Get saved poller configuration for a workflow.""" + try: + return await Storage.read(f"workflow_poller_config/{workflow_id}") + except Exception as e: + log.error("workflow.poller_config.get.error", {"id": workflow_id, "error": str(e)}) + raise HTTPException(status_code=500, detail=f"Failed to get poller config: {str(e)}") + + +@router.get("/workflow/{workflow_id}/poller-status") +async def get_workflow_poller_status(workflow_id: str): + """Return the runtime status of a workflow poller.""" + try: + from flocks.workflow.poller_manager import default_manager as _poller_default_manager + + return _poller_default_manager.get_status(workflow_id) + except Exception as e: + log.error("workflow.poller_status.get.error", {"id": workflow_id, "error": str(e)}) + raise HTTPException(status_code=500, detail=f"Failed to get poller status: {str(e)}") + + +@router.post("/workflow/{workflow_id}/poller-run-once") +async def run_workflow_poller_once(workflow_id: str): + """Trigger one immediate poller execution for a workflow.""" + try: + if not _read_workflow_from_fs(workflow_id): + raise HTTPException(status_code=404, detail=f"Workflow not found: {workflow_id}") + + from flocks.workflow.poller_manager import default_manager as _poller_default_manager + + poller_status = await _poller_default_manager.run_once(workflow_id) + return {"ok": True, "status": poller_status} + except HTTPException: + raise + except Exception as e: + log.error("workflow.poller_run_once.error", {"id": workflow_id, "error": str(e)}) + raise HTTPException(status_code=500, detail=f"Failed to run workflow poller once: {str(e)}") + + @router.post("/workflow/{workflow_id}/syslog-config") async def save_syslog_config(workflow_id: str, req: SyslogConfigRequest): """ diff --git a/flocks/workflow/poller_manager.py b/flocks/workflow/poller_manager.py new file mode 100644 index 000000000..38a1d1dea --- /dev/null +++ b/flocks/workflow/poller_manager.py @@ -0,0 +1,427 @@ +"""Lifecycle manager for workflow pollers. + +This mirrors the Kafka/syslog managers: one background poller task per workflow +id that periodically triggers ``run_workflow`` with configured inputs. +""" + +from __future__ import annotations + +import asyncio +import threading +import time +import uuid +from datetime import datetime +from typing import Any, Dict + +from flocks.storage.storage import Storage +from flocks.utils.log import Log +from flocks.workflow.fs_store import read_workflow_from_fs +from flocks.workflow.runner import RunWorkflowResult, run_workflow + +WORKFLOW_POLLER_CONFIG_PREFIX = "workflow_poller_config/" +DEFAULT_INTERVAL_SECONDS = 30 +DEFAULT_TIMEOUT_SECONDS = 7200 + +log = Log.create(service="workflow.poller") + + +def _now_ms() -> int: + return int(time.time() * 1000) + + +def _today_string() -> str: + return datetime.now().strftime("%Y-%m-%d") + + +class WorkflowPollerManager: + """Manage one background poller loop per workflow id.""" + + def __init__(self) -> None: + self._tasks: dict[str, asyncio.Task[Any]] = {} + self._abort_events: dict[str, asyncio.Event] = {} + self._run_tasks: dict[str, set[asyncio.Task[Any]]] = {} + self._run_cancel_events: dict[str, set[threading.Event]] = {} + self._status: dict[str, Dict[str, Any]] = {} + + @staticmethod + def _config_key(workflow_id: str) -> str: + return f"{WORKFLOW_POLLER_CONFIG_PREFIX}{workflow_id}" + + def _normalize_config(self, workflow_id: str, data: Any) -> Dict[str, Any]: + raw = data if isinstance(data, dict) else {} + interval_seconds = int(raw.get("intervalSeconds") or DEFAULT_INTERVAL_SECONDS) + timeout_seconds = int(raw.get("timeoutSeconds") or DEFAULT_TIMEOUT_SECONDS) + inputs = raw.get("inputs") if isinstance(raw.get("inputs"), dict) else {} + return { + "workflowId": workflow_id, + "enabled": bool(raw.get("enabled")), + "intervalSeconds": max(1, interval_seconds), + "timeoutSeconds": max(1, timeout_seconds), + "noOverlap": bool(raw.get("noOverlap", True)), + "inputs": dict(inputs), + "updatedAt": raw.get("updatedAt"), + } + + def _cleanup_done_runs(self, workflow_id: str) -> int: + tasks = self._run_tasks.get(workflow_id) + if not tasks: + return 0 + active_tasks = {task for task in tasks if not task.done()} + if active_tasks: + self._run_tasks[workflow_id] = active_tasks + return len(active_tasks) + self._run_tasks.pop(workflow_id, None) + return 0 + + def _register_run_task(self, workflow_id: str, task: asyncio.Task[Any]) -> None: + task_set = self._run_tasks.setdefault(workflow_id, set()) + task_set.add(task) + + def _discard(done_task: asyncio.Task[Any]) -> None: + tasks = self._run_tasks.get(workflow_id) + if tasks is not None: + tasks.discard(done_task) + if not tasks: + self._run_tasks.pop(workflow_id, None) + + task.add_done_callback(_discard) + + def _build_inputs(self, config: Dict[str, Any]) -> Dict[str, Any]: + inputs = dict(config.get("inputs") or {}) + if not str(inputs.get("input_date") or "").strip(): + inputs["input_date"] = _today_string() + inputs["_trigger"] = "poller" + inputs["_poller_run_id"] = f"poller-{_now_ms()}-{uuid.uuid4().hex[:8]}" + return inputs + + def _summarize_outputs(self, outputs: Any) -> Dict[str, Any]: + if not isinstance(outputs, dict): + return {} + + summary: Dict[str, Any] = {} + load_stats = outputs.get("load_stats") + if isinstance(load_stats, dict) and isinstance(load_stats.get("record_count"), int): + summary["selectedCount"] = load_stats["record_count"] + + if isinstance(outputs.get("processed_mark_count"), int): + summary["processedMarkCount"] = outputs["processed_mark_count"] + + if isinstance(outputs.get("kafka_message_count"), int): + summary["kafkaMessageCount"] = outputs["kafka_message_count"] + + channel_status = outputs.get("channel_notify_status") + if channel_status is not None: + summary["channelNotifyStatus"] = channel_status + + return summary + + def _base_status(self, workflow_id: str) -> Dict[str, Any]: + return { + "workflowId": workflow_id, + "state": "stopped", + "error": None, + "activeRuns": 0, + "lastRunAt": None, + "lastStatus": None, + "lastError": None, + "lastDurationMs": None, + "selectedCount": None, + "processedMarkCount": None, + "channelNotifyStatus": None, + "kafkaMessageCount": None, + "nextRunAt": None, + "lastRunId": None, + } + + def get_status(self, workflow_id: str) -> Dict[str, Any]: + status = dict(self._base_status(workflow_id)) + status.update(self._status.get(workflow_id) or {}) + status["activeRuns"] = self._cleanup_done_runs(workflow_id) + if workflow_id not in self._tasks and status.get("state") == "running": + status["state"] = "stopped" + status["nextRunAt"] = None + return status + + async def start_all(self) -> None: + try: + keys = await Storage.list_keys(WORKFLOW_POLLER_CONFIG_PREFIX) + except Exception as exc: + log.warning("poller.list_keys_failed", {"error": str(exc)}) + return + + for key in keys: + if not key.startswith(WORKFLOW_POLLER_CONFIG_PREFIX): + continue + workflow_id = key[len(WORKFLOW_POLLER_CONFIG_PREFIX):] + if not workflow_id: + continue + try: + data = await Storage.read(key) + except Exception as exc: + log.warning("poller.config_read_failed", {"key": key, "error": str(exc)}) + continue + if isinstance(data, dict) and data.get("enabled"): + await self.restart_workflow(workflow_id) + + async def stop_all(self) -> None: + for workflow_id in list(self._tasks.keys()): + await self.stop_workflow(workflow_id) + + async def stop_workflow(self, workflow_id: str) -> None: + abort_event = self._abort_events.get(workflow_id) + if abort_event is not None: + abort_event.set() + + for cancel_event in self._run_cancel_events.pop(workflow_id, set()): + cancel_event.set() + + task = self._tasks.pop(workflow_id, None) + if task is not None and not task.done(): + task.cancel() + try: + await task + except asyncio.CancelledError: + pass + except Exception: + pass + + run_tasks = list(self._run_tasks.pop(workflow_id, set())) + for run_task in run_tasks: + if not run_task.done(): + run_task.cancel() + if run_tasks: + try: + await asyncio.wait_for( + asyncio.gather(*run_tasks, return_exceptions=True), + timeout=1.0, + ) + except (asyncio.TimeoutError, asyncio.CancelledError): + pass + + self._abort_events.pop(workflow_id, None) + current = self._status.get(workflow_id) or self._base_status(workflow_id) + current["state"] = "stopped" + current["error"] = None + current["nextRunAt"] = None + current["activeRuns"] = 0 + self._status[workflow_id] = current + + async def restart_workflow(self, workflow_id: str) -> Dict[str, Any]: + await self.stop_workflow(workflow_id) + try: + stored = await Storage.read(self._config_key(workflow_id)) + except Exception as exc: + log.warning("poller.restart_read_failed", {"workflow_id": workflow_id, "error": str(exc)}) + return {"workflowId": workflow_id, "state": "failed", "error": str(exc)} + + config = self._normalize_config(workflow_id, stored) + if not config.get("enabled"): + self._status[workflow_id] = { + **self._base_status(workflow_id), + "workflowId": workflow_id, + "state": "stopped", + "error": None, + } + return self.get_status(workflow_id) + + wf_data = read_workflow_from_fs(workflow_id) + if not wf_data: + err = "workflow_not_found" + self._status[workflow_id] = { + **self.get_status(workflow_id), + "workflowId": workflow_id, + "state": "failed", + "error": err, + } + return self.get_status(workflow_id) + + workflow_json = wf_data.get("workflowJson") + if not workflow_json: + err = "workflow_json_missing" + self._status[workflow_id] = { + **self.get_status(workflow_id), + "workflowId": workflow_id, + "state": "failed", + "error": err, + } + return self.get_status(workflow_id) + + abort_event = asyncio.Event() + self._abort_events[workflow_id] = abort_event + self._status[workflow_id] = { + **self.get_status(workflow_id), + "workflowId": workflow_id, + "state": "running", + "error": None, + "enabled": True, + "intervalSeconds": config["intervalSeconds"], + "timeoutSeconds": config["timeoutSeconds"], + "noOverlap": config["noOverlap"], + "nextRunAt": _now_ms(), + } + task = asyncio.create_task( + self._poller_loop(workflow_id, workflow_json, config, abort_event), + name=f"workflow-poller-{workflow_id}", + ) + self._tasks[workflow_id] = task + return self.get_status(workflow_id) + + async def run_once(self, workflow_id: str) -> Dict[str, Any]: + try: + stored = await Storage.read(self._config_key(workflow_id)) + except Exception as exc: + log.warning("poller.run_once_read_failed", {"workflow_id": workflow_id, "error": str(exc)}) + current = self.get_status(workflow_id) + current["lastStatus"] = "failed" + current["lastError"] = str(exc) + return current + + config = self._normalize_config(workflow_id, stored) + wf_data = read_workflow_from_fs(workflow_id) + if not wf_data: + current = self.get_status(workflow_id) + current["state"] = "failed" if workflow_id in self._tasks else current.get("state", "stopped") + current["lastStatus"] = "failed" + current["lastError"] = "workflow_not_found" + self._status[workflow_id] = current + return self.get_status(workflow_id) + + workflow_json = wf_data.get("workflowJson") + if not workflow_json: + current = self.get_status(workflow_id) + current["state"] = "failed" if workflow_id in self._tasks else current.get("state", "stopped") + current["lastStatus"] = "failed" + current["lastError"] = "workflow_json_missing" + self._status[workflow_id] = current + return self.get_status(workflow_id) + + return await self._execute_run(workflow_id, workflow_json, config) + + async def _poller_loop( + self, + workflow_id: str, + workflow_json: Dict[str, Any], + config: Dict[str, Any], + abort_event: asyncio.Event, + ) -> None: + interval_seconds = config["intervalSeconds"] + try: + while not abort_event.is_set(): + await self._schedule_run(workflow_id, workflow_json, config) + next_run_at = _now_ms() + interval_seconds * 1000 + current = self._status.get(workflow_id) or self._base_status(workflow_id) + current["nextRunAt"] = next_run_at + current["activeRuns"] = self._cleanup_done_runs(workflow_id) + self._status[workflow_id] = current + try: + await asyncio.wait_for(abort_event.wait(), timeout=interval_seconds) + except asyncio.TimeoutError: + continue + except asyncio.CancelledError: + raise + except Exception as exc: + current = self._status.get(workflow_id) or self._base_status(workflow_id) + current["state"] = "failed" + current["error"] = str(exc) + current["nextRunAt"] = None + self._status[workflow_id] = current + log.warning("poller.loop_failed", {"workflow_id": workflow_id, "error": str(exc)}) + finally: + if workflow_id in self._tasks and self._tasks.get(workflow_id) is asyncio.current_task(): + current = self._status.get(workflow_id) or self._base_status(workflow_id) + if current.get("state") != "failed": + current["state"] = "stopped" + current["error"] = None + current["nextRunAt"] = None + current["activeRuns"] = self._cleanup_done_runs(workflow_id) + self._status[workflow_id] = current + + async def _schedule_run( + self, + workflow_id: str, + workflow_json: Dict[str, Any], + config: Dict[str, Any], + ) -> None: + active_runs = self._cleanup_done_runs(workflow_id) + if config.get("noOverlap", True) and active_runs > 0: + current = self._status.get(workflow_id) or self._base_status(workflow_id) + current["lastStatus"] = "skipped" + current["lastError"] = "previous_run_still_active" + current["activeRuns"] = active_runs + self._status[workflow_id] = current + return + + run_task = asyncio.create_task( + self._execute_run(workflow_id, workflow_json, config), + name=f"workflow-poller-run-{workflow_id}", + ) + self._register_run_task(workflow_id, run_task) + + async def _execute_run( + self, + workflow_id: str, + workflow_json: Dict[str, Any], + config: Dict[str, Any], + ) -> Dict[str, Any]: + started_at_ms = _now_ms() + cancel_event = threading.Event() + cancel_events = self._run_cancel_events.setdefault(workflow_id, set()) + cancel_events.add(cancel_event) + current = self._status.get(workflow_id) or self._base_status(workflow_id) + current["lastRunAt"] = started_at_ms + current["activeRuns"] = self._cleanup_done_runs(workflow_id) + self._status[workflow_id] = current + + try: + result = await asyncio.to_thread( + run_workflow, + workflow=workflow_json, + inputs=self._build_inputs(config), + timeout_s=config["timeoutSeconds"], + trace=False, + cancel=cancel_event.is_set, + ) + if not isinstance(result, RunWorkflowResult): + result = RunWorkflowResult(status="failed", error="invalid_run_result") + duration_ms = _now_ms() - started_at_ms + summary = self._summarize_outputs(result.outputs) + current = self._status.get(workflow_id) or self._base_status(workflow_id) + current.update(summary) + current["lastRunAt"] = started_at_ms + current["lastDurationMs"] = duration_ms + current["lastRunId"] = result.run_id + current["lastStatus"] = result.status + current["lastError"] = result.error + current["activeRuns"] = self._cleanup_done_runs(workflow_id) + if workflow_id in self._tasks and current.get("state") != "failed": + current["state"] = "running" + current["error"] = None + self._status[workflow_id] = current + except Exception as exc: + duration_ms = _now_ms() - started_at_ms + current = self._status.get(workflow_id) or self._base_status(workflow_id) + current["lastRunAt"] = started_at_ms + current["lastDurationMs"] = duration_ms + current["lastStatus"] = "failed" + current["lastError"] = str(exc) + current["activeRuns"] = self._cleanup_done_runs(workflow_id) + if workflow_id in self._tasks and current.get("state") != "failed": + current["state"] = "running" + current["error"] = None + self._status[workflow_id] = current + log.warning("poller.run_failed", {"workflow_id": workflow_id, "error": str(exc)}) + finally: + cancel_events.discard(cancel_event) + if not cancel_events: + self._run_cancel_events.pop(workflow_id, None) + current = self._status.get(workflow_id) or self._base_status(workflow_id) + current["activeRuns"] = self._cleanup_done_runs(workflow_id) + if workflow_id not in self._tasks and current.get("state") == "running": + current["state"] = "stopped" + current["nextRunAt"] = None + self._status[workflow_id] = current + + return self.get_status(workflow_id) + + +default_manager = WorkflowPollerManager() diff --git a/tests/server/routes/test_workflow_poller_routes.py b/tests/server/routes/test_workflow_poller_routes.py new file mode 100644 index 000000000..9baf2d2dc --- /dev/null +++ b/tests/server/routes/test_workflow_poller_routes.py @@ -0,0 +1,133 @@ +from __future__ import annotations + +from types import SimpleNamespace +from typing import Any + +import pytest +from httpx import AsyncClient + +from flocks.server.routes import workflow as workflow_routes + + +@pytest.mark.asyncio +async def test_save_poller_config_restarts_manager( + client: AsyncClient, + monkeypatch: pytest.MonkeyPatch, +) -> None: + writes: list[tuple[str, dict[str, Any]]] = [] + + async def _fake_write(key: Any, value: dict[str, Any]) -> None: + writes.append((key, value)) + + async def _fake_restart(workflow_id: str) -> dict[str, Any]: + assert workflow_id == "wf-1" + return {"workflowId": workflow_id, "state": "running", "lastStatus": None} + + monkeypatch.setattr( + workflow_routes, + "_read_workflow_from_fs", + lambda workflow_id: {"workflowJson": {"start": "n1", "nodes": [], "edges": []}} if workflow_id == "wf-1" else None, + ) + monkeypatch.setattr(workflow_routes.Storage, "write", _fake_write) + monkeypatch.setattr( + "flocks.workflow.poller_manager.default_manager", + SimpleNamespace(restart_workflow=_fake_restart), + ) + + response = await client.post( + "/api/workflow/wf-1/poller-config", + json={ + "enabled": True, + "intervalSeconds": 45, + "timeoutSeconds": 3600, + "noOverlap": True, + "inputs": {"persist_triage_output": True}, + }, + ) + + assert response.status_code == 200, response.text + poller_writes = [(key, value) for key, value in writes if key == "workflow_poller_config/wf-1"] + assert poller_writes + key, payload = poller_writes[0] + assert key == "workflow_poller_config/wf-1" + assert payload["enabled"] is True + assert payload["intervalSeconds"] == 45 + assert payload["timeoutSeconds"] == 3600 + assert payload["inputs"] == {"persist_triage_output": True} + + +@pytest.mark.asyncio +async def test_get_poller_config_returns_saved_data( + client: AsyncClient, + monkeypatch: pytest.MonkeyPatch, +) -> None: + async def _fake_read(_key: Any, *_args: Any, **_kwargs: Any) -> dict[str, Any] | None: + if _key != "workflow_poller_config/wf-1": + return None + return { + "workflowId": "wf-1", + "enabled": True, + "intervalSeconds": 30, + "timeoutSeconds": 7200, + "noOverlap": True, + "inputs": {"dedup_source_workflow_name": "stream_alert_denoise_gt_fast"}, + } + + monkeypatch.setattr(workflow_routes.Storage, "read", _fake_read) + + response = await client.get("/api/workflow/wf-1/poller-config") + assert response.status_code == 200, response.text + assert response.json()["workflowId"] == "wf-1" + assert response.json()["intervalSeconds"] == 30 + + +@pytest.mark.asyncio +async def test_get_poller_status_returns_runtime_snapshot( + client: AsyncClient, + monkeypatch: pytest.MonkeyPatch, +) -> None: + monkeypatch.setattr( + "flocks.workflow.poller_manager.default_manager", + SimpleNamespace( + get_status=lambda workflow_id: { + "workflowId": workflow_id, + "state": "running", + "lastStatus": "success", + "selectedCount": 12, + }, + ), + ) + + response = await client.get("/api/workflow/wf-1/poller-status") + assert response.status_code == 200, response.text + assert response.json()["state"] == "running" + assert response.json()["selectedCount"] == 12 + + +@pytest.mark.asyncio +async def test_run_poller_once_returns_latest_status( + client: AsyncClient, + monkeypatch: pytest.MonkeyPatch, +) -> None: + async def _fake_run_once(workflow_id: str) -> dict[str, Any]: + return { + "workflowId": workflow_id, + "state": "stopped", + "lastStatus": "success", + "selectedCount": 5, + } + + monkeypatch.setattr( + workflow_routes, + "_read_workflow_from_fs", + lambda workflow_id: {"workflowJson": {"start": "n1", "nodes": [], "edges": []}} if workflow_id == "wf-1" else None, + ) + monkeypatch.setattr( + "flocks.workflow.poller_manager.default_manager", + SimpleNamespace(run_once=_fake_run_once), + ) + + response = await client.post("/api/workflow/wf-1/poller-run-once") + assert response.status_code == 200, response.text + assert response.json()["status"]["lastStatus"] == "success" + assert response.json()["status"]["selectedCount"] == 5 diff --git a/tests/workflow/test_poller_manager.py b/tests/workflow/test_poller_manager.py new file mode 100644 index 000000000..d6b9f92ac --- /dev/null +++ b/tests/workflow/test_poller_manager.py @@ -0,0 +1,185 @@ +from __future__ import annotations + +import asyncio +from typing import Any + +import pytest + +from flocks.workflow import poller_manager +from flocks.workflow.runner import RunWorkflowResult + + +@pytest.mark.asyncio +async def test_restart_disabled_config_reports_stopped(monkeypatch: pytest.MonkeyPatch) -> None: + manager = poller_manager.WorkflowPollerManager() + + async def _fake_read(_key: str) -> dict[str, Any]: + return {"enabled": False} + + monkeypatch.setattr(poller_manager.Storage, "read", _fake_read) + + status = await manager.restart_workflow("wf-disabled") + assert status["state"] == "stopped" + assert status["error"] is None + + +@pytest.mark.asyncio +async def test_restart_missing_workflow_reports_failed(monkeypatch: pytest.MonkeyPatch) -> None: + manager = poller_manager.WorkflowPollerManager() + + async def _fake_read(_key: str) -> dict[str, Any]: + return {"enabled": True, "intervalSeconds": 30} + + monkeypatch.setattr(poller_manager.Storage, "read", _fake_read) + monkeypatch.setattr(poller_manager, "read_workflow_from_fs", lambda _workflow_id: None) + + status = await manager.restart_workflow("wf-missing") + assert status["state"] == "failed" + assert status["error"] == "workflow_not_found" + + +@pytest.mark.asyncio +async def test_run_once_injects_dynamic_inputs_and_summary(monkeypatch: pytest.MonkeyPatch) -> None: + manager = poller_manager.WorkflowPollerManager() + captured_inputs: dict[str, Any] = {} + + async def _fake_read(_key: str) -> dict[str, Any]: + return { + "enabled": False, + "timeoutSeconds": 9, + "inputs": {"dedup_source_workflow_name": "stream_alert_denoise_gt_fast"}, + } + + def _fake_run_workflow(*, workflow: Any, inputs: dict[str, Any], timeout_s: int, trace: bool, cancel): # noqa: ANN001 + captured_inputs.update(inputs) + assert workflow == {"start": "n1", "nodes": [], "edges": []} + assert timeout_s == 9 + assert trace is False + assert cancel() is False + return RunWorkflowResult( + status="success", + run_id="run-1", + outputs={ + "load_stats": {"record_count": 7}, + "processed_mark_count": 3, + "channel_notify_status": "sent", + }, + ) + + monkeypatch.setattr(poller_manager.Storage, "read", _fake_read) + monkeypatch.setattr( + poller_manager, + "read_workflow_from_fs", + lambda _workflow_id: {"workflowJson": {"start": "n1", "nodes": [], "edges": []}}, + ) + monkeypatch.setattr(poller_manager, "run_workflow", _fake_run_workflow) + + status = await manager.run_once("wf-run-once") + + assert status["lastStatus"] == "success" + assert status["selectedCount"] == 7 + assert status["processedMarkCount"] == 3 + assert status["channelNotifyStatus"] == "sent" + assert status["state"] == "stopped" + assert captured_inputs["dedup_source_workflow_name"] == "stream_alert_denoise_gt_fast" + assert captured_inputs["input_date"] + assert captured_inputs["_trigger"] == "poller" + assert captured_inputs["_poller_run_id"].startswith("poller-") + + +@pytest.mark.asyncio +async def test_no_overlap_skips_when_previous_run_is_still_active( + monkeypatch: pytest.MonkeyPatch, +) -> None: + manager = poller_manager.WorkflowPollerManager() + threading_event = asyncio.Event() + + config = { + "enabled": True, + "intervalSeconds": 1, + "timeoutSeconds": 5, + "noOverlap": True, + "inputs": {}, + } + + def _fake_run_workflow(*, workflow: Any, inputs: dict[str, Any], timeout_s: int, trace: bool, cancel): # noqa: ANN001 + _ = workflow, inputs, timeout_s, trace, cancel + # Keep the run active until the test releases it so a second tick skips. + asyncio.run(asyncio.wait_for(threading_event.wait(), timeout=2.0)) + return RunWorkflowResult(status="success", outputs={"load_stats": {"record_count": 1}}) + + monkeypatch.setattr(poller_manager, "run_workflow", _fake_run_workflow) + monkeypatch.setattr( + poller_manager, + "read_workflow_from_fs", + lambda _workflow_id: {"workflowJson": {"start": "n1", "nodes": [], "edges": []}}, + ) + + await manager._schedule_run("wf-overlap", {"start": "n1", "nodes": [], "edges": []}, config) + await asyncio.sleep(0.02) + await manager._schedule_run("wf-overlap", {"start": "n1", "nodes": [], "edges": []}, config) + status = manager.get_status("wf-overlap") + + threading_event.set() + await asyncio.sleep(0.02) + + assert status["lastStatus"] == "skipped" + assert status["lastError"] == "previous_run_still_active" + + +@pytest.mark.asyncio +async def test_start_all_only_restarts_enabled_configs(monkeypatch: pytest.MonkeyPatch) -> None: + manager = poller_manager.WorkflowPollerManager() + restarted: list[str] = [] + + async def _fake_list_keys(_prefix: str) -> list[str]: + return [ + "workflow_poller_config/wf-enabled", + "workflow_poller_config/wf-disabled", + ] + + async def _fake_read(key: str) -> dict[str, Any]: + return {"enabled": key.endswith("wf-enabled")} + + async def _fake_restart(workflow_id: str) -> dict[str, Any]: + restarted.append(workflow_id) + return {"workflowId": workflow_id, "state": "running"} + + monkeypatch.setattr(poller_manager.Storage, "list_keys", _fake_list_keys) + monkeypatch.setattr(poller_manager.Storage, "read", _fake_read) + monkeypatch.setattr(manager, "restart_workflow", _fake_restart) + + await manager.start_all() + assert restarted == ["wf-enabled"] + + +@pytest.mark.asyncio +async def test_restart_workflow_replaces_existing_task(monkeypatch: pytest.MonkeyPatch) -> None: + manager = poller_manager.WorkflowPollerManager() + config = {"enabled": True, "intervalSeconds": 30, "timeoutSeconds": 10, "noOverlap": True, "inputs": {}} + + async def _fake_read(_key: str) -> dict[str, Any]: + return config + + async def _fake_loop(*args, **kwargs) -> None: # noqa: ANN002, ANN003 + await asyncio.sleep(60) + + monkeypatch.setattr(poller_manager.Storage, "read", _fake_read) + monkeypatch.setattr( + poller_manager, + "read_workflow_from_fs", + lambda _workflow_id: {"workflowJson": {"start": "n1", "nodes": [], "edges": []}}, + ) + monkeypatch.setattr(manager, "_poller_loop", _fake_loop) + + first = await manager.restart_workflow("wf-restart") + first_task = manager._tasks["wf-restart"] + second = await manager.restart_workflow("wf-restart") + second_task = manager._tasks["wf-restart"] + + assert first["state"] == "running" + assert second["state"] == "running" + assert first_task is not second_task + assert first_task.cancelled() or first_task.done() + + await manager.stop_workflow("wf-restart") diff --git a/webui/src/api/workflow.ts b/webui/src/api/workflow.ts index 90bb7e224..b4588bddc 100644 --- a/webui/src/api/workflow.ts +++ b/webui/src/api/workflow.ts @@ -200,6 +200,37 @@ export interface KafkaConsumerStatus { workerCount?: number; } +export interface WorkflowPollerConfig { + workflowId?: string; + enabled?: boolean; + intervalSeconds?: number; + timeoutSeconds?: number; + noOverlap?: boolean; + inputs?: Record; + updatedAt?: number; +} + +export interface WorkflowPollerStatus { + workflowId?: string; + state: 'running' | 'stopped' | 'failed'; + error?: string | null; + enabled?: boolean; + intervalSeconds?: number; + timeoutSeconds?: number; + noOverlap?: boolean; + activeRuns?: number; + lastRunAt?: number | null; + lastRunId?: string | null; + lastStatus?: string | null; + lastError?: string | null; + lastDurationMs?: number | null; + selectedCount?: number | null; + processedMarkCount?: number | null; + channelNotifyStatus?: string | null; + kafkaMessageCount?: number | null; + nextRunAt?: number | null; +} + export const workflowAPI = { list: (params?: { category?: string; status?: string; excludeId?: string }) => client.get('/api/workflow', { params }), @@ -296,6 +327,23 @@ export const workflowAPI = { getKafkaStatus: (id: string) => client.get(`/api/workflow/${id}/kafka-status`), + savePollerConfig: (id: string, config: WorkflowPollerConfig) => + client.post<{ ok: boolean; status?: WorkflowPollerStatus }>( + `/api/workflow/${id}/poller-config`, + config, + ), + + getPollerConfig: (id: string) => + client.get(`/api/workflow/${id}/poller-config`), + + getPollerStatus: (id: string) => + client.get(`/api/workflow/${id}/poller-status`), + + runPollerOnce: (id: string) => + client.post<{ ok: boolean; status?: WorkflowPollerStatus }>( + `/api/workflow/${id}/poller-run-once`, + ), + saveSyslogConfig: (id: string, config: { enabled?: boolean; protocol?: string; diff --git a/webui/src/locales/en-US/workflow.json b/webui/src/locales/en-US/workflow.json index 98ea16f78..cb009db29 100644 --- a/webui/src/locales/en-US/workflow.json +++ b/webui/src/locales/en-US/workflow.json @@ -229,6 +229,30 @@ "savedConfig": "Saved", "saveConfig": "Save Configuration", "kafkaHint": "When enabled, Flocks consumes messages from the given broker/topic and passes them to workflow inputs (default key: kafka_message); if an output broker/topic is set, successful run results are produced back to that topic.", + "pollerSection": "Workflow Poller", + "pollerEnabled": "Enable poller", + "pollerNoOverlap": "No overlap", + "pollerInterval": "Interval (seconds)", + "pollerTimeout": "Timeout (seconds)", + "pollerInputs": "Inputs JSON", + "pollerInputsJsonError": "Inputs must be a valid JSON object", + "pollerInputsHint": "By default this prefills from the current workflow's Sample Inputs when available. You can also enter any inputs JSON that matches this workflow.", + "pollerRunOnce": "Run once now", + "pollerRunningOnce": "Running once...", + "pollerRunOnceFailed": "Failed to run poller once", + "pollerStatus": "Poller status", + "pollerRunning": "Running", + "pollerEnabledIdle": "Enabled, waiting for next tick", + "pollerFailed": "Poller failed", + "pollerLastRunAt": "Last run", + "pollerNextRunAt": "Next run", + "pollerLastStatus": "Last status", + "pollerLastDuration": "Last duration", + "pollerSelectedCount": "Selected count", + "pollerActiveRuns": "Active runs", + "pollerProcessedMarkCount": "Processed mark count", + "pollerChannelStatus": "Channel notify", + "pollerHint": "This is a generic background polling capability for any workflow. The poller only schedules repeated runs; idempotency, state caches, and input semantics remain defined by the workflow itself.", "syslogSection": "Syslog input", "syslogExperimental": "(Experimental)", "syslogEnabled": "Enable listener", diff --git a/webui/src/locales/zh-CN/workflow.json b/webui/src/locales/zh-CN/workflow.json index a6e425f52..1e33de843 100644 --- a/webui/src/locales/zh-CN/workflow.json +++ b/webui/src/locales/zh-CN/workflow.json @@ -229,6 +229,30 @@ "savedConfig": "已保存", "saveConfig": "保存配置", "kafkaHint": "开启后 Flocks 从指定 Broker/Topic 消费消息,消息内容写入工作流 inputs(默认键名 kafka_message);如配置输出 Broker/Topic,执行成功的结果会写回该 Topic。", + "pollerSection": "Workflow Poller", + "pollerEnabled": "启用轮询服务", + "pollerNoOverlap": "禁止重叠执行", + "pollerInterval": "轮询间隔(秒)", + "pollerTimeout": "执行超时(秒)", + "pollerInputs": "Inputs JSON", + "pollerInputsJsonError": "Inputs 必须是合法的 JSON 对象", + "pollerInputsHint": "默认会优先读取当前 workflow 的 Sample Inputs;你也可以按该 workflow 的实际 inputs 手动填写。", + "pollerRunOnce": "立即执行一轮", + "pollerRunningOnce": "执行中...", + "pollerRunOnceFailed": "立即执行失败", + "pollerStatus": "轮询状态", + "pollerRunning": "运行中", + "pollerEnabledIdle": "已启用,等待下一轮", + "pollerFailed": "轮询器异常", + "pollerLastRunAt": "上次执行", + "pollerNextRunAt": "下次执行", + "pollerLastStatus": "最近结果", + "pollerLastDuration": "最近耗时", + "pollerSelectedCount": "本轮选中数量", + "pollerActiveRuns": "活跃执行数", + "pollerProcessedMarkCount": "processed 标记数", + "pollerChannelStatus": "通道通知状态", + "pollerHint": "这是针对所有 workflow 的通用后台轮询能力。poller 只负责按周期触发执行;具体幂等、状态缓存和输入语义仍由 workflow 自身定义。", "syslogSection": "Syslog 输入", "syslogExperimental": "(实验性)", "syslogEnabled": "启用监听", diff --git a/webui/src/pages/WorkflowDetail/tabs/IntegrationTab.test.tsx b/webui/src/pages/WorkflowDetail/tabs/IntegrationTab.test.tsx index e0f9dfb00..94b0eb104 100644 --- a/webui/src/pages/WorkflowDetail/tabs/IntegrationTab.test.tsx +++ b/webui/src/pages/WorkflowDetail/tabs/IntegrationTab.test.tsx @@ -1,5 +1,5 @@ import { describe, it, expect, vi, beforeEach } from 'vitest'; -import { render, screen, waitFor } from '@testing-library/react'; +import { fireEvent, render, screen, waitFor } from '@testing-library/react'; import userEvent from '@testing-library/user-event'; import IntegrationTab from './IntegrationTab'; @@ -11,6 +11,11 @@ const { workflowAPI } = vi.hoisted(() => ({ getKafkaConfig: vi.fn(), saveKafkaConfig: vi.fn(), getKafkaStatus: vi.fn(), + getPollerConfig: vi.fn(), + savePollerConfig: vi.fn(), + getPollerStatus: vi.fn(), + runPollerOnce: vi.fn(), + getSampleInputs: vi.fn(), getSyslogConfig: vi.fn(), saveSyslogConfig: vi.fn(), getSyslogStatus: vi.fn(), @@ -57,6 +62,30 @@ vi.mock('react-i18next', () => ({ 'detail.run.savedConfig': '已保存', 'detail.run.saveConfig': '保存配置', 'detail.run.kafkaHint': 'hint', + 'detail.run.pollerSection': 'Workflow Poller', + 'detail.run.pollerEnabled': '启用轮询服务', + 'detail.run.pollerNoOverlap': '禁止重叠执行', + 'detail.run.pollerInterval': '轮询间隔(秒)', + 'detail.run.pollerTimeout': '执行超时(秒)', + 'detail.run.pollerInputs': 'Inputs JSON', + 'detail.run.pollerInputsJsonError': 'Inputs 必须是合法的 JSON 对象', + 'detail.run.pollerInputsHint': 'poller inputs hint', + 'detail.run.pollerRunOnce': '立即执行一轮', + 'detail.run.pollerRunningOnce': '执行中...', + 'detail.run.pollerRunOnceFailed': '立即执行失败', + 'detail.run.pollerStatus': '轮询状态', + 'detail.run.pollerRunning': '运行中', + 'detail.run.pollerEnabledIdle': '已启用,等待下一轮', + 'detail.run.pollerFailed': '轮询器异常', + 'detail.run.pollerLastRunAt': '上次执行', + 'detail.run.pollerNextRunAt': '下次执行', + 'detail.run.pollerLastStatus': '最近结果', + 'detail.run.pollerLastDuration': '最近耗时', + 'detail.run.pollerSelectedCount': '本轮选中数量', + 'detail.run.pollerActiveRuns': '活跃执行数', + 'detail.run.pollerProcessedMarkCount': 'processed 标记数', + 'detail.run.pollerChannelStatus': '通道通知状态', + 'detail.run.pollerHint': 'poller hint', 'detail.run.syslogSection': 'Syslog', 'detail.run.syslogExperimental': '实验性', 'detail.run.syslogEnabled': '启用监听', @@ -98,6 +127,11 @@ describe('IntegrationTab Kafka config', () => { workflowAPI.getKafkaConfig.mockResolvedValue({ data: null }); workflowAPI.getKafkaStatus.mockResolvedValue({ data: { state: 'stopped', error: null } }); workflowAPI.saveKafkaConfig.mockResolvedValue({ data: { ok: true, consumer: { state: 'stopped', error: null } } }); + workflowAPI.getPollerConfig.mockResolvedValue({ data: null }); + workflowAPI.getPollerStatus.mockResolvedValue({ data: { state: 'stopped', error: null } }); + workflowAPI.savePollerConfig.mockResolvedValue({ data: { ok: true, status: { state: 'running', lastStatus: null } } }); + workflowAPI.runPollerOnce.mockResolvedValue({ data: { ok: true, status: { state: 'stopped', lastStatus: 'success' } } }); + workflowAPI.getSampleInputs.mockResolvedValue({ data: { sampleInputs: {} } }); workflowAPI.getSyslogConfig.mockResolvedValue({ data: null }); workflowAPI.getSyslogStatus.mockResolvedValue({ data: { state: 'stopped', error: null } }); }); @@ -132,4 +166,103 @@ describe('IntegrationTab Kafka config', () => { }); }); }); + + it('renders poller status badge when runtime is running', async () => { + workflowAPI.getPollerStatus.mockResolvedValue({ + data: { + state: 'running', + lastStatus: 'success', + selectedCount: 12, + activeRuns: 1, + }, + }); + + render(); + await userEvent.setup().click(await screen.findByRole('button', { name: /Workflow Poller/ })); + + expect(await screen.findByText('运行中')).toBeInTheDocument(); + expect(screen.getByText(/本轮选中数量: 12/)).toBeInTheDocument(); + }); + + it('saves poller config from the integration tab', async () => { + const user = userEvent.setup(); + workflowAPI.getSampleInputs.mockResolvedValue({ + data: { + sampleInputs: { + _comment: 'for display only', + _comment_dispose: 'dispose note', + severity: 'high', + notify: true, + }, + }, + }); + render(); + + await user.click(await screen.findByRole('button', { name: /Workflow Poller/ })); + await user.click(screen.getByLabelText('启用轮询服务')); + const intervalInput = screen.getByLabelText('轮询间隔(秒)'); + await user.clear(intervalInput); + await user.type(intervalInput, '45'); + await user.click(screen.getByRole('button', { name: '保存配置' })); + + await waitFor(() => { + expect(workflowAPI.savePollerConfig).toHaveBeenCalledWith('wf-1', { + enabled: true, + intervalSeconds: 45, + timeoutSeconds: 7200, + noOverlap: true, + inputs: { + severity: 'high', + notify: true, + }, + }); + }); + }); + + it('prefills poller inputs from current workflow sample inputs', async () => { + workflowAPI.getSampleInputs.mockResolvedValue({ + data: { + sampleInputs: { + _comment: 'ignore me', + _comment_cache: 'cache note', + eventType: 'alert', + source: 'demo', + }, + }, + }); + + render(); + await userEvent.setup().click(await screen.findByRole('button', { name: /Workflow Poller/ })); + + const textarea = await screen.findByLabelText('Inputs JSON'); + expect(textarea).toHaveValue(`{ + "eventType": "alert", + "source": "demo" +}`); + }); + + it('blocks saving poller config when inputs json is invalid', async () => { + const user = userEvent.setup(); + render(); + + await user.click(await screen.findByRole('button', { name: /Workflow Poller/ })); + const textarea = screen.getByLabelText('Inputs JSON'); + fireEvent.change(textarea, { target: { value: '{"broken": ' } }); + await user.click(screen.getByRole('button', { name: '保存配置' })); + + expect(await screen.findByText('Inputs 必须是合法的 JSON 对象')).toBeInTheDocument(); + expect(workflowAPI.savePollerConfig).not.toHaveBeenCalled(); + }); + + it('runs poller once from the integration tab', async () => { + const user = userEvent.setup(); + render(); + + await user.click(await screen.findByRole('button', { name: /Workflow Poller/ })); + await user.click(screen.getByRole('button', { name: '立即执行一轮' })); + + await waitFor(() => { + expect(workflowAPI.runPollerOnce).toHaveBeenCalledWith('wf-1'); + }); + }); }); diff --git a/webui/src/pages/WorkflowDetail/tabs/IntegrationTab.tsx b/webui/src/pages/WorkflowDetail/tabs/IntegrationTab.tsx index 291d74baf..f4ec73382 100644 --- a/webui/src/pages/WorkflowDetail/tabs/IntegrationTab.tsx +++ b/webui/src/pages/WorkflowDetail/tabs/IntegrationTab.tsx @@ -11,6 +11,7 @@ import { WorkflowServiceDriver, SyslogListenerStatus, KafkaConsumerStatus, + WorkflowPollerStatus, } from '@/api/workflow'; import CopyButton from '@/components/common/CopyButton'; import WorkflowStatusBadge from '@/components/common/WorkflowStatusBadge'; @@ -52,6 +53,40 @@ function SectionHeader({ ); } +const DEFAULT_POLLER_INPUTS_TEXT = JSON.stringify({}, null, 2); + +function stripExecutionOnlyComments(value: unknown): unknown { + if (Array.isArray(value)) { + return value.map(stripExecutionOnlyComments); + } + if (!value || typeof value !== 'object') { + return value; + } + return Object.fromEntries( + Object.entries(value) + .filter(([key]) => !key.startsWith('_comment')) + .map(([key, nestedValue]) => [key, stripExecutionOnlyComments(nestedValue)]), + ); +} + +function stringifyPollerInputs(value: unknown): string { + if (!value || typeof value !== 'object' || Array.isArray(value)) { + return DEFAULT_POLLER_INPUTS_TEXT; + } + return JSON.stringify(stripExecutionOnlyComments(value), null, 2); +} + +function formatTimestamp(ts?: number | null): string { + if (!ts) return '-'; + return new Date(ts).toLocaleString(); +} + +function formatDuration(ms?: number | null): string { + if (typeof ms !== 'number') return '-'; + if (ms < 1000) return `${ms} ms`; + return `${(ms / 1000).toFixed(1)} s`; +} + // ───────────────────────────────────────────── // 发布为 API // ───────────────────────────────────────────── @@ -473,6 +508,323 @@ function KafkaSection({ workflowId }: { workflowId: string }) { ); } +function PollerSection({ workflowId }: { workflowId: string }) { + const { t } = useTranslation('workflow'); + const [expanded, setExpanded] = useState(false); + const [saving, setSaving] = useState(false); + const [saved, setSaved] = useState(false); + const [runningOnce, setRunningOnce] = useState(false); + const [enabled, setEnabled] = useState(false); + const [intervalSeconds, setIntervalSeconds] = useState('30'); + const [timeoutSeconds, setTimeoutSeconds] = useState('7200'); + const [noOverlap, setNoOverlap] = useState(true); + const [inputsText, setInputsText] = useState(DEFAULT_POLLER_INPUTS_TEXT); + const [jsonError, setJsonError] = useState(''); + const [saveError, setSaveError] = useState(''); + const [poller, setPoller] = useState(null); + + const refreshStatus = useCallback(async () => { + try { + const res = await workflowAPI.getPollerStatus(workflowId); + setPoller(res.data); + } catch { + // ignore transient backend errors + } + }, [workflowId]); + + useEffect(() => { + let cancelled = false; + + const loadPollerConfig = async () => { + let sampleInputs: Record = {}; + try { + const sampleRes = await workflowAPI.getSampleInputs(workflowId); + if (sampleRes.data?.sampleInputs && typeof sampleRes.data.sampleInputs === 'object' && !Array.isArray(sampleRes.data.sampleInputs)) { + sampleInputs = sampleRes.data.sampleInputs; + } + } catch { + sampleInputs = {}; + } + + try { + const res = await workflowAPI.getPollerConfig(workflowId); + if (cancelled) return; + if (res.data) { + setEnabled(!!res.data.enabled); + setIntervalSeconds(String(res.data.intervalSeconds ?? 30)); + setTimeoutSeconds(String(res.data.timeoutSeconds ?? 7200)); + setNoOverlap(res.data.noOverlap ?? true); + const configuredInputs = ( + res.data.inputs + && typeof res.data.inputs === 'object' + && !Array.isArray(res.data.inputs) + && Object.keys(res.data.inputs).length > 0 + ) + ? res.data.inputs + : sampleInputs; + setInputsText(stringifyPollerInputs(configuredInputs)); + return; + } + setInputsText(stringifyPollerInputs(sampleInputs)); + } catch { + if (!cancelled) { + setInputsText(stringifyPollerInputs(sampleInputs)); + } + } + }; + + loadPollerConfig(); + refreshStatus(); + return () => { + cancelled = true; + }; + }, [workflowId, refreshStatus]); + + useEffect(() => { + if (poller?.state !== 'running') return; + const handle = window.setInterval(() => { + refreshStatus(); + }, 3000); + return () => window.clearInterval(handle); + }, [poller?.state, refreshStatus]); + + const validateInputs = (): Record | null => { + try { + const parsed = JSON.parse(inputsText); + if (!parsed || typeof parsed !== 'object' || Array.isArray(parsed)) { + setJsonError(t('detail.run.pollerInputsJsonError')); + return null; + } + setJsonError(''); + return stripExecutionOnlyComments(parsed) as Record; + } catch { + setJsonError(t('detail.run.pollerInputsJsonError')); + return null; + } + }; + + const handleSave = async () => { + const parsedInputs = validateInputs(); + if (!parsedInputs) return; + + setSaving(true); + setSaved(false); + setSaveError(''); + try { + const res = await workflowAPI.savePollerConfig(workflowId, { + enabled, + intervalSeconds: Math.max(1, Number.parseInt(intervalSeconds, 10) || 30), + timeoutSeconds: Math.max(1, Number.parseInt(timeoutSeconds, 10) || 7200), + noOverlap, + inputs: parsedInputs, + }); + if (res.data?.status) { + setPoller(res.data.status); + } else { + refreshStatus(); + } + setSaved(true); + setTimeout(() => setSaved(false), 2000); + } catch (err: unknown) { + setSaveError(extractErrorMessage(err, t('detail.run.savingConfig'))); + refreshStatus(); + } finally { + setSaving(false); + } + }; + + const handleRunOnce = async () => { + setRunningOnce(true); + setSaveError(''); + try { + const res = await workflowAPI.runPollerOnce(workflowId); + if (res.data?.status) { + setPoller(res.data.status); + } else { + refreshStatus(); + } + } catch (err: unknown) { + setSaveError(extractErrorMessage(err, t('detail.run.pollerRunOnceFailed'))); + } finally { + setRunningOnce(false); + } + }; + + let summaryBadge: React.ReactNode; + if (poller?.state === 'running') { + summaryBadge = ( + + {t('detail.run.pollerRunning')} + + ); + } else if (poller?.state === 'failed') { + summaryBadge = ( + + {poller.error || t('detail.run.pollerFailed')} + + ); + } else if (enabled) { + summaryBadge = ( + + {t('detail.run.pollerEnabledIdle')} + + ); + } else { + summaryBadge = null; + } + + const statusBadgeClass = poller?.state === 'running' + ? 'bg-green-50 text-green-700 border-green-200' + : poller?.state === 'failed' + ? 'bg-red-50 text-red-700 border-red-200' + : 'bg-gray-50 text-gray-600 border-gray-200'; + + return ( +
+ setExpanded(v => !v)} + badge={summaryBadge} + /> + {expanded && ( +
+
+
+ + setEnabled(e.target.checked)} + className="rounded border-gray-300 text-red-600 focus:ring-red-500" + /> +
+
+ + setNoOverlap(e.target.checked)} + className="rounded border-gray-300 text-red-600 focus:ring-red-500" + /> +
+
+ +
+
+ + setIntervalSeconds(e.target.value)} + className="w-full text-xs border border-gray-200 rounded-lg px-3 py-1.5 focus:outline-none focus:ring-1 focus:ring-red-500" + /> +
+
+ + setTimeoutSeconds(e.target.value)} + className="w-full text-xs border border-gray-200 rounded-lg px-3 py-1.5 focus:outline-none focus:ring-1 focus:ring-red-500" + /> +
+
+ +
+ +