From 3c5905b759fcc376d0c3f2af058c4aae9e33ca7f Mon Sep 17 00:00:00 2001 From: Manuel Sommer Date: Fri, 1 Mar 2024 09:34:13 +0100 Subject: [PATCH 1/5] :hammer: restructure openvas parser --- dojo/tools/openvas/csv_parser.py | 3 +++ dojo/tools/openvas/parser.py | 2 ++ dojo/tools/openvas/xml_parser.py | 3 +++ 3 files changed, 8 insertions(+) create mode 100644 dojo/tools/openvas/csv_parser.py create mode 100644 dojo/tools/openvas/xml_parser.py diff --git a/dojo/tools/openvas/csv_parser.py b/dojo/tools/openvas/csv_parser.py new file mode 100644 index 00000000000..f4328298f8c --- /dev/null +++ b/dojo/tools/openvas/csv_parser.py @@ -0,0 +1,3 @@ +class OpenVASCSVParser(object): + def __init__(self) -> None: + pass \ No newline at end of file diff --git a/dojo/tools/openvas/parser.py b/dojo/tools/openvas/parser.py index 9a8c9b79ad4..54b51ff6c4e 100755 --- a/dojo/tools/openvas/parser.py +++ b/dojo/tools/openvas/parser.py @@ -5,6 +5,8 @@ from xml.dom import NamespaceErr from defusedxml import ElementTree as ET from dojo.models import Finding, Endpoint +from dojo.tools.openvas.csv_parser import OpenVASCSVParser +from dojo.tools.openvas.xml_parser import OpenVASXMLParser class ColumnMappingStrategy(object): diff --git a/dojo/tools/openvas/xml_parser.py b/dojo/tools/openvas/xml_parser.py new file mode 100644 index 00000000000..489c3606af8 --- /dev/null +++ b/dojo/tools/openvas/xml_parser.py @@ -0,0 +1,3 @@ +class OpenVASXMLParser(object): + def __init__(self) -> None: + pass \ No newline at end of file From 20def1d125c7656fa873c2ec1fad72f569a2e51f Mon Sep 17 00:00:00 2001 From: Manuel Sommer Date: Fri, 1 Mar 2024 09:41:03 +0100 Subject: [PATCH 2/5] adapt csv parser --- dojo/tools/openvas/csv_parser.py | 281 ++++++++++++++++++++++++++++++- dojo/tools/openvas/parser.py | 278 +----------------------------- dojo/tools/openvas/xml_parser.py | 9 + 3 files changed, 290 insertions(+), 278 deletions(-) diff --git a/dojo/tools/openvas/csv_parser.py b/dojo/tools/openvas/csv_parser.py index f4328298f8c..6a81ad3444c 100644 --- a/dojo/tools/openvas/csv_parser.py +++ b/dojo/tools/openvas/csv_parser.py @@ -1,3 +1,280 @@ +import csv +import hashlib +import io +from dateutil.parser import parse +from xml.dom import NamespaceErr +from defusedxml import ElementTree as ET +from dojo.models import Finding, Endpoint + +class ColumnMappingStrategy(object): + mapped_column = None + + def __init__(self): + self.successor = None + + def map_column_value(self, finding, column_value): + pass + + @staticmethod + def evaluate_bool_value(column_value): + if column_value.lower() == "true": + return True + elif column_value.lower() == "false": + return False + else: + return None + + def process_column(self, column_name, column_value, finding): + if ( + column_name.lower() == self.mapped_column + and column_value is not None + ): + self.map_column_value(finding, column_value) + elif self.successor is not None: + self.successor.process_column(column_name, column_value, finding) + + +class DateColumnMappingStrategy(ColumnMappingStrategy): + def __init__(self): + self.mapped_column = "timestamp" + super(DateColumnMappingStrategy, self).__init__() + + def map_column_value(self, finding, column_value): + finding.date = parse(column_value).date() + + +class TitleColumnMappingStrategy(ColumnMappingStrategy): + def __init__(self): + self.mapped_column = "nvt name" + super(TitleColumnMappingStrategy, self).__init__() + + def map_column_value(self, finding, column_value): + finding.title = column_value + + +class CweColumnMappingStrategy(ColumnMappingStrategy): + def __init__(self): + self.mapped_column = "cweid" + super(CweColumnMappingStrategy, self).__init__() + + def map_column_value(self, finding, column_value): + if column_value.isdigit(): + finding.cwe = int(column_value) + + +class PortColumnMappingStrategy(ColumnMappingStrategy): + def __init__(self): + self.mapped_column = "port" + super(PortColumnMappingStrategy, self).__init__() + + def map_column_value(self, finding, column_value): + if column_value.isdigit(): + finding.unsaved_endpoints[0].port = int(column_value) + + +class ProtocolColumnMappingStrategy(ColumnMappingStrategy): + def __init__(self): + self.mapped_column = "port protocol" + super(ProtocolColumnMappingStrategy, self).__init__() + + def map_column_value(self, finding, column_value): + if column_value: # do not store empty protocol + finding.unsaved_endpoints[0].protocol = column_value + + +class IpColumnMappingStrategy(ColumnMappingStrategy): + def __init__(self): + self.mapped_column = "ip" + super(IpColumnMappingStrategy, self).__init__() + + def map_column_value(self, finding, column_value): + if not finding.unsaved_endpoints[ + 0 + ].host: # process only if host is not already defined (by field hostname) + finding.unsaved_endpoints[0].host = column_value + + +class HostnameColumnMappingStrategy(ColumnMappingStrategy): + def __init__(self): + self.mapped_column = "hostname" + super(HostnameColumnMappingStrategy, self).__init__() + + def map_column_value(self, finding, column_value): + if column_value: # do not override IP if hostname is empty + finding.unsaved_endpoints[0].host = column_value + + +class SeverityColumnMappingStrategy(ColumnMappingStrategy): + @staticmethod + def is_valid_severity(severity): + valid_severity = ("Info", "Low", "Medium", "High", "Critical") + return severity in valid_severity + + def __init__(self): + self.mapped_column = "severity" + super(SeverityColumnMappingStrategy, self).__init__() + + def map_column_value(self, finding, column_value): + if self.is_valid_severity(column_value): + finding.severity = column_value + else: + finding.severity = "Info" + + +class DescriptionColumnMappingStrategy(ColumnMappingStrategy): + def __init__(self): + self.mapped_column = "summary" + super(DescriptionColumnMappingStrategy, self).__init__() + + def map_column_value(self, finding, column_value): + finding.description = column_value + + +class MitigationColumnMappingStrategy(ColumnMappingStrategy): + def __init__(self): + self.mapped_column = "solution" + super(MitigationColumnMappingStrategy, self).__init__() + + def map_column_value(self, finding, column_value): + finding.mitigation = column_value + + +class ImpactColumnMappingStrategy(ColumnMappingStrategy): + def __init__(self): + self.mapped_column = "vulnerability insight" + super(ImpactColumnMappingStrategy, self).__init__() + + def map_column_value(self, finding, column_value): + finding.impact = column_value + + +class ReferencesColumnMappingStrategy(ColumnMappingStrategy): + def __init__(self): + self.mapped_column = "specific result" + super(ReferencesColumnMappingStrategy, self).__init__() + + def map_column_value(self, finding, column_value): + finding.references = column_value + + +class ActiveColumnMappingStrategy(ColumnMappingStrategy): + def __init__(self): + self.mapped_column = "active" + super(ActiveColumnMappingStrategy, self).__init__() + + def map_column_value(self, finding, column_value): + finding.active = self.evaluate_bool_value(column_value) + + +class VerifiedColumnMappingStrategy(ColumnMappingStrategy): + def __init__(self): + self.mapped_column = "verified" + super(VerifiedColumnMappingStrategy, self).__init__() + + def map_column_value(self, finding, column_value): + finding.verified = self.evaluate_bool_value(column_value) + + +class FalsePositiveColumnMappingStrategy(ColumnMappingStrategy): + def __init__(self): + self.mapped_column = "falsepositive" + super(FalsePositiveColumnMappingStrategy, self).__init__() + + def map_column_value(self, finding, column_value): + finding.false_p = self.evaluate_bool_value(column_value) + + +class DuplicateColumnMappingStrategy(ColumnMappingStrategy): + def __init__(self): + self.mapped_column = "duplicate" + super(DuplicateColumnMappingStrategy, self).__init__() + + def map_column_value(self, finding, column_value): + finding.duplicate = self.evaluate_bool_value(column_value) + class OpenVASCSVParser(object): - def __init__(self) -> None: - pass \ No newline at end of file + def create_chain(self): + date_column_strategy = DateColumnMappingStrategy() + title_column_strategy = TitleColumnMappingStrategy() + cwe_column_strategy = CweColumnMappingStrategy() + ip_column_strategy = IpColumnMappingStrategy() + hostname_column_strategy = HostnameColumnMappingStrategy() + severity_column_strategy = SeverityColumnMappingStrategy() + description_column_strategy = DescriptionColumnMappingStrategy() + mitigation_column_strategy = MitigationColumnMappingStrategy() + impact_column_strategy = ImpactColumnMappingStrategy() + references_column_strategy = ReferencesColumnMappingStrategy() + active_column_strategy = ActiveColumnMappingStrategy() + verified_column_strategy = VerifiedColumnMappingStrategy() + false_positive_strategy = FalsePositiveColumnMappingStrategy() + duplicate_strategy = DuplicateColumnMappingStrategy() + port_strategy = PortColumnMappingStrategy() + protocol_strategy = ProtocolColumnMappingStrategy() + + port_strategy.successor = protocol_strategy + duplicate_strategy.successor = port_strategy + false_positive_strategy.successor = duplicate_strategy + verified_column_strategy.successor = false_positive_strategy + active_column_strategy.successor = verified_column_strategy + references_column_strategy.successor = active_column_strategy + impact_column_strategy.successor = references_column_strategy + mitigation_column_strategy.successor = impact_column_strategy + description_column_strategy.successor = mitigation_column_strategy + severity_column_strategy.successor = description_column_strategy + ip_column_strategy.successor = severity_column_strategy + hostname_column_strategy.successor = ip_column_strategy + cwe_column_strategy.successor = hostname_column_strategy + title_column_strategy.successor = cwe_column_strategy + date_column_strategy.successor = title_column_strategy + return date_column_strategy + + def get_findings(self, filename, test): + column_names = dict() + dupes = dict() + chain = self.create_chain() + + content = filename.read() + if isinstance(content, bytes): + content = content.decode("utf-8") + reader = csv.reader(io.StringIO(content), delimiter=",", quotechar='"') + + row_number = 0 + for row in reader: + finding = Finding(test=test) + finding.unsaved_endpoints = [Endpoint()] + + if row_number == 0: + column_names = self.read_column_names(row) + row_number += 1 + continue + + column_number = 0 + for column in row: + chain.process_column( + column_names[column_number], column, finding + ) + column_number += 1 + + if finding is not None and row_number > 0: + if finding.title is None: + finding.title = "" + if finding.description is None: + finding.description = "" + + key = hashlib.sha256( + ( + str(finding.unsaved_endpoints[0]) + + "|" + + finding.severity + + "|" + + finding.title + + "|" + + finding.description + ).encode("utf-8") + ).hexdigest() + + if key not in dupes: + dupes[key] = finding + + row_number += 1 + return list(dupes.values()) diff --git a/dojo/tools/openvas/parser.py b/dojo/tools/openvas/parser.py index 54b51ff6c4e..586efdc5541 100755 --- a/dojo/tools/openvas/parser.py +++ b/dojo/tools/openvas/parser.py @@ -1,238 +1,12 @@ -import csv -import hashlib -import io -from dateutil.parser import parse -from xml.dom import NamespaceErr -from defusedxml import ElementTree as ET -from dojo.models import Finding, Endpoint from dojo.tools.openvas.csv_parser import OpenVASCSVParser from dojo.tools.openvas.xml_parser import OpenVASXMLParser -class ColumnMappingStrategy(object): - mapped_column = None - def __init__(self): - self.successor = None - - def map_column_value(self, finding, column_value): - pass - - @staticmethod - def evaluate_bool_value(column_value): - if column_value.lower() == "true": - return True - elif column_value.lower() == "false": - return False - else: - return None - - def process_column(self, column_name, column_value, finding): - if ( - column_name.lower() == self.mapped_column - and column_value is not None - ): - self.map_column_value(finding, column_value) - elif self.successor is not None: - self.successor.process_column(column_name, column_value, finding) - - -class DateColumnMappingStrategy(ColumnMappingStrategy): - def __init__(self): - self.mapped_column = "timestamp" - super(DateColumnMappingStrategy, self).__init__() - - def map_column_value(self, finding, column_value): - finding.date = parse(column_value).date() - - -class TitleColumnMappingStrategy(ColumnMappingStrategy): - def __init__(self): - self.mapped_column = "nvt name" - super(TitleColumnMappingStrategy, self).__init__() - - def map_column_value(self, finding, column_value): - finding.title = column_value - - -class CweColumnMappingStrategy(ColumnMappingStrategy): - def __init__(self): - self.mapped_column = "cweid" - super(CweColumnMappingStrategy, self).__init__() - - def map_column_value(self, finding, column_value): - if column_value.isdigit(): - finding.cwe = int(column_value) - - -class PortColumnMappingStrategy(ColumnMappingStrategy): - def __init__(self): - self.mapped_column = "port" - super(PortColumnMappingStrategy, self).__init__() - - def map_column_value(self, finding, column_value): - if column_value.isdigit(): - finding.unsaved_endpoints[0].port = int(column_value) - - -class ProtocolColumnMappingStrategy(ColumnMappingStrategy): - def __init__(self): - self.mapped_column = "port protocol" - super(ProtocolColumnMappingStrategy, self).__init__() - - def map_column_value(self, finding, column_value): - if column_value: # do not store empty protocol - finding.unsaved_endpoints[0].protocol = column_value - - -class IpColumnMappingStrategy(ColumnMappingStrategy): - def __init__(self): - self.mapped_column = "ip" - super(IpColumnMappingStrategy, self).__init__() - - def map_column_value(self, finding, column_value): - if not finding.unsaved_endpoints[ - 0 - ].host: # process only if host is not already defined (by field hostname) - finding.unsaved_endpoints[0].host = column_value - - -class HostnameColumnMappingStrategy(ColumnMappingStrategy): - def __init__(self): - self.mapped_column = "hostname" - super(HostnameColumnMappingStrategy, self).__init__() - - def map_column_value(self, finding, column_value): - if column_value: # do not override IP if hostname is empty - finding.unsaved_endpoints[0].host = column_value - - -class SeverityColumnMappingStrategy(ColumnMappingStrategy): - @staticmethod - def is_valid_severity(severity): - valid_severity = ("Info", "Low", "Medium", "High", "Critical") - return severity in valid_severity - - def __init__(self): - self.mapped_column = "severity" - super(SeverityColumnMappingStrategy, self).__init__() - - def map_column_value(self, finding, column_value): - if self.is_valid_severity(column_value): - finding.severity = column_value - else: - finding.severity = "Info" - - -class DescriptionColumnMappingStrategy(ColumnMappingStrategy): - def __init__(self): - self.mapped_column = "summary" - super(DescriptionColumnMappingStrategy, self).__init__() - - def map_column_value(self, finding, column_value): - finding.description = column_value - - -class MitigationColumnMappingStrategy(ColumnMappingStrategy): - def __init__(self): - self.mapped_column = "solution" - super(MitigationColumnMappingStrategy, self).__init__() - - def map_column_value(self, finding, column_value): - finding.mitigation = column_value - - -class ImpactColumnMappingStrategy(ColumnMappingStrategy): - def __init__(self): - self.mapped_column = "vulnerability insight" - super(ImpactColumnMappingStrategy, self).__init__() - - def map_column_value(self, finding, column_value): - finding.impact = column_value - - -class ReferencesColumnMappingStrategy(ColumnMappingStrategy): - def __init__(self): - self.mapped_column = "specific result" - super(ReferencesColumnMappingStrategy, self).__init__() - - def map_column_value(self, finding, column_value): - finding.references = column_value - - -class ActiveColumnMappingStrategy(ColumnMappingStrategy): - def __init__(self): - self.mapped_column = "active" - super(ActiveColumnMappingStrategy, self).__init__() - - def map_column_value(self, finding, column_value): - finding.active = self.evaluate_bool_value(column_value) - - -class VerifiedColumnMappingStrategy(ColumnMappingStrategy): - def __init__(self): - self.mapped_column = "verified" - super(VerifiedColumnMappingStrategy, self).__init__() - - def map_column_value(self, finding, column_value): - finding.verified = self.evaluate_bool_value(column_value) - - -class FalsePositiveColumnMappingStrategy(ColumnMappingStrategy): - def __init__(self): - self.mapped_column = "falsepositive" - super(FalsePositiveColumnMappingStrategy, self).__init__() - - def map_column_value(self, finding, column_value): - finding.false_p = self.evaluate_bool_value(column_value) - - -class DuplicateColumnMappingStrategy(ColumnMappingStrategy): - def __init__(self): - self.mapped_column = "duplicate" - super(DuplicateColumnMappingStrategy, self).__init__() - - def map_column_value(self, finding, column_value): - finding.duplicate = self.evaluate_bool_value(column_value) class OpenVASParser(object): - def create_chain(self): - date_column_strategy = DateColumnMappingStrategy() - title_column_strategy = TitleColumnMappingStrategy() - cwe_column_strategy = CweColumnMappingStrategy() - ip_column_strategy = IpColumnMappingStrategy() - hostname_column_strategy = HostnameColumnMappingStrategy() - severity_column_strategy = SeverityColumnMappingStrategy() - description_column_strategy = DescriptionColumnMappingStrategy() - mitigation_column_strategy = MitigationColumnMappingStrategy() - impact_column_strategy = ImpactColumnMappingStrategy() - references_column_strategy = ReferencesColumnMappingStrategy() - active_column_strategy = ActiveColumnMappingStrategy() - verified_column_strategy = VerifiedColumnMappingStrategy() - false_positive_strategy = FalsePositiveColumnMappingStrategy() - duplicate_strategy = DuplicateColumnMappingStrategy() - port_strategy = PortColumnMappingStrategy() - protocol_strategy = ProtocolColumnMappingStrategy() - - port_strategy.successor = protocol_strategy - duplicate_strategy.successor = port_strategy - false_positive_strategy.successor = duplicate_strategy - verified_column_strategy.successor = false_positive_strategy - active_column_strategy.successor = verified_column_strategy - references_column_strategy.successor = active_column_strategy - impact_column_strategy.successor = references_column_strategy - mitigation_column_strategy.successor = impact_column_strategy - description_column_strategy.successor = mitigation_column_strategy - severity_column_strategy.successor = description_column_strategy - ip_column_strategy.successor = severity_column_strategy - hostname_column_strategy.successor = ip_column_strategy - cwe_column_strategy.successor = hostname_column_strategy - title_column_strategy.successor = cwe_column_strategy - date_column_strategy.successor = title_column_strategy - - return date_column_strategy - + def read_column_names(self, row): column_names = dict() index = 0 @@ -265,55 +39,7 @@ def convert_cvss_score(self, raw_value): def get_findings(self, filename, test): if str(filename.name).endswith('.csv'): - column_names = dict() - dupes = dict() - chain = self.create_chain() - - content = filename.read() - if isinstance(content, bytes): - content = content.decode("utf-8") - reader = csv.reader(io.StringIO(content), delimiter=",", quotechar='"') - - row_number = 0 - for row in reader: - finding = Finding(test=test) - finding.unsaved_endpoints = [Endpoint()] - - if row_number == 0: - column_names = self.read_column_names(row) - row_number += 1 - continue - - column_number = 0 - for column in row: - chain.process_column( - column_names[column_number], column, finding - ) - column_number += 1 - - if finding is not None and row_number > 0: - if finding.title is None: - finding.title = "" - if finding.description is None: - finding.description = "" - - key = hashlib.sha256( - ( - str(finding.unsaved_endpoints[0]) - + "|" - + finding.severity - + "|" - + finding.title - + "|" - + finding.description - ).encode("utf-8") - ).hexdigest() - - if key not in dupes: - dupes[key] = finding - - row_number += 1 - return list(dupes.values()) + return OpenVASCSVParser().get_findings(filename, test) elif str(filename.name).endswith('.xml'): findings = [] tree = ET.parse(filename) diff --git a/dojo/tools/openvas/xml_parser.py b/dojo/tools/openvas/xml_parser.py index 489c3606af8..0f456d42d15 100644 --- a/dojo/tools/openvas/xml_parser.py +++ b/dojo/tools/openvas/xml_parser.py @@ -1,3 +1,12 @@ +import csv +import hashlib +import io +from dateutil.parser import parse +from xml.dom import NamespaceErr +from defusedxml import ElementTree as ET +from dojo.models import Finding, Endpoint + + class OpenVASXMLParser(object): def __init__(self) -> None: pass \ No newline at end of file From 840579d2ac370da5e41094eb2bbdf15e99000dd8 Mon Sep 17 00:00:00 2001 From: Manuel Sommer Date: Fri, 1 Mar 2024 09:43:27 +0100 Subject: [PATCH 3/5] fix csv parser --- dojo/tools/openvas/csv_parser.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/dojo/tools/openvas/csv_parser.py b/dojo/tools/openvas/csv_parser.py index 6a81ad3444c..92a6de3c028 100644 --- a/dojo/tools/openvas/csv_parser.py +++ b/dojo/tools/openvas/csv_parser.py @@ -228,6 +228,14 @@ def create_chain(self): date_column_strategy.successor = title_column_strategy return date_column_strategy + def read_column_names(self, row): + column_names = dict() + index = 0 + for column in row: + column_names[index] = column + index += 1 + return column_names + def get_findings(self, filename, test): column_names = dict() dupes = dict() From 8f8a60b616f4c60e7b72a89bc252d6dcea4fd27d Mon Sep 17 00:00:00 2001 From: Manuel Sommer Date: Fri, 1 Mar 2024 09:43:45 +0100 Subject: [PATCH 4/5] fix xml parser --- dojo/tools/openvas/parser.py | 41 ++---------------------- dojo/tools/openvas/xml_parser.py | 55 ++++++++++++++++++++++++++++++-- 2 files changed, 55 insertions(+), 41 deletions(-) diff --git a/dojo/tools/openvas/parser.py b/dojo/tools/openvas/parser.py index 586efdc5541..9b24b4a0a00 100755 --- a/dojo/tools/openvas/parser.py +++ b/dojo/tools/openvas/parser.py @@ -41,42 +41,5 @@ def get_findings(self, filename, test): if str(filename.name).endswith('.csv'): return OpenVASCSVParser().get_findings(filename, test) elif str(filename.name).endswith('.xml'): - findings = [] - tree = ET.parse(filename) - root = tree.getroot() - if "report" not in root.tag: - raise NamespaceErr( - "This doesn't seem to be a valid Greenbone OpenVAS XML file." - ) - report = root.find("report") - results = report.find("results") - for result in results: - for finding in result: - if finding.tag == "name": - title = finding.text - description = [f"**Name**: {finding.text}"] - if finding.tag == "host": - title = title + "_" + finding.text - description.append(f"**Host**: {finding.text}") - if finding.tag == "port": - title = title + "_" + finding.text - description.append(f"**Port**: {finding.text}") - if finding.tag == "nvt": - description.append(f"**NVT**: {finding.text}") - if finding.tag == "severity": - severity = self.convert_cvss_score(finding.text) - description.append(f"**Severity**: {finding.text}") - if finding.tag == "qod": - description.append(f"**QOD**: {finding.text}") - if finding.tag == "description": - description.append(f"**Description**: {finding.text}") - - finding = Finding( - title=str(title), - description="\n".join(description), - severity=severity, - dynamic_finding=True, - static_finding=False - ) - findings.append(finding) - return findings + return OpenVASXMLParser().get_findings(filename, test) + \ No newline at end of file diff --git a/dojo/tools/openvas/xml_parser.py b/dojo/tools/openvas/xml_parser.py index 0f456d42d15..78105e226e5 100644 --- a/dojo/tools/openvas/xml_parser.py +++ b/dojo/tools/openvas/xml_parser.py @@ -8,5 +8,56 @@ class OpenVASXMLParser(object): - def __init__(self) -> None: - pass \ No newline at end of file + def get_findings(self, filename, test): + findings = [] + tree = ET.parse(filename) + root = tree.getroot() + if "report" not in root.tag: + raise NamespaceErr( + "This doesn't seem to be a valid Greenbone OpenVAS XML file." + ) + report = root.find("report") + results = report.find("results") + for result in results: + for finding in result: + if finding.tag == "name": + title = finding.text + description = [f"**Name**: {finding.text}"] + if finding.tag == "host": + title = title + "_" + finding.text + description.append(f"**Host**: {finding.text}") + if finding.tag == "port": + title = title + "_" + finding.text + description.append(f"**Port**: {finding.text}") + if finding.tag == "nvt": + description.append(f"**NVT**: {finding.text}") + if finding.tag == "severity": + severity = self.convert_cvss_score(finding.text) + description.append(f"**Severity**: {finding.text}") + if finding.tag == "qod": + description.append(f"**QOD**: {finding.text}") + if finding.tag == "description": + description.append(f"**Description**: {finding.text}") + + finding = Finding( + title=str(title), + description="\n".join(description), + severity=severity, + dynamic_finding=True, + static_finding=False + ) + findings.append(finding) + return findings + + def convert_cvss_score(self, raw_value): + val = float(raw_value) + if val == 0.0: + return "Info" + elif val < 4.0: + return "Low" + elif val < 7.0: + return "Medium" + elif val < 9.0: + return "High" + else: + return "Critical" From 1530d4852beeb02962018cbaba9373fff43604e3 Mon Sep 17 00:00:00 2001 From: Manuel Sommer Date: Fri, 1 Mar 2024 09:45:53 +0100 Subject: [PATCH 5/5] flake8 --- dojo/tools/openvas/csv_parser.py | 13 ++----------- dojo/tools/openvas/parser.py | 26 -------------------------- dojo/tools/openvas/xml_parser.py | 8 ++------ 3 files changed, 4 insertions(+), 43 deletions(-) diff --git a/dojo/tools/openvas/csv_parser.py b/dojo/tools/openvas/csv_parser.py index 92a6de3c028..2c9eac5a3fd 100644 --- a/dojo/tools/openvas/csv_parser.py +++ b/dojo/tools/openvas/csv_parser.py @@ -2,10 +2,9 @@ import hashlib import io from dateutil.parser import parse -from xml.dom import NamespaceErr -from defusedxml import ElementTree as ET from dojo.models import Finding, Endpoint + class ColumnMappingStrategy(object): mapped_column = None @@ -192,6 +191,7 @@ def __init__(self): def map_column_value(self, finding, column_value): finding.duplicate = self.evaluate_bool_value(column_value) + class OpenVASCSVParser(object): def create_chain(self): date_column_strategy = DateColumnMappingStrategy() @@ -210,7 +210,6 @@ def create_chain(self): duplicate_strategy = DuplicateColumnMappingStrategy() port_strategy = PortColumnMappingStrategy() protocol_strategy = ProtocolColumnMappingStrategy() - port_strategy.successor = protocol_strategy duplicate_strategy.successor = port_strategy false_positive_strategy.successor = duplicate_strategy @@ -240,35 +239,29 @@ def get_findings(self, filename, test): column_names = dict() dupes = dict() chain = self.create_chain() - content = filename.read() if isinstance(content, bytes): content = content.decode("utf-8") reader = csv.reader(io.StringIO(content), delimiter=",", quotechar='"') - row_number = 0 for row in reader: finding = Finding(test=test) finding.unsaved_endpoints = [Endpoint()] - if row_number == 0: column_names = self.read_column_names(row) row_number += 1 continue - column_number = 0 for column in row: chain.process_column( column_names[column_number], column, finding ) column_number += 1 - if finding is not None and row_number > 0: if finding.title is None: finding.title = "" if finding.description is None: finding.description = "" - key = hashlib.sha256( ( str(finding.unsaved_endpoints[0]) @@ -280,9 +273,7 @@ def get_findings(self, filename, test): + finding.description ).encode("utf-8") ).hexdigest() - if key not in dupes: dupes[key] = finding - row_number += 1 return list(dupes.values()) diff --git a/dojo/tools/openvas/parser.py b/dojo/tools/openvas/parser.py index 9b24b4a0a00..6a1399f28ef 100755 --- a/dojo/tools/openvas/parser.py +++ b/dojo/tools/openvas/parser.py @@ -2,19 +2,7 @@ from dojo.tools.openvas.xml_parser import OpenVASXMLParser - - - class OpenVASParser(object): - - def read_column_names(self, row): - column_names = dict() - index = 0 - for column in row: - column_names[index] = column - index += 1 - return column_names - def get_scan_types(self): return ["OpenVAS Parser"] @@ -24,22 +12,8 @@ def get_label_for_scan_types(self, scan_type): def get_description_for_scan_types(self, scan_type): return "Import CSV or XML output of Greenbone OpenVAS report." - def convert_cvss_score(self, raw_value): - val = float(raw_value) - if val == 0.0: - return "Info" - elif val < 4.0: - return "Low" - elif val < 7.0: - return "Medium" - elif val < 9.0: - return "High" - else: - return "Critical" - def get_findings(self, filename, test): if str(filename.name).endswith('.csv'): return OpenVASCSVParser().get_findings(filename, test) elif str(filename.name).endswith('.xml'): return OpenVASXMLParser().get_findings(filename, test) - \ No newline at end of file diff --git a/dojo/tools/openvas/xml_parser.py b/dojo/tools/openvas/xml_parser.py index 78105e226e5..bc2c63dd828 100644 --- a/dojo/tools/openvas/xml_parser.py +++ b/dojo/tools/openvas/xml_parser.py @@ -1,10 +1,6 @@ -import csv -import hashlib -import io -from dateutil.parser import parse from xml.dom import NamespaceErr from defusedxml import ElementTree as ET -from dojo.models import Finding, Endpoint +from dojo.models import Finding class OpenVASXMLParser(object): @@ -48,7 +44,7 @@ def get_findings(self, filename, test): ) findings.append(finding) return findings - + def convert_cvss_score(self, raw_value): val = float(raw_value) if val == 0.0: