Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 14 additions & 25 deletions contentcuration/contentcuration/viewsets/sync/endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@
bulk creates, updates, and deletes.
"""
from celery import states
from django.db.models import Exists
from django.db.models import OuterRef
from django.db.models import Q
from django_celery_results.models import TaskResult
from django_cte import CTEQuerySet
from django_cte import With
from rest_framework.authentication import SessionAuthentication
from rest_framework.permissions import IsAuthenticated
from rest_framework.response import Response
Expand Down Expand Up @@ -126,38 +126,27 @@ def return_changes(self, request, channel_revs):
return {"changes": changes, "errors": errors, "successes": successes}

def return_tasks(self, request, channel_revs):
custom_task_objects = CustomTaskMetadata.objects.filter(
channel_id__in=channel_revs.keys()
).annotate(
has_matching_task=Exists(
TaskResult.objects.filter(
task_id=OuterRef('task_id'),
status__in=[states.STARTED, states.FAILURE],
).exclude(
custom_task_cte = With(CustomTaskMetadata.objects.filter(channel_id__in=channel_revs.keys()))
task_result_querySet = CTEQuerySet(model=TaskResult)
query = custom_task_cte.join(task_result_querySet, task_id=custom_task_cte.col.task_id)\
.with_cte(custom_task_cte)\
.filter(status__in=[states.STARTED, states.FAILURE],)\
.exclude(
task_name__in=[apply_channel_changes_task.name, apply_user_changes_task.name]
)
)
).filter(
has_matching_task=True
).annotate(
progress=custom_task_cte.col.progress,
channel_id=custom_task_cte.col.channel_id,
)

response_payload = {
"tasks": [],
}

for custom_task in custom_task_objects:
task_data = {
"task_id": custom_task.task_id,
"task_name": custom_task.task_name,
"traceback": custom_task.task_result.traceback,
"progress": custom_task.progress,
"channel_id": custom_task.channel_id,
"status": custom_task.task_result.status,
if query.exists():
response_payload = {
"tasks": query.values("task_id", "task_name", "traceback", "progress", "channel_id", "status"),
}

# Add the task data to the response_payload
response_payload["tasks"].append(task_data)

return response_payload

def post(self, request):
Expand Down