Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file.
90 changes: 90 additions & 0 deletions dojo/tools/api_wazuh/api_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
import requests
import logging
import json

logging.basicConfig(level=logging.WARNING)
"""
Step 1: Obtain a Wazuh JWT token and export it to an env var
$ TOKEN=$(curl -u <user>:<password> -k -X POST "https://localhost:55000/security/user/authenticate?raw=true")

Step 2: Increase the JWT token expiration time to 3 months (default is 900 seconds)
$ curl -k -X PUT "https://localhost:55000/security/config" -H "Authorization: Bearer $TOKEN" -H "Content-Type: application/json" -d '{"auth_token_exp_timeout":7889231}'

Note: After changing the expiration time, all previously issued tokens will be revoked.

Step 3: Obtain a new JWT token with the updated expiration time.
$ curl -u <user>:<password> -k -X POST "https://localhost:55000/security/user/authenticate?raw=true"
"""


class WazuhAPI:
def __init__(self, tool_config):
self.base_url = tool_config.url
self.headers = {}

extras = tool_config.extras
if extras and isinstance(extras, str):
try:
extras = json.loads(extras)
except json.JSONDecodeError:
raise ValueError(f"Failed to decode extras field as JSON: {extras}")
elif extras and not isinstance(extras, dict):
raise ValueError(f"Unexpected type for extras field: {type(extras)}")

self.token = tool_config.api_key # Add Wazuh JWT token as tool API key
self.verify_ssl = (
extras.get("verify_ssl", True) if extras else True
) # Default to True, but can be overridden in the tool configuration extras with {"verify_ssl": false}

self.headers["Authorization"] = f"Bearer {self.token}"

def get_agents_in_group(self, group_name):
endpoint = f"{self.base_url}/groups/{group_name}/agents"
response = requests.get(endpoint, headers=self.headers, verify=self.verify_ssl)
response.raise_for_status()

if response.ok:
return response.json()["data"]["affected_items"]
else:
logging.warning(
f"Failed to retrieve agents for group {group_name}. Status code: {response.status_code}, Detail: {response.text}"
)
return []

def get_vulnerabilities_for_agent(self, agent_id):
endpoint = f"{self.base_url}/vulnerability/{agent_id}"
response = requests.get(endpoint, headers=self.headers, verify=self.verify_ssl)
if response.status_code == 200:
return response.json()
else:
logging.warning(
f"Failed to retrieve vulnerabilities for agent {agent_id}. Status code: {response.status_code}, Detail: {response.text}"
)
return None

def get_vulnerable_agents(self, GROUP_1, GROUP_2):
group1_agents = self.get_agents_in_group(GROUP_1)
group2_agents = self.get_agents_in_group(GROUP_2)

group1_agents_data = {agent["id"]: agent["ip"] for agent in group1_agents}
group2_ids = set(agent["id"] for agent in group2_agents)

common_ids = set(group1_agents_data.keys()).intersection(group2_ids)

vulnerabilities_list = []

for agent_id in common_ids:
vulnerabilities = self.get_vulnerabilities_for_agent(agent_id)
if vulnerabilities:
filtered_vulnerabilities = []
for vulnerability in vulnerabilities.get("data", {}).get(
"affected_items", []
):
if vulnerability.get("condition") != "Package unfixed":
vulnerability["agent_ip"] = group1_agents_data[agent_id]
filtered_vulnerabilities.append(vulnerability)
if filtered_vulnerabilities:
vulnerabilities["data"]["affected_items"] = filtered_vulnerabilities
vulnerabilities_list.append(vulnerabilities)

return vulnerabilities_list
45 changes: 45 additions & 0 deletions dojo/tools/api_wazuh/importer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
from django.core.exceptions import ValidationError
from dojo.models import Product_API_Scan_Configuration
from .api_client import WazuhAPI


class WazuhApiImporter(object):
config_id = "Wazuh API"

def get_findings(self, test):
client, config = self.prepare_client(test)
GROUP_1 = config.service_key_1
GROUP_2 = config.service_key_2
return client.get_vulnerable_agents(GROUP_1, GROUP_2)

def prepare_client(self, test):
product = test.engagement.product
if test.api_scan_configuration:
config = test.api_scan_configuration
if config.product != product:
raise ValidationError(
f'API Scan Configuration for "{self.config_id}" and Product do not match. '
f'Product: "{product.name}" ({product.id}), config.product: "{config.product.name}" ({config.product.id})'
)
else:
configs = Product_API_Scan_Configuration.objects.filter(
product=product,
tool_configuration__tool_type__name=self.config_id,
)
if configs.count() == 1:
config = configs.first()
elif configs.count() > 1:
raise ValidationError(
"More than one Product API Scan Configuration has been configured, but none of them has been "
"chosen. Please specify at Test which one should be used. "
f'Product: "{product.name}" ({product.id})'
)
else:
raise ValidationError(
"There are no API Scan Configurations for this Product. Please add at least one API Scan "
f'Configuration for "{self.config_id}" to this Product. '
f'Product: "{product.name}" ({product.id})'
)

tool_config = config.tool_configuration
return WazuhAPI(tool_config), config
101 changes: 101 additions & 0 deletions dojo/tools/api_wazuh/parser.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
import json
import hashlib
from dojo.models import Finding, Endpoint
from .importer import (
WazuhApiImporter,
) # Importing the WazuhApiImporter from importer.py

SCAN_TYPE_ID = "Wazuh API"


class ApiWazuhParser(object):
"""
Import from Wazuh API
"""

def get_scan_types(self):
return [SCAN_TYPE_ID]

def get_label_for_scan_types(self, scan_type):
return SCAN_TYPE_ID

def get_description_for_scan_types(self, scan_type):
return "Wazuh findings can be directly imported using the Wazuh API."

def requires_file(self, scan_type):
return False # Since we're interacting with the API, no file is required

def requires_tool_type(self, scan_type):
return SCAN_TYPE_ID # This parser is specifically for the Wazuh API

def api_scan_configuration_hint(self):
return "Please ensure the correct API endpoint and API key (JWT token) are configured for Wazuh."

def get_findings(self, file, test):
# If a file is not provided, fetch data from the Wazuh API
if file is None:
data = WazuhApiImporter().get_findings(
test
) # Adapted to use WazuhApiImporter
else:
data = json.load(file)

if not data:
return []

# Detect duplications
dupes = dict()

# Loop through each element in the list
for entry in data:
vulnerabilities = entry.get("data", {}).get("affected_items", [])
for item in vulnerabilities:
if (
item["condition"] != "Package unfixed"
and item["severity"] != "Untriaged"
):
id = item.get("cve")
package_name = item.get("name")
package_version = item.get("version")
description = item.get("condition")
severity = item.get("severity").capitalize()
agent_ip = item.get("agent_ip")
links = item.get("external_references")
cvssv3_score = item.get("cvss3_score")
publish_date = item.get("published")

if links:
references = "\n".join(links)
else:
references = None

title = item.get("title") + " (version: " + package_version + ")"
dupe_key = title + id + agent_ip + package_name + package_version
dupe_key = hashlib.sha256(dupe_key.encode("utf-8")).hexdigest()

if dupe_key in dupes:
find = dupes[dupe_key]
else:
dupes[dupe_key] = True

find = Finding(
title=title,
test=test,
description=description,
severity=severity,
mitigation="mitigation",
references=references,
static_finding=True,
component_name=package_name,
component_version=package_version,
cvssv3_score=cvssv3_score,
publish_date=publish_date,
unique_id_from_tool=dupe_key,
)
if id and id.startswith("CVE"):
find.unsaved_vulnerability_ids = [id]
if agent_ip:
find.unsaved_endpoints = [Endpoint(host=agent_ip)]
dupes[dupe_key] = find

return list(dupes.values())
4 changes: 1 addition & 3 deletions dojo/tools/wazuh/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,7 @@ def get_findings(self, filename, test):
else:
active = False
links = item.get("external_references")
title = (
item.get("title") + " (version: " + package_version + ")"
)
title = item.get("title") + " (version: " + package_version + ")"
severity = item.get("severity", "info").capitalize()
if links:
references = ""
Expand Down
120 changes: 120 additions & 0 deletions dojo/tools/wazuh/wazuh-vulns-extractor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
import requests
from requests.auth import HTTPBasicAuth
import json
import urllib3

# Suppress InsecureRequestWarning
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

# Wazuh
BASE_URL = "https://<wazuh-ip-or-url>:55000"
AUTH_URL = f"{BASE_URL}/security/user/authenticate?raw=true"
HEADERS = {}

# Basic authentication creds
USERNAME = "<wazuh-api-user>"
PASSWORD = "<wazuh-api-pass>"
Comment thread
This conversation was marked as resolved.

"""
Using Two Groups for Wazuh Agent Queries:

- Provides precise targeting by intersecting two specific groups, even if agents belong to multiple groups.
- Balances between broad and narrow agent selections.
- If narrow targeting isn't desired, set both group variables to the same name for a wider selection.
- Additionally, it appends an 'agent_ip' field for every vulnerability. This is later processed by DefectDojo to create endpoints and correlate each vulnerability to a specific agent, enhancing traceability and accountability.

Note: This approach refines the vulnerability reporting process by correlating agents with vulnerabilities more efficiently.
"""


GROUP_1 = "<group1-name>"
GROUP_2 = "<group2-name>"
Comment thread
This conversation was marked as resolved.

# Authenticate and set token


def authenticate():
response = requests.get(
AUTH_URL, auth=HTTPBasicAuth(USERNAME, PASSWORD), verify=False
Comment thread
This conversation was marked as resolved.
)
if response.status_code == 200:
Comment thread
This conversation was marked as resolved.
token = response.text
HEADERS["Authorization"] = f"Bearer {token}"
else:
raise ValueError(
f"Failed to authenticate. Status code: {response.status_code}, Detail: {response.text}"
)


# Retrieve agents for a specific group


def get_agents_in_group(group_name):
endpoint = f"{BASE_URL}/groups/{group_name}/agents"
response = requests.get(endpoint, headers=HEADERS, verify=False)
if response.status_code == 200:
Comment thread
This conversation was marked as resolved.
return response.json()["data"]["affected_items"]
else:
print(
Comment thread
This conversation was marked as resolved.
f"Failed to retrieve agents for group {group_name}. Status code: {response.status_code}, Detail: {response.text}"
)
return []


# Retrieve vulnerabilities for a specific agent


def get_vulnerabilities_for_agent(agent_id):
endpoint = f"{BASE_URL}/vulnerability/{agent_id}"
response = requests.get(endpoint, headers=HEADERS, verify=False)
if response.status_code == 200:
return response.json()
else:
print(
Comment thread
This conversation was marked as resolved.
f"Failed to retrieve vulnerabilities for agent {agent_id}. Status code: {response.status_code}, Detail: {response.text}"
)
return None


# Main function


def main():
authenticate()

group1_agents = get_agents_in_group(GROUP_1)
group2_agents = get_agents_in_group(GROUP_2)

# Extract the agent IDs and IPs from the response for each group
group1_agents_data = {agent["id"]: agent["ip"] for agent in group1_agents}
group2_ids = set(agent["id"] for agent in group2_agents)

# Find the intersection of the two sets
common_ids = set(group1_agents_data.keys()).intersection(group2_ids)

vulnerabilities_list = []

# Loop through each agent_id and get its vulnerabilities
for agent_id in common_ids:
vulnerabilities = get_vulnerabilities_for_agent(agent_id)
if vulnerabilities:
filtered_vulnerabilities = []
# Extend the vulnerabilities with agent_ip field
for vulnerability in vulnerabilities.get("data", {}).get(
"affected_items", []
):
# Skip the vulnerability if its condition is "Package unfixed"
if vulnerability.get("condition") != "Package unfixed":
vulnerability["agent_ip"] = group1_agents_data[agent_id]
filtered_vulnerabilities.append(vulnerability)
if filtered_vulnerabilities:
vulnerabilities["data"]["affected_items"] = filtered_vulnerabilities
vulnerabilities_list.append(vulnerabilities)

# Write the filtered vulnerabilities to a JSON file
with open("vulnerabilities.json", "w") as f:
json.dump(vulnerabilities_list, f, indent=4)


if __name__ == "__main__":
main()
Comment thread
This conversation was marked as resolved.