From 7ad90c03ed4133b4a6106e188fc50585fa8ede5f Mon Sep 17 00:00:00 2001 From: Valentijn Scholten Date: Sat, 17 Jan 2026 18:29:48 +0100 Subject: [PATCH 1/3] tags from parser: fix parsers, add tests and fallback --- dojo/importers/default_importer.py | 4 +- dojo/importers/default_reimporter.py | 4 +- dojo/tools/codechecker/parser.py | 7 +- dojo/tools/meterian/parser.py | 2 +- dojo/tools/sarif/parser.py | 2 +- .../tools/sonarqube/sonarqube_restapi_json.py | 8 +- dojo/tools/sysdig_cli/parser.py | 2 +- dojo/tools/trivy/parser.py | 8 +- dojo/tools/trivy_operator/checks_handler.py | 2 +- dojo/tools/trivy_operator/secrets_handler.py | 2 +- .../trivy_operator/vulnerability_handler.py | 2 +- unittests/test_tags.py | 295 ++++++++++++++---- unittests/tools/test_meterian_parser.py | 6 +- unittests/tools/test_sarif_parser.py | 2 +- unittests/tools/test_trivy_operator_parser.py | 2 +- unittests/tools/test_trivy_parser.py | 10 +- 16 files changed, 273 insertions(+), 85 deletions(-) diff --git a/dojo/importers/default_importer.py b/dojo/importers/default_importer.py index 35fe6712387..bbdb2f7cb95 100644 --- a/dojo/importers/default_importer.py +++ b/dojo/importers/default_importer.py @@ -212,7 +212,9 @@ def process_findings( if self.service is not None: unsaved_finding.service = self.service - # Force parsers to use unsaved_tags (stored in below after saving) + # Merge any tags set by parser into unsaved_tags before nullifying + if unsaved_finding.tags: + unsaved_finding.unsaved_tags = (unsaved_finding.unsaved_tags or []) + list(unsaved_finding.tags) unsaved_finding.tags = None finding = self.process_cve(unsaved_finding) # Calculate hash_code before saving based on unsaved_endpoints and unsaved_vulnerability_ids diff --git a/dojo/importers/default_reimporter.py b/dojo/importers/default_reimporter.py index 47ce8c61acd..d241aed12bb 100644 --- a/dojo/importers/default_reimporter.py +++ b/dojo/importers/default_reimporter.py @@ -912,7 +912,9 @@ def finding_post_processing( self.endpoint_manager.chunk_endpoints_and_disperse(finding, finding_from_report.unsaved_endpoints) if len(self.endpoints_to_add) > 0: self.endpoint_manager.chunk_endpoints_and_disperse(finding, self.endpoints_to_add) - # Parsers must use unsaved_tags to store tags, so we can clean them + # Merge any tags set by parser into unsaved_tags + if finding_from_report.tags: + finding_from_report.unsaved_tags = (finding_from_report.unsaved_tags or []) + list(finding_from_report.tags) if finding_from_report.unsaved_tags: cleaned_tags = clean_tags(finding_from_report.unsaved_tags) if isinstance(cleaned_tags, list): diff --git a/dojo/tools/codechecker/parser.py b/dojo/tools/codechecker/parser.py index 16eff099c76..f1bf1586650 100644 --- a/dojo/tools/codechecker/parser.py +++ b/dojo/tools/codechecker/parser.py @@ -98,7 +98,7 @@ def get_item(vuln): else: title = unique_id_from_tool - return Finding( + finding = Finding( title=title, description=description, severity=severity, @@ -113,10 +113,9 @@ def get_item(vuln): sast_source_line=sast_source_line, static_finding=True, dynamic_finding=False, - tags=[ - vuln["analyzer_name"], - ], ) + finding.unsaved_tags = [vuln["analyzer_name"]] + return finding def get_mapped_severity(severity): diff --git a/dojo/tools/meterian/parser.py b/dojo/tools/meterian/parser.py index 492dec89c77..d882faa23dd 100644 --- a/dojo/tools/meterian/parser.py +++ b/dojo/tools/meterian/parser.py @@ -69,8 +69,8 @@ def do_get_findings(self, single_security_report, scan_date, test): dynamic_finding=False, file_path="Manifest file", unique_id_from_tool=advisory["id"], - tags=[language], ) + finding.unsaved_tags = [language] if "cve" in advisory: if advisory["cve"] != "N/A": diff --git a/dojo/tools/sarif/parser.py b/dojo/tools/sarif/parser.py index a32c9b7ea37..d554e1d51e3 100644 --- a/dojo/tools/sarif/parser.py +++ b/dojo/tools/sarif/parser.py @@ -269,7 +269,7 @@ def get_items_from_result(self, result, rules, artifacts, run_date): # manage tags provided in the report and rule and remove duplicated tags = list(set(get_properties_tags(rule) + get_properties_tags(result))) tags = [s.removeprefix("external/cwe/") for s in tags] - finding.tags = tags + finding.unsaved_tags = tags # manage fingerprints # fingerprinting in SARIF is more complete than in current implementation diff --git a/dojo/tools/sonarqube/sonarqube_restapi_json.py b/dojo/tools/sonarqube/sonarqube_restapi_json.py index 9feb7a14397..a01dd44037d 100644 --- a/dojo/tools/sonarqube/sonarqube_restapi_json.py +++ b/dojo/tools/sonarqube/sonarqube_restapi_json.py @@ -55,10 +55,10 @@ def get_json_items(self, json_content, test, mode): severity=self.severitytranslator(issue.get("severity")), static_finding=True, dynamic_finding=False, - tags=["bug"], line=line, date=date, ) + item.unsaved_tags = ["bug"] elif issue.get("type") == "VULNERABILITY": key = issue.get("key") rule = issue.get("rule") @@ -129,10 +129,10 @@ def get_json_items(self, json_content, test, mode): cwe=cwe, cvssv3_score=cvss, file_path=component, - tags=["vulnerability"], line=line, date=date, ) + item.unsaved_tags = ["vulnerability"] vulnids = [] if "Reference: CVE" in message: cve_pattern = r"Reference: CVE-\d{4}-\d{4,7}" @@ -200,10 +200,10 @@ def get_json_items(self, json_content, test, mode): static_finding=True, dynamic_finding=False, file_path=component, - tags=["code_smell"], line=line, date=date, ) + item.unsaved_tags = ["code_smell"] items.append(item) if json_content.get("hotspots"): for hotspot in json_content.get("hotspots"): @@ -249,10 +249,10 @@ def get_json_items(self, json_content, test, mode): static_finding=True, dynamic_finding=False, file_path=component, - tags=["hotspot"], line=line, date=date, ) + item.unsaved_tags = ["hotspot"] items.append(item) return items diff --git a/dojo/tools/sysdig_cli/parser.py b/dojo/tools/sysdig_cli/parser.py index 90b5fd82e2f..3b88bcec408 100644 --- a/dojo/tools/sysdig_cli/parser.py +++ b/dojo/tools/sysdig_cli/parser.py @@ -138,7 +138,7 @@ def parse_csv(self, arr_data, test): tags = [] if row.vulnerability_id: tags.append(clean_tags("VulnId:" + row.vulnerability_id)) - finding.tags = tags + finding.unsaved_tags = tags finding.dynamic_finding = False finding.static_finding = True finding.description += "\n\n###Vulnerability Details" diff --git a/dojo/tools/trivy/parser.py b/dojo/tools/trivy/parser.py index 184aa88c2ac..6308070d71a 100644 --- a/dojo/tools/trivy/parser.py +++ b/dojo/tools/trivy/parser.py @@ -323,10 +323,10 @@ def get_result_items(self, test, results, service_name=None, artifact_name=""): static_finding=True, dynamic_finding=False, fix_available=fix_available, - tags=[vul_type, target_class], service=service_name, **status_fields, ) + finding.unsaved_tags = [vul_type, target_class] if vuln_id: finding.unsaved_vulnerability_ids = [vuln_id] @@ -379,9 +379,9 @@ def get_result_items(self, test, results, service_name=None, artifact_name=""): fix_available=True, static_finding=True, dynamic_finding=False, - tags=[target_type, target_class], service=service_name, ) + finding.unsaved_tags = [target_type, target_class] items.append(finding) secrets = target_data.get("Secrets", []) @@ -410,9 +410,9 @@ def get_result_items(self, test, results, service_name=None, artifact_name=""): static_finding=True, dynamic_finding=False, fix_available=True, - tags=[target_class], service=service_name, ) + finding.unsaved_tags = [target_class] items.append(finding) licenses = target_data.get("Licenses", []) @@ -444,9 +444,9 @@ def get_result_items(self, test, results, service_name=None, artifact_name=""): static_finding=True, dynamic_finding=False, fix_available=True, - tags=[target_class], service=service_name, ) + finding.unsaved_tags = [target_class] items.append(finding) return items diff --git a/dojo/tools/trivy_operator/checks_handler.py b/dojo/tools/trivy_operator/checks_handler.py index cc2fe0d9b67..e9d27edcdb9 100644 --- a/dojo/tools/trivy_operator/checks_handler.py +++ b/dojo/tools/trivy_operator/checks_handler.py @@ -48,7 +48,7 @@ def handle_checks(self, labels, checks, test): fix_available=True, ) if resource_namespace: - finding.tags = resource_namespace + finding.unsaved_tags = [resource_namespace] if check_id: finding.unsaved_vulnerability_ids = [UniformTrivyVulnID().return_uniformed_vulnid(check_id)] findings.append(finding) diff --git a/dojo/tools/trivy_operator/secrets_handler.py b/dojo/tools/trivy_operator/secrets_handler.py index ae387b2c8ec..e0a1b1996f5 100644 --- a/dojo/tools/trivy_operator/secrets_handler.py +++ b/dojo/tools/trivy_operator/secrets_handler.py @@ -56,6 +56,6 @@ def handle_secrets(self, labels, secrets, test): fix_available=True, ) if resource_namespace: - finding.tags = resource_namespace + finding.unsaved_tags = [resource_namespace] findings.append(finding) return findings diff --git a/dojo/tools/trivy_operator/vulnerability_handler.py b/dojo/tools/trivy_operator/vulnerability_handler.py index 266a5b37cb4..685f0760763 100644 --- a/dojo/tools/trivy_operator/vulnerability_handler.py +++ b/dojo/tools/trivy_operator/vulnerability_handler.py @@ -86,9 +86,9 @@ def handle_vulns(self, labels, vulnerabilities, test): dynamic_finding=False, service=service, file_path=file_path, - tags=[tag for tag in finding_tags if tag], fix_available=fix_available, ) + finding.unsaved_tags = [tag for tag in finding_tags if tag] if vuln_id: finding.unsaved_vulnerability_ids = [UniformTrivyVulnID().return_uniformed_vulnid(vuln_id)] findings.append(finding) diff --git a/unittests/test_tags.py b/unittests/test_tags.py index f4723ae8bcb..b08990e9bd2 100644 --- a/unittests/test_tags.py +++ b/unittests/test_tags.py @@ -1,5 +1,10 @@ import logging import random +from pathlib import Path + +from django.contrib.auth.models import User +from django.test import Client +from django.urls import reverse from dojo.models import Finding, Product, Test from dojo.product.helpers import propagate_tags_on_product_sync @@ -244,7 +249,38 @@ def test_finding_create_tags_with_slashes(self): # logger.debug('looking for tag %s in tag list %s', tag, response['tags']) self.assertIn(tag, response["tags"]) + def test_import_multipart_tags(self): + """API-specific test for multipart form data tag handling.""" + with (self.zap_sample5_filename).open(encoding="utf-8") as testfile: + data = { + "engagement": [1], + "file": [testfile], + "scan_type": ["ZAP Scan"], + "tags": ["bug,security", "urgent"], # Attempting to mimic the two "tag" fields (-F 'tags=tag1' -F 'tags=tag2') + } + response = self.import_scan(data, 201) + # Make sure the serializer returns the correct tags + success_tags = ["bug", "security", "urgent"] + self.assertEqual(response["tags"], success_tags) + # Check that the test has the same issue + test_id = response["test"] + response = self.get_test_api(test_id) + self.assertEqual(len(success_tags), len(response.get("tags"))) + for tag in success_tags: + self.assertIn(tag, response["tags"]) + + +class TagImportMixin: + + """Mixin containing import/reimport tag tests that can be run via API or UI.""" + + def setUp(self): + self.zap_sample5_filename = get_unit_tests_scans_path("zap") / "5_zap_sample_one.xml" + self.generic_sample_with_tags_filename = get_unit_tests_scans_path("generic") / "generic_report1.json" + self.generic_sample_with_more_tags_filename = get_unit_tests_scans_path("generic") / "generic_report1_more_tags.json" + def test_import_and_reimport_with_tags(self): + """Test that tags passed as import parameter are applied to the test.""" tags = ["tag1", "tag2"] import0 = self.import_scan_with_params(self.zap_sample5_filename, tags=tags) test_id = import0["test"] @@ -267,26 +303,8 @@ def test_import_and_reimport_with_tags(self): for tag in tags: self.assertIn(tag, response["tags"]) - def test_import_multipart_tags(self): - with (self.zap_sample5_filename).open(encoding="utf-8") as testfile: - data = { - "engagement": [1], - "file": [testfile], - "scan_type": ["ZAP Scan"], - "tags": ["bug,security", "urgent"], # Attempting to mimic the two "tag" fields (-F 'tags=tag1' -F 'tags=tag2') - } - response = self.import_scan(data, 201) - # Make sure the serializer returns the correct tags - success_tags = ["bug", "security", "urgent"] - self.assertEqual(response["tags"], success_tags) - # Check that the test has the same issue - test_id = response["test"] - response = self.get_test_api(test_id) - self.assertEqual(len(success_tags), len(response.get("tags"))) - for tag in success_tags: - self.assertIn(tag, response["tags"]) - def test_import_report_with_tags(self): + """Test that parser-generated tags on findings are preserved during import/reimport.""" def assert_tags_in_findings(findings: list[dict], expected_finding_count: int, desired_tags: list[str]) -> None: self.assertEqual(expected_finding_count, len(findings)) for finding in findings: @@ -309,7 +327,85 @@ def assert_tags_in_findings(findings: list[dict], expected_finding_count: int, d assert_tags_in_findings(findings, 2, ["security", "network", "hardened"]) +class TagImportTestAPI(DojoAPITestCase, TagImportMixin): + + """Test tag handling during import/reimport via API.""" + + fixtures = ["dojo_testdata.json"] + + def setUp(self): + super().setUp() + testuser = User.objects.get(username="admin") + testuser.usercontactinfo.block_execution = True + testuser.usercontactinfo.save() + self.login_as_admin() + TagImportMixin.setUp(self) + + +class TagImportTestUI(DojoAPITestCase, TagImportMixin): + + """Test tag handling during import/reimport via UI.""" + + fixtures = ["dojo_testdata.json"] + + def setUp(self): + super().setUp() + testuser = User.objects.get(username="admin") + testuser.usercontactinfo.block_execution = True + testuser.usercontactinfo.save() + self.login_as_admin() + self.client_ui = Client() + self.client_ui.force_login(self.get_test_admin()) + TagImportMixin.setUp(self) + + def import_scan_with_params(self, filename, scan_type="ZAP Scan", engagement=1, minimum_severity="Low", *, + active=True, verified=False, tags=None, close_old_findings=False, **kwargs): + """Override to use UI import instead of API.""" + with Path(filename).open(encoding="utf-8") as testfile: + payload = { + "minimum_severity": minimum_severity, + "active": "force_to_true" if active else "force_to_false", + "verified": "force_to_true" if verified else "force_to_false", + "scan_type": scan_type, + "file": testfile, + "environment": 1, + "close_old_findings": close_old_findings, + } + if tags is not None: + # Tagulous form field expects comma-separated string + payload["tags"] = ",".join(tags) if isinstance(tags, list) else tags + + response = self.client_ui.post(reverse("import_scan_results", args=(engagement,)), payload) + self.assertEqual(302, response.status_code, response.content[:1000]) + test_id = int(response.url.split("/")[-1]) + return {"test": test_id} + + def reimport_scan_with_params(self, test_id, filename, scan_type="ZAP Scan", minimum_severity="Low", *, + active=True, verified=False, tags=None, close_old_findings=True, **kwargs): + """Override to use UI reimport instead of API.""" + with Path(filename).open(encoding="utf-8") as testfile: + payload = { + "minimum_severity": minimum_severity, + "active": "force_to_true" if active else "force_to_false", + "verified": "force_to_true" if verified else "force_to_false", + "scan_type": scan_type, + "file": testfile, + "close_old_findings": close_old_findings, + } + if tags is not None: + # Tagulous form field expects comma-separated string + payload["tags"] = ",".join(tags) if isinstance(tags, list) else tags + + response = self.client_ui.post(reverse("re_import_scan_results", args=(test_id,)), payload) + self.assertEqual(302, response.status_code, response.content[:1000]) + new_test_id = int(response.url.split("/")[-1]) + return {"test": new_test_id} + + class InheritedTagsTests(DojoAPITestCase): + + """Non-import tests for inherited tags functionality.""" + fixtures = ["dojo_testdata.json"] def setUp(self, *args, **kwargs): @@ -323,6 +419,56 @@ def setUp(self, *args, **kwargs): def _convert_instance_tags_to_list(self, instance) -> list: return [tag.name for tag in instance.tags.all()] + def test_new_engagement_then_add_tag_to_engagement_then_remove_tag_to_engagement(self): + # Create the engagement + engagement = self.create_engagement("Inherited Tags Engagement", self.product) + test = self.create_test(engagement=engagement, scan_type="ZAP Scan") + # Check to see if tags match the product + product_tags = self._convert_instance_tags_to_list(self.product) + self.assertEqual(product_tags, self._convert_instance_tags_to_list(engagement)) + self.assertEqual(product_tags, self._convert_instance_tags_to_list(test)) + # Add a tag on the engagement) + engagement_tags_before_addition = self._convert_instance_tags_to_list(engagement) + engagement.tags.add("engagement_only_tag") + # Check to see that the update was successful + self.assertEqual(["engagement_only_tag", *engagement_tags_before_addition], self._convert_instance_tags_to_list(engagement)) + # Check to see that tests were not impacted + self.assertEqual(product_tags, self._convert_instance_tags_to_list(test)) + # remove a tag on the engagement + engagement_tags_before_removal = self._convert_instance_tags_to_list(engagement) + engagement.tags.remove("engagement_only_tag") + # Check to see that the update was successful + engagement_tags_before_removal.remove("engagement_only_tag") + self.assertEqual(engagement_tags_before_removal, self._convert_instance_tags_to_list(engagement)) + # Check to see that tests were not impacted + self.assertEqual(product_tags, self._convert_instance_tags_to_list(test)) + + def test_new_engagement_then_remove_inherited_tag(self): + # Create the engagement + engagement = self.create_engagement("Inherited Tags Engagement", self.product) + # Check to see if tags match the product + product_tags = self._convert_instance_tags_to_list(self.product) + self.assertEqual(product_tags, self._convert_instance_tags_to_list(engagement)) + # Remove an inherited tag + engagement_tags_before_removal = self._convert_instance_tags_to_list(engagement) + engagement.tags.remove("inherit") + # Check to see that the inherited tag could not be removed + self.assertEqual(engagement_tags_before_removal, self._convert_instance_tags_to_list(engagement)) + + +class InheritedTagsImportMixin: + + """Mixin containing inherited tags import/reimport tests that can be run via API or UI.""" + + def setUp(self): + self.system_settings(enable_product_tag_inheritance=True) + self.product = self.create_product("Inherited Tags Test", tags=["inherit", "these", "tags"]) + self.scans_path = get_unit_tests_scans_path("zap") + self.zap_sample5_filename = self.scans_path / "5_zap_sample_one.xml" + + def _convert_instance_tags_to_list(self, instance) -> list: + return [tag.name for tag in instance.tags.all()] + def _import_and_return_objects(self, test_id=None, *, reimport=False, tags=None) -> dict: # Import some findings to create all objects engagement = self.create_engagement("Inherited Tags Engagement", self.product) @@ -372,42 +518,6 @@ def test_import_with_tags_then_reimport_with_different_tag(self): product_tags_plus_reimport_tag.insert(1, "reimport_tag") self.assertEqual(product_tags_plus_reimport_tag, self._convert_instance_tags_to_list(objects.get("test"))) - def test_new_engagement_then_add_tag_to_engagement_then_remove_tag_to_engagement(self): - # Create the engagement - engagement = self.create_engagement("Inherited Tags Engagement", self.product) - test = self.create_test(engagement=engagement, scan_type="ZAP Scan") - # Check to see if tags match the product - product_tags = self._convert_instance_tags_to_list(self.product) - self.assertEqual(product_tags, self._convert_instance_tags_to_list(engagement)) - self.assertEqual(product_tags, self._convert_instance_tags_to_list(test)) - # Add a tag on the engagement) - engagement_tags_before_addition = self._convert_instance_tags_to_list(engagement) - engagement.tags.add("engagement_only_tag") - # Check to see that the update was successful - self.assertEqual(["engagement_only_tag", *engagement_tags_before_addition], self._convert_instance_tags_to_list(engagement)) - # Check to see that tests were not impacted - self.assertEqual(product_tags, self._convert_instance_tags_to_list(test)) - # remove a tag on the engagement - engagement_tags_before_removal = self._convert_instance_tags_to_list(engagement) - engagement.tags.remove("engagement_only_tag") - # Check to see that the update was successful - engagement_tags_before_removal.remove("engagement_only_tag") - self.assertEqual(engagement_tags_before_removal, self._convert_instance_tags_to_list(engagement)) - # Check to see that tests were not impacted - self.assertEqual(product_tags, self._convert_instance_tags_to_list(test)) - - def test_new_engagement_then_remove_inherited_tag(self): - # Create the engagement - engagement = self.create_engagement("Inherited Tags Engagement", self.product) - # Check to see if tags match the product - product_tags = self._convert_instance_tags_to_list(self.product) - self.assertEqual(product_tags, self._convert_instance_tags_to_list(engagement)) - # Remove an inherited tag - engagement_tags_before_removal = self._convert_instance_tags_to_list(engagement) - engagement.tags.remove("inherit") - # Check to see that the inherited tag could not be removed - self.assertEqual(engagement_tags_before_removal, self._convert_instance_tags_to_list(engagement)) - def test_remove_tag_from_product_then_add_tag_to_product(self): # Import some findings to create all objects objects = self._import_and_return_objects() @@ -439,3 +549,78 @@ def test_remove_tag_from_product_then_add_tag_to_product(self): self.assertEqual(product_tags_post_addition, self._convert_instance_tags_to_list(objects.get("endpoint"))) self.assertEqual(product_tags_post_addition, self._convert_instance_tags_to_list(objects.get("test"))) self.assertEqual(product_tags_post_addition, self._convert_instance_tags_to_list(objects.get("finding"))) + + +class InheritedTagsImportTestAPI(DojoAPITestCase, InheritedTagsImportMixin): + + """Test inherited tags during import/reimport via API.""" + + fixtures = ["dojo_testdata.json"] + + def setUp(self): + super().setUp() + testuser = User.objects.get(username="admin") + testuser.usercontactinfo.block_execution = True + testuser.usercontactinfo.save() + self.login_as_admin() + InheritedTagsImportMixin.setUp(self) + + +class InheritedTagsImportTestUI(DojoAPITestCase, InheritedTagsImportMixin): + + """Test inherited tags during import/reimport via UI.""" + + fixtures = ["dojo_testdata.json"] + + def setUp(self): + super().setUp() + testuser = User.objects.get(username="admin") + testuser.usercontactinfo.block_execution = True + testuser.usercontactinfo.save() + self.login_as_admin() + self.client_ui = Client() + self.client_ui.force_login(self.get_test_admin()) + InheritedTagsImportMixin.setUp(self) + + def import_scan_with_params(self, filename, scan_type="ZAP Scan", engagement=1, minimum_severity="Low", *, + active=True, verified=False, tags=None, close_old_findings=False, **kwargs): + """Override to use UI import instead of API.""" + with Path(filename).open(encoding="utf-8") as testfile: + payload = { + "minimum_severity": minimum_severity, + "active": "force_to_true" if active else "force_to_false", + "verified": "force_to_true" if verified else "force_to_false", + "scan_type": scan_type, + "file": testfile, + "environment": 1, + "close_old_findings": close_old_findings, + } + if tags is not None: + # Tagulous form field expects comma-separated string + payload["tags"] = ",".join(tags) if isinstance(tags, list) else tags + + response = self.client_ui.post(reverse("import_scan_results", args=(engagement,)), payload) + self.assertEqual(302, response.status_code, response.content[:1000]) + test_id = int(response.url.split("/")[-1]) + return {"test": test_id} + + def reimport_scan_with_params(self, test_id, filename, scan_type="ZAP Scan", minimum_severity="Low", *, + active=True, verified=False, tags=None, close_old_findings=True, **kwargs): + """Override to use UI reimport instead of API.""" + with Path(filename).open(encoding="utf-8") as testfile: + payload = { + "minimum_severity": minimum_severity, + "active": "force_to_true" if active else "force_to_false", + "verified": "force_to_true" if verified else "force_to_false", + "scan_type": scan_type, + "file": testfile, + "close_old_findings": close_old_findings, + } + if tags is not None: + # Tagulous form field expects comma-separated string + payload["tags"] = ",".join(tags) if isinstance(tags, list) else tags + + response = self.client_ui.post(reverse("re_import_scan_results", args=(test_id,)), payload) + self.assertEqual(302, response.status_code, response.content[:1000]) + new_test_id = int(response.url.split("/")[-1]) + return {"test": new_test_id} diff --git a/unittests/tools/test_meterian_parser.py b/unittests/tools/test_meterian_parser.py index 794856840d0..b08226a7916 100644 --- a/unittests/tools/test_meterian_parser.py +++ b/unittests/tools/test_meterian_parser.py @@ -67,7 +67,7 @@ def test_meterianParser_finding_has_fields(self): self.assertIn("https://github.com/knowledgecode/date-and-time/security/advisories/GHSA-r92x-f52r-x54g", finding.references, "found " + finding.references) self.assertIn("https://github.com/knowledgecode/date-and-time/commit/9e4b501eacddccc8b1f559fb414f48472ee17c2a", finding.references, "found " + finding.references) self.assertIn("Manifest file", finding.file_path) - self.assertEqual(["nodejs"], finding.tags) + self.assertEqual(["nodejs"], finding.unsaved_tags) def test_meterianParser_finding_has_no_remediation(self): with (get_unit_tests_scans_path("meterian") / "report_one_vuln_no_remediation.json").open(encoding="utf-8") as testfile: @@ -90,5 +90,5 @@ def test_meterianParser_dual_language_report_has_two_findins(self): findings = parser.get_findings(testfile, Test()) self.assertEqual(2, len(findings)) - self.assertIn("nodejs", findings[0].tags) - self.assertIn("ruby", findings[1].tags) + self.assertIn("nodejs", findings[0].unsaved_tags) + self.assertIn("ruby", findings[1].unsaved_tags) diff --git a/unittests/tools/test_sarif_parser.py b/unittests/tools/test_sarif_parser.py index 227a584fb06..77fe9e9a7fe 100644 --- a/unittests/tools/test_sarif_parser.py +++ b/unittests/tools/test_sarif_parser.py @@ -585,7 +585,7 @@ def test_tags_from_result_properties(self): parser = SarifParser() findings = parser.get_findings(testfile, Test()) item = findings[0] - self.assertEqual(["Scan"], item.tags) + self.assertEqual(["Scan"], item.unsaved_tags) def test_severity_in_properties(self): with (get_unit_tests_scans_path("sarif") / "issue_10191.json").open(encoding="utf-8") as testfile: diff --git a/unittests/tools/test_trivy_operator_parser.py b/unittests/tools/test_trivy_operator_parser.py index c7641902b82..2eaa0b6ac42 100644 --- a/unittests/tools/test_trivy_operator_parser.py +++ b/unittests/tools/test_trivy_operator_parser.py @@ -129,7 +129,7 @@ def test_vulnerabilityreport_extended(self): self.assertEqual("3.6.13-2ubuntu1.10", finding.mitigation) self.assertEqual(5.9, finding.cvssv3_score) self.assertEqual("ubuntu:20.04 (ubuntu 20.04)", finding.file_path) - self.assertEqual("lbc, os-pkgs, ubuntu", str(finding.tags)) + self.assertEqual(["lbc", "ubuntu", "os-pkgs"], finding.unsaved_tags) def test_cis_benchmark(self): with sample_path("cis_benchmark.json").open(encoding="utf-8") as test_file: diff --git a/unittests/tools/test_trivy_parser.py b/unittests/tools/test_trivy_parser.py index f6c08156fee..c5333c2bd16 100644 --- a/unittests/tools/test_trivy_parser.py +++ b/unittests/tools/test_trivy_parser.py @@ -92,7 +92,7 @@ def test_misconfigurations_and_secrets(self): references = """https://avd.aquasec.com/misconfig/ds002 https://docs.docker.com/develop/develop-images/dockerfile_best-practices/""" self.assertEqual(references, finding.references) - self.assertEqual(["config", "dockerfile"], finding.tags) + self.assertEqual(["dockerfile", "config"], finding.unsaved_tags) finding = findings[3] self.assertEqual("Secret detected in Dockerfile - GitHub Personal Access Token", finding.title) self.assertEqual("Critical", finding.severity) @@ -103,7 +103,7 @@ def test_misconfigurations_and_secrets(self): self.assertEqual(description, finding.description) self.assertEqual("Dockerfile", finding.file_path) self.assertEqual(24, finding.line) - self.assertEqual(["secret"], finding.tags) + self.assertEqual(["secret"], finding.unsaved_tags) def test_kubernetes(self): with sample_path("kubernetes.json").open(encoding="utf-8") as test_file: @@ -124,7 +124,7 @@ def test_kubernetes(self): self.assertEqual("1.8.2.2", finding.mitigation) self.assertEqual(1, len(finding.unsaved_vulnerability_ids)) self.assertEqual("CVE-2020-27350", finding.unsaved_vulnerability_ids[0]) - self.assertEqual(["debian", "os-pkgs"], finding.tags) + self.assertEqual(["debian", "os-pkgs"], finding.unsaved_tags) self.assertEqual("apt", finding.component_name) self.assertEqual("1.8.2.1", finding.component_version) self.assertEqual("default / Deployment / redis-follower", finding.service) @@ -143,7 +143,7 @@ def test_kubernetes(self): self.assertEqual("1.8.2.2", finding.mitigation) self.assertEqual(1, len(finding.unsaved_vulnerability_ids)) self.assertEqual("CVE-2020-27350", finding.unsaved_vulnerability_ids[0]) - self.assertEqual(["debian", "os-pkgs"], finding.tags) + self.assertEqual(["debian", "os-pkgs"], finding.unsaved_tags) self.assertEqual("apt", finding.component_name) self.assertEqual("1.8.2.1", finding.component_version) self.assertEqual("default / Deployment / redis-leader", finding.service) @@ -171,7 +171,7 @@ def test_kubernetes(self): self.assertEqual(re_description.strip(), re_finding_description.strip()) self.assertEqual("Set 'set containers[].securityContext.allowPrivilegeEscalation' to 'false'.", finding.mitigation) self.assertIsNone(finding.unsaved_vulnerability_ids) - self.assertEqual(["config", "kubernetes"], finding.tags) + self.assertEqual(["kubernetes", "config"], finding.unsaved_tags) self.assertIsNone(finding.component_name) self.assertIsNone(finding.component_version) self.assertEqual("default / Deployment / redis-follower", finding.service) From 23c67b2c084712a6bffb350a2832a41e707175ef Mon Sep 17 00:00:00 2001 From: Valentijn Scholten Date: Sat, 17 Jan 2026 21:02:59 +0100 Subject: [PATCH 2/3] fix tag merge --- dojo/importers/default_importer.py | 7 +++++-- dojo/importers/default_reimporter.py | 7 +++++-- 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/dojo/importers/default_importer.py b/dojo/importers/default_importer.py index bbdb2f7cb95..fc2faeefe46 100644 --- a/dojo/importers/default_importer.py +++ b/dojo/importers/default_importer.py @@ -213,8 +213,11 @@ def process_findings( unsaved_finding.service = self.service # Merge any tags set by parser into unsaved_tags before nullifying - if unsaved_finding.tags: - unsaved_finding.unsaved_tags = (unsaved_finding.unsaved_tags or []) + list(unsaved_finding.tags) + tags_from_parser = unsaved_finding.tags if isinstance(unsaved_finding.tags, list) else [] + unsaved_tags_from_parser = unsaved_finding.unsaved_tags if isinstance(unsaved_finding.unsaved_tags, list) else [] + merged_tags = unsaved_tags_from_parser + tags_from_parser + if merged_tags: + unsaved_finding.unsaved_tags = merged_tags unsaved_finding.tags = None finding = self.process_cve(unsaved_finding) # Calculate hash_code before saving based on unsaved_endpoints and unsaved_vulnerability_ids diff --git a/dojo/importers/default_reimporter.py b/dojo/importers/default_reimporter.py index d241aed12bb..cfa1d47745a 100644 --- a/dojo/importers/default_reimporter.py +++ b/dojo/importers/default_reimporter.py @@ -913,8 +913,11 @@ def finding_post_processing( if len(self.endpoints_to_add) > 0: self.endpoint_manager.chunk_endpoints_and_disperse(finding, self.endpoints_to_add) # Merge any tags set by parser into unsaved_tags - if finding_from_report.tags: - finding_from_report.unsaved_tags = (finding_from_report.unsaved_tags or []) + list(finding_from_report.tags) + tags_from_parser = finding_from_report.tags if isinstance(finding_from_report.tags, list) else [] + unsaved_tags_from_parser = finding_from_report.unsaved_tags if isinstance(finding_from_report.unsaved_tags, list) else [] + merged_tags = unsaved_tags_from_parser + tags_from_parser + if merged_tags: + finding_from_report.unsaved_tags = merged_tags if finding_from_report.unsaved_tags: cleaned_tags = clean_tags(finding_from_report.unsaved_tags) if isinstance(cleaned_tags, list): From b005a9e72e3b1115bcccf328dfa3daef0a4b611c Mon Sep 17 00:00:00 2001 From: Valentijn Scholten Date: Sat, 17 Jan 2026 21:43:11 +0100 Subject: [PATCH 3/3] comments --- dojo/importers/default_importer.py | 3 ++- dojo/importers/default_reimporter.py | 1 + 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/dojo/importers/default_importer.py b/dojo/importers/default_importer.py index fc2faeefe46..2bbdb4b58eb 100644 --- a/dojo/importers/default_importer.py +++ b/dojo/importers/default_importer.py @@ -212,7 +212,8 @@ def process_findings( if self.service is not None: unsaved_finding.service = self.service - # Merge any tags set by parser into unsaved_tags before nullifying + # Parsers shouldn't use the tags field, and use unsaved_tags instead. + # Merge any tags set by parser into unsaved_tags tags_from_parser = unsaved_finding.tags if isinstance(unsaved_finding.tags, list) else [] unsaved_tags_from_parser = unsaved_finding.unsaved_tags if isinstance(unsaved_finding.unsaved_tags, list) else [] merged_tags = unsaved_tags_from_parser + tags_from_parser diff --git a/dojo/importers/default_reimporter.py b/dojo/importers/default_reimporter.py index cfa1d47745a..94c5bd4d127 100644 --- a/dojo/importers/default_reimporter.py +++ b/dojo/importers/default_reimporter.py @@ -912,6 +912,7 @@ def finding_post_processing( self.endpoint_manager.chunk_endpoints_and_disperse(finding, finding_from_report.unsaved_endpoints) if len(self.endpoints_to_add) > 0: self.endpoint_manager.chunk_endpoints_and_disperse(finding, self.endpoints_to_add) + # Parsers shouldn't use the tags field, and use unsaved_tags instead. # Merge any tags set by parser into unsaved_tags tags_from_parser = finding_from_report.tags if isinstance(finding_from_report.tags, list) else [] unsaved_tags_from_parser = finding_from_report.unsaved_tags if isinstance(finding_from_report.unsaved_tags, list) else []