diff --git a/dojo/celery.py b/dojo/celery.py index 336fd420aca..83d2e5d1e08 100644 --- a/dojo/celery.py +++ b/dojo/celery.py @@ -28,23 +28,26 @@ def __call__(self, *args, **kwargs): """ Restore user context in the celery worker via crum.impersonate. - The apply_async method injects ``async_user`` into kwargs when a task - is dispatched. Here we pop it and set it as the current user in - thread-local storage so that all downstream code — including nested - dojo_dispatch_task calls — sees the correct user via - get_current_user(). + The apply_async method injects ``async_user_id`` into kwargs when a task + is dispatched. Here we pop it, resolve to a user instance, and set it + as the current user in thread-local storage so that all downstream + code — including nested dojo_dispatch_task calls — sees the correct + user via get_current_user(). - When a task is called directly (not via apply_async), async_user is + When a task is called directly (not via apply_async), async_user_id is not in kwargs. In that case we leave the existing crum context intact so that callers who already set a user (e.g. via crum.impersonate in tests or request middleware) are not disrupted. """ - if "async_user" not in kwargs: + if "async_user_id" not in kwargs: return super().__call__(*args, **kwargs) import crum # noqa: PLC0415 - user = kwargs.pop("async_user") + from dojo.models import Dojo_User # noqa: PLC0415 circular import + + user_id = kwargs.pop("async_user_id") + user = Dojo_User.objects.filter(pk=user_id).first() if user_id else None with crum.impersonate(user): return super().__call__(*args, **kwargs) @@ -59,8 +62,9 @@ def apply_async(self, args=None, kwargs=None, **options): # Inject user context for Dojo tasks only. Celery built-in tasks (e.g. # celery.backend_cleanup) do not accept custom kwargs. task_name = self.name or "" - if not task_name.startswith("celery.") and "async_user" not in kwargs: - kwargs["async_user"] = get_current_user() + if not task_name.startswith("celery.") and "async_user_id" not in kwargs: + user = get_current_user() + kwargs["async_user_id"] = user.id if user else None # Control flag used for sync/async decision; never pass into the task itself kwargs.pop("sync", None) @@ -135,8 +139,6 @@ def __call__(self, *args, **kwargs): app = Celery("dojo", task_cls=PgHistoryTask) -# Using a string here means the worker will not have to -# pickle the object when using Windows. app.config_from_object("django.conf:settings", namespace="CELERY") app.autodiscover_tasks(lambda: settings.INSTALLED_APPS) diff --git a/dojo/celery_dispatch.py b/dojo/celery_dispatch.py index c835717efe2..fde94336ec4 100644 --- a/dojo/celery_dispatch.py +++ b/dojo/celery_dispatch.py @@ -18,10 +18,11 @@ def apply_async(self, args: Any | None = None, kwargs: Any | None = None, **opti def _inject_async_user(kwargs: Mapping[str, Any] | None) -> dict[str, Any]: result: dict[str, Any] = dict(kwargs or {}) - if "async_user" not in result: + if "async_user_id" not in result: from dojo.utils import get_current_user # noqa: PLC0415 circular import - result["async_user"] = get_current_user() + user = get_current_user() + result["async_user_id"] = user.id if user else None return result @@ -58,7 +59,7 @@ def dojo_dispatch_task(task_or_sig: _SupportsSi | _SupportsApplyAsync | Signatur """ Dispatch a task/signature using DefectDojo semantics. - - Inject `async_user` if missing. + - Inject `async_user_id` if missing. - Capture and inject pghistory context if available. - Respect `sync=True` (foreground execution) and user `block_execution`. - Support `countdown=` for async dispatch. diff --git a/dojo/decorators.py b/dojo/decorators.py index f3045402079..bc3b898a3e1 100644 --- a/dojo/decorators.py +++ b/dojo/decorators.py @@ -88,7 +88,7 @@ def __wrapper__(*args, **kwargs): from dojo.utils import get_current_user # noqa: PLC0415 circular import user = get_current_user() - kwargs["async_user"] = user + kwargs["async_user_id"] = user.id if user else None # Capture pghistory context to pass to Celery worker # The PgHistoryTask base class will apply this context in the worker diff --git a/dojo/forms.py b/dojo/forms.py index b6ca072df6d..26cb003d0a5 100644 --- a/dojo/forms.py +++ b/dojo/forms.py @@ -1,5 +1,5 @@ +import json import logging -import pickle import re import warnings from datetime import date, datetime @@ -3477,7 +3477,7 @@ def __init__(self, attrs=None): def decompress(self, value): if value: - return pickle.loads(value) + return json.loads(value) return [None, None, None, None, None, None] def format_output(self, rendered_widgets): @@ -3497,7 +3497,7 @@ def __init__(self, *args, **kwargs): super().__init__(list_fields, *args, **kwargs) def compress(self, values): - return pickle.dumps(values) + return json.dumps(values) class CreateChoiceQuestionForm(forms.Form): diff --git a/dojo/models.py b/dojo/models.py index 93a0fd64ed6..153177bc20e 100644 --- a/dojo/models.py +++ b/dojo/models.py @@ -1093,7 +1093,16 @@ def save(self, *args, **kwargs): from dojo.sla_config.helpers import async_update_sla_expiration_dates_sla_config_sync # noqa: I001, PLC0415 circular import from dojo.celery_dispatch import dojo_dispatch_task # noqa: PLC0415 circular import - dojo_dispatch_task(async_update_sla_expiration_dates_sla_config_sync, self, products, severities=severities) + dojo_dispatch_task( + async_update_sla_expiration_dates_sla_config_sync, + self.id, + list(products.values_list("id", flat=True)), + severities=severities, + ) + # The async task refetches and resets async_updating on its own copy. + # Mirror that on this in-memory instance so a subsequent save() on the + # same instance does not trigger the lock-revert path at line 1058. + self.async_updating = False def clean(self): sla_days = [self.critical, self.high, self.medium, self.low] @@ -1255,7 +1264,17 @@ def save(self, *args, **kwargs): from dojo.sla_config.helpers import async_update_sla_expiration_dates_sla_config_sync # noqa: I001, PLC0415 circular import from dojo.celery_dispatch import dojo_dispatch_task # noqa: PLC0415 circular import - dojo_dispatch_task(async_update_sla_expiration_dates_sla_config_sync, sla_config, Product.objects.filter(id=self.id)) + dojo_dispatch_task( + async_update_sla_expiration_dates_sla_config_sync, + sla_config.id, + [self.id], + ) + # The async task refetches and resets async_updating on its own copies. + # Mirror that on this in-memory product and the in-memory sla_config so a + # subsequent save() on either does not trigger their lock-revert paths. + self.async_updating = False + if sla_config: + sla_config.async_updating = False def get_absolute_url(self): return reverse("view_product", args=[str(self.id)]) diff --git a/dojo/notifications/helper.py b/dojo/notifications/helper.py index a8c3db6f611..1de664c4a6d 100644 --- a/dojo/notifications/helper.py +++ b/dojo/notifications/helper.py @@ -18,7 +18,6 @@ from dojo import __version__ as dd_version from dojo.authorization.roles_permissions import Permissions -from dojo.celery_dispatch import dojo_dispatch_task from dojo.decorators import we_want_async from dojo.labels import get_labels from dojo.models import ( @@ -834,19 +833,22 @@ def _process_notifications( # Some errors should not be pushed to all channels, only to alerts. # For example reasons why JIRA Issues: https://github.com/DefectDojo/django-DefectDojo/issues/11575 + # Per-channel sends run synchronously inside the surrounding async_create_notification + # task body. Dispatching inner Celery tasks would require JSON-serializable kwargs, but + # callers pass model instances (finding/test/engagement/product/...) and refetching every + # one of them per channel would multiply DB queries; running synchronously avoids both. if not alert_only: + user_id = getattr(notifications.user, "id", None) if self.system_settings.enable_slack_notifications and "slack" in getattr( notifications, event, notifications.other, ): logger.debug("Sending Slack Notification") - dojo_dispatch_task( - send_slack_notification, - event, - user_id=getattr(notifications.user, "id", None), - **kwargs, - ) + try: + send_slack_notification.run(event, user_id=user_id, **kwargs) + except Exception: + logger.exception("Failed to send Slack notification for event %s", event) if self.system_settings.enable_msteams_notifications and "msteams" in getattr( notifications, @@ -854,12 +856,10 @@ def _process_notifications( notifications.other, ): logger.debug("Sending MSTeams Notification") - dojo_dispatch_task( - send_msteams_notification, - event, - user_id=getattr(notifications.user, "id", None), - **kwargs, - ) + try: + send_msteams_notification.run(event, user_id=user_id, **kwargs) + except Exception: + logger.exception("Failed to send MSTeams notification for event %s", event) if self.system_settings.enable_mail_notifications and "mail" in getattr( notifications, @@ -867,12 +867,10 @@ def _process_notifications( notifications.other, ): logger.debug("Sending Mail Notification") - dojo_dispatch_task( - send_mail_notification, - event, - user_id=getattr(notifications.user, "id", None), - **kwargs, - ) + try: + send_mail_notification.run(event, user_id=user_id, **kwargs) + except Exception: + logger.exception("Failed to send Mail notification for event %s", event) if self.system_settings.enable_webhooks_notifications and "webhooks" in getattr( notifications, @@ -880,12 +878,10 @@ def _process_notifications( notifications.other, ): logger.debug("Sending Webhooks Notification") - dojo_dispatch_task( - send_webhooks_notification, - event, - user_id=getattr(notifications.user, "id", None), - **kwargs, - ) + try: + send_webhooks_notification.run(event, user_id=user_id, **kwargs) + except Exception: + logger.exception("Failed to send Webhooks notification for event %s", event) def process_tag_notifications(request, note, parent_url, parent_title): diff --git a/dojo/notifications/tasks.py b/dojo/notifications/tasks.py index afdb1d1d92d..aaf11cf6915 100644 --- a/dojo/notifications/tasks.py +++ b/dojo/notifications/tasks.py @@ -38,7 +38,9 @@ def log_generic_alert(source, title, description): @app.task(bind=True) -def add_alerts(self, runinterval, *args, **kwargs): +def add_alerts(self, *args, **kwargs): + # Run interval matches the beat schedule for this task (see CELERY_BEAT_SCHEDULE). + runinterval = timedelta(hours=1) now = timezone.now() upcoming_engagements = Engagement.objects.filter(target_start__gt=now + timedelta(days=3), target_start__lt=now + timedelta(days=3) + runinterval).order_by("target_start") diff --git a/dojo/settings/settings.dist.py b/dojo/settings/settings.dist.py index dde75921aff..f66548b0ec6 100644 --- a/dojo/settings/settings.dist.py +++ b/dojo/settings/settings.dist.py @@ -107,7 +107,7 @@ DD_CELERY_RESULT_BACKEND=(str, "django-db"), DD_CELERY_RESULT_EXPIRES=(int, 86400), DD_CELERY_BEAT_SCHEDULE_FILENAME=(str, root("dojo.celery.beat.db")), - DD_CELERY_TASK_SERIALIZER=(str, "pickle"), + DD_CELERY_TASK_SERIALIZER=(str, "json"), DD_CELERY_LOG_LEVEL=(str, "INFO"), # Hard ceiling on task runtime. When reached, the worker process is sent SIGKILL — no cleanup # code runs. Always set higher than DD_CELERY_TASK_SOFT_TIME_LIMIT. (0 = disabled, no limit) @@ -847,8 +847,9 @@ def generate_url(scheme, double_slashes, user, password, host, port, path, param CELERY_TIMEZONE = TIME_ZONE CELERY_RESULT_EXPIRES = env("DD_CELERY_RESULT_EXPIRES") CELERY_BEAT_SCHEDULE_FILENAME = env("DD_CELERY_BEAT_SCHEDULE_FILENAME") -CELERY_ACCEPT_CONTENT = ["pickle", "json", "msgpack", "yaml"] +CELERY_ACCEPT_CONTENT = ["json"] CELERY_TASK_SERIALIZER = env("DD_CELERY_TASK_SERIALIZER") +CELERY_RESULT_SERIALIZER = "json" CELERY_LOG_LEVEL = env("DD_CELERY_LOG_LEVEL") if env("DD_CELERY_TASK_TIME_LIMIT") > 0: @@ -872,7 +873,6 @@ def generate_url(scheme, double_slashes, user, password, host, port, path, param "add-alerts": { "task": "dojo.notifications.tasks.add_alerts", "schedule": timedelta(hours=1), - "args": [timedelta(hours=1)], "options": { "expires": int(60 * 60 * 1 * 1.2), # If a task is not executed within 72 minutes, it should be dropped from the queue. Two more tasks should be scheduled in the meantime. }, diff --git a/dojo/sla_config/helpers.py b/dojo/sla_config/helpers.py index 045456f38d7..6cabe47c29d 100644 --- a/dojo/sla_config/helpers.py +++ b/dojo/sla_config/helpers.py @@ -8,7 +8,12 @@ @app.task -def async_update_sla_expiration_dates_sla_config_sync(sla_config: SLA_Configuration, products: list[Product], *args, severities: list[str] | None = None, **kwargs): +def async_update_sla_expiration_dates_sla_config_sync(sla_config_id: int, product_ids: list[int], *args, severities: list[str] | None = None, **kwargs): + sla_config = SLA_Configuration.objects.filter(pk=sla_config_id).first() + if sla_config is None: + logger.info("SLA_Configuration with id %s no longer exists, skipping update", sla_config_id) + return + products = Product.objects.filter(pk__in=product_ids) if method := get_custom_method("FINDING_SLA_EXPIRATION_CALCULATION_METHOD"): method(sla_config, products, severities=severities) else: diff --git a/dojo/survey/views.py b/dojo/survey/views.py index 60c8cd58a15..d17e25e6d2c 100644 --- a/dojo/survey/views.py +++ b/dojo/survey/views.py @@ -1,4 +1,4 @@ -import pickle +import json from datetime import date, timedelta from django.contrib import messages @@ -490,7 +490,7 @@ def create_question(request): order=form.cleaned_data["order"], text=form.cleaned_data["text"], multichoice=choiceQuestionFrom.cleaned_data["multichoice"]) - choices_to_process = pickle.loads(choiceQuestionFrom.cleaned_data["answer_choices"]) + choices_to_process = json.loads(choiceQuestionFrom.cleaned_data["answer_choices"]) for c in choices_to_process: if c is not None and len(c) > 0: diff --git a/dojo/utils.py b/dojo/utils.py index 7c1d80bb0f6..a6e858a5447 100644 --- a/dojo/utils.py +++ b/dojo/utils.py @@ -1737,10 +1737,13 @@ def _get_object_name(obj): @app.task -def async_delete_task(obj, **kwargs): +def async_delete_task(model_label, pk, **kwargs): """ Delete an object and all its related objects using the SQL cascade walker. + Takes ``(model_label, pk)`` (e.g. ``("dojo.product", 42)``) so the task + arguments are JSON-serializable. The instance is refetched in the worker. + Handles Python-level concerns (duplicates, integrators, M2M, file cleanup, product grading) explicitly, then uses cascade_delete_related_objects() for efficient bottom-up SQL deletion of all FK-related tables. The top-level @@ -1749,12 +1752,20 @@ def async_delete_task(obj, **kwargs): Accepts **kwargs for _pgh_context injected by dojo_dispatch_task. Uses PgHistoryTask base class (default) to preserve pghistory context for audit trail. """ + from django.apps import apps # noqa: PLC0415 + from dojo.finding.helper import ( # noqa: PLC0415 circular import bulk_delete_findings, prepare_duplicates_for_delete, ) from dojo.utils_cascade_delete import cascade_delete_related_objects # noqa: PLC0415 circular import + Model = apps.get_model(model_label) + obj = Model.objects.filter(pk=pk).first() + if obj is None: + logger.info("ASYNC_DELETE: %s pk=%s already gone, nothing to do", model_label, pk) + return + logger.debug("ASYNC_DELETE: Deleting %s: %s", _get_object_name(obj), obj) if not isinstance(obj, ASYNC_DELETE_SUPPORTED_TYPES): logger.debug("ASYNC_DELETE: %s async delete not supported. Deleting normally: %s", _get_object_name(obj), obj) @@ -1839,7 +1850,7 @@ def delete(self, obj, **kwargs): """ from dojo.celery_dispatch import dojo_dispatch_task # noqa: PLC0415 circular import - dojo_dispatch_task(async_delete_task, obj, **kwargs) + dojo_dispatch_task(async_delete_task, obj._meta.label_lower, obj.pk, **kwargs) @staticmethod def get_object_name(obj): diff --git a/unittests/test_importers_performance.py b/unittests/test_importers_performance.py index 210599106ae..c72a9b3c890 100644 --- a/unittests/test_importers_performance.py +++ b/unittests/test_importers_performance.py @@ -367,11 +367,11 @@ def test_import_reimport_reimport_performance_pghistory_no_async(self): testuser.usercontactinfo.save() self._import_reimport_performance( - expected_num_queries1=185, + expected_num_queries1=188, expected_num_async_tasks1=2, - expected_num_queries2=132, + expected_num_queries2=133, expected_num_async_tasks2=1, - expected_num_queries3=36, + expected_num_queries3=37, expected_num_async_tasks3=1, expected_num_queries4=100, expected_num_async_tasks4=0, @@ -392,13 +392,13 @@ def test_import_reimport_reimport_performance_pghistory_no_async_with_product_gr self.system_settings(enable_product_grade=True) self._import_reimport_performance( - expected_num_queries1=192, + expected_num_queries1=198, expected_num_async_tasks1=4, - expected_num_queries2=139, + expected_num_queries2=143, expected_num_async_tasks2=3, - expected_num_queries3=40, + expected_num_queries3=44, expected_num_async_tasks3=3, - expected_num_queries4=107, + expected_num_queries4=109, expected_num_async_tasks4=2, ) @@ -545,9 +545,9 @@ def test_deduplication_performance_pghistory_no_async(self): testuser.usercontactinfo.save() self._deduplication_performance( - expected_num_queries1=123, + expected_num_queries1=126, expected_num_async_tasks1=2, - expected_num_queries2=104, + expected_num_queries2=107, expected_num_async_tasks2=2, ) @@ -657,11 +657,11 @@ def test_import_reimport_reimport_performance_pghistory_no_async(self): testuser.usercontactinfo.save() self._import_reimport_performance( - expected_num_queries1=194, + expected_num_queries1=197, expected_num_async_tasks1=2, - expected_num_queries2=143, + expected_num_queries2=144, expected_num_async_tasks2=1, - expected_num_queries3=46, + expected_num_queries3=47, expected_num_async_tasks3=1, expected_num_queries4=101, expected_num_async_tasks4=0, @@ -682,13 +682,13 @@ def test_import_reimport_reimport_performance_pghistory_no_async_with_product_gr self.system_settings(enable_product_grade=True) self._import_reimport_performance( - expected_num_queries1=204, + expected_num_queries1=210, expected_num_async_tasks1=4, - expected_num_queries2=153, + expected_num_queries2=157, expected_num_async_tasks2=3, - expected_num_queries3=50, + expected_num_queries3=54, expected_num_async_tasks3=3, - expected_num_queries4=111, + expected_num_queries4=113, expected_num_async_tasks4=2, ) @@ -809,8 +809,8 @@ def test_deduplication_performance_pghistory_no_async(self): testuser.usercontactinfo.save() self._deduplication_performance( - expected_num_queries1=132, + expected_num_queries1=135, expected_num_async_tasks1=2, - expected_num_queries2=215, + expected_num_queries2=218, expected_num_async_tasks2=2, ) diff --git a/unittests/test_no_pickle.py b/unittests/test_no_pickle.py new file mode 100644 index 00000000000..f9da16df86b --- /dev/null +++ b/unittests/test_no_pickle.py @@ -0,0 +1,35 @@ +""" +Guard tests preventing the reintroduction of pickle into the dojo app. + +Pickle deserialization of attacker-controllable bytes is arbitrary code +execution. We removed all uses (form widgets, Celery serializer) and these +tests fail loudly if a future change adds them back. +""" + +import re +from pathlib import Path + +from django.conf import settings + +import dojo +from unittests.dojo_test_case import DojoTestCase + + +class TestNoPickle(DojoTestCase): + def test_no_pickle_imports_in_dojo(self): + dojo_root = Path(dojo.__file__).resolve().parent + offenders = [] + import_re = re.compile(r"^\s*(?:import\s+pickle|from\s+pickle\s+import)\b", re.MULTILINE) + for path in dojo_root.rglob("*.py"): + text = path.read_text(encoding="utf-8") + if import_re.search(text): + offenders.append(str(path.relative_to(dojo_root.parent))) + self.assertFalse( + offenders, + f"pickle is forbidden in dojo source. Offenders: {offenders}", + ) + + def test_celery_serializer_is_json_only(self): + self.assertEqual(settings.CELERY_TASK_SERIALIZER, "json") + self.assertEqual(settings.CELERY_ACCEPT_CONTENT, ["json"]) + self.assertEqual(settings.CELERY_RESULT_SERIALIZER, "json") diff --git a/unittests/test_survey_forms.py b/unittests/test_survey_forms.py new file mode 100644 index 00000000000..9526c44424a --- /dev/null +++ b/unittests/test_survey_forms.py @@ -0,0 +1,26 @@ +import json + +from dojo.forms import MultiExampleField, MultiWidgetBasic +from unittests.dojo_test_case import DojoTestCase + + +class TestSurveyChoiceWidget(DojoTestCase): + def test_compress_returns_json_string(self): + field = MultiExampleField(required=False) + values = ["a", "b", "c", None, None, None] + compressed = field.compress(values) + + self.assertIsInstance(compressed, str) + self.assertEqual(json.loads(compressed), values) + + def test_decompress_round_trips(self): + widget = MultiWidgetBasic() + values = ["red", "green", "blue", "yellow", None, None] + compressed = json.dumps(values) + + self.assertEqual(widget.decompress(compressed), values) + + def test_decompress_empty_returns_blank_list(self): + widget = MultiWidgetBasic() + self.assertEqual(widget.decompress(None), [None, None, None, None, None, None]) + self.assertEqual(widget.decompress(""), [None, None, None, None, None, None])