diff --git a/dojo/tools/api_wazuh/__init__.py b/dojo/tools/api_wazuh/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/dojo/tools/api_wazuh/api_client.py b/dojo/tools/api_wazuh/api_client.py new file mode 100644 index 00000000000..cae9ac3b803 --- /dev/null +++ b/dojo/tools/api_wazuh/api_client.py @@ -0,0 +1,90 @@ +import requests +import logging +import json + +logging.basicConfig(level=logging.WARNING) +""" +Step 1: Obtain a Wazuh JWT token and export it to an env var +$ TOKEN=$(curl -u : -k -X POST "https://localhost:55000/security/user/authenticate?raw=true") + +Step 2: Increase the JWT token expiration time to 3 months (default is 900 seconds) +$ curl -k -X PUT "https://localhost:55000/security/config" -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json" -d '{"auth_token_exp_timeout":7889231}' + +Note: After changing the expiration time, all previously issued tokens will be revoked. + +Step 3: Obtain a new JWT token with the updated expiration time. +$ curl -u : -k -X POST "https://localhost:55000/security/user/authenticate?raw=true" +""" + + +class WazuhAPI: + def __init__(self, tool_config): + self.base_url = tool_config.url + self.headers = {} + + extras = tool_config.extras + if extras and isinstance(extras, str): + try: + extras = json.loads(extras) + except json.JSONDecodeError: + raise ValueError(f"Failed to decode extras field as JSON: {extras}") + elif extras and not isinstance(extras, dict): + raise ValueError(f"Unexpected type for extras field: {type(extras)}") + + self.token = tool_config.api_key # Add Wazuh JWT token as tool API key + self.verify_ssl = ( + extras.get("verify_ssl", True) if extras else True + ) # Default to True, but can be overridden in the tool configuration extras with {"verify_ssl": false} + + self.headers["Authorization"] = f"Bearer {self.token}" + + def get_agents_in_group(self, group_name): + endpoint = f"{self.base_url}/groups/{group_name}/agents" + response = requests.get(endpoint, headers=self.headers, verify=self.verify_ssl) + response.raise_for_status() + + if response.ok: + return response.json()["data"]["affected_items"] + else: + logging.warning( + f"Failed to retrieve agents for group {group_name}. Status code: {response.status_code}, Detail: {response.text}" + ) + return [] + + def get_vulnerabilities_for_agent(self, agent_id): + endpoint = f"{self.base_url}/vulnerability/{agent_id}" + response = requests.get(endpoint, headers=self.headers, verify=self.verify_ssl) + if response.status_code == 200: + return response.json() + else: + logging.warning( + f"Failed to retrieve vulnerabilities for agent {agent_id}. Status code: {response.status_code}, Detail: {response.text}" + ) + return None + + def get_vulnerable_agents(self, GROUP_1, GROUP_2): + group1_agents = self.get_agents_in_group(GROUP_1) + group2_agents = self.get_agents_in_group(GROUP_2) + + group1_agents_data = {agent["id"]: agent["ip"] for agent in group1_agents} + group2_ids = set(agent["id"] for agent in group2_agents) + + common_ids = set(group1_agents_data.keys()).intersection(group2_ids) + + vulnerabilities_list = [] + + for agent_id in common_ids: + vulnerabilities = self.get_vulnerabilities_for_agent(agent_id) + if vulnerabilities: + filtered_vulnerabilities = [] + for vulnerability in vulnerabilities.get("data", {}).get( + "affected_items", [] + ): + if vulnerability.get("condition") != "Package unfixed": + vulnerability["agent_ip"] = group1_agents_data[agent_id] + filtered_vulnerabilities.append(vulnerability) + if filtered_vulnerabilities: + vulnerabilities["data"]["affected_items"] = filtered_vulnerabilities + vulnerabilities_list.append(vulnerabilities) + + return vulnerabilities_list diff --git a/dojo/tools/api_wazuh/importer.py b/dojo/tools/api_wazuh/importer.py new file mode 100644 index 00000000000..c2511c104b0 --- /dev/null +++ b/dojo/tools/api_wazuh/importer.py @@ -0,0 +1,45 @@ +from django.core.exceptions import ValidationError +from dojo.models import Product_API_Scan_Configuration +from .api_client import WazuhAPI + + +class WazuhApiImporter(object): + config_id = "Wazuh API" + + def get_findings(self, test): + client, config = self.prepare_client(test) + GROUP_1 = config.service_key_1 + GROUP_2 = config.service_key_2 + return client.get_vulnerable_agents(GROUP_1, GROUP_2) + + def prepare_client(self, test): + product = test.engagement.product + if test.api_scan_configuration: + config = test.api_scan_configuration + if config.product != product: + raise ValidationError( + f'API Scan Configuration for "{self.config_id}" and Product do not match. ' + f'Product: "{product.name}" ({product.id}), config.product: "{config.product.name}" ({config.product.id})' + ) + else: + configs = Product_API_Scan_Configuration.objects.filter( + product=product, + tool_configuration__tool_type__name=self.config_id, + ) + if configs.count() == 1: + config = configs.first() + elif configs.count() > 1: + raise ValidationError( + "More than one Product API Scan Configuration has been configured, but none of them has been " + "chosen. Please specify at Test which one should be used. " + f'Product: "{product.name}" ({product.id})' + ) + else: + raise ValidationError( + "There are no API Scan Configurations for this Product. Please add at least one API Scan " + f'Configuration for "{self.config_id}" to this Product. ' + f'Product: "{product.name}" ({product.id})' + ) + + tool_config = config.tool_configuration + return WazuhAPI(tool_config), config diff --git a/dojo/tools/api_wazuh/parser.py b/dojo/tools/api_wazuh/parser.py new file mode 100644 index 00000000000..952b1b6ffa4 --- /dev/null +++ b/dojo/tools/api_wazuh/parser.py @@ -0,0 +1,101 @@ +import json +import hashlib +from dojo.models import Finding, Endpoint +from .importer import ( + WazuhApiImporter, +) # Importing the WazuhApiImporter from importer.py + +SCAN_TYPE_ID = "Wazuh API" + + +class ApiWazuhParser(object): + """ + Import from Wazuh API + """ + + def get_scan_types(self): + return [SCAN_TYPE_ID] + + def get_label_for_scan_types(self, scan_type): + return SCAN_TYPE_ID + + def get_description_for_scan_types(self, scan_type): + return "Wazuh findings can be directly imported using the Wazuh API." + + def requires_file(self, scan_type): + return False # Since we're interacting with the API, no file is required + + def requires_tool_type(self, scan_type): + return SCAN_TYPE_ID # This parser is specifically for the Wazuh API + + def api_scan_configuration_hint(self): + return "Please ensure the correct API endpoint and API key (JWT token) are configured for Wazuh." + + def get_findings(self, file, test): + # If a file is not provided, fetch data from the Wazuh API + if file is None: + data = WazuhApiImporter().get_findings( + test + ) # Adapted to use WazuhApiImporter + else: + data = json.load(file) + + if not data: + return [] + + # Detect duplications + dupes = dict() + + # Loop through each element in the list + for entry in data: + vulnerabilities = entry.get("data", {}).get("affected_items", []) + for item in vulnerabilities: + if ( + item["condition"] != "Package unfixed" + and item["severity"] != "Untriaged" + ): + id = item.get("cve") + package_name = item.get("name") + package_version = item.get("version") + description = item.get("condition") + severity = item.get("severity").capitalize() + agent_ip = item.get("agent_ip") + links = item.get("external_references") + cvssv3_score = item.get("cvss3_score") + publish_date = item.get("published") + + if links: + references = "\n".join(links) + else: + references = None + + title = item.get("title") + " (version: " + package_version + ")" + dupe_key = title + id + agent_ip + package_name + package_version + dupe_key = hashlib.sha256(dupe_key.encode("utf-8")).hexdigest() + + if dupe_key in dupes: + find = dupes[dupe_key] + else: + dupes[dupe_key] = True + + find = Finding( + title=title, + test=test, + description=description, + severity=severity, + mitigation="mitigation", + references=references, + static_finding=True, + component_name=package_name, + component_version=package_version, + cvssv3_score=cvssv3_score, + publish_date=publish_date, + unique_id_from_tool=dupe_key, + ) + if id and id.startswith("CVE"): + find.unsaved_vulnerability_ids = [id] + if agent_ip: + find.unsaved_endpoints = [Endpoint(host=agent_ip)] + dupes[dupe_key] = find + + return list(dupes.values()) diff --git a/dojo/tools/wazuh/parser.py b/dojo/tools/wazuh/parser.py index b1ea19d836b..a626ab60aae 100644 --- a/dojo/tools/wazuh/parser.py +++ b/dojo/tools/wazuh/parser.py @@ -49,9 +49,7 @@ def get_findings(self, filename, test): else: active = False links = item.get("external_references") - title = ( - item.get("title") + " (version: " + package_version + ")" - ) + title = item.get("title") + " (version: " + package_version + ")" severity = item.get("severity", "info").capitalize() if links: references = "" diff --git a/dojo/tools/wazuh/wazuh-vulns-extractor.py b/dojo/tools/wazuh/wazuh-vulns-extractor.py new file mode 100644 index 00000000000..acbed54eb90 --- /dev/null +++ b/dojo/tools/wazuh/wazuh-vulns-extractor.py @@ -0,0 +1,120 @@ +import requests +from requests.auth import HTTPBasicAuth +import json +import urllib3 + +# Suppress InsecureRequestWarning +urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) + +# Wazuh +BASE_URL = "https://:55000" +AUTH_URL = f"{BASE_URL}/security/user/authenticate?raw=true" +HEADERS = {} + +# Basic authentication creds +USERNAME = "" +PASSWORD = "" + +""" +Using Two Groups for Wazuh Agent Queries: + +- Provides precise targeting by intersecting two specific groups, even if agents belong to multiple groups. +- Balances between broad and narrow agent selections. +- If narrow targeting isn't desired, set both group variables to the same name for a wider selection. +- Additionally, it appends an 'agent_ip' field for every vulnerability. This is later processed by DefectDojo to create endpoints and correlate each vulnerability to a specific agent, enhancing traceability and accountability. + +Note: This approach refines the vulnerability reporting process by correlating agents with vulnerabilities more efficiently. +""" + + +GROUP_1 = "" +GROUP_2 = "" + +# Authenticate and set token + + +def authenticate(): + response = requests.get( + AUTH_URL, auth=HTTPBasicAuth(USERNAME, PASSWORD), verify=False + ) + if response.status_code == 200: + token = response.text + HEADERS["Authorization"] = f"Bearer {token}" + else: + raise ValueError( + f"Failed to authenticate. Status code: {response.status_code}, Detail: {response.text}" + ) + + +# Retrieve agents for a specific group + + +def get_agents_in_group(group_name): + endpoint = f"{BASE_URL}/groups/{group_name}/agents" + response = requests.get(endpoint, headers=HEADERS, verify=False) + if response.status_code == 200: + return response.json()["data"]["affected_items"] + else: + print( + f"Failed to retrieve agents for group {group_name}. Status code: {response.status_code}, Detail: {response.text}" + ) + return [] + + +# Retrieve vulnerabilities for a specific agent + + +def get_vulnerabilities_for_agent(agent_id): + endpoint = f"{BASE_URL}/vulnerability/{agent_id}" + response = requests.get(endpoint, headers=HEADERS, verify=False) + if response.status_code == 200: + return response.json() + else: + print( + f"Failed to retrieve vulnerabilities for agent {agent_id}. Status code: {response.status_code}, Detail: {response.text}" + ) + return None + + +# Main function + + +def main(): + authenticate() + + group1_agents = get_agents_in_group(GROUP_1) + group2_agents = get_agents_in_group(GROUP_2) + + # Extract the agent IDs and IPs from the response for each group + group1_agents_data = {agent["id"]: agent["ip"] for agent in group1_agents} + group2_ids = set(agent["id"] for agent in group2_agents) + + # Find the intersection of the two sets + common_ids = set(group1_agents_data.keys()).intersection(group2_ids) + + vulnerabilities_list = [] + + # Loop through each agent_id and get its vulnerabilities + for agent_id in common_ids: + vulnerabilities = get_vulnerabilities_for_agent(agent_id) + if vulnerabilities: + filtered_vulnerabilities = [] + # Extend the vulnerabilities with agent_ip field + for vulnerability in vulnerabilities.get("data", {}).get( + "affected_items", [] + ): + # Skip the vulnerability if its condition is "Package unfixed" + if vulnerability.get("condition") != "Package unfixed": + vulnerability["agent_ip"] = group1_agents_data[agent_id] + filtered_vulnerabilities.append(vulnerability) + if filtered_vulnerabilities: + vulnerabilities["data"]["affected_items"] = filtered_vulnerabilities + vulnerabilities_list.append(vulnerabilities) + + # Write the filtered vulnerabilities to a JSON file + with open("vulnerabilities.json", "w") as f: + json.dump(vulnerabilities_list, f, indent=4) + + +if __name__ == "__main__": + main()