Risk Acceptance
| {{ risk_acceptance.accepted_findings_count }} |
{% if risk_acceptance.filename %}
Yes
-
|
{% else %}
diff --git a/dojo/templates/dojo/view_risk_acceptance.html b/dojo/templates/dojo/view_risk_acceptance.html
index 5086fc2d6c3..16c43b579a4 100644
--- a/dojo/templates/dojo/view_risk_acceptance.html
+++ b/dojo/templates/dojo/view_risk_acceptance.html
@@ -142,13 +142,13 @@ Decision & Recommendation
{{ risk_acceptance.get_recommendation_display }}
- {{ risk_acceptance.recommendation_details|markdown_render }}
+ {{ risk_acceptance.recommendation_details }}
|
{{ risk_acceptance.get_decision_display }}
|
- {{ risk_acceptance.decision_details|markdown_render }}
+ {{ risk_acceptance.decision_details }}
|
diff --git a/dojo/url/api/serializer.py b/dojo/url/api/serializer.py
index f35a9abf726..516193cba4c 100644
--- a/dojo/url/api/serializer.py
+++ b/dojo/url/api/serializer.py
@@ -11,4 +11,4 @@ class Meta:
"""Meta class for URLSerializer."""
model = URL
- fields = "__all__"
+ exclude = ("location", "hash")
diff --git a/dojo/utils.py b/dojo/utils.py
index ba1b5ed0d7c..a5d8a13ed81 100644
--- a/dojo/utils.py
+++ b/dojo/utils.py
@@ -140,11 +140,6 @@ def do_false_positive_history(finding, *args, **kwargs):
existing_non_fp_findings = existing_findings.filter(active=True).exclude(false_p=True)
to_mark_as_fp.update(set(existing_non_fp_findings))
- # Remove the async user kwarg because save() really does not like it
- # Would rather not add anything to Finding.save()
- if "async_user" in kwargs:
- kwargs.pop("async_user")
-
for find in to_mark_as_fp:
deduplicationLogger.debug(
"FALSE_POSITIVE_HISTORY: Marking Finding %i:%s from %s as false positive",
@@ -1971,7 +1966,7 @@ def mass_model_updater(model_type, models, function, fields, page_size=1000, ord
def to_str_typed(obj):
"""For code that handles multiple types of objects, print not only __str__ but prefix the type of the object"""
- return f"{type(obj)}: {obj}"
+ return f"{type(obj).__name__}: {obj}"
def get_product(obj):
@@ -2065,7 +2060,7 @@ def async_delete_chunk_task(objects, **kwargs):
"""
Module-level Celery task to delete a chunk of objects.
- Accepts **kwargs for async_user and _pgh_context injected by dojo_dispatch_task.
+ Accepts **kwargs for _pgh_context injected by dojo_dispatch_task.
Uses PgHistoryTask base class (default) to preserve pghistory context for audit trail.
"""
max_retries = 3
@@ -2119,7 +2114,7 @@ def async_delete_crawl_task(obj, model_list, **kwargs):
"""
Module-level Celery task to crawl and delete related objects.
- Accepts **kwargs for async_user and _pgh_context injected by dojo_dispatch_task.
+ Accepts **kwargs for _pgh_context injected by dojo_dispatch_task.
Uses PgHistoryTask base class (default) to preserve pghistory context for audit trail.
"""
from dojo.celery_dispatch import dojo_dispatch_task # noqa: PLC0415 circular import
@@ -2158,7 +2153,7 @@ def async_delete_task(obj, **kwargs):
"""
Module-level Celery task to delete an object and its related objects.
- Accepts **kwargs for async_user and _pgh_context injected by dojo_dispatch_task.
+ Accepts **kwargs for _pgh_context injected by dojo_dispatch_task.
Uses PgHistoryTask base class (default) to preserve pghistory context for audit trail.
"""
from dojo.celery_dispatch import dojo_dispatch_task # noqa: PLC0415 circular import
@@ -2196,7 +2191,7 @@ def delete(self, obj, **kwargs):
Entry point to delete an object asynchronously.
Dispatches to async_delete_task via dojo_dispatch_task to ensure proper
- handling of async_user and _pgh_context.
+ handling of user context and _pgh_context.
"""
from dojo.celery_dispatch import dojo_dispatch_task # noqa: PLC0415 circular import
diff --git a/helm/defectdojo/Chart.yaml b/helm/defectdojo/Chart.yaml
index fe3807b46a0..cac4e2ce84a 100644
--- a/helm/defectdojo/Chart.yaml
+++ b/helm/defectdojo/Chart.yaml
@@ -1,8 +1,8 @@
apiVersion: v2
-appVersion: "2.55.2"
+appVersion: "2.55.3"
description: A Helm chart for Kubernetes to install DefectDojo
name: defectdojo
-version: 1.9.12
+version: 1.9.13
icon: https://defectdojo.com/hubfs/DefectDojo_favicon.png
maintainers:
- name: madchap
@@ -34,4 +34,4 @@ dependencies:
# description: Critical bug
annotations:
artifacthub.io/prerelease: "false"
- artifacthub.io/changes: "- kind: changed\n description: Bump DefectDojo to 2.55.2\n"
+ artifacthub.io/changes: "- kind: changed\n description: Bump DefectDojo to 2.55.3\n"
diff --git a/helm/defectdojo/README.md b/helm/defectdojo/README.md
index 480b35e158c..27e93da4c3f 100644
--- a/helm/defectdojo/README.md
+++ b/helm/defectdojo/README.md
@@ -511,7 +511,7 @@ The HELM schema will be generated for you.
# General information about chart values
- 
+ 
A Helm chart for Kubernetes to install DefectDojo
diff --git a/requirements.txt b/requirements.txt
index 52396d021d7..0bf77d07125 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -29,9 +29,9 @@ PyGithub==2.8.1
lxml==6.0.2
Markdown==3.10.1
openpyxl==3.1.5
-Pillow==12.1.0 # required by django-imagekit
+Pillow==12.1.1 # required by django-imagekit
psycopg[c]==3.3.2
-cryptography==46.0.4
+cryptography==46.0.5
python-dateutil==2.9.0.post0
redis==7.1.0
requests==2.32.5
diff --git a/unittests/test_async_delete.py b/unittests/test_async_delete.py
index 341723e8296..b8320d24707 100644
--- a/unittests/test_async_delete.py
+++ b/unittests/test_async_delete.py
@@ -2,10 +2,10 @@
Unit tests for async_delete functionality.
These tests verify that the async_delete class works correctly with dojo_dispatch_task,
-which injects async_user and _pgh_context kwargs into task calls.
+which injects user context and _pgh_context kwargs into task calls.
The original bug was that @app.task decorated instance methods didn't properly handle
-the injected kwargs, causing TypeError: unexpected keyword argument 'async_user'.
+the injected kwargs, causing TypeError for unexpected keyword arguments.
"""
import logging
@@ -120,8 +120,8 @@ def test_async_delete_simple_object(self):
# Use impersonate to set current user context (required for block_execution to work)
with impersonate(self.testuser):
- # This would raise TypeError before the fix:
- # TypeError: delete() got an unexpected keyword argument 'async_user'
+ # This would raise TypeError before the fix when injected kwargs
+ # were not handled properly by task functions
async_del = async_delete()
async_del.delete(finding)
diff --git a/unittests/test_bulk_edit_validation.py b/unittests/test_bulk_edit_validation.py
index 84ce98e396f..63c84568b02 100644
--- a/unittests/test_bulk_edit_validation.py
+++ b/unittests/test_bulk_edit_validation.py
@@ -409,8 +409,8 @@ def test_bulk_edit_duplicate_finding_severity_update_works(self):
# View-Level Validation Tests (Active + Risk Acceptance)
- def test_bulk_edit_active_finding_cannot_accept_risk(self):
- """Test that active findings cannot accept risk via bulk edit"""
+ def test_bulk_edit_active_finding_can_accept_risk(self):
+ """Test that active findings can accept risk via bulk edit (matching individual behavior)"""
# Enable simple risk acceptance on product
self.product.enable_simple_risk_acceptance = True
self.product.save()
@@ -427,22 +427,26 @@ def test_bulk_edit_active_finding_cannot_accept_risk(self):
follow=True,
)
- # Verify finding is NOT risk accepted
+ # Verify finding IS risk accepted and becomes inactive
self.active_finding.refresh_from_db()
- self.assertFalse(
+ self.assertTrue(
self.active_finding.risk_accepted,
- "Active finding should not be risk accepted",
+ "Active finding should be risk accepted",
+ )
+ self.assertFalse(
+ self.active_finding.active,
+ "Risk accepted finding should become inactive",
)
- # Verify warning message
+ # Verify no warning message about active findings
messages = self._get_messages_text(response)
warning_messages = [
m for m in messages if "active findings" in m.lower() and "risk" in m.lower()
]
- self.assertGreater(
+ self.assertEqual(
len(warning_messages),
0,
- f"Expected warning about active findings and risk acceptance, got: {messages}",
+ f"Unexpected warning about active findings: {warning_messages}",
)
def test_bulk_edit_inactive_finding_can_accept_risk(self):
@@ -553,12 +557,15 @@ def test_bulk_edit_shows_success_message_with_actual_count(self):
self._assert_finding_status(normal2, active=True)
def test_bulk_edit_shows_multiple_warning_messages(self):
- """Test that multiple warning messages appear for different conflicts"""
+ """
+ Test that warning messages appear for conflicts (duplicate status)
+ and that active findings can now be risk accepted successfully
+ """
# Enable simple risk acceptance
self.product.enable_simple_risk_acceptance = True
self.product.save()
- # First, try to set duplicate finding as active (will be skipped)
+ # First, try to set duplicate finding as active (will be skipped with warning)
post_data1 = self._bulk_edit_post_data(
[self.duplicate_finding.id],
active=True, # Will conflict with duplicate
@@ -569,11 +576,11 @@ def test_bulk_edit_shows_multiple_warning_messages(self):
follow=True,
)
- # Then, try to risk accept active finding (will be skipped)
+ # Then, risk accept active finding (should succeed - no longer a conflict)
post_data2 = self._bulk_edit_post_data(
[self.active_finding.id],
risk_acceptance=True,
- risk_accept=True, # Will conflict with active
+ risk_accept=True, # Should work now!
)
response2 = self.client.post(
reverse("finding_bulk_update_all"),
@@ -586,25 +593,34 @@ def test_bulk_edit_shows_multiple_warning_messages(self):
messages2 = self._get_messages_text(response2)
all_messages = messages1 + messages2
+ # Verify duplicate warning appears
duplicate_warnings = [
m for m in all_messages if "duplicate findings" in m.lower()
]
+ self.assertGreater(
+ len(duplicate_warnings),
+ 0,
+ f"Expected duplicate warning, got: {all_messages}",
+ )
+
+ # Verify NO warning about active findings and risk acceptance
active_warnings = [
m
for m
in all_messages
if "active findings" in m.lower() and "risk" in m.lower()
]
-
- self.assertGreater(
- len(duplicate_warnings),
- 0,
- f"Expected duplicate warning, got: {all_messages}",
- )
- self.assertGreater(
+ self.assertEqual(
len(active_warnings),
0,
- f"Expected active risk acceptance warning, got: {all_messages}",
+ f"Unexpected active risk acceptance warning: {active_warnings}",
+ )
+
+ # Verify active finding was successfully risk accepted
+ self.active_finding.refresh_from_db()
+ self.assertTrue(
+ self.active_finding.risk_accepted,
+ "Active finding should be risk accepted successfully",
)
def test_bulk_edit_no_warning_when_no_conflicts(self):
diff --git a/unittests/test_middleware_login_required.py b/unittests/test_middleware_login_required.py
new file mode 100644
index 00000000000..8f25e7dd3ae
--- /dev/null
+++ b/unittests/test_middleware_login_required.py
@@ -0,0 +1,65 @@
+from types import SimpleNamespace
+from unittest.mock import Mock, patch
+
+from django.contrib.auth.models import AnonymousUser
+from django.http import HttpResponse
+from django.test import RequestFactory
+from rest_framework.authentication import TokenAuthentication
+from rest_framework.authtoken.models import Token
+from rest_framework.permissions import IsAuthenticated
+from rest_framework.response import Response
+from rest_framework.views import APIView
+
+from dojo.middleware import LoginRequiredMiddleware
+from dojo.models import User
+
+from .dojo_test_case import DojoTestCase, versioned_fixtures
+
+
+class TokenAuthenticatedView(APIView):
+ authentication_classes = (TokenAuthentication,)
+ permission_classes = (IsAuthenticated,)
+
+ def get(self, request):
+ return Response({"username": request.user.username})
+
+
+@versioned_fixtures
+class TestLoginRequiredMiddlewareDdUser(DojoTestCase):
+ fixtures = ["dojo_testdata.json"]
+
+ def setUp(self):
+ super().setUp()
+ self.factory = RequestFactory()
+ self.admin = User.objects.get(username="admin")
+
+ def test_sets_dd_user_for_session_authenticated_request(self):
+ request = self.factory.get("/dashboard")
+ request.user = self.admin
+
+ middleware = LoginRequiredMiddleware(lambda _request: HttpResponse("OK"))
+ fake_uwsgi = SimpleNamespace(set_logvar=Mock())
+
+ with patch.dict("sys.modules", {"uwsgi": fake_uwsgi}):
+ response = middleware(request)
+
+ self.assertEqual(200, response.status_code)
+ fake_uwsgi.set_logvar.assert_called_once_with("dd_user", str(self.admin))
+
+ def test_sets_dd_user_for_drf_token_authenticated_request(self):
+ token, _ = Token.objects.get_or_create(user=self.admin)
+
+ request = self.factory.get(
+ "/api/v2/mock-endpoint/",
+ HTTP_AUTHORIZATION=f"Token {token.key}",
+ )
+ request.user = AnonymousUser()
+
+ middleware = LoginRequiredMiddleware(TokenAuthenticatedView.as_view())
+ fake_uwsgi = SimpleNamespace(set_logvar=Mock())
+
+ with patch.dict("sys.modules", {"uwsgi": fake_uwsgi}):
+ response = middleware(request)
+
+ self.assertEqual(200, response.status_code)
+ fake_uwsgi.set_logvar.assert_called_once_with("dd_user", str(self.admin))
diff --git a/unittests/test_rest_framework.py b/unittests/test_rest_framework.py
index 3354dce111a..5666f4de1d8 100644
--- a/unittests/test_rest_framework.py
+++ b/unittests/test_rest_framework.py
@@ -856,6 +856,63 @@ def test_detail_configuration_not_authorized(self):
response = self.client.get(relative_url)
self.assertEqual(200, response.status_code, response.content[:1000])
+ class RelatedObjectsTest(BaseClassTest):
+ def test_notes_can_be_added_by_users_with_read_only_permissions(self):
+ self.setUp_global_reader()
+ response = self.client.get(self.url, format="json")
+ self.assertEqual(200, response.status_code, response.content[:1000])
+ engagement_id = response.data["results"][0]["id"]
+ # Attempt to add a note with reader permissions
+ relative_url = f"{self.url}{engagement_id}/notes/"
+ response = self.client.post(relative_url, {"entry": "string"})
+ self.assertEqual(201, response.status_code, response.content[:1000])
+
+ @parameterized.expand(
+ [
+ ("files", {"title": "test"}),
+ ("tags", {"tags": ["apple", "banana", "cherry"]}),
+ ],
+ )
+ def test_related_objects(self, related_object_path, payload):
+ """
+ Tests that BaseRelatedObjectPermission enforces the permissions not associated
+ with the base object. For example, even though a request to add a tag to an
+ engagement is a POST, we do not need engagement add permissions, but rather
+ engagement edit permissions since that is what is defined in the
+ UserHasEngagementRelatedObjectPermission class
+ """
+ self.setUp_global_reader()
+ # Skip tags for engagement and tests
+ if related_object_path == "tags" and self.endpoint_model in {Engagement, Test}:
+ return
+ # Get an object
+ response = self.client.get(self.url, format="json")
+ self.assertEqual(200, response.status_code, response.content[:1000])
+ object_id = response.data["results"][0]["id"]
+ # Attempt to add a related object
+ relative_url = f"{self.url}{object_id}/{related_object_path}/"
+ response = self.client.post(relative_url, payload)
+ self.assertEqual(403, response.status_code, response.content[:1000])
+ # Now switch to a user with edit permissions (but not create)
+ self.setUp_global_writer()
+ # Retry adding the related object
+ if related_object_path == "files":
+ # Convert bytes to a mock uploaded file
+ response = self.client.post(
+ relative_url,
+ {
+ "file": SimpleUploadedFile(
+ name="test_file.txt",
+ content=b"empty",
+ content_type="text/plain",
+ ),
+ **payload,
+ },
+ )
+ else:
+ response = self.client.post(relative_url, payload)
+ self.assertIn(response.status_code, [200, 201], response.content[:1000])
+
@versioned_fixtures
class AppAnalysisTest(BaseClass.BaseClassTest):
@@ -1487,7 +1544,7 @@ def test_update_object_not_authorized(self):
@versioned_fixtures
-class EngagementTest(BaseClass.BaseClassTest):
+class EngagementTest(BaseClass.RelatedObjectsTest, BaseClass.BaseClassTest):
fixtures = ["dojo_testdata.json"]
def __init__(self, *args, **kwargs):
@@ -1517,42 +1574,6 @@ def __init__(self, *args, **kwargs):
self.deleted_objects = 23
BaseClass.RESTEndpointTest.__init__(self, *args, **kwargs)
- @parameterized.expand(
- [
- ("files", {"title": "test", "file": b"empty"}),
- ("notes", {"entry": "string"}),
- ],
- )
- def test_related_objects(self, related_object_path, payload):
- """
- Tests that BaseRelatedObjectPermission enforces the permissions not associated
- with the base object. For example, even though a request to add a note to an
- engagement is a POST, we do not need engagement add permissions, but rather
- engagement edit permissions since that is what is defined in the
- UserHasEngagementRelatedObjectPermission class
- """
- self.setUp_global_reader()
- # Get an engagement
- response = self.client.get(self.url, format="json")
- self.assertEqual(200, response.status_code, response.content[:1000])
- engagement_id = response.data["results"][0]["id"]
- # Attempt to add a related object
- relative_url = f"{self.url}{engagement_id}/{related_object_path}/"
- response = self.client.post(relative_url, payload)
- self.assertEqual(403, response.status_code, response.content[:1000])
- # Now switch to a user with edit permissions (but not create)
- self.setUp_global_writer()
- # Retry adding the related object
- if related_object_path == "files":
- # Convert bytes to a mock uploaded file
- payload["file"] = SimpleUploadedFile(
- name="test_file.txt",
- content=payload["file"], # the b"empty"
- content_type="text/plain",
- )
- response = self.client.post(relative_url, payload)
- self.assertEqual(201, response.status_code, response.content[:1000])
-
@versioned_fixtures
class RiskAcceptanceTest(BaseClass.BaseClassTest):
@@ -1726,7 +1747,7 @@ def test_file_with_quoted_name(self):
@versioned_fixtures
-class FindingsTest(BaseClass.BaseClassTest):
+class FindingsTest(BaseClass.RelatedObjectsTest, BaseClass.BaseClassTest):
fixtures = ["dojo_testdata.json"]
def __init__(self, *args, **kwargs):
@@ -1767,7 +1788,8 @@ def __init__(self, *args, **kwargs):
"files": [],
"tags": ["tag1", "tag_2"],
}
- self.update_fields = {"duplicate": False, "active": True, "push_to_jira": "True", "tags": ["finding_tag_new"]}
+ # Do not push to jira here as it will make the request fail due to jira not being configured
+ self.update_fields = {"duplicate": False, "active": True, "tags": ["finding_tag_new"]}
self.test_type = TestType.OBJECT_PERMISSIONS
self.permission_check_class = Finding
self.permission_create = Permissions.Finding_Add
@@ -2415,7 +2437,7 @@ def test_severity_validation(self):
@versioned_fixtures
-class TestsTest(BaseClass.BaseClassTest):
+class TestsTest(BaseClass.RelatedObjectsTest, BaseClass.BaseClassTest):
fixtures = ["dojo_testdata.json"]
def __init__(self, *args, **kwargs):