Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 20 additions & 16 deletions dojo/finding/deduplication.py
Original file line number Diff line number Diff line change
Expand Up @@ -292,22 +292,26 @@ def build_candidate_scope_queryset(test, mode="deduplication", service=None):
)
queryset = Finding.objects.filter(scope_q)

# Base prefetches for both modes
prefetch_list = ["endpoints", "vulnerability_id_set", "found_by"]

# Additional prefetches for reimport mode: fetch only non-special endpoint statuses with their
# endpoint joined in, so endpoint_manager can read status_finding_non_special directly without
# any extra DB queries
if mode == "reimport":
prefetch_list.append(
Prefetch(
"status_finding",
queryset=Endpoint_Status.objects.exclude(
Q(false_positive=True) | Q(out_of_scope=True) | Q(risk_accepted=True),
).select_related("endpoint"),
to_attr="status_finding_non_special",
),
)
if settings.V3_FEATURE_LOCATIONS:
prefetch_list = ["locations__location__url", "vulnerability_id_set", "found_by"]
else:
# TODO: Delete this after the move to Locations
# Base prefetches for both modes
prefetch_list = ["endpoints", "vulnerability_id_set", "found_by"]

# Additional prefetches for reimport mode: fetch only non-special endpoint statuses with their
# endpoint joined in, so endpoint_manager can read status_finding_non_special directly without
# any extra DB queries
if mode == "reimport":
prefetch_list.append(
Prefetch(
"status_finding",
queryset=Endpoint_Status.objects.exclude(
Q(false_positive=True) | Q(out_of_scope=True) | Q(risk_accepted=True),
).select_related("endpoint"),
to_attr="status_finding_non_special",
),
)

return (
queryset
Expand Down
26 changes: 24 additions & 2 deletions unittests/test_deduplication_logic.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from django.core import serializers
from django.utils import timezone

from dojo.finding.deduplication import set_duplicate
from dojo.finding.deduplication import build_candidate_scope_queryset, set_duplicate
from dojo.importers.default_importer import DefaultImporter
from dojo.models import (
Development_Environment,
Expand All @@ -27,7 +27,12 @@
)
from dojo.url.models import URL

from .dojo_test_case import DojoTestCase, get_unit_tests_scans_path, versioned_fixtures
from .dojo_test_case import (
DojoTestCase,
get_unit_tests_scans_path,
skip_unless_v3,
versioned_fixtures,
)

logger = logging.getLogger(__name__)
deduplicationLogger = logging.getLogger("dojo.specific-loggers.deduplication")
Expand Down Expand Up @@ -1892,3 +1897,20 @@ def enable_dedupe(self, *, enable=True):
system_settings = System_Settings.objects.get()
system_settings.enable_deduplication = enable
system_settings.save()


@versioned_fixtures
class TestDedupeRelatedMethods(DojoTestCase):
fixtures = ["dojo_testdata.json"]

@skip_unless_v3
def test_build_candidate_scope_queryset_does_not_crash_when_locations_enabled_but_endpoint_exists(self):
# Create an Endpoint and associate it with a Finding
with Endpoint.allow_endpoint_init():
endpoint = Endpoint.objects.create(host="test-host.com")
test = Test.objects.get(id=3)
finding = Finding.objects.filter(test=test).first()
finding.endpoints.add(endpoint)
# This used to explode when it tried to prefetch Endpoints when they are disabled, now fixed
finding = build_candidate_scope_queryset(test).filter(id=finding.id).first()
self.assertTrue(finding)
Loading