diff --git a/dojo/db_migrations/0201_populate_finding_sla_expiration_date.py b/dojo/db_migrations/0201_populate_finding_sla_expiration_date.py new file mode 100644 index 00000000000..4b886301de7 --- /dev/null +++ b/dojo/db_migrations/0201_populate_finding_sla_expiration_date.py @@ -0,0 +1,133 @@ +from django.db import migrations +from django.utils import timezone +from datetime import datetime +from django.conf import settings +from dateutil.relativedelta import relativedelta +import logging + +from dojo.utils import get_work_days + +logger = logging.getLogger(__name__) + + +def calculate_sla_expiration_dates(apps, schema_editor): + System_Settings = apps.get_model('dojo', 'System_Settings') + + ss, _ = System_Settings.objects.get_or_create() + if not ss.enable_finding_sla: + return + + logger.info('Calculating SLA expiration dates for all findings') + + SLA_Configuration = apps.get_model('dojo', 'SLA_Configuration') + Finding = apps.get_model('dojo', 'Finding') + + findings = Finding.objects.filter(sla_expiration_date__isnull=True).order_by('id').only('id', 'sla_start_date', 'date', 'severity', 'test', 'mitigated') + + page_size = 1000 + total_count = Finding.objects.filter(id__gt=0).count() + logger.info('Found %d findings to be updated', total_count) + + i = 0 + batch = [] + last_id = 0 + total_pages = (total_count // page_size) + 2 + for p in range(1, total_pages): + page = findings.filter(id__gt=last_id)[:page_size] + for find in page: + i += 1 + last_id = find.id + + start_date = find.sla_start_date if find.sla_start_date else find.date + + sla_config = SLA_Configuration.objects.filter(id=find.test.engagement.product.sla_configuration_id).first() + sla_period = getattr(sla_config, find.severity.lower(), None) + + days = None + if settings.SLA_BUSINESS_DAYS: + if find.mitigated: + days = get_work_days(find.date, find.mitigated.date()) + else: + days = get_work_days(find.date, timezone.now().date()) + else: + if isinstance(start_date, datetime): + start_date = start_date.date() + + if find.mitigated: + days = (find.mitigated.date() - start_date).days + else: + days = (timezone.now().date() - start_date).days + + days = days if days > 0 else 0 + + days_remaining = None + if sla_period: + days_remaining = sla_period - days + + if days_remaining: + if find.mitigated: + find.sla_expiration_date = find.mitigated.date() + relativedelta(days=days_remaining) + else: + find.sla_expiration_date = timezone.now().date() + relativedelta(days=days_remaining) + + batch.append(find) + + if (i > 0 and i % page_size == 0): + Finding.objects.bulk_update(batch, ['sla_expiration_date']) + batch = [] + logger.info('%s out of %s findings processed...', i, total_count) + + Finding.objects.bulk_update(batch, ['sla_expiration_date']) + batch = [] + logger.info('%s out of %s findings processed...', i, total_count) + + +def reset_sla_expiration_dates(apps, schema_editor): + System_Settings = apps.get_model('dojo', 'System_Settings') + + ss, _ = System_Settings.objects.get_or_create() + if not ss.enable_finding_sla: + return + + logger.info('Resetting SLA expiration dates for all findings') + + Finding = apps.get_model('dojo', 'Finding') + + findings = Finding.objects.filter(sla_expiration_date__isnull=False).order_by('id').only('id') + + page_size = 1000 + total_count = Finding.objects.filter(id__gt=0).count() + logger.info('Found %d findings to be reset', total_count) + + i = 0 + batch = [] + last_id = 0 + total_pages = (total_count // page_size) + 2 + for p in range(1, total_pages): + page = findings.filter(id__gt=last_id)[:page_size] + for find in page: + i += 1 + last_id = find.id + + find.sla_expiration_date = None + batch.append(find) + + if (i > 0 and i % page_size == 0): + Finding.objects.bulk_update(batch, ['sla_expiration_date']) + batch = [] + logger.info('%s out of %s findings processed...', i, total_count) + + Finding.objects.bulk_update(batch, ['sla_expiration_date']) + batch = [] + logger.info('%s out of %s findings processed...', i, total_count) + + +class Migration(migrations.Migration): + + dependencies = [ + ('dojo', '0200_finding_sla_expiration_date_product_async_updating_and_more'), + ] + + operations = [ + migrations.RunPython(calculate_sla_expiration_dates, reset_sla_expiration_dates), + ] diff --git a/dojo/filters.py b/dojo/filters.py index 51279d76a9a..723c52337f3 100644 --- a/dojo/filters.py +++ b/dojo/filters.py @@ -11,6 +11,7 @@ from django.conf import settings import six from django.utils.translation import gettext_lazy as _ +from django.utils import timezone from django_filters import FilterSet, CharFilter, OrderingFilter, \ ModelMultipleChoiceFilter, ModelChoiceFilter, MultipleChoiceFilter, \ BooleanFilter, NumberFilter, DateFilter @@ -148,16 +149,12 @@ def any(self, qs, name): return qs def sla_satisfied(self, qs, name): - for finding in qs: - if finding.violates_sla: - qs = qs.exclude(id=finding.id) - return qs + # return findings that have an sla expiration date after today or no sla expiration date + return qs.filter(Q(sla_expiration_date__isnull=True) | Q(sla_expiration_date__gt=timezone.now().date())) def sla_violated(self, qs, name): - for finding in qs: - if not finding.violates_sla: - qs = qs.exclude(id=finding.id) - return qs + # return active findings that have an sla expiration date before today + return qs.filter(Q(active=True) & Q(sla_expiration_date__lt=timezone.now().date())) options = { None: (_('Any'), any), @@ -184,13 +181,13 @@ def any(self, qs, name): def sla_satisifed(self, qs, name): for product in qs: - if product.violates_sla: + if product.violates_sla(): qs = qs.exclude(id=product.id) return qs def sla_violated(self, qs, name): for product in qs: - if not product.violates_sla: + if not product.violates_sla(): qs = qs.exclude(id=product.id) return qs diff --git a/dojo/models.py b/dojo/models.py index 7bda3997c0c..45d522963ee 100755 --- a/dojo/models.py +++ b/dojo/models.py @@ -1102,7 +1102,7 @@ def findings_active_verified_count(self): @cached_property def endpoint_host_count(self): # active_endpoints is (should be) prefetched - endpoints = self.active_endpoints + endpoints = getattr(self, 'active_endpoints', None) hosts = [] for e in endpoints: @@ -1116,7 +1116,10 @@ def endpoint_host_count(self): @cached_property def endpoint_count(self): # active_endpoints is (should be) prefetched - return len(self.active_endpoints) + endpoints = getattr(self, 'active_endpoints', None) + if endpoints: + return len(self.active_endpoints) + return None def open_findings(self, start_date=None, end_date=None): if start_date is None or end_date is None: @@ -1192,13 +1195,11 @@ def get_absolute_url(self): from django.urls import reverse return reverse('view_product', args=[str(self.id)]) - @property def violates_sla(self): - findings = Finding.objects.filter(test__engagement__product=self, active=True) - for f in findings: - if f.violates_sla: - return True - return False + findings = Finding.objects.filter(test__engagement__product=self, + active=True, + sla_expiration_date__lt=timezone.now().date()) + return findings.count() > 0 class Product_Member(models.Model): @@ -2887,20 +2888,19 @@ def set_sla_expiration_date(self): self.sla_expiration_date = get_current_date() + relativedelta(days=days_remaining) def sla_days_remaining(self): - sla_calculation = None - sla_period = self.get_sla_period() - if sla_period: - sla_calculation = sla_period - self.sla_age - return sla_calculation - - def sla_deadline(self): - days_remaining = self.sla_days_remaining() - if days_remaining: + if self.sla_expiration_date: if self.mitigated: - return self.mitigated.date() + relativedelta(days=days_remaining) - return get_current_date() + relativedelta(days=days_remaining) + mitigated_date = self.mitigated + if isinstance(mitigated_date, datetime): + mitigated_date = self.mitigated.date() + return (self.sla_expiration_date - mitigated_date).days + else: + return (self.sla_expiration_date - get_current_date()).days return None + def sla_deadline(self): + return self.sla_expiration_date + def github(self): try: return self.github_issue @@ -3294,8 +3294,7 @@ def inherit_tags(self, potentially_existing_tags): @property def violates_sla(self): - days_remaining = self.sla_days_remaining() - return days_remaining < 0 if days_remaining else False + return (self.sla_expiration_date and self.sla_expiration_date < timezone.now()) class FindingAdmin(admin.ModelAdmin):