Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
255 changes: 140 additions & 115 deletions src/audit/analyzers/mcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,136 +46,162 @@ pub async fn extract_mcp_failures(
Ok(analyze_all(mcpg_logs_dir).await?.failures)
}

/// Accumulated state built from MCP gateway log events.
#[derive(Default)]
struct EventAccumulators {
per_tool: BTreeMap<(String, String), ToolAccumulator>,
observed_servers: BTreeSet<String>,
server_error_counts: BTreeMap<String, u64>,
failures: Vec<MCPFailureReport>,
saw_recognizable_event: bool,
}

impl EventAccumulators {
fn on_tool_call(&mut self, value: &Value) {
self.saw_recognizable_event = true;
let server = extract_string_field(value, &["server", "mcp_server", "provider"])
.unwrap_or_default();
let tool = extract_string_field(value, &["tool", "name"]);
if !server.is_empty() {
self.observed_servers.insert(server.clone());
}
if let Some(tool) = tool.filter(|t| !t.is_empty()) {
let entry = self.per_tool.entry((server, tool)).or_default();
update_tool_sizes(entry, value);
entry.call_count += 1;
}
}

fn on_tool_error(&mut self, value: Value) {
self.saw_recognizable_event = true;
let server = extract_string_field(&value, &["server", "mcp_server", "provider"])
.unwrap_or_default();
let tool = extract_string_field(&value, &["tool", "name"]);
if !server.is_empty() {
self.observed_servers.insert(server.clone());
}
if let Some(tool_name) = tool.clone().filter(|t| !t.is_empty()) {
let entry = self.per_tool.entry((server, tool_name)).or_default();
update_tool_sizes(entry, &value);
entry.error_count += 1;
}
self.failures.push(MCPFailureReport {
tool: tool.filter(|t| !t.is_empty()),
context: None,
reason: extract_stringish_field(&value, &["error"]),
timestamp: extract_string_field(&value, &["ts", "time", "timestamp", "@timestamp"]),
extra: value,
});
}

fn on_server_error(&mut self, value: Value) {
self.saw_recognizable_event = true;
let server = extract_string_field(&value, &["server", "mcp_server", "provider"])
.unwrap_or_default();
if !server.is_empty() {
self.observed_servers.insert(server.clone());
*self.server_error_counts.entry(server).or_default() += 1;
}
self.failures.push(MCPFailureReport {
tool: None,
context: None,
reason: extract_stringish_field(&value, &["error"]),
timestamp: extract_string_field(&value, &["ts", "time", "timestamp", "@timestamp"]),
extra: value,
});
}

fn on_server_lifecycle(&mut self, value: &Value) {
self.saw_recognizable_event = true;
if let Some(server) =
extract_string_field(value, &["server", "mcp_server", "provider"])
{
self.observed_servers.insert(server);
}
}

fn process_event(&mut self, event_kind: &str, value: Value) {
match event_kind {
"tool_call" => self.on_tool_call(&value),
"tool_error" => self.on_tool_error(value),
"server_error" => self.on_server_error(value),
"server_start" | "server_stop" => self.on_server_lifecycle(&value),
_ => {}
}
}
}

async fn analyze_all(mcpg_logs_dir: &Path) -> Result<AnalyzeAllResult> {
match tokio::fs::metadata(mcpg_logs_dir).await {
if !ensure_mcpg_logs_dir_exists(mcpg_logs_dir).await? {
return Ok(AnalyzeAllResult::default());
}
let file_paths = read_log_file_paths(mcpg_logs_dir).await?;
let mut acc = EventAccumulators::default();
process_log_files(file_paths, &mut acc).await?;
if !acc.saw_recognizable_event {
return Ok(AnalyzeAllResult::default());
}
Ok(AnalyzeAllResult {
tool_usage: Some(MCPToolUsageData {
tools: build_tool_summaries(&acc.per_tool),
}),
server_health: Some(MCPServerHealth {
servers: build_server_health_list(
acc.observed_servers,
&acc.per_tool,
acc.server_error_counts,
),
}),
failures: acc.failures,
})
}

/// Returns `true` if the directory exists and is a directory, `false` if not found.
/// Errors on other OS errors or if the path is not a directory.
async fn ensure_mcpg_logs_dir_exists(dir: &Path) -> Result<bool> {
match tokio::fs::metadata(dir).await {
Ok(metadata) => {
anyhow::ensure!(
metadata.is_dir(),
"MCPG logs path is not a directory: {}",
mcpg_logs_dir.display()
dir.display()
);
Ok(true)
}
Err(error) if error.kind() == ErrorKind::NotFound => return Ok(AnalyzeAllResult::default()),
Err(error) => {
return Err(error)
.with_context(|| format!("Failed to stat {}", mcpg_logs_dir.display()));
}
Err(error) if error.kind() == ErrorKind::NotFound => Ok(false),
Err(error) => Err(error).with_context(|| format!("Failed to stat {}", dir.display())),
}
}

let file_paths = read_log_file_paths(mcpg_logs_dir).await?;
let mut saw_recognizable_event = false;
let mut per_tool = BTreeMap::<(String, String), ToolAccumulator>::new();
let mut observed_servers = BTreeSet::<String>::new();
let mut server_error_counts = BTreeMap::<String, u64>::new();
let mut failures = Vec::new();

/// Reads every log file in `file_paths` and dispatches each event line to `acc`.
async fn process_log_files(file_paths: Vec<PathBuf>, acc: &mut EventAccumulators) -> Result<()> {
for path in file_paths {
let contents = tokio::fs::read_to_string(&path)
.await
.with_context(|| format!("Failed to read MCP gateway log {}", path.display()))?;

for line in contents.lines() {
let trimmed = line.trim();
if trimmed.is_empty() {
continue;
}

let value: Value = match serde_json::from_str(trimmed) {
Ok(value) => value,
Err(_) => continue,
};

let Some(event_kind) = extract_string_field(&value, &["event", "kind", "type"])
.map(|kind| kind.to_ascii_lowercase())
else {
continue;
};

match event_kind.as_str() {
"tool_call" => {
saw_recognizable_event = true;
let server =
extract_string_field(&value, &["server", "mcp_server", "provider"])
.unwrap_or_default();
let tool = extract_string_field(&value, &["tool", "name"]);

if !server.is_empty() {
observed_servers.insert(server.clone());
}

if let Some(tool) = tool.filter(|tool| !tool.is_empty()) {
let entry = per_tool.entry((server, tool)).or_default();
update_tool_sizes(entry, &value);
entry.call_count += 1;
}
}
"tool_error" => {
saw_recognizable_event = true;
let server =
extract_string_field(&value, &["server", "mcp_server", "provider"])
.unwrap_or_default();
let tool = extract_string_field(&value, &["tool", "name"]);

if !server.is_empty() {
observed_servers.insert(server.clone());
}

if let Some(tool_name) = tool.clone().filter(|tool| !tool.is_empty()) {
let entry = per_tool.entry((server, tool_name)).or_default();
update_tool_sizes(entry, &value);
entry.error_count += 1;
}

failures.push(MCPFailureReport {
tool: tool.filter(|tool| !tool.is_empty()),
context: None,
reason: extract_stringish_field(&value, &["error"]),
timestamp: extract_string_field(
&value,
&["ts", "time", "timestamp", "@timestamp"],
),
extra: value,
});
}
"server_error" => {
saw_recognizable_event = true;
let server =
extract_string_field(&value, &["server", "mcp_server", "provider"])
.unwrap_or_default();

if !server.is_empty() {
observed_servers.insert(server.clone());
*server_error_counts.entry(server).or_default() += 1;
}

failures.push(MCPFailureReport {
tool: None,
context: None,
reason: extract_stringish_field(&value, &["error"]),
timestamp: extract_string_field(
&value,
&["ts", "time", "timestamp", "@timestamp"],
),
extra: value,
});
}
"server_start" | "server_stop" => {
saw_recognizable_event = true;
if let Some(server) =
extract_string_field(&value, &["server", "mcp_server", "provider"])
{
observed_servers.insert(server);
}
}
_ => {}
}
acc.process_event(&event_kind, value);
}
}
Ok(())
}

if !saw_recognizable_event {
return Ok(AnalyzeAllResult::default());
}

fn build_tool_summaries(
per_tool: &BTreeMap<(String, String), ToolAccumulator>,
) -> Vec<MCPToolSummary> {
let mut tools: Vec<MCPToolSummary> = per_tool
.iter()
.map(|((server, tool), stats)| MCPToolSummary {
Expand All @@ -192,7 +218,14 @@ async fn analyze_all(mcpg_logs_dir: &Path) -> Result<AnalyzeAllResult> {
.cmp(&left.call_count)
.then_with(|| left.name.cmp(&right.name))
});
tools
}

fn build_server_health_list(
observed_servers: BTreeSet<String>,
per_tool: &BTreeMap<(String, String), ToolAccumulator>,
server_error_counts: BTreeMap<String, u64>,
) -> Vec<MCPServerStats> {
let mut server_rollups = BTreeMap::<String, MCPServerStats>::new();
for server in observed_servers {
server_rollups.insert(
Expand All @@ -203,31 +236,28 @@ async fn analyze_all(mcpg_logs_dir: &Path) -> Result<AnalyzeAllResult> {
},
);
}

for ((server, _tool), stats) in &per_tool {
for ((server, _tool), stats) in per_tool {
if server.is_empty() {
continue;
}
let server_entry = server_rollups
let entry = server_rollups
.entry(server.clone())
.or_insert_with(|| MCPServerStats {
name: server.clone(),
..MCPServerStats::default()
});
server_entry.total_calls += stats.call_count;
server_entry.error_count += stats.error_count;
entry.total_calls += stats.call_count;
entry.error_count += stats.error_count;
}

for (server, error_count) in server_error_counts {
let server_entry = server_rollups
let entry = server_rollups
.entry(server.clone())
.or_insert_with(|| MCPServerStats {
name: server,
..MCPServerStats::default()
});
server_entry.error_count += error_count;
entry.error_count += error_count;
}

let mut servers: Vec<MCPServerStats> = server_rollups
.into_values()
.map(|mut stats| {
Expand All @@ -246,12 +276,7 @@ async fn analyze_all(mcpg_logs_dir: &Path) -> Result<AnalyzeAllResult> {
.cmp(&left.total_calls)
.then_with(|| left.name.cmp(&right.name))
});

Ok(AnalyzeAllResult {
tool_usage: Some(MCPToolUsageData { tools }),
server_health: Some(MCPServerHealth { servers }),
failures,
})
servers
}

async fn read_log_file_paths(dir: &Path) -> Result<Vec<PathBuf>> {
Expand Down