Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions flocks/plugin/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ def load_all(
exclude_subdirs=ext.exclude_subdirs,
)
if default_sources:
log.info(
log.debug(
"plugin.scan",
{
"subdir": ext.subdir,
Expand All @@ -253,7 +253,7 @@ def load_all(
exclude_subdirs=ext.exclude_subdirs,
)
if project_sources:
log.info(
log.debug(
"plugin.project.scan",
{
"subdir": ext.subdir,
Expand Down Expand Up @@ -415,7 +415,7 @@ def _load_yaml_source(cls, ext: ExtensionPoint, yaml_path: Path) -> None:
items = cls._validate_and_dedup(ext, [item], str(yaml_path))
if items:
ext.consumer(items, str(yaml_path))
log.info(
log.debug(
"plugin.yaml_dispatched",
{
"source": str(yaml_path),
Expand Down
98 changes: 77 additions & 21 deletions flocks/provider/sdk/openai_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,29 +86,85 @@ def _summarise_block(block: Any) -> Dict[str, Any]:
return {"type": type(block).__name__}


def _summarise_messages(openai_messages: List[Any]) -> List[Dict[str, Any]]:
"""Compute a redacted ``message_shapes`` for diagnostic logging.
def _summarise_messages(openai_messages: List[Any]) -> Dict[str, Any]:
"""Compute a compact diagnostic summary for request logging.

See :func:`_summarise_block`. Used by both streaming and non-streaming
request paths so multimodal regressions surface uniformly in the log.
The previous ``message_shapes`` payload logged every message in the request,
which became very large on long-running sessions because the full history
was repeated on every model call. Keep the useful signal, but collapse it
into aggregate counters plus a short tail of the most recent messages.
"""
out: List[Dict[str, Any]] = []
role_counts: Dict[str, int] = {}
block_type_counts: Dict[str, int] = {}
skipped_types: Dict[str, int] = {}
tail: List[Dict[str, Any]] = []
total_content_chars = 0
max_content_chars = 0
multimodal_messages = 0

for m in openai_messages:
if not isinstance(m, dict):
out.append({"type": type(m).__name__, "skipped": True})
skipped_type = type(m).__name__
skipped_types[skipped_type] = skipped_types.get(skipped_type, 0) + 1
continue

role = str(m.get("role") or "unknown")
role_counts[role] = role_counts.get(role, 0) + 1
content = m.get("content")
tail_entry: Dict[str, Any] = {"role": role}
content_chars: Optional[int] = None

if isinstance(content, list):
out.append({
"role": m.get("role"),
"blocks": [_summarise_block(b) for b in content],
})
multimodal_messages += 1
block_summaries = [_summarise_block(block) for block in content]
message_block_types: Dict[str, int] = {}
text_chars = 0
image_url_chars = 0
for summary in block_summaries:
block_type = str(summary.get("type") or "unknown")
block_type_counts[block_type] = block_type_counts.get(block_type, 0) + 1
message_block_types[block_type] = message_block_types.get(block_type, 0) + 1
text_chars += int(summary.get("text_chars") or 0)
image_url_chars += int(summary.get("url_chars") or 0)

content_chars = text_chars + image_url_chars
tail_entry.update(
{
"content_kind": "blocks",
"block_count": len(content),
"block_types": message_block_types,
}
)
if text_chars:
tail_entry["text_chars"] = text_chars
if image_url_chars:
tail_entry["image_url_chars"] = image_url_chars
else:
out.append({
"role": m.get("role"),
"content_chars": len(content) if isinstance(content, str) else None,
})
return out
content_chars = len(content) if isinstance(content, str) else None
tail_entry["content_kind"] = "text"

if content_chars is not None:
tail_entry["content_chars"] = content_chars
total_content_chars += content_chars
max_content_chars = max(max_content_chars, content_chars)

tail.append(tail_entry)
if len(tail) > 6:
tail.pop(0)

summary: Dict[str, Any] = {
"message_count": len(openai_messages),
"role_counts": role_counts,
"multimodal_messages": multimodal_messages,
"total_content_chars": total_content_chars,
"max_content_chars": max_content_chars,
"tail": tail,
}
if block_type_counts:
summary["block_type_counts"] = block_type_counts
if skipped_types:
summary["skipped_types"] = skipped_types
return summary


def format_openai_content(content: Any) -> Any:
Expand Down Expand Up @@ -694,16 +750,16 @@ async def chat(
params["tools"] = kwargs["tools"]

# Mirror ``chat_stream``'s diagnostic log so non-streaming multimodal
# regressions are equally visible. Never logs raw base64 — see
# ``_summarise_block``.
# regressions are equally visible. Keep INFO payloads compact because
# these request logs are emitted for every model call in long sessions.
log.info("openai_base.chat.request", {
"model": model_id,
"thinking_enabled": bool(thinking),
"has_extra_body": "extra_body" in params,
"has_tools": bool(kwargs.get("tools")),
"max_tokens": kwargs.get("max_tokens"),
"has_temperature": "temperature" in params,
"message_shapes": _summarise_messages(openai_messages),
"message_summary": _summarise_messages(openai_messages),
})

response = await client.chat.completions.create(**params)
Expand Down Expand Up @@ -768,8 +824,8 @@ async def chat_stream(
if kwargs.get("tools"):
params["tools"] = kwargs["tools"]

# Inspect content shape so multimodal regressions surface in the log.
# We *never* log full base64 payloads — see ``_summarise_block``.
# Inspect request shape so multimodal regressions surface in the log
# without repeating the full message history on every turn.
log.info("openai_base.stream.request", {
"model": model_id,
"thinking_enabled": bool(thinking),
Expand All @@ -778,7 +834,7 @@ async def chat_stream(
"max_tokens": kwargs.get("max_tokens"),
"has_temperature": "temperature" in params,
"include_usage": True,
"message_shapes": _summarise_messages(openai_messages),
"message_summary": _summarise_messages(openai_messages),
})

try:
Expand Down
2 changes: 1 addition & 1 deletion flocks/session/lifecycle/compaction/policy.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ def from_model(
preserve_last=preserve_last,
)

_policy_log.info("compaction_policy.created", {
_policy_log.debug("compaction_policy.created", {
"context_window": context_window,
"max_output_tokens": max_output_tokens,
"usable_context": usable,
Expand Down
4 changes: 2 additions & 2 deletions flocks/session/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -2509,7 +2509,7 @@ def _build_llm_response_payload(
log.debug("runner.hook.llm_after.error", {"error": str(hook_exc)})
raise

log.info("runner.stream.summary", {
log.debug("runner.stream.summary", {
"total_chunks": chunk_counts["total"],
"reasoning_chunks": chunk_counts["reasoning"],
"text_chunks": chunk_counts["text"],
Expand Down Expand Up @@ -2559,7 +2559,7 @@ def _build_llm_response_payload(
log.warn("runner.stream.usage_update_failed", {"error": str(e)})

# Log summary
log.info("runner.stream.complete", {
log.debug("runner.stream.complete", {
"text_length": len(content),
"reasoning_length": len(reasoning),
"tool_calls": len(processor.tool_calls),
Expand Down
2 changes: 1 addition & 1 deletion flocks/session/session_loop.py
Original file line number Diff line number Diff line change
Expand Up @@ -831,7 +831,7 @@ async def progress_callback(stage: str, data: dict) -> None:
ctx.session.id, messages
)
tokens_dict = {"input": estimated_tokens, "output": 0, "cache": {"read": 0, "write": 0}}
log.info("loop.tokens_estimated_from_messages", {
log.debug("loop.tokens_estimated_from_messages", {
"session_id": ctx.session.id,
"estimated_tokens": estimated_tokens,
"message_count": len(messages),
Expand Down
6 changes: 3 additions & 3 deletions flocks/tool/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -626,7 +626,7 @@ def _bump_revision(cls, reason: str) -> None:
Agent.invalidate_cache()
except Exception as e:
log.debug("tool.revision.agent_invalidate_failed", {"error": str(e)})
log.info("tool.registry.revision.bumped", {"revision": cls._revision, "reason": reason})
log.debug("tool.registry.revision.bumped", {"revision": cls._revision, "reason": reason})

@classmethod
def register_function(
Expand Down Expand Up @@ -958,7 +958,7 @@ def _sync_api_service_states(cls) -> None:
p for p, svc in api_services.items()
if not svc.get("enabled", False)
]
log.info("tool_registry.api_service_sync", {
log.debug("tool_registry.api_service_sync", {
"disabled_tools": disabled_count,
"disabled_providers": disabled_providers,
})
Expand Down Expand Up @@ -1639,7 +1639,7 @@ def _do_refresh(self) -> None:
def _run_refresh(self) -> None:
try:
ToolRegistry.refresh_plugin_tools()
log.info("tool.watcher.reloaded", {"reason": "plugin tool file changed on disk"})
log.debug("tool.watcher.reloaded", {"reason": "plugin tool file changed on disk"})
except Exception as e:
log.warn("tool.watcher.reload_failed", {"error": str(e)})

Expand Down
2 changes: 1 addition & 1 deletion flocks/tool/tool_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -615,7 +615,7 @@ def yaml_to_tool(raw: dict, yaml_path: Path) -> Tool:
tool._provider_version = provider_version # type: ignore[attr-defined]
tool._source = source or "yaml_plugin" # type: ignore[attr-defined]

log.info("tool.yaml.loaded", {
log.debug("tool.yaml.loaded", {
"name": name,
"service_id": service_id,
"storage_key": storage_key,
Expand Down
3 changes: 3 additions & 0 deletions tests/cli/test_help_quiet.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,13 @@

ROOT = Path(__file__).resolve().parents[2]
NOISY_MARKERS = (
"plugin.scan",
"plugin.project.scan",
"tool.yaml.loaded",
"plugin.yaml_dispatched",
"tool_registry.api_service_sync",
"tool.registry.revision.bumped",
"tool.watcher.reloaded",
)


Expand Down
79 changes: 78 additions & 1 deletion tests/provider/test_openai_base_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,12 @@
4. Provider with config-based models (defers to config merge)
"""

import pytest
from types import SimpleNamespace
from unittest.mock import AsyncMock, Mock, patch, MagicMock

import pytest

import flocks.provider.sdk.openai_base as openai_base_module
from flocks.provider.sdk.openai_base import OpenAIBaseProvider, extract_reasoning_content
from flocks.provider.provider import ModelInfo, ModelCapabilities, ProviderConfig

Expand Down Expand Up @@ -428,6 +431,80 @@ async def test_chat_passes_explicit_temperature(self):
kwargs = create.await_args.kwargs
assert kwargs["temperature"] == 1.0

def test_summarise_messages_compacts_long_history(self):
summary = openai_base_module._summarise_messages(
[
{"role": "system", "content": "a" * 10},
{"role": "user", "content": "b" * 20},
{"role": "assistant", "content": "c" * 30},
{"role": "tool", "content": "d" * 40},
{"role": "assistant", "content": "e" * 50},
{"role": "user", "content": "f" * 60},
{
"role": "user",
"content": [
{"type": "text", "text": "hello"},
{
"type": "image_url",
"image_url": {"url": "data:image/png;base64,abcd"},
},
],
},
"skipped",
]
)

assert summary["message_count"] == 8
assert summary["role_counts"] == {
"system": 1,
"user": 3,
"assistant": 2,
"tool": 1,
}
assert summary["multimodal_messages"] == 1
assert summary["block_type_counts"] == {"text": 1, "image_url": 1}
assert summary["skipped_types"] == {"str": 1}
assert len(summary["tail"]) == 6
assert summary["tail"][-1] == {
"role": "user",
"content_kind": "blocks",
"block_count": 2,
"block_types": {"text": 1, "image_url": 1},
"text_chars": 5,
"image_url_chars": 26,
"content_chars": 31,
}

@pytest.mark.asyncio
async def test_chat_logs_compact_message_summary(self):
provider, create = self._build_provider_with_client()
create.return_value = self._mock_chat_response()
info = Mock()

from flocks.provider.provider import ChatMessage

with patch.object(openai_base_module, "log", Mock(info=info)):
await provider.chat(
"kimi-k2.5",
[
ChatMessage(role="system", content="system prompt"),
ChatMessage(role="user", content="hello"),
ChatMessage(role="assistant", content="world"),
],
max_tokens=20,
)

event, payload = info.call_args.args
assert event == "openai_base.chat.request"
assert "message_summary" in payload
assert "message_shapes" not in payload
assert payload["message_summary"]["message_count"] == 3
assert payload["message_summary"]["role_counts"] == {
"system": 1,
"user": 1,
"assistant": 1,
}


class TestExtractReasoningContent:
"""Regression: some proxies send stream chunks with ``delta is None``."""
Expand Down
22 changes: 21 additions & 1 deletion tests/session/test_compaction_policy.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,17 @@
all work correctly.
"""

from unittest.mock import Mock

import pytest

from flocks.session.lifecycle.compaction import CompactionPolicy, ContextTier, _BOUNDS, _MIN_OVERFLOW_THRESHOLD
import flocks.session.lifecycle.compaction.policy as policy_module
from flocks.session.lifecycle.compaction import (
CompactionPolicy,
ContextTier,
_BOUNDS,
_MIN_OVERFLOW_THRESHOLD,
)


# ---------------------------------------------------------------------------
Expand Down Expand Up @@ -46,6 +54,18 @@ def test_xlarge_tier(self):
assert CompactionPolicy._classify_tier(1_000_000) == ContextTier.XLARGE


class TestPolicyLogging:
def test_policy_creation_uses_debug_log(self, monkeypatch: pytest.MonkeyPatch):
logger = Mock()
monkeypatch.setattr(policy_module, "_policy_log", logger)

policy = CompactionPolicy.from_model(128_000, 16_384)

assert policy.tier == ContextTier.LARGE
logger.debug.assert_called_once()
logger.info.assert_not_called()


# ---------------------------------------------------------------------------
# Model-specific policy computation
# ---------------------------------------------------------------------------
Expand Down
Loading