From 5ff72fc4e1188879d09cac8b96a89e5066d4c1e7 Mon Sep 17 00:00:00 2001 From: xiami762 <> Date: Mon, 18 May 2026 15:57:19 +0800 Subject: [PATCH] fix(logging): compact OpenAI request logs and demote high-volume INFO Replace per-message message_shapes with aggregate message_summary for each chat/stream request, and move plugin, tool, session, and compaction noise from INFO to DEBUG so long runs stay readable. Co-authored-by: Cursor --- flocks/plugin/loader.py | 6 +- flocks/provider/sdk/openai_base.py | 98 +++++++++--- flocks/session/lifecycle/compaction/policy.py | 2 +- flocks/session/runner.py | 4 +- flocks/session/session_loop.py | 2 +- flocks/tool/registry.py | 6 +- flocks/tool/tool_loader.py | 2 +- tests/cli/test_help_quiet.py | 3 + tests/provider/test_openai_base_provider.py | 79 +++++++++- tests/session/test_compaction_policy.py | 22 ++- tests/tool/test_logging_noise.py | 139 ++++++++++++++++++ 11 files changed, 329 insertions(+), 34 deletions(-) create mode 100644 tests/tool/test_logging_noise.py diff --git a/flocks/plugin/loader.py b/flocks/plugin/loader.py index 5e47b6cc9..de3792a77 100644 --- a/flocks/plugin/loader.py +++ b/flocks/plugin/loader.py @@ -234,7 +234,7 @@ def load_all( exclude_subdirs=ext.exclude_subdirs, ) if default_sources: - log.info( + log.debug( "plugin.scan", { "subdir": ext.subdir, @@ -253,7 +253,7 @@ def load_all( exclude_subdirs=ext.exclude_subdirs, ) if project_sources: - log.info( + log.debug( "plugin.project.scan", { "subdir": ext.subdir, @@ -415,7 +415,7 @@ def _load_yaml_source(cls, ext: ExtensionPoint, yaml_path: Path) -> None: items = cls._validate_and_dedup(ext, [item], str(yaml_path)) if items: ext.consumer(items, str(yaml_path)) - log.info( + log.debug( "plugin.yaml_dispatched", { "source": str(yaml_path), diff --git a/flocks/provider/sdk/openai_base.py b/flocks/provider/sdk/openai_base.py index 54ac3afc9..905d5779c 100644 --- a/flocks/provider/sdk/openai_base.py +++ b/flocks/provider/sdk/openai_base.py @@ -86,29 +86,85 @@ def _summarise_block(block: Any) -> Dict[str, Any]: return {"type": type(block).__name__} -def _summarise_messages(openai_messages: List[Any]) -> List[Dict[str, Any]]: - """Compute a redacted ``message_shapes`` for diagnostic logging. +def _summarise_messages(openai_messages: List[Any]) -> Dict[str, Any]: + """Compute a compact diagnostic summary for request logging. - See :func:`_summarise_block`. Used by both streaming and non-streaming - request paths so multimodal regressions surface uniformly in the log. + The previous ``message_shapes`` payload logged every message in the request, + which became very large on long-running sessions because the full history + was repeated on every model call. Keep the useful signal, but collapse it + into aggregate counters plus a short tail of the most recent messages. """ - out: List[Dict[str, Any]] = [] + role_counts: Dict[str, int] = {} + block_type_counts: Dict[str, int] = {} + skipped_types: Dict[str, int] = {} + tail: List[Dict[str, Any]] = [] + total_content_chars = 0 + max_content_chars = 0 + multimodal_messages = 0 + for m in openai_messages: if not isinstance(m, dict): - out.append({"type": type(m).__name__, "skipped": True}) + skipped_type = type(m).__name__ + skipped_types[skipped_type] = skipped_types.get(skipped_type, 0) + 1 continue + + role = str(m.get("role") or "unknown") + role_counts[role] = role_counts.get(role, 0) + 1 content = m.get("content") + tail_entry: Dict[str, Any] = {"role": role} + content_chars: Optional[int] = None + if isinstance(content, list): - out.append({ - "role": m.get("role"), - "blocks": [_summarise_block(b) for b in content], - }) + multimodal_messages += 1 + block_summaries = [_summarise_block(block) for block in content] + message_block_types: Dict[str, int] = {} + text_chars = 0 + image_url_chars = 0 + for summary in block_summaries: + block_type = str(summary.get("type") or "unknown") + block_type_counts[block_type] = block_type_counts.get(block_type, 0) + 1 + message_block_types[block_type] = message_block_types.get(block_type, 0) + 1 + text_chars += int(summary.get("text_chars") or 0) + image_url_chars += int(summary.get("url_chars") or 0) + + content_chars = text_chars + image_url_chars + tail_entry.update( + { + "content_kind": "blocks", + "block_count": len(content), + "block_types": message_block_types, + } + ) + if text_chars: + tail_entry["text_chars"] = text_chars + if image_url_chars: + tail_entry["image_url_chars"] = image_url_chars else: - out.append({ - "role": m.get("role"), - "content_chars": len(content) if isinstance(content, str) else None, - }) - return out + content_chars = len(content) if isinstance(content, str) else None + tail_entry["content_kind"] = "text" + + if content_chars is not None: + tail_entry["content_chars"] = content_chars + total_content_chars += content_chars + max_content_chars = max(max_content_chars, content_chars) + + tail.append(tail_entry) + if len(tail) > 6: + tail.pop(0) + + summary: Dict[str, Any] = { + "message_count": len(openai_messages), + "role_counts": role_counts, + "multimodal_messages": multimodal_messages, + "total_content_chars": total_content_chars, + "max_content_chars": max_content_chars, + "tail": tail, + } + if block_type_counts: + summary["block_type_counts"] = block_type_counts + if skipped_types: + summary["skipped_types"] = skipped_types + return summary def format_openai_content(content: Any) -> Any: @@ -694,8 +750,8 @@ async def chat( params["tools"] = kwargs["tools"] # Mirror ``chat_stream``'s diagnostic log so non-streaming multimodal - # regressions are equally visible. Never logs raw base64 — see - # ``_summarise_block``. + # regressions are equally visible. Keep INFO payloads compact because + # these request logs are emitted for every model call in long sessions. log.info("openai_base.chat.request", { "model": model_id, "thinking_enabled": bool(thinking), @@ -703,7 +759,7 @@ async def chat( "has_tools": bool(kwargs.get("tools")), "max_tokens": kwargs.get("max_tokens"), "has_temperature": "temperature" in params, - "message_shapes": _summarise_messages(openai_messages), + "message_summary": _summarise_messages(openai_messages), }) response = await client.chat.completions.create(**params) @@ -768,8 +824,8 @@ async def chat_stream( if kwargs.get("tools"): params["tools"] = kwargs["tools"] - # Inspect content shape so multimodal regressions surface in the log. - # We *never* log full base64 payloads — see ``_summarise_block``. + # Inspect request shape so multimodal regressions surface in the log + # without repeating the full message history on every turn. log.info("openai_base.stream.request", { "model": model_id, "thinking_enabled": bool(thinking), @@ -778,7 +834,7 @@ async def chat_stream( "max_tokens": kwargs.get("max_tokens"), "has_temperature": "temperature" in params, "include_usage": True, - "message_shapes": _summarise_messages(openai_messages), + "message_summary": _summarise_messages(openai_messages), }) try: diff --git a/flocks/session/lifecycle/compaction/policy.py b/flocks/session/lifecycle/compaction/policy.py index f046c1cd8..6b40feec0 100644 --- a/flocks/session/lifecycle/compaction/policy.py +++ b/flocks/session/lifecycle/compaction/policy.py @@ -227,7 +227,7 @@ def from_model( preserve_last=preserve_last, ) - _policy_log.info("compaction_policy.created", { + _policy_log.debug("compaction_policy.created", { "context_window": context_window, "max_output_tokens": max_output_tokens, "usable_context": usable, diff --git a/flocks/session/runner.py b/flocks/session/runner.py index 699a6557b..aa17121a5 100644 --- a/flocks/session/runner.py +++ b/flocks/session/runner.py @@ -2509,7 +2509,7 @@ def _build_llm_response_payload( log.debug("runner.hook.llm_after.error", {"error": str(hook_exc)}) raise - log.info("runner.stream.summary", { + log.debug("runner.stream.summary", { "total_chunks": chunk_counts["total"], "reasoning_chunks": chunk_counts["reasoning"], "text_chunks": chunk_counts["text"], @@ -2559,7 +2559,7 @@ def _build_llm_response_payload( log.warn("runner.stream.usage_update_failed", {"error": str(e)}) # Log summary - log.info("runner.stream.complete", { + log.debug("runner.stream.complete", { "text_length": len(content), "reasoning_length": len(reasoning), "tool_calls": len(processor.tool_calls), diff --git a/flocks/session/session_loop.py b/flocks/session/session_loop.py index 613b2a705..5962715f5 100644 --- a/flocks/session/session_loop.py +++ b/flocks/session/session_loop.py @@ -831,7 +831,7 @@ async def progress_callback(stage: str, data: dict) -> None: ctx.session.id, messages ) tokens_dict = {"input": estimated_tokens, "output": 0, "cache": {"read": 0, "write": 0}} - log.info("loop.tokens_estimated_from_messages", { + log.debug("loop.tokens_estimated_from_messages", { "session_id": ctx.session.id, "estimated_tokens": estimated_tokens, "message_count": len(messages), diff --git a/flocks/tool/registry.py b/flocks/tool/registry.py index 72994865a..496285173 100644 --- a/flocks/tool/registry.py +++ b/flocks/tool/registry.py @@ -626,7 +626,7 @@ def _bump_revision(cls, reason: str) -> None: Agent.invalidate_cache() except Exception as e: log.debug("tool.revision.agent_invalidate_failed", {"error": str(e)}) - log.info("tool.registry.revision.bumped", {"revision": cls._revision, "reason": reason}) + log.debug("tool.registry.revision.bumped", {"revision": cls._revision, "reason": reason}) @classmethod def register_function( @@ -958,7 +958,7 @@ def _sync_api_service_states(cls) -> None: p for p, svc in api_services.items() if not svc.get("enabled", False) ] - log.info("tool_registry.api_service_sync", { + log.debug("tool_registry.api_service_sync", { "disabled_tools": disabled_count, "disabled_providers": disabled_providers, }) @@ -1639,7 +1639,7 @@ def _do_refresh(self) -> None: def _run_refresh(self) -> None: try: ToolRegistry.refresh_plugin_tools() - log.info("tool.watcher.reloaded", {"reason": "plugin tool file changed on disk"}) + log.debug("tool.watcher.reloaded", {"reason": "plugin tool file changed on disk"}) except Exception as e: log.warn("tool.watcher.reload_failed", {"error": str(e)}) diff --git a/flocks/tool/tool_loader.py b/flocks/tool/tool_loader.py index 8df57f716..6afb581f1 100644 --- a/flocks/tool/tool_loader.py +++ b/flocks/tool/tool_loader.py @@ -615,7 +615,7 @@ def yaml_to_tool(raw: dict, yaml_path: Path) -> Tool: tool._provider_version = provider_version # type: ignore[attr-defined] tool._source = source or "yaml_plugin" # type: ignore[attr-defined] - log.info("tool.yaml.loaded", { + log.debug("tool.yaml.loaded", { "name": name, "service_id": service_id, "storage_key": storage_key, diff --git a/tests/cli/test_help_quiet.py b/tests/cli/test_help_quiet.py index 9fcf1b146..0b3e9ce2d 100644 --- a/tests/cli/test_help_quiet.py +++ b/tests/cli/test_help_quiet.py @@ -5,10 +5,13 @@ ROOT = Path(__file__).resolve().parents[2] NOISY_MARKERS = ( + "plugin.scan", "plugin.project.scan", "tool.yaml.loaded", "plugin.yaml_dispatched", "tool_registry.api_service_sync", + "tool.registry.revision.bumped", + "tool.watcher.reloaded", ) diff --git a/tests/provider/test_openai_base_provider.py b/tests/provider/test_openai_base_provider.py index b8d9bcabf..19384ec2a 100644 --- a/tests/provider/test_openai_base_provider.py +++ b/tests/provider/test_openai_base_provider.py @@ -8,9 +8,12 @@ 4. Provider with config-based models (defers to config merge) """ -import pytest from types import SimpleNamespace from unittest.mock import AsyncMock, Mock, patch, MagicMock + +import pytest + +import flocks.provider.sdk.openai_base as openai_base_module from flocks.provider.sdk.openai_base import OpenAIBaseProvider, extract_reasoning_content from flocks.provider.provider import ModelInfo, ModelCapabilities, ProviderConfig @@ -428,6 +431,80 @@ async def test_chat_passes_explicit_temperature(self): kwargs = create.await_args.kwargs assert kwargs["temperature"] == 1.0 + def test_summarise_messages_compacts_long_history(self): + summary = openai_base_module._summarise_messages( + [ + {"role": "system", "content": "a" * 10}, + {"role": "user", "content": "b" * 20}, + {"role": "assistant", "content": "c" * 30}, + {"role": "tool", "content": "d" * 40}, + {"role": "assistant", "content": "e" * 50}, + {"role": "user", "content": "f" * 60}, + { + "role": "user", + "content": [ + {"type": "text", "text": "hello"}, + { + "type": "image_url", + "image_url": {"url": "data:image/png;base64,abcd"}, + }, + ], + }, + "skipped", + ] + ) + + assert summary["message_count"] == 8 + assert summary["role_counts"] == { + "system": 1, + "user": 3, + "assistant": 2, + "tool": 1, + } + assert summary["multimodal_messages"] == 1 + assert summary["block_type_counts"] == {"text": 1, "image_url": 1} + assert summary["skipped_types"] == {"str": 1} + assert len(summary["tail"]) == 6 + assert summary["tail"][-1] == { + "role": "user", + "content_kind": "blocks", + "block_count": 2, + "block_types": {"text": 1, "image_url": 1}, + "text_chars": 5, + "image_url_chars": 26, + "content_chars": 31, + } + + @pytest.mark.asyncio + async def test_chat_logs_compact_message_summary(self): + provider, create = self._build_provider_with_client() + create.return_value = self._mock_chat_response() + info = Mock() + + from flocks.provider.provider import ChatMessage + + with patch.object(openai_base_module, "log", Mock(info=info)): + await provider.chat( + "kimi-k2.5", + [ + ChatMessage(role="system", content="system prompt"), + ChatMessage(role="user", content="hello"), + ChatMessage(role="assistant", content="world"), + ], + max_tokens=20, + ) + + event, payload = info.call_args.args + assert event == "openai_base.chat.request" + assert "message_summary" in payload + assert "message_shapes" not in payload + assert payload["message_summary"]["message_count"] == 3 + assert payload["message_summary"]["role_counts"] == { + "system": 1, + "user": 1, + "assistant": 1, + } + class TestExtractReasoningContent: """Regression: some proxies send stream chunks with ``delta is None``.""" diff --git a/tests/session/test_compaction_policy.py b/tests/session/test_compaction_policy.py index 7b0ecd173..ced34799c 100644 --- a/tests/session/test_compaction_policy.py +++ b/tests/session/test_compaction_policy.py @@ -6,9 +6,17 @@ all work correctly. """ +from unittest.mock import Mock + import pytest -from flocks.session.lifecycle.compaction import CompactionPolicy, ContextTier, _BOUNDS, _MIN_OVERFLOW_THRESHOLD +import flocks.session.lifecycle.compaction.policy as policy_module +from flocks.session.lifecycle.compaction import ( + CompactionPolicy, + ContextTier, + _BOUNDS, + _MIN_OVERFLOW_THRESHOLD, +) # --------------------------------------------------------------------------- @@ -46,6 +54,18 @@ def test_xlarge_tier(self): assert CompactionPolicy._classify_tier(1_000_000) == ContextTier.XLARGE +class TestPolicyLogging: + def test_policy_creation_uses_debug_log(self, monkeypatch: pytest.MonkeyPatch): + logger = Mock() + monkeypatch.setattr(policy_module, "_policy_log", logger) + + policy = CompactionPolicy.from_model(128_000, 16_384) + + assert policy.tier == ContextTier.LARGE + logger.debug.assert_called_once() + logger.info.assert_not_called() + + # --------------------------------------------------------------------------- # Model-specific policy computation # --------------------------------------------------------------------------- diff --git a/tests/tool/test_logging_noise.py b/tests/tool/test_logging_noise.py new file mode 100644 index 000000000..c3e198b2f --- /dev/null +++ b/tests/tool/test_logging_noise.py @@ -0,0 +1,139 @@ +from __future__ import annotations + +from pathlib import Path +from types import SimpleNamespace +from unittest.mock import Mock + +import flocks.plugin.loader as plugin_loader_module +import flocks.tool.registry as tool_registry_module +import flocks.tool.tool_loader as tool_loader_module +from flocks.plugin.loader import ExtensionPoint, PluginLoader +from flocks.tool.registry import ToolFileWatcher, ToolRegistry + + +def test_yaml_to_tool_logs_loaded_at_debug(monkeypatch) -> None: + logger = Mock() + monkeypatch.setattr(tool_loader_module, "log", logger) + monkeypatch.setattr(tool_loader_module, "_load_provider_config", lambda _path: None) + monkeypatch.setattr( + tool_loader_module, + "_merge_provider_defaults", + lambda raw, _provider_cfg: raw, + ) + monkeypatch.setattr( + tool_loader_module, + "_build_handler", + lambda _handler, _path: object(), + ) + + tool = tool_loader_module.yaml_to_tool( + { + "name": "demo_tool", + "description": "Demo tool", + "handler": {"type": "http", "method": "GET", "url": "https://example.com"}, + }, + Path("/tmp/demo_tool.yaml"), + ) + + assert tool.info.name == "demo_tool" + logger.debug.assert_called_once() + event, payload = logger.debug.call_args.args + assert event == "tool.yaml.loaded" + assert payload["name"] == "demo_tool" + logger.info.assert_not_called() + + +def test_plugin_loader_high_volume_logs_use_debug(tmp_path: Path, monkeypatch) -> None: + logger = Mock() + monkeypatch.setattr(plugin_loader_module, "log", logger) + + user_root = tmp_path / "user_plugins" + project_dir = tmp_path / "project" + user_tools_dir = user_root / "tools" + project_tools_dir = project_dir / ".flocks" / "plugins" / "tools" + user_tools_dir.mkdir(parents=True) + project_tools_dir.mkdir(parents=True) + (user_tools_dir / "user.yaml").write_text("name: user-tool\n", encoding="utf-8") + (project_tools_dir / "project.yaml").write_text("name: project-tool\n", encoding="utf-8") + + monkeypatch.setattr(PluginLoader, "_plugin_root", user_root) + PluginLoader.clear_extension_points() + consumed: list[dict[str, str]] = [] + PluginLoader.register_extension_point( + ExtensionPoint( + attr_name="TOOLS", + subdir="tools", + consumer=lambda items, _source: consumed.extend(items), + yaml_item_factory=lambda raw, path: { + "name": str(raw["name"]), + "path": str(path), + }, + ) + ) + + try: + PluginLoader.load_all(project_dir=project_dir) + finally: + PluginLoader.clear_extension_points() + + assert consumed == [ + {"name": "user-tool", "path": str(user_tools_dir / "user.yaml")}, + {"name": "project-tool", "path": str(project_tools_dir / "project.yaml")}, + ] + + debug_events = [call.args[0] for call in logger.debug.call_args_list] + assert "plugin.scan" in debug_events + assert "plugin.project.scan" in debug_events + assert debug_events.count("plugin.yaml_dispatched") == 2 + + info_events = [call.args[0] for call in logger.info.call_args_list] + assert "plugin.scan" not in info_events + assert "plugin.project.scan" not in info_events + assert "plugin.yaml_dispatched" not in info_events + + +def test_tool_registry_high_volume_logs_use_debug(monkeypatch) -> None: + logger = Mock() + monkeypatch.setattr(tool_registry_module, "log", logger) + monkeypatch.setattr("flocks.agent.registry.Agent.invalidate_cache", lambda: None) + monkeypatch.setattr(ToolRegistry, "_revision", 41) + + ToolRegistry._bump_revision("plugin_refresh") + + event, payload = logger.debug.call_args.args + assert event == "tool.registry.revision.bumped" + assert payload == {"revision": 42, "reason": "plugin_refresh"} + logger.info.assert_not_called() + + +def test_tool_registry_api_service_sync_logs_at_debug(monkeypatch) -> None: + logger = Mock() + monkeypatch.setattr(tool_registry_module, "log", logger) + tool = SimpleNamespace(info=SimpleNamespace(provider="svc", enabled=True)) + monkeypatch.setattr(ToolRegistry, "_tools", {"demo": tool}) + monkeypatch.setattr( + "flocks.config.config_writer.ConfigWriter.list_api_services_raw", + lambda: {"svc": {"enabled": False}}, + ) + + ToolRegistry._sync_api_service_states() + + assert tool.info.enabled is False + event, payload = logger.debug.call_args.args + assert event == "tool_registry.api_service_sync" + assert payload == {"disabled_tools": 1, "disabled_providers": ["svc"]} + logger.info.assert_not_called() + + +def test_tool_file_watcher_refresh_logs_at_debug(monkeypatch) -> None: + logger = Mock() + monkeypatch.setattr(tool_registry_module, "log", logger) + monkeypatch.setattr(ToolRegistry, "refresh_plugin_tools", lambda: []) + + watcher = ToolFileWatcher() + watcher._run_refresh() + + event, payload = logger.debug.call_args.args + assert event == "tool.watcher.reloaded" + assert payload == {"reason": "plugin tool file changed on disk"} + logger.info.assert_not_called()