diff --git a/flocks/agent/registry.py b/flocks/agent/registry.py index 874470edf..e1643b541 100644 --- a/flocks/agent/registry.py +++ b/flocks/agent/registry.py @@ -579,6 +579,27 @@ def unregister(cls, name: str) -> bool: # --------------------------------------------------------------------------- +def _agent_event_should_reload(event: object) -> bool: + """Return True if a watchdog event should invalidate the agent cache. + + Mirrors ``flocks.tool.registry._tool_event_should_reload``: atomic-save + editors surface the real target via ``dest_path``, so we inspect both + endpoints before deciding to skip. + """ + candidates = [] + src = getattr(event, "src_path", "") or "" + if src: + candidates.append(src) + dest = getattr(event, "dest_path", "") or "" + if dest: + candidates.append(dest) + for path in candidates: + fname = os.path.basename(path) + if fname == "agent.yaml" or path.endswith(".md"): + return True + return False + + class AgentFileWatcher: """Watch plugin agent directories and auto-invalidate the Agent cache on change. @@ -621,13 +642,20 @@ def start(self) -> None: watcher = self + # Only react to actual content-mutation events. Without this guard the + # ``opened``/``closed``/``closed_no_write`` events that watchdog emits + # whenever any code (including the agent loader itself) reads + # ``agent.yaml`` / ``*.md`` would re-trigger cache invalidation on every + # access, causing a self-sustaining reload loop. + _RELOAD_EVENT_TYPES = frozenset({"modified", "created", "deleted", "moved"}) + class _Handler(FileSystemEventHandler): def on_any_event(self, event: FileSystemEvent) -> None: if event.is_directory: return - src = getattr(event, "src_path", "") or "" - fname = os.path.basename(src) - if fname == "agent.yaml" or src.endswith(".md"): + if getattr(event, "event_type", "") not in _RELOAD_EVENT_TYPES: + return + if _agent_event_should_reload(event): watcher._schedule_invalidate() handler = _Handler() diff --git a/flocks/agent/toolset.py b/flocks/agent/toolset.py index 5607f2683..20b1e4f16 100644 --- a/flocks/agent/toolset.py +++ b/flocks/agent/toolset.py @@ -47,7 +47,11 @@ def normalize_declared_tool_names( matches = [raw_name] if raw_name in available else [] if not matches: - log.warn("agent.toolset.tool_missing", {"tool": raw_name}) + # Built-in agent definitions (librarian, metis, …) declare optional + # tools such as ``lsp_*`` / ``ast_grep_search`` that ship in separate + # binaries; they are gracefully skipped when not installed. Treat + # this as informational only to avoid flooding operational logs. + log.debug("agent.toolset.tool_missing", {"tool": raw_name}) continue for match in matches: diff --git a/flocks/command/command_loader.py b/flocks/command/command_loader.py index 90ec5ee87..6f0b64b02 100644 --- a/flocks/command/command_loader.py +++ b/flocks/command/command_loader.py @@ -103,7 +103,7 @@ def discover_commands() -> Dict[str, CommandInfo]: try: from flocks.utils.compat import get_flocks_config_dir - opencode_global_dir = str(get_flocks_config_dir(binary="opencode") / "command") + flocks_global_dir = str(get_flocks_config_dir(binary="opencode") / "command") sources.append(("flocks-global", flocks_global_dir, "**/*.md")) except Exception as e: log.warn("command.flocks_dir.error", {"error": str(e)}) diff --git a/flocks/ingest/__init__.py b/flocks/ingest/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/flocks/ingest/syslog/__init__.py b/flocks/ingest/syslog/__init__.py new file mode 100644 index 000000000..5e83880ce --- /dev/null +++ b/flocks/ingest/syslog/__init__.py @@ -0,0 +1,6 @@ +"""Syslog ingestion for workflow triggers (UDP/TCP listeners).""" + +from flocks.ingest.syslog.constants import WORKFLOW_SYSLOG_CONFIG_PREFIX +from flocks.ingest.syslog.manager import SyslogManager, default_manager + +__all__ = ["SyslogManager", "default_manager", "WORKFLOW_SYSLOG_CONFIG_PREFIX"] diff --git a/flocks/ingest/syslog/constants.py b/flocks/ingest/syslog/constants.py new file mode 100644 index 000000000..717b8d693 --- /dev/null +++ b/flocks/ingest/syslog/constants.py @@ -0,0 +1,3 @@ +"""Storage key prefix for per-workflow syslog config (must match server routes).""" + +WORKFLOW_SYSLOG_CONFIG_PREFIX = "workflow_syslog_config/" diff --git a/flocks/ingest/syslog/listener.py b/flocks/ingest/syslog/listener.py new file mode 100644 index 000000000..7212f372d --- /dev/null +++ b/flocks/ingest/syslog/listener.py @@ -0,0 +1,128 @@ +"""Asyncio UDP/TCP syslog listeners.""" + +from __future__ import annotations + +import asyncio +from typing import Awaitable, Callable, Union + +from flocks.ingest.syslog.parser import parse_syslog + +OnSyslogMessage = Callable[[dict], Union[None, Awaitable[None]]] + + +class SyslogUDPProtocol(asyncio.DatagramProtocol): + """Receive syslog datagrams and invoke async callback with parsed dict. + + The *on_message* callback is expected to be non-blocking (e.g. a queue + put_nowait). This protocol deliberately does NOT create unbounded asyncio + tasks on every datagram — the caller owns concurrency control. + """ + + def __init__( + self, + on_message: OnSyslogMessage, + format_hint: str, + ) -> None: + self._on_message = on_message + self._format_hint = format_hint + + def datagram_received(self, data: bytes, _addr) -> None: # noqa: ANN001 + text = data.decode("utf-8", errors="replace") + parsed = parse_syslog(text, self._format_hint) + try: + res = self._on_message(parsed) + # If the callback returns a coroutine (legacy path), schedule it + # but only once — do NOT create_task without bound. + if asyncio.iscoroutine(res): + try: + loop = asyncio.get_running_loop() + except RuntimeError: + return + loop.create_task(self._safe_await(res)) + except Exception: + pass + + @staticmethod + async def _safe_await(coro) -> None: # noqa: ANN001 + try: + await coro + except Exception: + pass + + +async def run_udp_syslog_server( + host: str, + port: int, + format_hint: str, + on_message: OnSyslogMessage, + *, + abort_event: asyncio.Event, +) -> None: + loop = asyncio.get_running_loop() + transport, _protocol = await loop.create_datagram_endpoint( + lambda: SyslogUDPProtocol(on_message, format_hint), + local_addr=(host, port), + ) + try: + await abort_event.wait() + finally: + transport.close() + + +async def _handle_tcp_client( + reader: asyncio.StreamReader, + writer: asyncio.StreamWriter, + format_hint: str, + on_message: OnSyslogMessage, +) -> None: + try: + while True: + line = await reader.readline() + if not line: + break + text = line.decode("utf-8", errors="replace").strip() + if not text: + continue + parsed = parse_syslog(text, format_hint) + try: + res = on_message(parsed) + if asyncio.iscoroutine(res): + await res + except Exception: + pass + finally: + try: + writer.close() + await writer.wait_closed() + except Exception: + pass + + +async def run_tcp_syslog_server( + host: str, + port: int, + format_hint: str, + on_message: OnSyslogMessage, + *, + abort_event: asyncio.Event, +) -> None: + async def handle_client( + reader: asyncio.StreamReader, + writer: asyncio.StreamWriter, + ) -> None: + await _handle_tcp_client(reader, writer, format_hint, on_message) + + server = await asyncio.start_server(handle_client, host, port) + serve_task: asyncio.Task[None] | None = None + try: + serve_task = asyncio.create_task(server.serve_forever()) + await abort_event.wait() + finally: + if serve_task and not serve_task.done(): + serve_task.cancel() + try: + await serve_task + except asyncio.CancelledError: + pass + server.close() + await server.wait_closed() diff --git a/flocks/ingest/syslog/manager.py b/flocks/ingest/syslog/manager.py new file mode 100644 index 000000000..79438dd1a --- /dev/null +++ b/flocks/ingest/syslog/manager.py @@ -0,0 +1,418 @@ +"""Lifecycle manager for syslog listeners → workflow runs.""" + +from __future__ import annotations + +import asyncio +import time +from typing import Any, Dict, List + +from flocks.storage.storage import Storage +from flocks.utils.log import Log +from flocks.workflow.execution_store import ( + create_execution_record, + record_execution_result, + resolve_execution_outcome, +) +from flocks.workflow.fs_store import read_workflow_from_fs +from flocks.workflow.runner import run_workflow + +from flocks.ingest.syslog.constants import WORKFLOW_SYSLOG_CONFIG_PREFIX +from flocks.ingest.syslog.listener import run_tcp_syslog_server, run_udp_syslog_server + +log = Log.create(service="syslog.manager") + +# Maximum concurrent workflow executions per workflow to avoid FD exhaustion and SQLite write contention +_MAX_CONCURRENT_EXECUTIONS = 8 +# Maximum number of buffered syslog messages per workflow; excess messages are dropped with a warning +_MAX_QUEUE_SIZE = 200 +# Maximum time we wait for the listener to either bind successfully or fail +# during ``restart_workflow``. Any value <0.5s makes the call too aggressive +# under busy event-loops; anything >5s would make the HTTP save endpoint feel +# hung when the user makes a typo. +_BIND_WAIT_TIMEOUT_S = 3.0 + + +class SyslogManager: + """One async listener task per workflow id (when enabled). + + The listener / consumer fan-out is built around bounded primitives so a + syslog flood cannot translate into unbounded asyncio.Task growth: + + * A bounded ``asyncio.Queue`` (``_MAX_QUEUE_SIZE``) absorbs spikes; the + listener uses ``put_nowait`` and drops excess messages with a warning. + * A fixed pool of ``_MAX_CONCURRENT_EXECUTIONS`` worker coroutines drains + the queue and runs ``_trigger_workflow`` serially per worker. This is + stronger than a per-task ``Semaphore``: the previous design called + ``create_task`` for every queued message and only awaited the semaphore + *inside* the task, which let pending coroutines accumulate without + bound while the queue was emptied immediately. + """ + + def __init__(self) -> None: + self._tasks: dict[str, asyncio.Task] = {} + self._abort_events: dict[str, asyncio.Event] = {} + # Per-workflow bounded message queue for backpressure + self._queues: dict[str, asyncio.Queue] = {} + # Per-workflow fixed worker pool draining the queue + self._worker_pools: dict[str, List[asyncio.Task]] = {} + # Per-workflow listener runtime status for the syslog-status API. + # Possible state values: "binding" | "listening" | "failed" | "stopped". + self._listener_status: dict[str, Dict[str, Any]] = {} + # Per-workflow event signalled when the listener has either bound + # successfully or failed. Used by ``restart_workflow`` so the HTTP + # save endpoint can report bind failures synchronously. + self._listener_ready: dict[str, asyncio.Event] = {} + + @staticmethod + def _config_key(workflow_id: str) -> str: + return f"{WORKFLOW_SYSLOG_CONFIG_PREFIX}{workflow_id}" + + async def start_all(self) -> None: + try: + keys = await Storage.list_keys(WORKFLOW_SYSLOG_CONFIG_PREFIX) + except Exception as exc: + log.warning("syslog.list_keys_failed", {"error": str(exc)}) + return + + for key in keys: + if not key.startswith(WORKFLOW_SYSLOG_CONFIG_PREFIX): + continue + workflow_id = key[len(WORKFLOW_SYSLOG_CONFIG_PREFIX) :] + if not workflow_id: + continue + try: + data = await Storage.read(key) + except Exception as exc: + log.warning("syslog.config_read_failed", {"key": key, "error": str(exc)}) + continue + if isinstance(data, dict) and data.get("enabled"): + await self.restart_workflow(workflow_id) + + async def stop_all(self) -> None: + for workflow_id in list(self._tasks.keys()): + await self.stop_workflow(workflow_id) + + def get_listener_status(self, workflow_id: str) -> Dict[str, Any]: + """Return a snapshot of the listener runtime state for ``workflow_id``. + + Result shape:: + + {"state": "binding|listening|failed|stopped", "error": "..." | None, + "host": "...", "port": 5140, "protocol": "udp|tcp", + "queueSize": 12, "queueCapacity": 200, "workerCount": 8} + """ + status = dict(self._listener_status.get(workflow_id) or {"state": "stopped"}) + q = self._queues.get(workflow_id) + if q is not None: + status["queueSize"] = q.qsize() + status["queueCapacity"] = q.maxsize + pool = self._worker_pools.get(workflow_id) + if pool is not None: + status["workerCount"] = sum(1 for t in pool if not t.done()) + return status + + async def stop_workflow(self, workflow_id: str) -> None: + ev = self._abort_events.pop(workflow_id, None) + if ev is not None: + ev.set() + task = self._tasks.pop(workflow_id, None) + if task is not None and not task.done(): + task.cancel() + try: + await task + except asyncio.CancelledError: + pass + # Cancel all worker pool tasks; pop first so callers observing a + # stopped listener see an empty pool immediately. + pool = self._worker_pools.pop(workflow_id, None) + if pool: + for w in pool: + if not w.done(): + w.cancel() + try: + await asyncio.wait_for( + asyncio.gather(*pool, return_exceptions=True), + timeout=5.0, + ) + except (asyncio.TimeoutError, asyncio.CancelledError): + pass + self._queues.pop(workflow_id, None) + self._listener_ready.pop(workflow_id, None) + if workflow_id in self._listener_status: + self._listener_status[workflow_id] = {"state": "stopped", "error": None} + + async def restart_workflow(self, workflow_id: str) -> Dict[str, Any]: + """Restart the listener and return its post-bind runtime status. + + This call blocks until the underlying socket either binds successfully, + the bind fails (OSError such as ``EADDRINUSE``), or + ``_BIND_WAIT_TIMEOUT_S`` elapses. Callers (e.g. the HTTP + ``save_syslog_config`` route) can therefore surface bind errors to the + user instead of silently leaving the listener in a failed state. + """ + await self.stop_workflow(workflow_id) + key = self._config_key(workflow_id) + try: + data = await Storage.read(key) + except Exception as exc: + log.warning("syslog.restart_read_failed", {"workflow_id": workflow_id, "error": str(exc)}) + return {"state": "failed", "error": str(exc)} + if not isinstance(data, dict) or not data.get("enabled"): + self._listener_status[workflow_id] = {"state": "stopped", "error": None} + return {"state": "stopped", "error": None} + + # Load and cache the workflow JSON once; avoids a disk read per message + wf_data = read_workflow_from_fs(workflow_id) + if not wf_data: + err = "workflow_not_found" + self._listener_status[workflow_id] = {"state": "failed", "error": err} + log.warning("syslog.workflow_not_found_on_start", {"workflow_id": workflow_id}) + return {"state": "failed", "error": err} + workflow_json = wf_data.get("workflowJson") + if not workflow_json: + err = "workflow_json_missing" + self._listener_status[workflow_id] = {"state": "failed", "error": err} + log.warning("syslog.workflow_json_missing_on_start", {"workflow_id": workflow_id}) + return {"state": "failed", "error": err} + + queue: asyncio.Queue = asyncio.Queue(maxsize=_MAX_QUEUE_SIZE) + self._queues[workflow_id] = queue + + abort = asyncio.Event() + self._abort_events[workflow_id] = abort + + ready = asyncio.Event() + self._listener_ready[workflow_id] = ready + + host = str(data.get("host") or "0.0.0.0") + port = int(data.get("port") or 5140) + protocol = str(data.get("protocol") or "udp").lower() + self._listener_status[workflow_id] = { + "state": "binding", + "error": None, + "host": host, + "port": port, + "protocol": protocol, + } + + input_key = str(data.get("inputKey") or "syslog_message") + + # Spin up a fixed worker pool: exactly _MAX_CONCURRENT_EXECUTIONS + # coroutines drain the queue. pending tasks cannot exceed this number, + # which is the actual backpressure invariant we want. + workers: List[asyncio.Task] = [] + for i in range(_MAX_CONCURRENT_EXECUTIONS): + workers.append( + asyncio.create_task( + self._worker_loop(workflow_id, workflow_json, input_key, queue, abort), + name=f"syslog-worker-{workflow_id}-{i}", + ) + ) + self._worker_pools[workflow_id] = workers + + task = asyncio.create_task( + self._listener_loop(workflow_id, data, queue, abort, ready), + name=f"syslog-{workflow_id}", + ) + self._tasks[workflow_id] = task + + # Wait briefly for the listener to bind (or fail) so the caller can + # decide whether to surface a 502/Conflict instead of pretending the + # listener is up. + try: + await asyncio.wait_for(ready.wait(), timeout=_BIND_WAIT_TIMEOUT_S) + except asyncio.TimeoutError: + # Listener hasn't reported bind result; treat as best-effort + # "scheduled" so we don't tear it down on slow boxes, but mark the + # state explicitly so the UI can show "pending". + current = self._listener_status.get(workflow_id) or {} + if current.get("state") == "binding": + self._listener_status[workflow_id] = { + **current, + "state": "binding", + "error": "bind_pending_timeout", + } + log.warning("syslog.bind_pending_timeout", {"workflow_id": workflow_id}) + + log.info("syslog.listener_scheduled", {"workflow_id": workflow_id}) + return self.get_listener_status(workflow_id) + + async def _listener_loop( + self, + workflow_id: str, + config: Dict[str, Any], + queue: asyncio.Queue, + abort: asyncio.Event, + ready: asyncio.Event, + ) -> None: + host = str(config.get("host") or "0.0.0.0") + port = int(config.get("port") or 5140) + protocol = str(config.get("protocol") or "udp").lower() + format_hint = str(config.get("format") or "auto") + + # NOTE: keep this callback synchronous so the UDP protocol layer can + # invoke it inline from datagram_received() without creating an + # asyncio task per packet. That preserves the queue-based backpressure. + def on_msg(parsed: dict) -> None: + try: + queue.put_nowait(parsed) + except asyncio.QueueFull: + log.warning("syslog.queue_full_dropped", { + "workflow_id": workflow_id, + "queue_size": queue.qsize(), + }) + + async def _bind_and_serve() -> None: + """Bind the socket synchronously then mark the listener ready. + + ``run_udp_syslog_server`` / ``run_tcp_syslog_server`` create the + endpoint at the top of their body and then await abort; we wrap + them with a tiny helper so we can flip the ``ready`` flag + *after* the bind has succeeded. Bind failures are caught by the + outer ``try`` below and reported back as ``state="failed"``. + """ + # We rely on the underlying asyncio APIs raising OSError before + # they yield control, so wrapping the call alone is enough. We + # additionally schedule a single-shot "mark ready" task that + # runs on the next event-loop tick — by which point the bind has + # either succeeded or raised. + mark_task = asyncio.create_task(_mark_ready_after_bind()) + try: + if protocol == "tcp": + await run_tcp_syslog_server(host, port, format_hint, on_msg, abort_event=abort) + else: + await run_udp_syslog_server(host, port, format_hint, on_msg, abort_event=abort) + finally: + if not mark_task.done(): + mark_task.cancel() + + async def _mark_ready_after_bind() -> None: + # Give the bind one event-loop tick to complete (or raise) so we + # don't claim "listening" before the socket actually exists. + await asyncio.sleep(0) + if not ready.is_set(): + self._listener_status[workflow_id] = { + "state": "listening", + "error": None, + "host": host, + "port": port, + "protocol": protocol, + } + ready.set() + + try: + await _bind_and_serve() + except asyncio.CancelledError: + raise + except OSError as exc: + self._listener_status[workflow_id] = { + "state": "failed", + "error": str(exc), + "host": host, + "port": port, + "protocol": protocol, + } + ready.set() + log.error( + "syslog.bind_failed", + {"workflow_id": workflow_id, "error": str(exc), "host": host, "port": port, "protocol": protocol}, + ) + except Exception as exc: + self._listener_status[workflow_id] = { + "state": "failed", + "error": str(exc), + "host": host, + "port": port, + "protocol": protocol, + } + ready.set() + log.error("syslog.listener_error", {"workflow_id": workflow_id, "error": str(exc)}) + + async def _worker_loop( + self, + workflow_id: str, + workflow_json: Any, + input_key: str, + queue: asyncio.Queue, + abort: asyncio.Event, + ) -> None: + """One worker drains the queue serially. + + The worker pool size is the *only* concurrency knob; we deliberately + do not spawn additional asyncio.Tasks per message so the total number + of in-flight workflow runs is exactly ``_MAX_CONCURRENT_EXECUTIONS``. + """ + while not abort.is_set(): + try: + msg = await asyncio.wait_for(queue.get(), timeout=0.5) + except asyncio.TimeoutError: + continue + except asyncio.CancelledError: + return + try: + await self._trigger_workflow(workflow_id, workflow_json, msg, input_key) + except asyncio.CancelledError: + return + except Exception as exc: + log.warning( + "syslog.worker_dispatch_failed", + {"workflow_id": workflow_id, "error": str(exc)}, + ) + + async def _trigger_workflow( + self, + workflow_id: str, + workflow_json: Any, + syslog_msg: dict, + input_key: str, + ) -> None: + inputs = {input_key: syslog_msg} + + exec_data = await create_execution_record( + workflow_id, + input_params={"_trigger": "syslog", **inputs}, + ) + exec_id = exec_data["id"] + start_time = time.time() + + try: + result = await asyncio.to_thread( + run_workflow, + workflow=workflow_json, + inputs=inputs, + trace=False, + ) + status, error_msg = resolve_execution_outcome(result) + duration = time.time() - start_time + exec_data.update({ + "status": status, + "outputResults": result.outputs if isinstance(result.outputs, dict) else {}, + "finishedAt": int(time.time() * 1000), + "duration": duration, + "errorMessage": error_msg, + "executionLog": list(result.history or []), + "currentNodeId": result.last_node_id, + "currentPhase": status, + "currentStepIndex": result.steps, + }) + except Exception as exc: + duration = time.time() - start_time + log.error( + "syslog.workflow_run_failed", + {"workflow_id": workflow_id, "exec_id": exec_id, "error": str(exc)}, + ) + exec_data.update({ + "status": "error", + "errorMessage": str(exc), + "finishedAt": int(time.time() * 1000), + "duration": duration, + "currentPhase": "error", + }) + finally: + try: + await record_execution_result(workflow_id, exec_id, exec_data) + except Exception as exc: + log.warning("syslog.exec_record_failed", {"exec_id": exec_id, "error": str(exc)}) + + +default_manager = SyslogManager() diff --git a/flocks/ingest/syslog/parser.py b/flocks/ingest/syslog/parser.py new file mode 100644 index 000000000..270891350 --- /dev/null +++ b/flocks/ingest/syslog/parser.py @@ -0,0 +1,229 @@ +"""Parse syslog lines (RFC 5424 and BSD / RFC 3164 style) without external deps.""" + +from __future__ import annotations + +import re +from datetime import datetime +from typing import Any, Dict, Optional + +_PRI_RE = re.compile(r"^<(\d{1,3})>") +# After stripping PRI: MMM DD hh:mm:ss hostname tag: msg +_RFC3164_REST_RE = re.compile( + r"^([A-Za-z]{3}\s+\d{1,2}\s+\d{2}:\d{2}:\d{2})\s+(\S+)\s*(.*)$", + re.DOTALL, +) +# Non-standard but common: ISO_TS HOSTNAME APP[PID]: msg (no RFC5424 version) +_ISO3164_REST_RE = re.compile( + r"^(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}(?:[+-]\d{2}:\d{2}|Z)?)" # ISO timestamp + r"\s+(\S+)" # hostname + r"\s+(\S+?)\s*:\s*" # app_name/tag: + r"([\s\S]*)$", # message + re.DOTALL, +) + + +def _pri_parts(pri: int) -> tuple[int, int]: + facility = pri >> 3 + severity = pri & 7 + return facility, severity + + +def _normalize_ts(ts: Optional[str]) -> str: + if not ts: + return "" + ts = ts.strip() + # RFC5424 full-date + if "T" in ts: + try: + # Zulu + if ts.endswith("Z"): + return datetime.fromisoformat(ts.replace("Z", "+00:00")).isoformat() + return datetime.fromisoformat(ts).isoformat() + except ValueError: + return ts + # RFC3164: Oct 11 22:14:15 (no year — use current year best-effort) + try: + now = datetime.now() + dt = datetime.strptime(f"{now.year} {ts}", "%Y %b %d %H:%M:%S") + return dt.isoformat() + except ValueError: + return ts + + +def parse_syslog(raw: str, format_hint: str = "auto") -> Dict[str, Any]: + """ + Parse one syslog payload into a dict suitable for workflow inputs. + + format_hint: "auto" | "rfc3164" | "rfc5424" + """ + text = raw.decode("utf-8", errors="replace") if isinstance(raw, (bytes, bytearray)) else raw + text = text.strip() + if not text: + return { + "raw": text, + "facility": 0, + "severity": 0, + "timestamp": "", + "hostname": "", + "app_name": "", + "message": "", + "format": "empty", + } + + m_pri = _PRI_RE.match(text) + if not m_pri: + return { + "raw": text, + "facility": 0, + "severity": 0, + "timestamp": "", + "hostname": "", + "app_name": "", + "message": text, + "format": "unparsed", + } + + pri = int(m_pri.group(1)) + facility, severity = _pri_parts(pri) + rest = text[m_pri.end() :] + + if format_hint == "rfc3164": + return _parse_rfc3164(rest, raw=text, facility=facility, severity=severity) + if format_hint == "rfc5424": + return _parse_rfc5424(rest, raw=text, facility=facility, severity=severity) + + # auto: RFC5424 if second token is a single digit version number + if rest and rest[0].isdigit(): + first_space = rest.find(" ") + if first_space > 0 and rest[:first_space].isdigit(): + return _parse_rfc5424(rest, raw=text, facility=facility, severity=severity) + # Non-standard: ISO_TS HOSTNAME APP[PID]: msg (no version number) + if first_space > 0 and "T" in rest[:first_space]: + m_iso = _ISO3164_REST_RE.match(rest) + if m_iso: + return _parse_iso3164(m_iso, raw=text, facility=facility, severity=severity) + + return _parse_rfc3164(rest, raw=text, facility=facility, severity=severity) + + +def _next_rfc5424_token(s: str) -> tuple[str, str]: + """Pop one syslog field from *s*; structured data may start with '['.""" + s = s.lstrip() + if not s: + return "", "" + if s[0] == "[": + depth = 0 + for j, c in enumerate(s): + if c == "[": + depth += 1 + elif c == "]": + depth -= 1 + if depth == 0: + return s[: j + 1], s[j + 1 :].lstrip() + return s, "" + sp = s.find(" ") + if sp == -1: + return s, "" + return s[:sp], s[sp + 1 :].lstrip() + + +def _parse_rfc5424( + rest: str, + *, + raw: str, + facility: int, + severity: int, +) -> Dict[str, Any]: + s = rest.lstrip() + if not s: + return _parse_rfc3164(rest, raw=raw, facility=facility, severity=severity) + + i = 0 + while i < len(s) and s[i].isdigit(): + i += 1 + version = s[:i].strip() + s = s[i:].lstrip() + if not version.isdigit(): + return _parse_rfc3164(rest, raw=raw, facility=facility, severity=severity) + + ts, s = _next_rfc5424_token(s) + hostname, s = _next_rfc5424_token(s) + app_name, s = _next_rfc5424_token(s) + _procid, s = _next_rfc5424_token(s) + _msgid, s = _next_rfc5424_token(s) + _sdata, s = _next_rfc5424_token(s) + msg = s.strip() + + return { + "raw": raw, + "facility": facility, + "severity": severity, + "timestamp": _normalize_ts(ts), + "hostname": hostname if hostname != "-" else "", + "app_name": app_name if app_name != "-" else "", + "message": msg, + "format": "rfc5424", + } + + +def _parse_iso3164( + m: "re.Match[str]", + *, + raw: str, + facility: int, + severity: int, +) -> Dict[str, Any]: + """Handle non-standard ISO_TS HOSTNAME APP[PID]: msg (no RFC5424 version).""" + return { + "raw": raw, + "facility": facility, + "severity": severity, + "timestamp": _normalize_ts(m.group(1)), + "hostname": m.group(2), + "app_name": m.group(3), + "message": m.group(4).strip(), + "format": "iso3164", + } + + +def _parse_rfc3164( + rest: str, + *, + raw: str, + facility: int, + severity: int, +) -> Dict[str, Any]: + m = _RFC3164_REST_RE.match(rest.strip()) + if m: + ts = m.group(1) + hostname = m.group(2) + remainder = (m.group(3) or "").strip() + app_name = "" + message = remainder + # TAG: message (tag is alphanumeric, often "sshd" or "su") + if remainder and ":" in remainder: + tag, _, body = remainder.partition(":") + if tag and " " not in tag and tag.isprintable(): + app_name = tag.strip() + message = body.strip() + return { + "raw": raw, + "facility": facility, + "severity": severity, + "timestamp": _normalize_ts(ts), + "hostname": hostname, + "app_name": app_name, + "message": message, + "format": "rfc3164", + } + + return { + "raw": raw, + "facility": facility, + "severity": severity, + "timestamp": "", + "hostname": "", + "app_name": "", + "message": rest.strip(), + "format": "rfc3164", + } diff --git a/flocks/mcp/server.py b/flocks/mcp/server.py index 65b511616..059ef372b 100644 --- a/flocks/mcp/server.py +++ b/flocks/mcp/server.py @@ -60,7 +60,11 @@ async def init(self) -> None: Servers that fail to connect will be retried in the background with exponential backoff. """ if self._initialized: - log.warn("mcp.already_initialized") + # ``MCP.init`` is invoked from both the global server lifespan and + # the per-instance bootstrap on startup. The guard above keeps the + # call idempotent, so this is informational only and should not + # surface as a warning in operational logs. + log.debug("mcp.already_initialized") return log.info("mcp.initializing") diff --git a/flocks/provider/provider.py b/flocks/provider/provider.py index f83ad7fe5..4537ca1a5 100644 --- a/flocks/provider/provider.py +++ b/flocks/provider/provider.py @@ -218,7 +218,7 @@ def _ensure_initialized(cls): ("github-copilot", "flocks.provider.sdk.github_copilot", "GitHubCopilotProvider"), ("github-copilot-enterprise", "flocks.provider.sdk.github_copilot", "GitHubCopilotEnterpriseProvider"), ("vercel", "flocks.provider.sdk.vercel", "VercelProvider"), - ("opencode", "flocks.provider.sdk.opencode", "FlocksCompatProvider"), + ("opencode", "flocks.provider.sdk.opencode", "OpenCodeProvider"), ("sap-ai-core", "flocks.provider.sdk.sap_ai_core", "SAPAICoreProvider"), ("cloudflare-ai-gateway", "flocks.provider.sdk.cloudflare_gateway", "CloudflareGatewayProvider"), # Added in Batch 7 - Final providers diff --git a/flocks/server/app.py b/flocks/server/app.py index 7387c9b21..f7dc8fb32 100644 --- a/flocks/server/app.py +++ b/flocks/server/app.py @@ -189,8 +189,12 @@ async def lifespan(app: FastAPI): # but we still skip loading users when the marker is already set # to avoid unnecessary DB + session scans on every startup. async def _migrate_legacy_sessions_to_admin() -> None: - marker = await Storage.get("auth:migration:legacy_session_owner_to_admin", dict) - if marker and marker.get("done"): + # ``Storage.get`` interprets a non-``None`` ``model`` argument as a + # Pydantic model and calls ``model_validate_json``. Passing the + # builtin ``dict`` type therefore raised ``AttributeError``; omit the + # model so the value is decoded with ``json.loads``. + marker = await Storage.get("auth:migration:legacy_session_owner_to_admin") + if isinstance(marker, dict) and marker.get("done"): return if not await AuthService.has_users(): return @@ -218,7 +222,11 @@ async def _migrate_legacy_sessions_to_admin() -> None: # Register built-in hooks if memory is enabled try: config = await Config.get() - if config.memory.enabled: + # ``config.memory`` may be ``None`` when the memory system is not + # configured at all; in that case there is nothing to register. + memory_cfg = getattr(config, "memory", None) + memory_enabled = bool(getattr(memory_cfg, "enabled", False)) if memory_cfg else False + if memory_enabled: from flocks.hooks.builtin import register_builtin_hooks await _run_startup_phase( log, @@ -374,6 +382,27 @@ async def _start_channel_gateway() -> None: except Exception as e: log.warning("channel.gateway.start_failed", {"error": str(e)}) + # Start syslog listeners for workflows with syslog enabled. + # Use a background task with a short delay so the main startup path is not + # blocked and to break the crash-restart loop where an immediate syslog + # flood would bring the server back down before it is fully ready. + try: + from flocks.ingest.syslog.manager import default_manager as default_syslog_manager + + async def _delayed_syslog_start() -> None: + # Wait for storage and tool registry to be fully initialised before + # resuming syslog listeners. + await asyncio.sleep(3) + try: + await default_syslog_manager.start_all() + log.info("syslog.manager.started") + except Exception as exc: + log.warning("syslog.manager.start_failed", {"error": str(exc)}) + + _schedule_startup_phase(app, log, "syslog.manager.start", _delayed_syslog_start) + except Exception as e: + log.warning("syslog.manager.start_failed", {"error": str(e)}) + try: from flocks.updater.updater import recover_upgrade_state @@ -437,6 +466,15 @@ async def _start_channel_gateway() -> None: except Exception as e: log.warning("channel.gateway.stop_failed", {"error": str(e)}) + # Stop syslog listeners + try: + from flocks.ingest.syslog.manager import default_manager as default_syslog_manager + + await default_syslog_manager.stop_all() + log.info("syslog.manager.stopped") + except Exception as e: + log.warning("syslog.manager.stop_failed", {"error": str(e)}) + # Stop Task Center try: from flocks.task.manager import TaskManager diff --git a/flocks/server/routes/workflow.py b/flocks/server/routes/workflow.py index d35d7cfe1..8f77ee3f7 100644 --- a/flocks/server/routes/workflow.py +++ b/flocks/server/routes/workflow.py @@ -44,6 +44,7 @@ read_workflow_from_fs as shared_read_workflow_from_fs, workflow_scan_dirs as _all_scan_dirs, ) +from flocks.ingest.syslog.constants import WORKFLOW_SYSLOG_CONFIG_PREFIX from flocks.workflow.execution_store import ( create_execution_record, normalize_execution_status as _normalize_execution_status, @@ -444,6 +445,10 @@ def _workflow_stats_key(workflow_id: str) -> str: return f"workflow/{workflow_id}/stats" +def _syslog_config_key(workflow_id: str) -> str: + return f"{WORKFLOW_SYSLOG_CONFIG_PREFIX}{workflow_id}" + + async def _run_workflow_execution_task( *, workflow_id: str, @@ -460,10 +465,24 @@ async def _run_workflow_execution_task( loop = asyncio.get_running_loop() def _write_progress(update_fields: Dict[str, Any]) -> None: + # Called from the workflow-engine worker thread on every step + # start/complete. Step events for a single execution are issued + # serially by the engine, so no extra lock is needed beyond the + # caller's invariant — but we must still tolerate transient + # ``Storage.read`` failures (e.g. SQLite contention) without + # corrupting ``current`` with a non-dict result. try: - current = asyncio.run_coroutine_threadsafe(Storage.read(exec_key), loop).result(timeout=5) + current = asyncio.run_coroutine_threadsafe( + Storage.read(exec_key), loop + ).result(timeout=5) + if not isinstance(current, dict): + # Execution record was trimmed mid-run or never persisted; + # rebuild a minimal payload so the write still goes through. + current = {"id": exec_id, "workflowId": workflow_id} current.update(update_fields) - asyncio.run_coroutine_threadsafe(Storage.write(exec_key, current), loop).result(timeout=5) + asyncio.run_coroutine_threadsafe( + Storage.write(exec_key, current), loop + ).result(timeout=5) except Exception as exc: log.warning("workflow.step_progress.write_failed", { "exec_id": exec_id, @@ -505,6 +524,11 @@ def _on_step_complete(step_result) -> None: duration = time.time() - start_time current_data = await Storage.read(exec_key) + if not isinstance(current_data, dict): + # Defensive: the execution record could be missing if it was + # trimmed/cleaned up mid-run. Rebuild a baseline so the final + # status write still succeeds rather than blowing up. + current_data = {"id": exec_id, "workflowId": workflow_id} status_value, error_message = _resolve_execution_outcome(result) current_data.update({ "outputResults": result.outputs, @@ -519,11 +543,6 @@ def _on_step_complete(step_result) -> None: "currentStepIndex": result.steps, }) - if status_value == "success": - await _update_workflow_stats(workflow_id, True, duration) - elif status_value in {"error", "timeout"}: - await _update_workflow_stats(workflow_id, False, duration) - await _record_execution_result(workflow_id, exec_id, current_data) log.info("workflow.executed", { "id": workflow_id, @@ -534,6 +553,8 @@ def _on_step_complete(step_result) -> None: except Exception as exc: duration = time.time() - start_time current_data = await Storage.read(exec_key) + if not isinstance(current_data, dict): + current_data = {"id": exec_id, "workflowId": workflow_id} current_data.update({ "status": "cancelled" if cancel_event.is_set() else "error", "finishedAt": int(time.time() * 1000), @@ -542,8 +563,6 @@ def _on_step_complete(step_result) -> None: "executionLog": list(step_history), "currentPhase": "cancelled" if cancel_event.is_set() else "error", }) - if current_data["status"] == "error": - await _update_workflow_stats(workflow_id, False, duration) await _record_execution_result(workflow_id, exec_id, current_data) log.error("workflow.execute.error", { "id": workflow_id, @@ -584,18 +603,6 @@ async def _get_workflow_stats(workflow_id: str) -> Dict[str, Any]: return dict(_DEFAULT_STATS) -async def _update_workflow_stats(workflow_id: str, success: bool, duration: float) -> None: - """Update workflow statistics""" - stats = await _get_workflow_stats(workflow_id) - stats["callCount"] += 1 - if success: - stats["successCount"] += 1 - else: - stats["errorCount"] += 1 - stats["totalRuntime"] += duration - await Storage.write(_workflow_stats_key(workflow_id), stats) - - # ============================================================================= # API Endpoints - Workflow CRUD # ============================================================================= @@ -808,6 +815,17 @@ async def delete_workflow(workflow_id: str): except Exception: pass + try: + from flocks.ingest.syslog.manager import default_manager as _syslog_default_manager + + await _syslog_default_manager.stop_workflow(workflow_id) + except Exception: + pass + try: + await Storage.remove(_syslog_config_key(workflow_id)) + except Storage.NotFoundError: + pass + log.info("workflow.deleted", {"id": workflow_id}) await publish_event("workflow.deleted", {"id": workflow_id}) return None @@ -858,13 +876,21 @@ async def run_workflow_endpoint(workflow_id: str, req: WorkflowRunRequest): exec_id=exec_id, cancel_event=cancel_event, tool_context=tool_context, - ) + ), + name=f"workflow-run-{exec_id}", ) _active_workflow_executions[exec_id] = ActiveWorkflowExecution( workflow_id=workflow_id, task=task, cancel_event=cancel_event, ) + # Guarantee cleanup of the registry entry even when the task is + # cancelled or fails before reaching its own ``finally`` block (e.g. + # if the event loop is shutting down). This prevents the ``Active*`` + # map from growing forever when tasks are abandoned. + def _cleanup_active(_t: asyncio.Task, _eid: str = exec_id) -> None: + _active_workflow_executions.pop(_eid, None) + task.add_done_callback(_cleanup_active) log.info("workflow.execution.started", { "id": workflow_id, @@ -1023,21 +1049,55 @@ async def workflow_center_stop(workflow_id: str): @router.post("/workflow-center/{workflow_id}/invoke") async def workflow_center_invoke(workflow_id: str, req: WorkflowCenterInvokeRequest): - """Proxy invoke request to active published workflow service.""" + """Proxy invoke request to active published workflow service. + + Also records execution stats (callCount / successCount / errorCount) so + that the UI invocation counter is updated for every published-service call, + not just agent-driven /run calls. + """ + started = time.time() + exec_data = await create_execution_record( + workflow_id, + input_params=req.inputs or {}, + ) + exec_id = str(exec_data["id"]) try: - return await invoke_published_workflow( + result = await invoke_published_workflow( workflow_id, inputs=req.inputs, timeout_s=req.timeout_s, request_id=req.request_id, ) - except WorkflowNotFoundError as e: - raise HTTPException(status_code=404, detail=str(e)) - except WorkflowNotPublishedError as e: + duration = time.time() - started + raw_status = result.get("status", "SUCCEEDED") if isinstance(result, dict) else "SUCCEEDED" + status_value = _normalize_execution_status(raw_status) + success = status_value == "success" + exec_data.update({ + "outputResults": result.get("outputs", {}) if isinstance(result, dict) else {}, + "status": status_value, + "finishedAt": int(time.time() * 1000), + "duration": duration, + "currentPhase": status_value, + }) + await _record_execution_result(workflow_id, exec_id, exec_data) + return result + except (WorkflowNotFoundError, WorkflowNotPublishedError) as e: + duration = time.time() - started + exec_data.update({"status": "error", "finishedAt": int(time.time() * 1000), + "duration": duration, "errorMessage": str(e)}) + await _record_execution_result(workflow_id, exec_id, exec_data) raise HTTPException(status_code=404, detail=str(e)) except WorkflowCenterError as e: + duration = time.time() - started + exec_data.update({"status": "error", "finishedAt": int(time.time() * 1000), + "duration": duration, "errorMessage": str(e)}) + await _record_execution_result(workflow_id, exec_id, exec_data) raise HTTPException(status_code=400, detail=str(e)) except Exception as e: + duration = time.time() - started + exec_data.update({"status": "error", "finishedAt": int(time.time() * 1000), + "duration": duration, "errorMessage": str(e)}) + await _record_execution_result(workflow_id, exec_id, exec_data) log.error("workflow.center.invoke.error", {"workflow_id": workflow_id, "error": str(e)}) raise HTTPException(status_code=500, detail=f"Failed to invoke workflow service: {str(e)}") @@ -1082,29 +1142,31 @@ async def get_workflow_history( ): """ Get workflow execution history - + Returns list of recent executions for this workflow. """ try: if not _read_workflow_from_fs(workflow_id): raise HTTPException(status_code=404, detail=f"Workflow not found: {workflow_id}") - all_exec_keys = await Storage.list("workflow_execution/") + # 单次查询批量读取所有 execution 记录,避免 N 次单独 read 导致超长耗时 + all_entries = await Storage.list_entries("workflow_execution/") executions = [] - - for key in all_exec_keys: + for _key, exec_data in all_entries: try: - exec_data = await Storage.read(key) - if exec_data.get("workflowId") == workflow_id: - executions.append(WorkflowExecutionResponse(**exec_data)) + if not isinstance(exec_data, dict): + continue + if exec_data.get("workflowId") != workflow_id: + continue + executions.append(WorkflowExecutionResponse(**exec_data)) except Exception as e: - log.warning("workflow.history.skip", {"key": key, "error": str(e)}) + log.warning("workflow.history.skip", {"key": _key, "error": str(e)}) continue - + # Sort by start time (newest first) and limit executions.sort(key=lambda e: e.startedAt, reverse=True) executions = executions[:limit] - + log.info("workflow.history", {"id": workflow_id, "count": len(executions)}) return executions except HTTPException: @@ -1344,6 +1406,19 @@ class KafkaConfigRequest(BaseModel): outputTopic: Optional[str] = None +class SyslogConfigRequest(BaseModel): + """Per-workflow syslog listener configuration (experimental).""" + + model_config = ConfigDict(populate_by_name=True) + + enabled: bool = False + protocol: str = "udp" + host: str = "0.0.0.0" + port: int = 5140 + msg_format: str = Field("auto", alias="format") + input_key: str = Field("syslog_message", alias="inputKey") + + @router.post("/workflow/{workflow_id}/publish") async def publish_workflow_as_api(workflow_id: str): """ @@ -1518,6 +1593,81 @@ async def get_kafka_config(workflow_id: str): raise HTTPException(status_code=500, detail=f"Failed to get Kafka config: {str(e)}") +@router.post("/workflow/{workflow_id}/syslog-config") +async def save_syslog_config(workflow_id: str, req: SyslogConfigRequest): + """ + Save syslog listener configuration for a workflow. + + When ``enabled`` is true, this also (re)starts the UDP/TCP listener and + blocks until the underlying socket has either bound successfully or the + bind has failed (e.g. ``EADDRINUSE``, invalid host). Bind failures are + surfaced as ``409 Conflict`` so the UI can show an actionable error + instead of falsely claiming "Listening". + """ + try: + if not _read_workflow_from_fs(workflow_id): + raise HTTPException(status_code=404, detail=f"Workflow not found: {workflow_id}") + + config = { + "workflowId": workflow_id, + "enabled": req.enabled, + "protocol": req.protocol, + "host": req.host, + "port": req.port, + "format": req.msg_format, + "inputKey": req.input_key, + "updatedAt": int(time.time() * 1000), + } + await Storage.write(_syslog_config_key(workflow_id), config) + + from flocks.ingest.syslog.manager import default_manager as _syslog_default_manager + + status = await _syslog_default_manager.restart_workflow(workflow_id) + state = (status or {}).get("state") + if req.enabled and state == "failed": + err = (status or {}).get("error") or "listener_bind_failed" + raise HTTPException( + status_code=409, + detail=f"Syslog listener failed to bind: {err}", + ) + return {"ok": True, "listener": status} + except HTTPException: + raise + except Exception as e: + log.error("workflow.syslog_config.save.error", {"id": workflow_id, "error": str(e)}) + raise HTTPException(status_code=500, detail=f"Failed to save syslog config: {str(e)}") + + +@router.get("/workflow/{workflow_id}/syslog-config") +async def get_syslog_config(workflow_id: str): + """Get saved syslog configuration for a workflow.""" + try: + config = await Storage.read(_syslog_config_key(workflow_id)) + return config + except Exception as e: + log.error("workflow.syslog_config.get.error", {"id": workflow_id, "error": str(e)}) + raise HTTPException(status_code=500, detail=f"Failed to get syslog config: {str(e)}") + + +@router.get("/workflow/{workflow_id}/syslog-status") +async def get_syslog_status(workflow_id: str): + """Return the *runtime* status of the syslog listener for a workflow. + + This reflects the actual bind state (binding/listening/failed/stopped) and + queue depth, so the UI can show whether a saved-but-not-yet-bound listener + is actually running. The persisted config (``/syslog-config``) only + captures *intent*, which is why the UI must consult this endpoint to + truthfully render "Listening". + """ + try: + from flocks.ingest.syslog.manager import default_manager as _syslog_default_manager + + return _syslog_default_manager.get_listener_status(workflow_id) + except Exception as e: + log.error("workflow.syslog_status.get.error", {"id": workflow_id, "error": str(e)}) + raise HTTPException(status_code=500, detail=f"Failed to get syslog status: {str(e)}") + + # ============================================================================= # API Endpoints - Run Single Node # ============================================================================= diff --git a/flocks/skill/skill.py b/flocks/skill/skill.py index 16242d3fc..003682d3d 100644 --- a/flocks/skill/skill.py +++ b/flocks/skill/skill.py @@ -447,6 +447,20 @@ def stop_watcher(cls) -> None: cls._watcher = None +def _skill_event_should_reload(event: object) -> bool: + """Return True if a watchdog event affects a ``SKILL.md`` file. + + Atomic-save flows rename a temp file onto the real ``SKILL.md``; we have + to consult both ``src_path`` and ``dest_path`` so the watcher reloads on + those renames as well. + """ + for attr in ("src_path", "dest_path"): + path = getattr(event, attr, "") or "" + if path.endswith("SKILL.md"): + return True + return False + + class SkillFileWatcher: """ Watches skill directories for SKILL.md changes and auto-invalidates @@ -482,12 +496,19 @@ def start(self) -> None: watcher = self + # Only react to actual content-mutation events. watchdog emits + # ``opened``/``closed``/``closed_no_write`` events whenever any code + # (including the skill loader itself) reads ``SKILL.md`` files, which + # would otherwise cause a self-sustaining cache-invalidation loop. + _RELOAD_EVENT_TYPES = frozenset({"modified", "created", "deleted", "moved"}) + class _Handler(FileSystemEventHandler): def on_any_event(self, event: FileSystemEvent): if event.is_directory: return - src = getattr(event, "src_path", "") or "" - if src.endswith("SKILL.md"): + if getattr(event, "event_type", "") not in _RELOAD_EVENT_TYPES: + return + if _skill_event_should_reload(event): watcher._schedule_clear() handler = _Handler() diff --git a/flocks/storage/storage.py b/flocks/storage/storage.py index 8573a1864..734883e06 100644 --- a/flocks/storage/storage.py +++ b/flocks/storage/storage.py @@ -447,10 +447,12 @@ async def get(cls, key: str, model: Optional[Type[T]] = None) -> Optional[T | An value_str, value_type = row - if model is not None: + if model is not None and hasattr(model, "model_validate_json"): return model.model_validate_json(value_str) - else: - return json.loads(value_str) + # Fall back to a plain JSON decode when no Pydantic model is supplied + # (or when callers accidentally pass a builtin container type such as + # ``dict``/``list``, which is not a Pydantic model). + return json.loads(value_str) @classmethod async def delete(cls, key: str) -> bool: @@ -538,7 +540,7 @@ async def list_entries( entries: List[Tuple[str, T | Any]] = [] for key, value_str in rows: - if model is not None: + if model is not None and hasattr(model, "model_validate_json"): value = model.model_validate_json(value_str) else: value = json.loads(value_str) diff --git a/flocks/tool/registry.py b/flocks/tool/registry.py index 423035043..c41f3b22c 100644 --- a/flocks/tool/registry.py +++ b/flocks/tool/registry.py @@ -1095,8 +1095,21 @@ def _consume_tools(items: list, source: str) -> None: for spec in items: # YAML factory produces Tool instances directly if isinstance(spec, Tool): - if spec.info.name in cls._tools: - log.warn("plugin.tool.duplicate", {"source": source, "name": spec.info.name}) + existing = cls._tools.get(spec.info.name) + if existing is not None: + # ``PluginLoader.load_all()`` is invoked by multiple + # subsystems (ToolRegistry, Agent registry, etc.). A + # re-scan that re-encounters the same plugin file is + # idempotent and should not produce a noisy warning; + # only flag genuine name collisions from a different + # source. + existing_source = getattr(existing.info, "source", None) + if existing_source not in (None, "plugin_yaml", "plugin_py"): + log.warn("plugin.tool.duplicate", { + "source": source, + "name": spec.info.name, + "existing_source": existing_source, + }) continue if spec.info.source is None: spec.info.source = "plugin_yaml" @@ -1115,8 +1128,18 @@ def _consume_tools(items: list, source: str) -> None: "spec_keys": list(spec.keys()), }) continue - if name in cls._tools: - log.warn("plugin.tool.duplicate", {"source": source, "name": name}) + existing = cls._tools.get(name) + if existing is not None: + # Idempotent re-scan: same plugin source discovered again + # via another ``PluginLoader.load_all()`` pass. Only warn + # on genuine cross-source collisions. + existing_source = getattr(existing.info, "source", None) + if existing_source not in (None, "plugin_yaml", "plugin_py"): + log.warn("plugin.tool.duplicate", { + "source": source, + "name": name, + "existing_source": existing_source, + }) continue if isinstance(handler, str): @@ -1472,6 +1495,42 @@ def _register_dynamic_tools(cls) -> None: # --------------------------------------------------------------------------- +def _tool_event_should_reload(event: object) -> bool: + """Return True if a watchdog filesystem event should trigger a plugin reload. + + Atomic-save editors (vim, VS Code "useAtomicSave", many GUI tools, …) + persist edits by writing a sibling temp file then ``rename`` ing it onto + the real target. watchdog surfaces this as a ``moved`` event whose + ``src_path`` is the throwaway temp filename and whose ``dest_path`` is the + real ``tool.yaml`` / ``*.py``. Filtering only by ``src_path`` (the + pre-fix behaviour) misses the real edit entirely, so we have to inspect + both endpoints. + + Exposed at module scope so it can be unit-tested without spinning up + ``watchdog.observers.Observer`` against a temp directory. + """ + candidate_paths: List[str] = [] + src = getattr(event, "src_path", "") or "" + if src: + candidate_paths.append(src) + dest = getattr(event, "dest_path", "") or "" + if dest: + candidate_paths.append(dest) + if not candidate_paths: + return False + + for path in candidate_paths: + if not (path.endswith(".yaml") or path.endswith(".py")): + continue + fname = os.path.basename(path) + # Ignore Python bytecode / temp / hidden files that get touched + # during normal imports but never carry plugin definitions. + if fname.startswith(".") or fname.startswith("_") or "/__pycache__/" in path: + continue + return True + return False + + class ToolFileWatcher: """Watch plugin tool directories and auto-reload plugin tools on change. @@ -1523,13 +1582,23 @@ def start(self) -> None: watcher = self + # Only react to events that change file CONTENT. watchdog also emits + # ``opened``/``closed``/``closed_no_write`` events whenever any process + # (including this one) reads a YAML/Python file, and ``refresh_plugin_tools`` + # itself opens every plugin tool file on every reload. Listening to those + # access events creates an infinite reload feedback loop where the watcher + # endlessly re-triggers itself every ~debounce-window seconds. + _RELOAD_EVENT_TYPES = frozenset({"modified", "created", "deleted", "moved"}) + class _Handler(FileSystemEventHandler): def on_any_event(self, event: FileSystemEvent) -> None: if event.is_directory: return - src = getattr(event, "src_path", "") or "" - if src.endswith(".yaml") or src.endswith(".py"): - watcher._schedule_refresh() + if getattr(event, "event_type", "") not in _RELOAD_EVENT_TYPES: + return + if not _tool_event_should_reload(event): + return + watcher._schedule_refresh() handler = _Handler() observer = Observer() diff --git a/flocks/tool/task/run_workflow.py b/flocks/tool/task/run_workflow.py index 433d6cb2b..542115c40 100644 --- a/flocks/tool/task/run_workflow.py +++ b/flocks/tool/task/run_workflow.py @@ -353,16 +353,39 @@ async def run_workflow_tool( workflow_source: Union[Dict[str, Any], Path] if isinstance(workflow, str): raw = workflow.strip() - # If it's a JSON string, try to parse it. + # Try to parse as JSON first (handles JSON-encoded dicts or strings). + parsed = None try: - workflow_source = json.loads(raw) + parsed = json.loads(raw) except json.JSONDecodeError: + pass + + if isinstance(parsed, dict): + # Valid workflow JSON object. + workflow_source = parsed + elif isinstance(parsed, str): + # json.loads decoded a JSON-encoded string, e.g. the AI double-encoded the + # path: workflow='"/path/to/workflow.json"' → parsed='/path/to/workflow.json'. + # Use the decoded string (no surrounding quotes) as the file path. + p = Path(parsed).expanduser() + if p.exists() and p.is_file(): + workflow_source = p + else: + return ToolResult( + success=False, + error=( + f"Workflow file not found: {parsed!r}. " + "Provide a valid workflow JSON file path or a workflow dict." + ) + ) + elif parsed is None: + # json.loads raised JSONDecodeError — raw is not JSON. + # First try to resolve as a registered workflow ID, then fall back to file path. existing_workflow = read_workflow_from_fs(raw) if existing_workflow is not None: workflow_source = existing_workflow["workflowJson"] raw = existing_workflow["id"] else: - # Otherwise treat it as a file path. p = Path(raw).expanduser() if p.exists() and p.is_file(): workflow_source = p @@ -374,6 +397,15 @@ async def run_workflow_tool( "or a valid workflow JSON file path." ) ) + else: + # json.loads returned list / int / bool — not a valid workflow parameter. + return ToolResult( + success=False, + error=( + f"Invalid workflow parameter: expected a workflow dict or a file path string, " + f"got JSON-decoded {type(parsed).__name__} ({parsed!r})." + ) + ) elif isinstance(workflow, dict): workflow_source = workflow else: @@ -382,12 +414,25 @@ async def run_workflow_tool( error=f"workflow must be a dictionary or string, got {type(workflow).__name__}" ) + # Sanity-check dict workflows: must have at least a `start` field so we + # surface a clear error instead of a confusing Pydantic validation message. + if isinstance(workflow_source, dict) and "start" not in workflow_source: + return ToolResult( + success=False, + error=( + "Invalid workflow definition: the `start` field is required. " + "Make sure you pass the workflow JSON (with `start`, `nodes`, `edges`) " + "as the `workflow` parameter, not the execution inputs." + ) + ) + # Request permission (workflow execution can run arbitrary code) if isinstance(workflow_source, dict): workflow_name = workflow_source.get("name", "unnamed workflow") # Use id if available, otherwise use name or generate a fallback workflow_id = workflow_source.get("id") or workflow_source.get("name") or "unknown" else: + # workflow_source is a Path object here; Path.name gives the filename. workflow_name = workflow_source.name workflow_id = str(workflow_source) diff --git a/flocks/workflow/execution_store.py b/flocks/workflow/execution_store.py index 04ffdc610..cf8bc3091 100644 --- a/flocks/workflow/execution_store.py +++ b/flocks/workflow/execution_store.py @@ -2,14 +2,86 @@ from __future__ import annotations +import asyncio import time import uuid from typing import Any, Dict, Optional from flocks.session.recorder import Recorder from flocks.storage.storage import Storage +from flocks.utils.log import Log from flocks.workflow.runner import RunWorkflowResult +log = Log.create(service="workflow.execution_store") + +# Maximum number of execution history records retained per workflow. +# Older records are pruned automatically to prevent a syslog flood from bloating Storage. +_MAX_EXECUTION_HISTORY_PER_WORKFLOW = 500 +# Trim is an O(N) scan over all workflow_execution rows; only run it every Nth +# call per workflow to amortise the cost under high syslog throughput. +_TRIM_CHECK_INTERVAL = 50 +_trim_counters: Dict[str, int] = {} + +# Per-workflow lock to serialize read-modify-write of stats. Concurrent +# executions of the same workflow (e.g. syslog-triggered runs with +# semaphore=8) would otherwise race on ``Storage.read → mutate → write`` +# and silently lose counter increments. +_stats_locks: Dict[str, asyncio.Lock] = {} + + +def _get_stats_lock(workflow_id: str) -> asyncio.Lock: + lock = _stats_locks.get(workflow_id) + if lock is None: + lock = asyncio.Lock() + _stats_locks[workflow_id] = lock + return lock + + +def _workflow_stats_key(workflow_id: str) -> str: + return f"workflow/{workflow_id}/stats" + + +_DEFAULT_STATS: Dict[str, Any] = { + "callCount": 0, + "successCount": 0, + "errorCount": 0, + "totalRuntime": 0.0, + "avgRuntime": 0.0, + "thumbsUp": 0, + "thumbsDown": 0, +} + + +async def _update_workflow_stats(workflow_id: str, success: bool, duration: float) -> None: + """Increment workflow call/success/error counters and update avgRuntime. + + Serialised per workflow to keep concurrent updates from clobbering each + other (read → mutate → write race). + """ + lock = _get_stats_lock(workflow_id) + async with lock: + try: + key = _workflow_stats_key(workflow_id) + try: + stats: Dict[str, Any] = await Storage.read(key) or dict(_DEFAULT_STATS) + except Exception: + stats = dict(_DEFAULT_STATS) + stats["callCount"] = stats.get("callCount", 0) + 1 + if success: + stats["successCount"] = stats.get("successCount", 0) + 1 + else: + stats["errorCount"] = stats.get("errorCount", 0) + 1 + total = stats.get("totalRuntime", 0.0) + duration + stats["totalRuntime"] = total + call_count = stats["callCount"] + stats["avgRuntime"] = (total / call_count) if call_count > 0 else 0.0 + await Storage.write(key, stats) + except Exception as exc: + log.warning("workflow.stats.update_failed", { + "workflow_id": workflow_id, + "error": str(exc), + }) + def workflow_execution_key(exec_id: str) -> str: """Return the storage key for one workflow execution.""" @@ -98,13 +170,84 @@ async def record_execution_result( exec_id: str, exec_data: Dict[str, Any], ) -> None: - """Persist the final execution record and audit trail.""" + """Persist the final execution record, audit trail, and workflow stats.""" await Storage.write(workflow_execution_key(exec_id), exec_data) + + # Update call/success/error counters so all trigger paths (HTTP, syslog, etc.) + # are reflected in the UI stats panel. + status = exec_data.get("status", "error") + success = status == "success" + duration = exec_data.get("duration") + if not isinstance(duration, (int, float)): + started_at = exec_data.get("startedAt", 0) + finished_at = exec_data.get("finishedAt", int(time.time() * 1000)) + duration = max(0.0, (finished_at - started_at) / 1000.0) + await _update_workflow_stats(workflow_id, success, float(duration)) + + # Recorder writes to its own SQLite tables and can be slow under load. + # Run it as a background task so the syslog/HTTP dispatcher can release the + # concurrency slot immediately instead of waiting on session-history I/O. try: - await Recorder.record_workflow_execution( - exec_id=exec_id, - workflow_id=workflow_id, - run_result=exec_data, - ) + async def _record_audit() -> None: + try: + await Recorder.record_workflow_execution( + exec_id=exec_id, + workflow_id=workflow_id, + run_result=exec_data, + ) + except Exception as exc: + log.debug("workflow.audit.record_failed", { + "exec_id": exec_id, + "error": str(exc), + }) + + asyncio.create_task(_record_audit(), name=f"audit-{exec_id}") + except RuntimeError: + # No running loop (e.g. unit tests) — best-effort sync fallback. + try: + await Recorder.record_workflow_execution( + exec_id=exec_id, + workflow_id=workflow_id, + run_result=exec_data, + ) + except Exception: + pass + + # Prune old execution records when the per-workflow limit is exceeded. + # Throttled by a per-workflow counter to amortise the O(N) storage scan. + try: + counter = _trim_counters.get(workflow_id, 0) + 1 + _trim_counters[workflow_id] = counter + if counter >= _TRIM_CHECK_INTERVAL: + _trim_counters[workflow_id] = 0 + # Run trim in the background as well; it scans all execution rows + # and we don't want to delay the caller. + try: + asyncio.create_task( + _trim_execution_history(workflow_id), + name=f"trim-{workflow_id}", + ) + except RuntimeError: + await _trim_execution_history(workflow_id) except Exception: pass + + +async def _trim_execution_history(workflow_id: str) -> None: + """Delete the oldest execution records once the per-workflow cap is exceeded.""" + all_entries = await Storage.list_entries("workflow_execution/") + wf_entries = [ + (key, data) + for key, data in all_entries + if isinstance(data, dict) and data.get("workflowId") == workflow_id + ] + if len(wf_entries) <= _MAX_EXECUTION_HISTORY_PER_WORKFLOW: + return + # Sort ascending by startedAt and remove the oldest excess records + wf_entries.sort(key=lambda kd: kd[1].get("startedAt", 0)) + excess = len(wf_entries) - _MAX_EXECUTION_HISTORY_PER_WORKFLOW + for key, _ in wf_entries[:excess]: + try: + await Storage.remove(key) + except Exception: + pass diff --git a/pyproject.toml b/pyproject.toml index f1c9493c8..c1ec4e3fa 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -83,6 +83,7 @@ dependencies = [ # browser "cdp-use>=1.4.5", "pillow>=12.2.0", + "datasketch>=1.10.0", ] [dependency-groups] diff --git a/tests/ingest/__init__.py b/tests/ingest/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/ingest/test_syslog_manager_backpressure.py b/tests/ingest/test_syslog_manager_backpressure.py new file mode 100644 index 000000000..7bc859b15 --- /dev/null +++ b/tests/ingest/test_syslog_manager_backpressure.py @@ -0,0 +1,155 @@ +"""Regression tests for the syslog → workflow backpressure pipeline. + +These tests exercise ``SyslogManager`` in isolation (no UDP/TCP sockets) by +driving the bounded queue directly. They verify two invariants that the +previous semaphore-based design did *not* guarantee: + +1. Under a sustained burst the number of in-flight workflow dispatches is + bounded by ``_MAX_CONCURRENT_EXECUTIONS`` — not by the number of messages + the listener has shoved into the queue. +2. The bounded queue itself rejects excess messages via ``QueueFull`` so the + listener can drop+log instead of growing the consumer's pending-task set. + +These tests deliberately *do not* rely on networking; the listener loop is +covered by a separate route-level test that exercises the bind failure path. +""" + +from __future__ import annotations + +import asyncio + +import pytest + +from flocks.ingest.syslog import manager as syslog_manager + + +@pytest.mark.asyncio +async def test_worker_pool_bounds_in_flight_dispatches(monkeypatch: pytest.MonkeyPatch) -> None: + """The fixed worker pool must cap concurrent ``_trigger_workflow`` calls. + + We replace ``_trigger_workflow`` with an instrumented coroutine that + increments a counter on entry and asserts it never exceeds the worker + pool size before exiting. Then we feed N messages (much larger than the + pool) into the queue and let the workers drain them. + """ + + manager = syslog_manager.SyslogManager() + pool_size = syslog_manager._MAX_CONCURRENT_EXECUTIONS + + in_flight = 0 + max_in_flight = 0 + completed = 0 + lock = asyncio.Lock() + + async def _fake_trigger(workflow_id, workflow_json, msg, input_key): # noqa: ANN001 + nonlocal in_flight, max_in_flight, completed + async with lock: + in_flight += 1 + if in_flight > max_in_flight: + max_in_flight = in_flight + # Hold the worker briefly so a true concurrency violation would be + # observable; we cooperate with the event loop with a small sleep. + await asyncio.sleep(0.01) + async with lock: + in_flight -= 1 + completed += 1 + + monkeypatch.setattr(manager, "_trigger_workflow", _fake_trigger) + + workflow_id = "test-wf" + queue: asyncio.Queue = asyncio.Queue(maxsize=syslog_manager._MAX_QUEUE_SIZE) + abort = asyncio.Event() + + # Wire the manager up the same way ``restart_workflow`` would, minus the + # listener task itself (which would try to bind a real socket). + manager._queues[workflow_id] = queue + manager._abort_events[workflow_id] = abort + workers = [ + asyncio.create_task( + manager._worker_loop(workflow_id, {}, "syslog_message", queue, abort), + name=f"test-worker-{i}", + ) + for i in range(pool_size) + ] + manager._worker_pools[workflow_id] = workers + + # Burst-fill the queue with more work than the pool can do at once. + burst_size = pool_size * 6 + for i in range(burst_size): + queue.put_nowait({"_seq": i, "_trigger": "test"}) + + # Wait for the workers to drain the queue. + deadline = asyncio.get_event_loop().time() + 5.0 + while completed < burst_size and asyncio.get_event_loop().time() < deadline: + await asyncio.sleep(0.02) + + abort.set() + for w in workers: + w.cancel() + await asyncio.gather(*workers, return_exceptions=True) + + assert completed == burst_size, f"expected {burst_size} dispatches, got {completed}" + assert max_in_flight <= pool_size, ( + f"in-flight dispatches exceeded worker pool size: " + f"max_in_flight={max_in_flight}, pool_size={pool_size}" + ) + + +@pytest.mark.asyncio +async def test_bounded_queue_drops_excess_on_full() -> None: + """``put_nowait`` must raise ``QueueFull`` once capacity is reached. + + This is the contract the synchronous ``on_msg`` callback relies on; the + listener catches ``QueueFull`` and emits ``syslog.queue_full_dropped`` + instead of growing the queue unboundedly. + """ + + queue: asyncio.Queue = asyncio.Queue(maxsize=4) + for i in range(4): + queue.put_nowait({"_seq": i}) + with pytest.raises(asyncio.QueueFull): + queue.put_nowait({"_seq": 99}) + assert queue.qsize() == 4 + + +@pytest.mark.asyncio +async def test_stop_workflow_cancels_worker_pool() -> None: + """``stop_workflow`` must cancel and drain the worker pool cleanly. + + Leaking worker tasks would re-introduce the symptom the worker-pool + refactor was designed to prevent (orphan coroutines holding queue + references after the listener has stopped). + """ + + manager = syslog_manager.SyslogManager() + workflow_id = "test-wf-stop" + queue: asyncio.Queue = asyncio.Queue(maxsize=8) + abort = asyncio.Event() + manager._queues[workflow_id] = queue + manager._abort_events[workflow_id] = abort + manager._listener_status[workflow_id] = {"state": "listening", "error": None} + + async def _noop_trigger(*args, **kwargs): # noqa: ANN001, D401 + return None + + manager._trigger_workflow = _noop_trigger # type: ignore[assignment] + + workers = [ + asyncio.create_task( + manager._worker_loop(workflow_id, {}, "syslog_message", queue, abort), + name=f"stop-worker-{i}", + ) + for i in range(3) + ] + manager._worker_pools[workflow_id] = workers + + # Let workers loop once. + await asyncio.sleep(0.05) + + await manager.stop_workflow(workflow_id) + + for w in workers: + assert w.done(), "stop_workflow must terminate every worker in the pool" + assert workflow_id not in manager._worker_pools + assert workflow_id not in manager._queues + assert manager._listener_status[workflow_id]["state"] == "stopped" diff --git a/tests/ingest/test_syslog_manager_bind_failure.py b/tests/ingest/test_syslog_manager_bind_failure.py new file mode 100644 index 000000000..ecd621fcb --- /dev/null +++ b/tests/ingest/test_syslog_manager_bind_failure.py @@ -0,0 +1,106 @@ +"""Regression test for the bind-failure path of ``SyslogManager.restart_workflow``. + +The HTTP ``POST /api/workflow/{id}/syslog-config`` endpoint relies on +``restart_workflow`` synchronously reporting the listener's terminal state so +the route can return ``409 Conflict`` instead of falsely claiming success. + +We reproduce the failure by binding our own UDP socket on a chosen port and +then asking ``SyslogManager`` to start a listener for the same host/port; the +``OSError`` raised inside ``_listener_loop`` must surface as +``state == "failed"`` in the returned status. +""" + +from __future__ import annotations + +import asyncio +import socket + +import pytest + +from flocks.ingest.syslog import manager as syslog_manager + + +def _find_busy_udp_port() -> tuple[socket.socket, int]: + """Bind a UDP socket on a free port and return it (still bound).""" + sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + sock.bind(("127.0.0.1", 0)) + port = sock.getsockname()[1] + return sock, port + + +@pytest.mark.asyncio +async def test_restart_workflow_reports_failure_on_port_conflict( + monkeypatch: pytest.MonkeyPatch, +) -> None: + """Restarting a listener on a busy port must yield state="failed".""" + busy_sock, busy_port = _find_busy_udp_port() + try: + workflow_id = "wf-bind-fail" + config = { + "workflowId": workflow_id, + "enabled": True, + "protocol": "udp", + "host": "127.0.0.1", + "port": busy_port, + "format": "auto", + "inputKey": "syslog_message", + } + + async def _fake_storage_read(key: str): # noqa: ANN001 + if key == syslog_manager.SyslogManager._config_key(workflow_id): + return config + return None + + def _fake_read_workflow_from_fs(wid: str): # noqa: ANN001 + return { + "id": wid, + "workflowJson": { + "start": "n1", + "nodes": [{"id": "n1", "type": "python", "code": "result = {'ok': True}"}], + "edges": [], + }, + } + + # Patch the *module-level* names ``manager.py`` looks up at call time. + monkeypatch.setattr(syslog_manager.Storage, "read", _fake_storage_read) + monkeypatch.setattr(syslog_manager, "read_workflow_from_fs", _fake_read_workflow_from_fs) + + manager = syslog_manager.SyslogManager() + try: + status = await manager.restart_workflow(workflow_id) + assert status["state"] == "failed", ( + f"expected state='failed' on busy port, got {status!r}" + ) + assert status.get("error"), "failed status must include an error message" + assert status["port"] == busy_port + finally: + await manager.stop_workflow(workflow_id) + finally: + busy_sock.close() + + +@pytest.mark.asyncio +async def test_restart_workflow_returns_stopped_when_disabled( + monkeypatch: pytest.MonkeyPatch, +) -> None: + """A saved-but-disabled config must report state="stopped".""" + workflow_id = "wf-disabled" + config = { + "workflowId": workflow_id, + "enabled": False, + "protocol": "udp", + "host": "127.0.0.1", + "port": 9999, + "format": "auto", + "inputKey": "syslog_message", + } + + async def _fake_storage_read(key: str): # noqa: ANN001 + return config + + monkeypatch.setattr(syslog_manager.Storage, "read", _fake_storage_read) + + manager = syslog_manager.SyslogManager() + status = await manager.restart_workflow(workflow_id) + assert status == {"state": "stopped", "error": None} + assert manager.get_listener_status(workflow_id) == {"state": "stopped", "error": None} diff --git a/tests/integration/test_alert_dedup_triage_stream.py b/tests/integration/test_alert_dedup_triage_stream.py new file mode 100644 index 000000000..b87706f7e --- /dev/null +++ b/tests/integration/test_alert_dedup_triage_stream.py @@ -0,0 +1,248 @@ +#!/usr/bin/env python3 +# NOTE: standalone manual integration test — not a pytest test, run directly with python3. +""" +手动集成测试工具:两阶段流式 pipeline(dedup → triage) + +逐条读取 ~/Downloads/tdp_logs.json,对每条告警: + 1) 调用 http_alert_dedup(POST /workflow-center/http_alert_dedup/invoke) + - 返回 unique_alerts 为空 -> 被过滤掉,跳过 triage + - 返回 unique_alerts[0].dedup_key_already_exists == True -> 跨批次重复,跳过 triage + - 否则 -> 视为"首次出现的可分析告警",转 step 2 + 2) 调用 tdp_alert_triage(POST /workflow-center//invoke) + - 把原始告警作为 alert_data 传入,触发 LLM 研判流水线(测绘/CVE/payload 并行) + +输出 JSONL 到 ~/.flocks/workspace/outputs//,每条记录包含: + {batch, alert_index, dedup: {...}, triage: {verdict, risk, title, report_path} | None, reason} +末尾追加一行 _summary 汇总。 + +用法: + python3 scripts/stream_pipeline_dedup_triage.py [--input FILE] [--limit N] [--delay SEC] + +如果只想跑一小批做端到端验证: + python3 scripts/stream_pipeline_dedup_triage.py --limit 3 --triage-limit 1 +""" + +import argparse +import json +import os +import sys +import time +import urllib.request +import urllib.error +from datetime import datetime +from pathlib import Path + +# ---------- API endpoints ---------- +# dedup: via main server proxy (records UI metrics) +DEDUP_URL = "http://127.0.0.1:8000/api/workflow-center/http_alert_dedup/invoke" +DEDUP_KEY = "Yw5WQxIL2bgDSL1RH0XO4yolu30GYrQ9bsfLHSmWVfk" + +# triage: call the published service directly (avoids 30 s proxy timeout) +TRIAGE_URL = "http://127.0.0.1:19001/invoke" +TRIAGE_KEY = "8e23f1ad036c4f73960925923d04e9a1edf8fcaf3d6b4461b5d2ced7e0956267" + +DEDUP_BASE_INPUTS = { + "source_log_type": "tdp", + "filter_enabled": True, + "dedup_enabled": True, + "threshold": 0.7, +} + + +def _post(url: str, api_key: str, payload: dict, timeout: int) -> tuple[dict, int]: + """POST JSON to a flocks /invoke endpoint; return (response_dict, elapsed_ms).""" + # Services use X-API-Key; main proxy uses Authorization: Bearer. + if "127.0.0.1:8000" in url: + auth_header = {"Authorization": f"Bearer {api_key}"} + else: + auth_header = {"X-API-Key": api_key} + req = urllib.request.Request( + url, + data=json.dumps(payload).encode(), + headers={"Content-Type": "application/json", **auth_header}, + method="POST", + ) + t0 = time.time() + try: + with urllib.request.urlopen(req, timeout=timeout) as resp: + return json.loads(resp.read()), round((time.time() - t0) * 1000) + except urllib.error.HTTPError as e: + body = e.read().decode(errors="replace")[:500] + return {"status": "FAILED", "error": f"HTTP {e.code}: {body}"}, round((time.time() - t0) * 1000) + except Exception as e: + return {"status": "FAILED", "error": str(e)}, round((time.time() - t0) * 1000) + + +def call_dedup(alert: dict, timeout: int) -> tuple[dict, int]: + payload = {"inputs": {**DEDUP_BASE_INPUTS, "alerts": [alert]}} + return _post(DEDUP_URL, DEDUP_KEY, payload, timeout) + + +def call_triage(alert: dict, timeout: int) -> tuple[dict, int]: + """Call triage service directly on port 19001 — no proxy timeout.""" + payload = {"inputs": {"alert_data": alert}} + return _post(TRIAGE_URL, TRIAGE_KEY, payload, timeout) + + +def default_output_path() -> Path: + ts = datetime.now().strftime("%Y%m%d_%H%M%S") + out_dir = Path.home() / ".flocks" / "workspace" / "outputs" / datetime.now().strftime("%Y-%m-%d") + out_dir.mkdir(parents=True, exist_ok=True) + return out_dir / f"{ts}_pipeline_dedup_triage.jsonl" + + +def main() -> None: + p = argparse.ArgumentParser(description="Streaming pipeline: dedup -> triage") + p.add_argument("--input", default=str(Path.home() / "Downloads" / "tdp_logs.json"), + help="Input JSON file (top-level list)") + p.add_argument("--limit", type=int, default=0, + help="Process only the first N alerts (0 = all)") + p.add_argument("--triage-limit", type=int, default=0, + help="Stop after triggering N successful triage runs (0 = unlimited). " + "Useful to avoid burning LLM credits during smoke tests.") + p.add_argument("--delay", type=float, default=0.0, + help="Delay (seconds) between alerts") + p.add_argument("--dedup-timeout", type=int, default=60) + p.add_argument("--triage-timeout", type=int, default=600) + p.add_argument("--output", default=None, help="Output JSONL path") + args = p.parse_args() + + src = Path(args.input).expanduser() + if not src.exists(): + print(f"[ERROR] input not found: {src}", file=sys.stderr) + sys.exit(1) + + with open(src, "r", encoding="utf-8") as f: + records = json.load(f) + if isinstance(records, dict): + records = records.get("data", records.get("alerts", records.get("logs", []))) + if not isinstance(records, list): + print("[ERROR] expected top-level list", file=sys.stderr) + sys.exit(1) + + if args.limit > 0: + records = records[: args.limit] + + out_path = Path(args.output).expanduser() if args.output else default_output_path() + out_path.parent.mkdir(parents=True, exist_ok=True) + + total = len(records) + print(f"[stream] input: {src} count={total}") + print(f"[stream] output: {out_path}") + print(f"[stream] dedup: {DEDUP_URL}") + print(f"[stream] triage: {TRIAGE_URL}") + print("-" * 80) + + summary = { + "total_input": total, + "dedup_success": 0, + "dedup_failed": 0, + "filtered_out": 0, + "duplicate_skipped": 0, + "triage_invoked": 0, + "triage_success": 0, + "triage_failed": 0, + "verdict_counts": {}, + "started_at": datetime.now().isoformat(), + } + + with open(out_path, "w", encoding="utf-8") as f_out: + for i, alert in enumerate(records): + entry = {"alert_index": i, "alert_id": alert.get("id") or alert.get("uuid"), + "threat_name": (alert.get("threat") or {}).get("name", ""), + "src_ip": alert.get("attacker"), "dst_ip": alert.get("victim")} + + # ---------- step 1: dedup ---------- + dr, dms = call_dedup(alert, args.dedup_timeout) + ds = dr.get("status", "UNKNOWN") + if ds != "SUCCEEDED": + summary["dedup_failed"] += 1 + entry["dedup"] = {"status": ds, "elapsed_ms": dms, "error": dr.get("error", "")[:300]} + entry["reason"] = "dedup_failed" + entry["triage"] = None + f_out.write(json.dumps(entry, ensure_ascii=False) + "\n"); f_out.flush() + print(f" [{i+1:3d}/{total}] ✗ dedup FAILED ({dms}ms) {dr.get('error','')[:80]}") + continue + + summary["dedup_success"] += 1 + outs = dr.get("outputs", {}) + stats = outs.get("stats", {}) + ua = outs.get("unique_alerts", []) + entry["dedup"] = {"status": ds, "elapsed_ms": dms, + "filter_removed": stats.get("filter_removed_count", 0), + "after_filter": stats.get("after_filter_count", 0), + "unique_alerts": len(ua), + "lsh_clusters": stats.get("lsh_total_clusters"), + "lsh_dedup_keys": stats.get("lsh_total_dedup_keys")} + + if not ua: + summary["filtered_out"] += 1 + entry["reason"] = "filtered_out" + entry["triage"] = None + f_out.write(json.dumps(entry, ensure_ascii=False) + "\n"); f_out.flush() + print(f" [{i+1:3d}/{total}] - dedup OK ({dms:>4d}ms) filtered_out (kept=0)") + continue + + already = bool(ua[0].get("dedup_key_already_exists")) + entry["dedup"]["dedup_key"] = ua[0].get("dedup_key") + entry["dedup"]["dedup_key_already_exists"] = already + + if already: + summary["duplicate_skipped"] += 1 + entry["reason"] = "duplicate_skipped" + entry["triage"] = None + f_out.write(json.dumps(entry, ensure_ascii=False) + "\n"); f_out.flush() + print(f" [{i+1:3d}/{total}] - dedup OK ({dms:>4d}ms) duplicate (key={ua[0].get('dedup_key','')[:8]})") + continue + + # ---------- step 2: triage (only first-seen unique alerts) ---------- + if args.triage_limit > 0 and summary["triage_success"] >= args.triage_limit: + entry["reason"] = "triage_limit_reached" + entry["triage"] = None + f_out.write(json.dumps(entry, ensure_ascii=False) + "\n"); f_out.flush() + print(f" [{i+1:3d}/{total}] - dedup OK ({dms:>4d}ms) triage limit reached, skip") + continue + + summary["triage_invoked"] += 1 + tr, tms = call_triage(alert, args.triage_timeout) + ts_ = tr.get("status", "UNKNOWN") + if ts_ != "SUCCEEDED": + summary["triage_failed"] += 1 + entry["reason"] = "triage_failed" + entry["triage"] = {"status": ts_, "elapsed_ms": tms, "error": tr.get("error", "")[:300]} + f_out.write(json.dumps(entry, ensure_ascii=False) + "\n"); f_out.flush() + print(f" [{i+1:3d}/{total}] ✗ dedup OK + triage FAILED ({dms}+{tms}ms) {tr.get('error','')[:80]}") + continue + + summary["triage_success"] += 1 + tout = tr.get("outputs", {}) + verdict = tout.get("attack_verdict", "unknown") + summary["verdict_counts"][verdict] = summary["verdict_counts"].get(verdict, 0) + 1 + entry["reason"] = "triage_done" + entry["triage"] = {"status": ts_, "elapsed_ms": tms, + "attack_verdict": verdict, + "risk_level": tout.get("risk_level"), + "report_title": tout.get("report_title"), + "report_path": tout.get("report_path")} + f_out.write(json.dumps(entry, ensure_ascii=False) + "\n"); f_out.flush() + print(f" [{i+1:3d}/{total}] ✓ dedup OK + triage OK ({dms}+{tms}ms) " + f"verdict={verdict} title={(tout.get('report_title') or '')[:30]}") + + if args.delay > 0: + time.sleep(args.delay) + + summary["finished_at"] = datetime.now().isoformat() + f_out.write(json.dumps({"_summary": summary}, ensure_ascii=False) + "\n") + + print("-" * 80) + print(f"[done] dedup_success / failed : {summary['dedup_success']} / {summary['dedup_failed']}") + print(f"[done] filtered_out : {summary['filtered_out']}") + print(f"[done] duplicate_skipped : {summary['duplicate_skipped']}") + print(f"[done] triage_invoked : {summary['triage_invoked']}") + print(f"[done] triage_success / failed : {summary['triage_success']} / {summary['triage_failed']}") + print(f"[done] verdict_counts : {summary['verdict_counts']}") + print(f"[done] output : {out_path}") + + +if __name__ == "__main__": + main() diff --git a/tests/integration/test_http_alert_dedup_stream.py b/tests/integration/test_http_alert_dedup_stream.py new file mode 100755 index 000000000..2cb239888 --- /dev/null +++ b/tests/integration/test_http_alert_dedup_stream.py @@ -0,0 +1,178 @@ +#!/usr/bin/env python3 +# NOTE: standalone manual integration test — not a pytest test, run directly with python3. +""" +手动集成测试工具:流式模拟脚本,逐条读取 tdp_logs.json,逐条 POST 到 http_alert_dedup 的 +/invoke 接口,汇总去重结果写入 output 文件。需要 flocks 服务运行,http_alert_dedup 工作流已发布。 + +用法: + python3 scripts/stream_tdp_invoke.py [--input FILE] [--batch-size N] [--delay SEC] [--output FILE] + +默认: + --input ~/Downloads/tdp_logs.json + --batch-size 1 每次发送的告警条数(1 = 严格逐条) + --delay 0.0 每批次之间的间隔秒数(模拟流速) + --output ~/.flocks/workspace/outputs/_tdp_invoke.jsonl +""" + +import argparse +import json +import os +import sys +import time +import urllib.request +import urllib.error +from datetime import datetime +from pathlib import Path + +API_URL = "http://127.0.0.1:8000/api/workflow-center/http_alert_dedup/invoke" +API_KEY = "Yw5WQxIL2bgDSL1RH0XO4yolu30GYrQ9bsfLHSmWVfk" +WORKFLOW_INPUTS_BASE = { + "source_log_type": "tdp", + "filter_enabled": True, + "dedup_enabled": True, + "threshold": 0.7, +} + + +def post_invoke(alerts: list) -> dict: + payload = json.dumps({"inputs": {**WORKFLOW_INPUTS_BASE, "alerts": alerts}}).encode() + req = urllib.request.Request( + API_URL, + data=payload, + headers={ + "Content-Type": "application/json", + "Authorization": f"Bearer {API_KEY}", + }, + method="POST", + ) + try: + with urllib.request.urlopen(req, timeout=60) as resp: + return json.loads(resp.read()) + except urllib.error.HTTPError as e: + body = e.read().decode(errors="replace") + return {"error": f"HTTP {e.code}: {body}", "status": "FAILED"} + except Exception as e: + return {"error": str(e), "status": "FAILED"} + + +def default_output_path() -> Path: + ts = datetime.now().strftime("%Y%m%d_%H%M%S") + out_dir = Path.home() / ".flocks" / "workspace" / "outputs" / datetime.now().strftime("%Y-%m-%d") + out_dir.mkdir(parents=True, exist_ok=True) + return out_dir / f"{ts}_tdp_invoke.jsonl" + + +def main(): + parser = argparse.ArgumentParser(description="流式模拟:逐条将 TDP 告警发送至 /invoke") + parser.add_argument("--input", default=str(Path.home() / "Downloads" / "tdp_logs.json"), + help="输入 JSON 文件路径(list 格式)") + parser.add_argument("--batch-size", type=int, default=1, + help="每次请求发送的告警条数,默认 1(逐条流式)") + parser.add_argument("--delay", type=float, default=0.0, + help="每批之间等待秒数,默认 0(无延迟)") + parser.add_argument("--output", default=None, + help="输出 JSONL 文件路径,默认写入 ~/.flocks/workspace/outputs/") + args = parser.parse_args() + + input_path = Path(args.input).expanduser() + if not input_path.exists(): + print(f"[ERROR] 文件不存在: {input_path}", file=sys.stderr) + sys.exit(1) + + with open(input_path, "r", encoding="utf-8") as f: + records = json.load(f) + + if isinstance(records, dict): + records = records.get("data", records.get("alerts", records.get("logs", []))) + if not isinstance(records, list): + print("[ERROR] 文件格式错误:期望顶层为 JSON 数组", file=sys.stderr) + sys.exit(1) + + total = len(records) + batch_size = max(1, args.batch_size) + output_path = Path(args.output).expanduser() if args.output else default_output_path() + output_path.parent.mkdir(parents=True, exist_ok=True) + + print(f"[stream] 输入: {input_path} 共 {total} 条") + print(f"[stream] batch_size={batch_size} delay={args.delay}s") + print(f"[stream] 输出: {output_path}") + print(f"[stream] API: {API_URL}") + print("-" * 60) + + summary = { + "total_input": total, + "total_batches": 0, + "total_unique": 0, + "total_deduped": 0, + "total_filtered_out": 0, + "failed_batches": 0, + "started_at": datetime.now().isoformat(), + } + + with open(output_path, "w", encoding="utf-8") as out_f: + batch_idx = 0 + for start in range(0, total, batch_size): + batch = records[start: start + batch_size] + batch_idx += 1 + t0 = time.time() + result = post_invoke(batch) + elapsed = round(time.time() - t0, 3) + + status = result.get("status", "UNKNOWN") + outputs = result.get("outputs", {}) + stats = outputs.get("stats", {}) + + log_entry = { + "batch": batch_idx, + "record_start": start, + "record_end": start + len(batch) - 1, + "status": status, + "elapsed_ms": round(elapsed * 1000), + "unique_alerts": len(outputs.get("unique_alerts", [])), + "deduped_alerts": len(outputs.get("deduped_alerts", [])), + "stats": stats, + "error": result.get("error"), + "dedup_summary": outputs.get("dedup_summary", ""), + } + out_f.write(json.dumps(log_entry, ensure_ascii=False) + "\n") + out_f.flush() + + summary["total_batches"] += 1 + if status == "SUCCEEDED": + summary["total_unique"] += log_entry["unique_alerts"] + summary["total_deduped"] += log_entry["deduped_alerts"] + summary["total_filtered_out"] += stats.get("filter_removed_count", 0) + else: + summary["failed_batches"] += 1 + + progress = f"{start + len(batch)}/{total}" + indicator = "✓" if status == "SUCCEEDED" else "✗" + print( + f" {indicator} batch {batch_idx:4d} [{progress:>9s}] " + f"unique={log_entry['unique_alerts']:3d} " + f"deduped={log_entry['deduped_alerts']:3d} " + f"{elapsed*1000:.0f}ms" + + (f" ERR: {result.get('error','')[:60]}" if status != "SUCCEEDED" else "") + ) + + if args.delay > 0 and start + batch_size < total: + time.sleep(args.delay) + + summary["finished_at"] = datetime.now().isoformat() + + print("-" * 60) + print(f"[done] 批次总数: {summary['total_batches']}") + print(f"[done] 失败批次: {summary['failed_batches']}") + print(f"[done] 累计输入: {summary['total_input']}") + print(f"[done] 累计过滤掉: {summary['total_filtered_out']}") + print(f"[done] 累计去重后: {summary['total_unique']}") + print(f"[done] 累计去重前: {summary['total_deduped']}") + print(f"[done] 输出文件: {output_path}") + + # 末尾追加一行汇总 + with open(output_path, "a", encoding="utf-8") as out_f: + out_f.write(json.dumps({"_summary": summary}, ensure_ascii=False) + "\n") + + +if __name__ == "__main__": + main() diff --git a/tests/tool/test_watcher_atomic_save.py b/tests/tool/test_watcher_atomic_save.py new file mode 100644 index 000000000..e88c6a053 --- /dev/null +++ b/tests/tool/test_watcher_atomic_save.py @@ -0,0 +1,107 @@ +"""Regression tests for atomic-save handling in plugin/agent/skill watchers. + +Atomic-save editors persist edits by writing a sibling temp file and then +``rename``-ing it onto the real target. watchdog surfaces this as a +``FileMovedEvent`` whose ``src_path`` is the temp filename and whose +``dest_path`` is the actual ``tool.yaml`` / ``agent.yaml`` / ``SKILL.md``. + +These tests pin down the contract enforced by the three module-level +predicates that the watcher event handlers delegate to. +""" + +from __future__ import annotations + +from types import SimpleNamespace + +from flocks.tool.registry import _tool_event_should_reload +from flocks.agent.registry import _agent_event_should_reload +from flocks.skill.skill import _skill_event_should_reload + + +def _move_event(src: str, dest: str) -> SimpleNamespace: + return SimpleNamespace(event_type="moved", src_path=src, dest_path=dest, is_directory=False) + + +def _modify_event(path: str) -> SimpleNamespace: + return SimpleNamespace(event_type="modified", src_path=path, dest_path="", is_directory=False) + + +# --------------------------------------------------------------------------- +# Tool watcher predicate +# --------------------------------------------------------------------------- + + +def test_tool_watcher_accepts_dest_path_on_atomic_save() -> None: + """A rename of ```` -> ``tool.yaml`` must trigger a reload.""" + evt = _move_event( + src="/repo/.flocks/plugins/tools/api/foo/.tool.yaml.swp", + dest="/repo/.flocks/plugins/tools/api/foo/tool.yaml", + ) + assert _tool_event_should_reload(evt) is True + + +def test_tool_watcher_accepts_python_atomic_save() -> None: + evt = _move_event( + src="/repo/.flocks/plugins/tools/python/foo/.tool.py.4321~", + dest="/repo/.flocks/plugins/tools/python/foo/tool.py", + ) + assert _tool_event_should_reload(evt) is True + + +def test_tool_watcher_rejects_irrelevant_paths() -> None: + assert _tool_event_should_reload(_modify_event("/repo/.flocks/plugins/tools/api/foo/README")) is False + assert _tool_event_should_reload(_modify_event("/repo/.flocks/plugins/tools/api/foo/__pycache__/x.py")) is False + assert _tool_event_should_reload(_modify_event("/repo/.flocks/plugins/tools/api/foo/.hidden.yaml")) is False + assert _tool_event_should_reload(_modify_event("/repo/.flocks/plugins/tools/api/foo/_tmp.yaml")) is False + + +def test_tool_watcher_accepts_direct_modify_on_yaml() -> None: + evt = _modify_event("/repo/.flocks/plugins/tools/api/foo/tool.yaml") + assert _tool_event_should_reload(evt) is True + + +# --------------------------------------------------------------------------- +# Agent watcher predicate +# --------------------------------------------------------------------------- + + +def test_agent_watcher_accepts_dest_path_on_atomic_save() -> None: + evt = _move_event( + src="/repo/.flocks/plugins/agents/foo/.agent.yaml.swp", + dest="/repo/.flocks/plugins/agents/foo/agent.yaml", + ) + assert _agent_event_should_reload(evt) is True + + +def test_agent_watcher_accepts_md_via_dest_path() -> None: + evt = _move_event( + src="/repo/.flocks/plugins/agents/foo/.AGENT.md.tmp", + dest="/repo/.flocks/plugins/agents/foo/AGENT.md", + ) + assert _agent_event_should_reload(evt) is True + + +def test_agent_watcher_rejects_unrelated_paths() -> None: + evt = _modify_event("/repo/.flocks/plugins/agents/foo/README.txt") + assert _agent_event_should_reload(evt) is False + + +# --------------------------------------------------------------------------- +# Skill watcher predicate +# --------------------------------------------------------------------------- + + +def test_skill_watcher_accepts_skill_md_via_dest_path() -> None: + evt = _move_event( + src="/repo/.flocks/plugins/skills/foo/.SKILL.md.swap", + dest="/repo/.flocks/plugins/skills/foo/SKILL.md", + ) + assert _skill_event_should_reload(evt) is True + + +def test_skill_watcher_accepts_direct_modify() -> None: + assert _skill_event_should_reload(_modify_event("/repo/.flocks/plugins/skills/foo/SKILL.md")) is True + + +def test_skill_watcher_rejects_non_skill_files() -> None: + assert _skill_event_should_reload(_modify_event("/repo/.flocks/plugins/skills/foo/notes.md")) is False diff --git a/uv.lock b/uv.lock index 4fe1bdb5d..bb04c80f7 100644 --- a/uv.lock +++ b/uv.lock @@ -384,6 +384,19 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/48/ef/0c2f4a8e31018a986949d34a01115dd057bf536905dca38897bacd21fac3/cryptography-46.0.5-cp38-abi3-win_amd64.whl", hash = "sha256:556e106ee01aa13484ce9b0239bca667be5004efb0aabbed28d353df86445595", size = 3467050, upload-time = "2026-02-10T19:18:18.899Z" }, ] +[[package]] +name = "datasketch" +version = "1.10.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "numpy" }, + { name = "scipy" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/8d/73/8e9014887f9fca2d785777a0a6186813e4fc7faa24f05fc88c6420624891/datasketch-1.10.0.tar.gz", hash = "sha256:d23aea80ce4c40790ca7a40795659848be92ecc43db80942be26f21e81d24714", size = 91699, upload-time = "2026-04-17T23:06:56.388Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/ed/e7/a94668082e078099eb0161635649510aa887690767b779fffe4bdc479913/datasketch-1.10.0-py3-none-any.whl", hash = "sha256:303dd90cda0948a21abba3aaefc9f8528fa12b8204edc5e1ae8b1d7b750234e7", size = 99914, upload-time = "2026-04-17T23:06:54.39Z" }, +] + [[package]] name = "decorator" version = "5.2.1" @@ -508,6 +521,7 @@ dependencies = [ { name = "claude" }, { name = "click" }, { name = "croniter" }, + { name = "datasketch" }, { name = "defusedxml" }, { name = "dingtalk-stream" }, { name = "fastapi" }, @@ -573,6 +587,7 @@ requires-dist = [ { name = "claude", specifier = ">=0.4.11" }, { name = "click", specifier = ">=8.1.7" }, { name = "croniter", specifier = ">=6.0.0" }, + { name = "datasketch", specifier = ">=1.10.0" }, { name = "defusedxml", specifier = ">=0.7.1" }, { name = "dingtalk-stream", specifier = ">=0.20" }, { name = "fastapi", specifier = ">=0.109.0" }, @@ -2005,6 +2020,27 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/8f/e8/726643a3ea68c727da31570bde48c7a10f1aa60eddd628d94078fec586ff/ruff-0.15.7-py3-none-win_arm64.whl", hash = "sha256:18e8d73f1c3fdf27931497972250340f92e8c861722161a9caeb89a58ead6ed2", size = 11023304, upload-time = "2026-03-19T16:26:51.669Z" }, ] +[[package]] +name = "scipy" +version = "1.17.1" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "numpy" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/7a/97/5a3609c4f8d58b039179648e62dd220f89864f56f7357f5d4f45c29eb2cc/scipy-1.17.1.tar.gz", hash = "sha256:95d8e012d8cb8816c226aef832200b1d45109ed4464303e997c5b13122b297c0", size = 30573822, upload-time = "2026-02-23T00:26:24.851Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/35/48/b992b488d6f299dbe3f11a20b24d3dda3d46f1a635ede1c46b5b17a7b163/scipy-1.17.1-cp312-cp312-macosx_10_14_x86_64.whl", hash = "sha256:35c3a56d2ef83efc372eaec584314bd0ef2e2f0d2adb21c55e6ad5b344c0dcb8", size = 31610954, upload-time = "2026-02-23T00:17:49.855Z" }, + { url = "https://files.pythonhosted.org/packages/b2/02/cf107b01494c19dc100f1d0b7ac3cc08666e96ba2d64db7626066cee895e/scipy-1.17.1-cp312-cp312-macosx_12_0_arm64.whl", hash = "sha256:fcb310ddb270a06114bb64bbe53c94926b943f5b7f0842194d585c65eb4edd76", size = 28172662, upload-time = "2026-02-23T00:18:01.64Z" }, + { url = "https://files.pythonhosted.org/packages/cf/a9/599c28631bad314d219cf9ffd40e985b24d603fc8a2f4ccc5ae8419a535b/scipy-1.17.1-cp312-cp312-macosx_14_0_arm64.whl", hash = "sha256:cc90d2e9c7e5c7f1a482c9875007c095c3194b1cfedca3c2f3291cdc2bc7c086", size = 20344366, upload-time = "2026-02-23T00:18:12.015Z" }, + { url = "https://files.pythonhosted.org/packages/35/f5/906eda513271c8deb5af284e5ef0206d17a96239af79f9fa0aebfe0e36b4/scipy-1.17.1-cp312-cp312-macosx_14_0_x86_64.whl", hash = "sha256:c80be5ede8f3f8eded4eff73cc99a25c388ce98e555b17d31da05287015ffa5b", size = 22704017, upload-time = "2026-02-23T00:18:21.502Z" }, + { url = "https://files.pythonhosted.org/packages/da/34/16f10e3042d2f1d6b66e0428308ab52224b6a23049cb2f5c1756f713815f/scipy-1.17.1-cp312-cp312-manylinux_2_27_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:e19ebea31758fac5893a2ac360fedd00116cbb7628e650842a6691ba7ca28a21", size = 32927842, upload-time = "2026-02-23T00:18:35.367Z" }, + { url = "https://files.pythonhosted.org/packages/01/8e/1e35281b8ab6d5d72ebe9911edcdffa3f36b04ed9d51dec6dd140396e220/scipy-1.17.1-cp312-cp312-manylinux_2_27_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:02ae3b274fde71c5e92ac4d54bc06c42d80e399fec704383dcd99b301df37458", size = 35235890, upload-time = "2026-02-23T00:18:49.188Z" }, + { url = "https://files.pythonhosted.org/packages/c5/5c/9d7f4c88bea6e0d5a4f1bc0506a53a00e9fcb198de372bfe4d3652cef482/scipy-1.17.1-cp312-cp312-musllinux_1_2_aarch64.whl", hash = "sha256:8a604bae87c6195d8b1045eddece0514d041604b14f2727bbc2b3020172045eb", size = 35003557, upload-time = "2026-02-23T00:18:54.74Z" }, + { url = "https://files.pythonhosted.org/packages/65/94/7698add8f276dbab7a9de9fb6b0e02fc13ee61d51c7c3f85ac28b65e1239/scipy-1.17.1-cp312-cp312-musllinux_1_2_x86_64.whl", hash = "sha256:f590cd684941912d10becc07325a3eeb77886fe981415660d9265c4c418d0bea", size = 37625856, upload-time = "2026-02-23T00:19:00.307Z" }, + { url = "https://files.pythonhosted.org/packages/a2/84/dc08d77fbf3d87d3ee27f6a0c6dcce1de5829a64f2eae85a0ecc1f0daa73/scipy-1.17.1-cp312-cp312-win_amd64.whl", hash = "sha256:41b71f4a3a4cab9d366cd9065b288efc4d4f3c0b37a91a8e0947fb5bd7f31d87", size = 36549682, upload-time = "2026-02-23T00:19:07.67Z" }, + { url = "https://files.pythonhosted.org/packages/bc/98/fe9ae9ffb3b54b62559f52dedaebe204b408db8109a8c66fdd04869e6424/scipy-1.17.1-cp312-cp312-win_arm64.whl", hash = "sha256:f4115102802df98b2b0db3cce5cb9b92572633a1197c77b7553e5203f284a5b3", size = 24547340, upload-time = "2026-02-23T00:19:12.024Z" }, +] + [[package]] name = "shellingham" version = "1.5.4" diff --git a/webui/src/api/workflow.ts b/webui/src/api/workflow.ts index 8899c2aa0..e73b32507 100644 --- a/webui/src/api/workflow.ts +++ b/webui/src/api/workflow.ts @@ -147,6 +147,30 @@ export interface WorkflowService { containerName?: string; } +/** Saved syslog listener config (per workflow). */ +export interface SyslogConfig { + workflowId?: string; + enabled?: boolean; + protocol?: string; + host?: string; + port?: number; + format?: string; + inputKey?: string; + updatedAt?: number; +} + +/** Runtime state of the syslog listener (independent from saved config). */ +export interface SyslogListenerStatus { + state: 'binding' | 'listening' | 'failed' | 'stopped'; + error?: string | null; + host?: string; + port?: number; + protocol?: string; + queueSize?: number; + queueCapacity?: number; + workerCount?: number; +} + export const workflowAPI = { list: (params?: { category?: string; status?: string; excludeId?: string }) => client.get('/api/workflow', { params }), @@ -239,6 +263,25 @@ export const workflowAPI = { outputTopic?: string; } | null>(`/api/workflow/${id}/kafka-config`), + saveSyslogConfig: (id: string, config: { + enabled?: boolean; + protocol?: string; + host?: string; + port?: number; + format?: string; + inputKey?: string; + }) => + client.post<{ ok: boolean; listener?: SyslogListenerStatus }>( + `/api/workflow/${id}/syslog-config`, + config, + ), + + getSyslogConfig: (id: string) => + client.get(`/api/workflow/${id}/syslog-config`), + + getSyslogStatus: (id: string) => + client.get(`/api/workflow/${id}/syslog-status`), + runNode: (id: string, data: { nodeId: string; inputs?: Record }) => client.post(`/api/workflow/${id}/run-node`, { node_id: data.nodeId, inputs: data.inputs ?? {} }), diff --git a/webui/src/locales/en-US/workflow.json b/webui/src/locales/en-US/workflow.json index 6bffeb0c8..303565d56 100644 --- a/webui/src/locales/en-US/workflow.json +++ b/webui/src/locales/en-US/workflow.json @@ -58,6 +58,7 @@ "tabOverview": "Overview", "tabChat": "AI Edit", "tabRun": "Run", + "tabIntegration": "Integration", "renderError": "Component render error", "deleteWorkflow": "Delete Workflow", "deleteConfirmTitle": "Delete Workflow", @@ -204,6 +205,16 @@ "savedConfig": "Saved", "saveConfig": "Save Configuration", "kafkaHint": "Kafka integration is under development, configuration will take effect in a future version", + "syslogSection": "Syslog input", + "syslogExperimental": "(Experimental)", + "syslogEnabled": "Enable listener", + "syslogProtocol": "Protocol", + "syslogHost": "Bind address", + "syslogPort": "Port", + "syslogFormat": "Parse format", + "syslogInputKey": "Inputs key", + "syslogHint": "When enabled, Flocks listens for syslog on the given address/port and passes parsed payloads to workflow inputs (default key: syslog_message).", + "syslogActive": "Listening", "historySection": "Execution History", "noHistory": "No execution records", "noOutput": "No output data", diff --git a/webui/src/locales/zh-CN/workflow.json b/webui/src/locales/zh-CN/workflow.json index 74e852323..2524d9925 100644 --- a/webui/src/locales/zh-CN/workflow.json +++ b/webui/src/locales/zh-CN/workflow.json @@ -58,6 +58,7 @@ "tabOverview": "概览", "tabChat": "AI 编辑", "tabRun": "运行", + "tabIntegration": "集成", "renderError": "组件渲染出错", "deleteWorkflow": "删除工作流", "deleteConfirmTitle": "删除工作流", @@ -204,6 +205,16 @@ "savedConfig": "已保存", "saveConfig": "保存配置", "kafkaHint": "Kafka 集成功能开发中,配置将在后续版本生效", + "syslogSection": "Syslog 输入", + "syslogExperimental": "(实验性)", + "syslogEnabled": "启用监听", + "syslogProtocol": "协议", + "syslogHost": "监听地址", + "syslogPort": "端口", + "syslogFormat": "解析格式", + "syslogInputKey": "Inputs 键名", + "syslogHint": "开启后 Flocks 在指定地址/端口接收 syslog,解析结果写入工作流 inputs(默认键名 syslog_message)。", + "syslogActive": "监听中", "historySection": "执行历史", "noHistory": "暂无执行记录", "noOutput": "无输出数据", diff --git a/webui/src/pages/WorkflowDetail/RightPanel.tsx b/webui/src/pages/WorkflowDetail/RightPanel.tsx index 8c0db61ab..dd6b371e9 100644 --- a/webui/src/pages/WorkflowDetail/RightPanel.tsx +++ b/webui/src/pages/WorkflowDetail/RightPanel.tsx @@ -6,6 +6,7 @@ import { useConfirm } from '@/components/common/ConfirmDialog'; import OverviewTab from './tabs/OverviewTab'; import ChatTab from './tabs/ChatTab'; import RunTab from './tabs/RunTab'; +import IntegrationTab from './tabs/IntegrationTab'; // ───────────────────────────────────────────── // Error boundary helpers @@ -59,7 +60,7 @@ class TabErrorBoundary extends Component< // RightPanel // ───────────────────────────────────────────── -type TabId = 'chat' | 'overview' | 'run'; +type TabId = 'chat' | 'overview' | 'run' | 'integration'; interface RightPanelProps { workflow: Workflow; @@ -107,9 +108,10 @@ export default function RightPanel({ }; const TABS: { id: TabId; label: string }[] = [ - { id: 'overview', label: t('detail.rightPanel.tabOverview') }, - { id: 'chat', label: t('detail.rightPanel.tabChat') }, - { id: 'run', label: t('detail.rightPanel.tabRun') }, + { id: 'overview', label: t('detail.rightPanel.tabOverview') }, + { id: 'chat', label: t('detail.rightPanel.tabChat') }, + { id: 'run', label: t('detail.rightPanel.tabRun') }, + { id: 'integration', label: t('detail.rightPanel.tabIntegration') }, ]; return ( @@ -158,6 +160,11 @@ export default function RightPanel({ /> )} + {activeTab === 'integration' && ( + + + + )} {/* 底部删除按钮 */} diff --git a/webui/src/pages/WorkflowDetail/tabs/IntegrationTab.tsx b/webui/src/pages/WorkflowDetail/tabs/IntegrationTab.tsx new file mode 100644 index 000000000..71e8666a7 --- /dev/null +++ b/webui/src/pages/WorkflowDetail/tabs/IntegrationTab.tsx @@ -0,0 +1,534 @@ +import { useState, useEffect, useCallback } from 'react'; +import { + Loader2, Globe, StopCircle, Check, ChevronDown, ChevronRight, + AlertCircle, Wifi, Server, +} from 'lucide-react'; +import { useTranslation } from 'react-i18next'; +import { + workflowAPI, + Workflow, + WorkflowService, + SyslogListenerStatus, +} from '@/api/workflow'; +import CopyButton from '@/components/common/CopyButton'; +import WorkflowStatusBadge from '@/components/common/WorkflowStatusBadge'; +import { extractErrorMessage } from '@/utils/error'; + +export interface IntegrationTabProps { + workflow: Workflow; +} + +// ───────────────────────────────────────────── +// 共享 SectionHeader +// ───────────────────────────────────────────── +function SectionHeader({ + title, + expanded, + onToggle, + badge, +}: { + title: string; + expanded: boolean; + onToggle: () => void; + badge?: React.ReactNode; +}) { + return ( + + ); +} + +// ───────────────────────────────────────────── +// 发布为 API +// ───────────────────────────────────────────── +function PublishSection({ workflowId }: { workflowId: string }) { + const { t } = useTranslation('workflow'); + const [expanded, setExpanded] = useState(true); + const [service, setService] = useState(null); + const [loadingService, setLoadingService] = useState(true); + const [publishing, setPublishing] = useState(false); + const [stopping, setStopping] = useState(false); + const [error, setError] = useState(''); + const [apiKeyVisible, setApiKeyVisible] = useState(false); + + const fetchService = useCallback(async () => { + try { + const res = await workflowAPI.getService(workflowId); + setService(res.data); + } catch { + setService(null); + } finally { + setLoadingService(false); + } + }, [workflowId]); + + useEffect(() => { + fetchService(); + }, [fetchService]); + + const handlePublish = async () => { + setError(''); + setPublishing(true); + try { + const res = await workflowAPI.publish(workflowId); + setService(res.data); + } catch (err: unknown) { + setError(extractErrorMessage(err, t('detail.run.publishFailed'))); + } finally { + setPublishing(false); + } + }; + + const handleUnpublish = async () => { + setError(''); + setStopping(true); + try { + await workflowAPI.unpublish(workflowId); + await fetchService(); + } catch (err: unknown) { + setError(extractErrorMessage(err, t('detail.run.stopFailed'))); + } finally { + setStopping(false); + } + }; + + const maskedKey = (key?: string) => { + if (!key) return '***'; + return apiKeyVisible ? key : `${key.slice(0, 4)}${'*'.repeat(Math.max(0, key.length - 8))}${key.slice(-4)}`; + }; + + const badge = service && ; + + return ( +
+ setExpanded(v => !v)} + badge={badge} + /> + {expanded && ( +
+ {loadingService ? ( +
+ +
+ ) : service && service.status !== 'stopped' ? ( +
+
+ +
+ {service.invokeUrl ?? ''} + +
+
+
+ +
+ + {maskedKey(service.apiKey)} + + + +
+
+
+ +
+
{`curl -X POST ${service.invokeUrl ?? ''} \\
+  -H "Content-Type: application/json" \\
+  -H "X-API-Key: ${service.apiKey ?? ''}" \\
+  -d '{"inputs": {}}'`}
+
+ +
+
+
+ +
+ ) : ( +
+

+ {t('detail.run.publishDesc')} +

+ + {publishing && ( +

{t('detail.run.dockerStarting')}

+ )} +
+ )} + {error && ( +
+ + {error} +
+ )} +
+ )} +
+ ); +} + +// ───────────────────────────────────────────── +// Kafka 配置 +// ───────────────────────────────────────────── +function KafkaSection({ workflowId }: { workflowId: string }) { + const { t } = useTranslation('workflow'); + const [expanded, setExpanded] = useState(false); + const [saving, setSaving] = useState(false); + const [saved, setSaved] = useState(false); + const [inputBroker, setInputBroker] = useState(''); + const [inputTopic, setInputTopic] = useState(''); + const [inputGroupId, setInputGroupId] = useState(''); + const [outputBroker, setOutputBroker] = useState(''); + const [outputTopic, setOutputTopic] = useState(''); + + useEffect(() => { + workflowAPI.getKafkaConfig(workflowId).then(res => { + if (res.data) { + setInputBroker(res.data.inputBroker || ''); + setInputTopic(res.data.inputTopic || ''); + setInputGroupId(res.data.inputGroupId || ''); + setOutputBroker(res.data.outputBroker || ''); + setOutputTopic(res.data.outputTopic || ''); + } + }).catch(() => {}); + }, [workflowId]); + + const handleSave = async () => { + setSaving(true); + setSaved(false); + try { + await workflowAPI.saveKafkaConfig(workflowId, { + inputBroker, inputTopic, inputGroupId, outputBroker, outputTopic, + }); + setSaved(true); + setTimeout(() => setSaved(false), 2000); + } catch { + // ignore - stub endpoint may return 501 + } finally { + setSaving(false); + } + }; + + const inputField = (label: string, value: string, onChange: (v: string) => void, placeholder: string) => ( +
+ + onChange(e.target.value)} + placeholder={placeholder} + className="w-full text-xs border border-gray-200 rounded-lg px-3 py-1.5 focus:outline-none focus:ring-1 focus:ring-red-500" + /> +
+ ); + + return ( +
+ setExpanded(v => !v)} + badge={{t('detail.run.kafkaExperimental')}} + /> + {expanded && ( +
+
+

+ {t('detail.run.inputConfig')} +

+ {inputField('Broker', inputBroker, setInputBroker, 'localhost:9092')} + {inputField('Topic', inputTopic, setInputTopic, 'workflow-input')} + {inputField('Consumer Group', inputGroupId, setInputGroupId, 'flocks-consumer')} +
+
+

+ {t('detail.run.outputConfig')} +

+ {inputField('Broker', outputBroker, setOutputBroker, 'localhost:9092')} + {inputField('Topic', outputTopic, setOutputTopic, 'workflow-output')} +
+ +

{t('detail.run.kafkaHint')}

+
+ )} +
+ ); +} + +// ───────────────────────────────────────────── +// Syslog 配置 +// ───────────────────────────────────────────── +function SyslogSection({ workflowId }: { workflowId: string }) { + const { t } = useTranslation('workflow'); + const [expanded, setExpanded] = useState(false); + const [saving, setSaving] = useState(false); + const [saved, setSaved] = useState(false); + const [enabled, setEnabled] = useState(false); + const [protocol, setProtocol] = useState('udp'); + const [host, setHost] = useState('0.0.0.0'); + const [port, setPort] = useState('5140'); + const [format, setFormat] = useState('auto'); + const [inputKey, setInputKey] = useState('syslog_message'); + // Runtime listener state (independent from saved config) — only this should + // drive the "Listening" indicator, otherwise a bind failure leaves the UI + // falsely showing the listener as active. + const [listener, setListener] = useState(null); + const [saveError, setSaveError] = useState(''); + + const refreshStatus = useCallback(async () => { + try { + const res = await workflowAPI.getSyslogStatus(workflowId); + setListener(res.data); + } catch { + // ignore — older backend / transient failure: UI will show "unknown" + } + }, [workflowId]); + + const isListening = listener?.state === 'listening'; + const isBinding = listener?.state === 'binding'; + const isFailed = listener?.state === 'failed'; + + // 摘要行:仅当后端真正报告 listening 时才显示绿色 active + let summaryBadge: React.ReactNode; + if (isListening) { + summaryBadge = ( + + {(listener?.protocol || protocol).toUpperCase()} {listener?.host || host}:{listener?.port ?? port} + {' · '}{t('detail.run.syslogActive')} + + ); + } else if (enabled && isBinding) { + summaryBadge = ( + + {protocol.toUpperCase()} {host}:{port} · binding… + + ); + } else if (enabled && isFailed) { + summaryBadge = ( + + {protocol.toUpperCase()} {host}:{port} · {listener?.error || 'failed'} + + ); + } else { + summaryBadge = ( + {t('detail.run.syslogExperimental')} + ); + } + + useEffect(() => { + workflowAPI.getSyslogConfig(workflowId).then(res => { + if (res.data) { + setEnabled(!!res.data.enabled); + setProtocol(res.data.protocol || 'udp'); + setHost(res.data.host || '0.0.0.0'); + setPort(String(res.data.port ?? 5140)); + setFormat(res.data.format || 'auto'); + setInputKey(res.data.inputKey || 'syslog_message'); + } + }).catch(() => {}); + refreshStatus(); + }, [workflowId, refreshStatus]); + + // While "binding" we poll briefly so the UI converges on the real state + // without forcing the user to refresh. + useEffect(() => { + if (!isBinding) return; + const handle = window.setInterval(() => { + refreshStatus(); + }, 1500); + return () => window.clearInterval(handle); + }, [isBinding, refreshStatus]); + + const handleSave = async () => { + setSaving(true); + setSaved(false); + setSaveError(''); + try { + const res = await workflowAPI.saveSyslogConfig(workflowId, { + enabled, + protocol, + host, + port: Number.parseInt(port, 10) || 5140, + format, + inputKey, + }); + if (res.data?.listener) { + setListener(res.data.listener); + } else { + refreshStatus(); + } + setSaved(true); + setTimeout(() => setSaved(false), 2000); + } catch (err: unknown) { + setSaveError(extractErrorMessage(err, t('detail.run.savingConfig'))); + refreshStatus(); + } finally { + setSaving(false); + } + }; + + const inputField = (label: string, value: string, onChange: (v: string) => void, placeholder: string) => ( +
+ + onChange(e.target.value)} + placeholder={placeholder} + className="w-full text-xs border border-gray-200 rounded-lg px-3 py-1.5 focus:outline-none focus:ring-1 focus:ring-red-500" + /> +
+ ); + + return ( +
+ setExpanded(v => !v)} + badge={summaryBadge} + /> + {expanded && ( +
+
+ setEnabled(e.target.checked)} + className="rounded border-gray-300 text-red-600 focus:ring-red-500" + /> + +
+
+

+ {t('detail.run.inputConfig')} +

+
+ + +
+ {inputField(t('detail.run.syslogHost'), host, setHost, '0.0.0.0')} + {inputField(t('detail.run.syslogPort'), port, setPort, '5140')} +
+ + +
+ {inputField(t('detail.run.syslogInputKey'), inputKey, setInputKey, 'syslog_message')} +
+ + {saveError && ( +
+ + {saveError} +
+ )} + {enabled && isFailed && !saveError && ( +
+ + + Listener failed to bind: {listener?.error || 'unknown error'} + +
+ )} + {enabled && isListening && typeof listener?.queueSize === 'number' && ( +

+ queue {listener.queueSize}/{listener.queueCapacity ?? '?'} · workers {listener.workerCount ?? '?'} +

+ )} +

{t('detail.run.syslogHint')}

+
+ )} +
+ ); +} + +// ───────────────────────────────────────────── +// 主组件 +// ───────────────────────────────────────────── +export default function IntegrationTab({ workflow }: IntegrationTabProps) { + return ( +
+ + + +
+ ); +} diff --git a/webui/src/pages/WorkflowDetail/tabs/RunTab.test.tsx b/webui/src/pages/WorkflowDetail/tabs/RunTab.test.tsx index 82b47e28b..8709af1e7 100644 --- a/webui/src/pages/WorkflowDetail/tabs/RunTab.test.tsx +++ b/webui/src/pages/WorkflowDetail/tabs/RunTab.test.tsx @@ -17,6 +17,8 @@ const { workflowAPI } = vi.hoisted(() => ({ unpublish: vi.fn(), getKafkaConfig: vi.fn(), saveKafkaConfig: vi.fn(), + getSyslogConfig: vi.fn(), + saveSyslogConfig: vi.fn(), getHistory: vi.fn(), }, })); @@ -74,6 +76,15 @@ vi.mock('react-i18next', () => ({ 'detail.run.savedConfig': '已保存', 'detail.run.saveConfig': '保存配置', 'detail.run.kafkaHint': 'hint', + 'detail.run.syslogSection': 'Syslog', + 'detail.run.syslogExperimental': '实验性', + 'detail.run.syslogEnabled': '启用', + 'detail.run.syslogProtocol': '协议', + 'detail.run.syslogHost': '地址', + 'detail.run.syslogPort': '端口', + 'detail.run.syslogFormat': '格式', + 'detail.run.syslogInputKey': '键名', + 'detail.run.syslogHint': 'syslog hint', 'detail.run.historySection': '执行历史', 'detail.run.noHistory': '暂无执行记录', 'detail.run.noOutput': '无输出数据', @@ -130,6 +141,7 @@ describe('RunTab', () => { workflowAPI.saveSampleInputs.mockResolvedValue({ data: { ok: true } }); workflowAPI.getService.mockResolvedValue({ data: null }); workflowAPI.getKafkaConfig.mockResolvedValue({ data: null }); + workflowAPI.getSyslogConfig.mockResolvedValue({ data: null }); workflowAPI.getHistory.mockResolvedValue({ data: [] }); workflowAPI.run.mockResolvedValue({ data: { diff --git a/webui/src/pages/WorkflowDetail/tabs/RunTab.tsx b/webui/src/pages/WorkflowDetail/tabs/RunTab.tsx index 0852c0ff0..b1f35cbb3 100644 --- a/webui/src/pages/WorkflowDetail/tabs/RunTab.tsx +++ b/webui/src/pages/WorkflowDetail/tabs/RunTab.tsx @@ -1,14 +1,13 @@ import { useState, useEffect, useCallback, useRef } from 'react'; import { - Loader2, ChevronDown, ChevronRight, Globe, StopCircle, - Check, Clock, CheckCircle, XCircle, AlertCircle, Wifi, FlaskConical, + Loader2, ChevronDown, ChevronRight, StopCircle, + Clock, CheckCircle, XCircle, AlertCircle, FlaskConical, } from 'lucide-react'; import { useTranslation } from 'react-i18next'; import { workflowAPI, Workflow, WorkflowExecution, - WorkflowService, WorkflowJSON, } from '@/api/workflow'; import CopyButton from '@/components/common/CopyButton'; @@ -484,252 +483,6 @@ function TestSection({ ); } -// ───────────────────────────────────────────── -// 区块2:发布为 API -// ───────────────────────────────────────────── -function PublishSection({ workflowId }: { workflowId: string }) { - const { t } = useTranslation('workflow'); - const [expanded, setExpanded] = useState(true); - const [service, setService] = useState(null); - const [loadingService, setLoadingService] = useState(true); - const [publishing, setPublishing] = useState(false); - const [stopping, setStopping] = useState(false); - const [error, setError] = useState(''); - const [apiKeyVisible, setApiKeyVisible] = useState(false); - - const fetchService = useCallback(async () => { - try { - const res = await workflowAPI.getService(workflowId); - setService(res.data); - } catch { - setService(null); - } finally { - setLoadingService(false); - } - }, [workflowId]); - - useEffect(() => { - fetchService(); - }, [fetchService]); - - const handlePublish = async () => { - setError(''); - setPublishing(true); - try { - const res = await workflowAPI.publish(workflowId); - setService(res.data); - } catch (err: unknown) { - setError(extractErrorMessage(err, t('detail.run.publishFailed'))); - } finally { - setPublishing(false); - } - }; - - const handleUnpublish = async () => { - setError(''); - setStopping(true); - try { - await workflowAPI.unpublish(workflowId); - await fetchService(); - } catch (err: unknown) { - setError(extractErrorMessage(err, t('detail.run.stopFailed'))); - } finally { - setStopping(false); - } - }; - - const maskedKey = (key?: string) => { - if (!key) return '***'; - return apiKeyVisible ? key : `${key.slice(0, 4)}${'*'.repeat(Math.max(0, key.length - 8))}${key.slice(-4)}`; - }; - - const badge = service && ( - - ); - - return ( -
- setExpanded(v => !v)} badge={badge} /> - {expanded && ( -
- {loadingService ? ( -
- -
- ) : service && service.status !== 'stopped' ? ( -
-
- -
- {service.invokeUrl ?? ''} - -
-
-
- -
- - {maskedKey(service.apiKey)} - - - -
-
-
- -
-
{`curl -X POST ${service.invokeUrl ?? ''} \\
-  -H "Content-Type: application/json" \\
-  -H "X-API-Key: ${service.apiKey ?? ''}" \\
-  -d '{"inputs": {}}'`}
-
- -
-
-
- -
- ) : ( -
-

- {t('detail.run.publishDesc')} -

- - {publishing && ( -

{t('detail.run.dockerStarting')}

- )} -
- )} - {error && ( -
- - {error} -
- )} -
- )} -
- ); -} - -// ───────────────────────────────────────────── -// 区块3:Kafka 配置 -// ───────────────────────────────────────────── -function KafkaSection({ workflowId }: { workflowId: string }) { - const { t } = useTranslation('workflow'); - const [expanded, setExpanded] = useState(false); - const [saving, setSaving] = useState(false); - const [saved, setSaved] = useState(false); - const [inputBroker, setInputBroker] = useState(''); - const [inputTopic, setInputTopic] = useState(''); - const [inputGroupId, setInputGroupId] = useState(''); - const [outputBroker, setOutputBroker] = useState(''); - const [outputTopic, setOutputTopic] = useState(''); - - useEffect(() => { - workflowAPI.getKafkaConfig(workflowId).then(res => { - if (res.data) { - setInputBroker(res.data.inputBroker || ''); - setInputTopic(res.data.inputTopic || ''); - setInputGroupId(res.data.inputGroupId || ''); - setOutputBroker(res.data.outputBroker || ''); - setOutputTopic(res.data.outputTopic || ''); - } - }).catch(() => {}); - }, [workflowId]); - - const handleSave = async () => { - setSaving(true); - setSaved(false); - try { - await workflowAPI.saveKafkaConfig(workflowId, { - inputBroker, inputTopic, inputGroupId, outputBroker, outputTopic, - }); - setSaved(true); - setTimeout(() => setSaved(false), 2000); - } catch { - // ignore - stub endpoint may return 501 - } finally { - setSaving(false); - } - }; - - const inputField = (label: string, value: string, onChange: (v: string) => void, placeholder: string) => ( -
- - onChange(e.target.value)} - placeholder={placeholder} - className="w-full text-xs border border-gray-200 rounded-lg px-3 py-1.5 focus:outline-none focus:ring-1 focus:ring-red-500" - /> -
- ); - - return ( -
- setExpanded(v => !v)} - badge={{t('detail.run.kafkaExperimental')}} - /> - {expanded && ( -
-
-

- {t('detail.run.inputConfig')} -

- {inputField('Broker', inputBroker, setInputBroker, 'localhost:9092')} - {inputField('Topic', inputTopic, setInputTopic, 'workflow-input')} - {inputField('Consumer Group', inputGroupId, setInputGroupId, 'flocks-consumer')} -
-
-

- {t('detail.run.outputConfig')} -

- {inputField('Broker', outputBroker, setOutputBroker, 'localhost:9092')} - {inputField('Topic', outputTopic, setOutputTopic, 'workflow-output')} -
- -

{t('detail.run.kafkaHint')}

-
- )} -
- ); -} - // ───────────────────────────────────────────── // 单步详情组件 // ───────────────────────────────────────────── @@ -1036,8 +789,6 @@ export default function RunTab({ onExecutionChange={onLatestExecutionChange} onExecutionSettled={onExecutionSettled} /> - -