diff --git a/src/audit/analyzers/mcp.rs b/src/audit/analyzers/mcp.rs index 9a08767a..df72a94c 100644 --- a/src/audit/analyzers/mcp.rs +++ b/src/audit/analyzers/mcp.rs @@ -46,136 +46,162 @@ pub async fn extract_mcp_failures( Ok(analyze_all(mcpg_logs_dir).await?.failures) } +/// Accumulated state built from MCP gateway log events. +#[derive(Default)] +struct EventAccumulators { + per_tool: BTreeMap<(String, String), ToolAccumulator>, + observed_servers: BTreeSet, + server_error_counts: BTreeMap, + failures: Vec, + saw_recognizable_event: bool, +} + +impl EventAccumulators { + fn on_tool_call(&mut self, value: &Value) { + self.saw_recognizable_event = true; + let server = extract_string_field(value, &["server", "mcp_server", "provider"]) + .unwrap_or_default(); + let tool = extract_string_field(value, &["tool", "name"]); + if !server.is_empty() { + self.observed_servers.insert(server.clone()); + } + if let Some(tool) = tool.filter(|t| !t.is_empty()) { + let entry = self.per_tool.entry((server, tool)).or_default(); + update_tool_sizes(entry, value); + entry.call_count += 1; + } + } + + fn on_tool_error(&mut self, value: Value) { + self.saw_recognizable_event = true; + let server = extract_string_field(&value, &["server", "mcp_server", "provider"]) + .unwrap_or_default(); + let tool = extract_string_field(&value, &["tool", "name"]); + if !server.is_empty() { + self.observed_servers.insert(server.clone()); + } + if let Some(tool_name) = tool.clone().filter(|t| !t.is_empty()) { + let entry = self.per_tool.entry((server, tool_name)).or_default(); + update_tool_sizes(entry, &value); + entry.error_count += 1; + } + self.failures.push(MCPFailureReport { + tool: tool.filter(|t| !t.is_empty()), + context: None, + reason: extract_stringish_field(&value, &["error"]), + timestamp: extract_string_field(&value, &["ts", "time", "timestamp", "@timestamp"]), + extra: value, + }); + } + + fn on_server_error(&mut self, value: Value) { + self.saw_recognizable_event = true; + let server = extract_string_field(&value, &["server", "mcp_server", "provider"]) + .unwrap_or_default(); + if !server.is_empty() { + self.observed_servers.insert(server.clone()); + *self.server_error_counts.entry(server).or_default() += 1; + } + self.failures.push(MCPFailureReport { + tool: None, + context: None, + reason: extract_stringish_field(&value, &["error"]), + timestamp: extract_string_field(&value, &["ts", "time", "timestamp", "@timestamp"]), + extra: value, + }); + } + + fn on_server_lifecycle(&mut self, value: &Value) { + self.saw_recognizable_event = true; + if let Some(server) = + extract_string_field(value, &["server", "mcp_server", "provider"]) + { + self.observed_servers.insert(server); + } + } + + fn process_event(&mut self, event_kind: &str, value: Value) { + match event_kind { + "tool_call" => self.on_tool_call(&value), + "tool_error" => self.on_tool_error(value), + "server_error" => self.on_server_error(value), + "server_start" | "server_stop" => self.on_server_lifecycle(&value), + _ => {} + } + } +} + async fn analyze_all(mcpg_logs_dir: &Path) -> Result { - match tokio::fs::metadata(mcpg_logs_dir).await { + if !ensure_mcpg_logs_dir_exists(mcpg_logs_dir).await? { + return Ok(AnalyzeAllResult::default()); + } + let file_paths = read_log_file_paths(mcpg_logs_dir).await?; + let mut acc = EventAccumulators::default(); + process_log_files(file_paths, &mut acc).await?; + if !acc.saw_recognizable_event { + return Ok(AnalyzeAllResult::default()); + } + Ok(AnalyzeAllResult { + tool_usage: Some(MCPToolUsageData { + tools: build_tool_summaries(&acc.per_tool), + }), + server_health: Some(MCPServerHealth { + servers: build_server_health_list( + acc.observed_servers, + &acc.per_tool, + acc.server_error_counts, + ), + }), + failures: acc.failures, + }) +} + +/// Returns `true` if the directory exists and is a directory, `false` if not found. +/// Errors on other OS errors or if the path is not a directory. +async fn ensure_mcpg_logs_dir_exists(dir: &Path) -> Result { + match tokio::fs::metadata(dir).await { Ok(metadata) => { anyhow::ensure!( metadata.is_dir(), "MCPG logs path is not a directory: {}", - mcpg_logs_dir.display() + dir.display() ); + Ok(true) } - Err(error) if error.kind() == ErrorKind::NotFound => return Ok(AnalyzeAllResult::default()), - Err(error) => { - return Err(error) - .with_context(|| format!("Failed to stat {}", mcpg_logs_dir.display())); - } + Err(error) if error.kind() == ErrorKind::NotFound => Ok(false), + Err(error) => Err(error).with_context(|| format!("Failed to stat {}", dir.display())), } +} - let file_paths = read_log_file_paths(mcpg_logs_dir).await?; - let mut saw_recognizable_event = false; - let mut per_tool = BTreeMap::<(String, String), ToolAccumulator>::new(); - let mut observed_servers = BTreeSet::::new(); - let mut server_error_counts = BTreeMap::::new(); - let mut failures = Vec::new(); - +/// Reads every log file in `file_paths` and dispatches each event line to `acc`. +async fn process_log_files(file_paths: Vec, acc: &mut EventAccumulators) -> Result<()> { for path in file_paths { let contents = tokio::fs::read_to_string(&path) .await .with_context(|| format!("Failed to read MCP gateway log {}", path.display()))?; - for line in contents.lines() { let trimmed = line.trim(); if trimmed.is_empty() { continue; } - let value: Value = match serde_json::from_str(trimmed) { Ok(value) => value, Err(_) => continue, }; - let Some(event_kind) = extract_string_field(&value, &["event", "kind", "type"]) .map(|kind| kind.to_ascii_lowercase()) else { continue; }; - - match event_kind.as_str() { - "tool_call" => { - saw_recognizable_event = true; - let server = - extract_string_field(&value, &["server", "mcp_server", "provider"]) - .unwrap_or_default(); - let tool = extract_string_field(&value, &["tool", "name"]); - - if !server.is_empty() { - observed_servers.insert(server.clone()); - } - - if let Some(tool) = tool.filter(|tool| !tool.is_empty()) { - let entry = per_tool.entry((server, tool)).or_default(); - update_tool_sizes(entry, &value); - entry.call_count += 1; - } - } - "tool_error" => { - saw_recognizable_event = true; - let server = - extract_string_field(&value, &["server", "mcp_server", "provider"]) - .unwrap_or_default(); - let tool = extract_string_field(&value, &["tool", "name"]); - - if !server.is_empty() { - observed_servers.insert(server.clone()); - } - - if let Some(tool_name) = tool.clone().filter(|tool| !tool.is_empty()) { - let entry = per_tool.entry((server, tool_name)).or_default(); - update_tool_sizes(entry, &value); - entry.error_count += 1; - } - - failures.push(MCPFailureReport { - tool: tool.filter(|tool| !tool.is_empty()), - context: None, - reason: extract_stringish_field(&value, &["error"]), - timestamp: extract_string_field( - &value, - &["ts", "time", "timestamp", "@timestamp"], - ), - extra: value, - }); - } - "server_error" => { - saw_recognizable_event = true; - let server = - extract_string_field(&value, &["server", "mcp_server", "provider"]) - .unwrap_or_default(); - - if !server.is_empty() { - observed_servers.insert(server.clone()); - *server_error_counts.entry(server).or_default() += 1; - } - - failures.push(MCPFailureReport { - tool: None, - context: None, - reason: extract_stringish_field(&value, &["error"]), - timestamp: extract_string_field( - &value, - &["ts", "time", "timestamp", "@timestamp"], - ), - extra: value, - }); - } - "server_start" | "server_stop" => { - saw_recognizable_event = true; - if let Some(server) = - extract_string_field(&value, &["server", "mcp_server", "provider"]) - { - observed_servers.insert(server); - } - } - _ => {} - } + acc.process_event(&event_kind, value); } } + Ok(()) +} - if !saw_recognizable_event { - return Ok(AnalyzeAllResult::default()); - } - +fn build_tool_summaries( + per_tool: &BTreeMap<(String, String), ToolAccumulator>, +) -> Vec { let mut tools: Vec = per_tool .iter() .map(|((server, tool), stats)| MCPToolSummary { @@ -192,7 +218,14 @@ async fn analyze_all(mcpg_logs_dir: &Path) -> Result { .cmp(&left.call_count) .then_with(|| left.name.cmp(&right.name)) }); + tools +} +fn build_server_health_list( + observed_servers: BTreeSet, + per_tool: &BTreeMap<(String, String), ToolAccumulator>, + server_error_counts: BTreeMap, +) -> Vec { let mut server_rollups = BTreeMap::::new(); for server in observed_servers { server_rollups.insert( @@ -203,31 +236,28 @@ async fn analyze_all(mcpg_logs_dir: &Path) -> Result { }, ); } - - for ((server, _tool), stats) in &per_tool { + for ((server, _tool), stats) in per_tool { if server.is_empty() { continue; } - let server_entry = server_rollups + let entry = server_rollups .entry(server.clone()) .or_insert_with(|| MCPServerStats { name: server.clone(), ..MCPServerStats::default() }); - server_entry.total_calls += stats.call_count; - server_entry.error_count += stats.error_count; + entry.total_calls += stats.call_count; + entry.error_count += stats.error_count; } - for (server, error_count) in server_error_counts { - let server_entry = server_rollups + let entry = server_rollups .entry(server.clone()) .or_insert_with(|| MCPServerStats { name: server, ..MCPServerStats::default() }); - server_entry.error_count += error_count; + entry.error_count += error_count; } - let mut servers: Vec = server_rollups .into_values() .map(|mut stats| { @@ -246,12 +276,7 @@ async fn analyze_all(mcpg_logs_dir: &Path) -> Result { .cmp(&left.total_calls) .then_with(|| left.name.cmp(&right.name)) }); - - Ok(AnalyzeAllResult { - tool_usage: Some(MCPToolUsageData { tools }), - server_health: Some(MCPServerHealth { servers }), - failures, - }) + servers } async fn read_log_file_paths(dir: &Path) -> Result> {