diff --git a/compute_worker/compute_worker.py b/compute_worker/compute_worker.py index 875051090..5535ef77d 100644 --- a/compute_worker/compute_worker.py +++ b/compute_worker/compute_worker.py @@ -26,9 +26,12 @@ from kombu import Queue, Exchange from urllib3 import Retry - logger = logging.getLogger() + +# ----------------------------------------------- +# Celery + Rabbit MQ +# ----------------------------------------------- # Init celery + rabbit queue definitions app = Celery() app.config_from_object('celery_config') # grabs celery_config.py @@ -38,6 +41,9 @@ ] +# ----------------------------------------------- +# Directories +# ----------------------------------------------- # Setup base directories used by all submissions # note: we need to pass this directory to docker-compose so it knows where to store things! HOST_DIRECTORY = os.environ.get("HOST_DIRECTORY", "/tmp/codabench/") @@ -45,6 +51,10 @@ CACHE_DIR = os.path.join(BASE_DIR, "cache") MAX_CACHE_DIR_SIZE_GB = float(os.environ.get('MAX_CACHE_DIR_SIZE_GB', 10)) + +# ----------------------------------------------- +# Submission status +# ----------------------------------------------- # Status options for submissions STATUS_NONE = "None" STATUS_SUBMITTING = "Submitting" @@ -65,6 +75,10 @@ STATUS_FAILED, ) + +# ----------------------------------------------- +# Container Engine +# ----------------------------------------------- # Setup the container engine that we are using if os.environ.get("CONTAINER_ENGINE_EXECUTABLE"): CONTAINER_ENGINE_EXECUTABLE = os.environ.get("CONTAINER_ENGINE_EXECUTABLE") @@ -75,9 +89,18 @@ CONTAINER_ENGINE_EXECUTABLE = "docker" +# ----------------------------------------------- +# Exceptions +# ----------------------------------------------- class SubmissionException(Exception): pass +class DockerImagePullException(Exception): + pass + +class ExecutionTimeLimitExceeded(Exception): + pass + # ----------------------------------------------------------------------------- # The main compute worker entrypoint, this is how a job is ran at the highest @@ -94,6 +117,8 @@ def run_wrapper(run_args): if run.is_scoring: run.push_scores() run.push_output() + except DockerImagePullException as e: + run._update_status(STATUS_FAILED, str(e)) except SubmissionException as e: run._update_status(STATUS_FAILED, str(e)) except SoftTimeLimitExceeded: @@ -160,14 +185,14 @@ def is_valid_zip(zip_path): return False -class ExecutionTimeLimitExceeded(Exception): - pass - - def alarm_handler(signum, frame): raise ExecutionTimeLimitExceeded +# ----------------------------------------------- +# Class Run +# Respnosible for running a submission inside a docker container +# ----------------------------------------------- class Run: """A "Run" in Codalab is composed of some program, some data to work with, and some signed URLs to upload results to. There is also a secret key to do special commands for just this submission. @@ -340,8 +365,53 @@ def _get_container_image(self, image_name): container_engine_pull = check_output(cmd) logger.info("Pull complete for image: {0} with output of {1}".format(image_name, container_engine_pull)) except CalledProcessError: - logger.info("Pull for image: {} returned a non-zero exit code!") - raise SubmissionException(f"Pull for {image_name} failed!") + error_message = f"Pull for image: {image_name} returned a non-zero exit code! Check if the docker image exists on docker hub." + logger.info(error_message) + # Prepare data to be sent to submissions api + docker_pull_fail_data = { + "type": "Docker_Image_Pull_Fail", + "error_message": error_message, + } + # Send data to be written to ingestion logs + self._update_submission(docker_pull_fail_data) + # Send error through web socket to the frontend + asyncio.run(self._send_docker_image_pull_fail_through_socket(error_message)) + raise DockerImagePullException(f"Pull for {image_name} failed!") + + async def _send_docker_image_pull_fail_through_socket(self, error_message): + """ + This function gets an error messages and sends it through a web socket + """ + logger.info(f"Connecting to {self.websocket_url} to send docker image pull error") + + # connect to web socket + websocket = await websockets.connect(self.websocket_url) + + # define websocket errors + websocket_errors = (socket.gaierror, websockets.WebSocketException, websockets.ConnectionClosedError, ConnectionRefusedError) + + try: + # send message + await websocket.send(json.dumps({ + "kind": "stderr", + "message": error_message + })) + + except websocket_errors: + # handle websocket errors + logger.info(f"Docker image pull error sending failed through websocket") + try: + await websocket.close() + except Exception as e: + logger.error(e) + else: + # no error in websocket message sending + logger.info(f"Docker image pull error sent successfully through websocket") + + logger.info(f"Disconnecting from websocket {self.websocket_url}") + + # close websocket + await websocket.close() def _get_bundle(self, url, destination, cache=True): """Downloads zip from url and unzips into destination. If cache=True then url is hashed and checked diff --git a/src/apps/api/views/submissions.py b/src/apps/api/views/submissions.py index d4d82fba2..a1088d5d8 100644 --- a/src/apps/api/views/submissions.py +++ b/src/apps/api/views/submissions.py @@ -1,5 +1,6 @@ import json import uuid +import logging from django.db.models import Q from django_filters.rest_framework import DjangoFilterBackend @@ -14,14 +15,17 @@ from rest_framework.settings import api_settings from rest_framework.viewsets import ModelViewSet from rest_framework_csv import renderers +from django.core.files.base import ContentFile from profiles.models import Organization, Membership from tasks.models import Task from api.serializers.submissions import SubmissionCreationSerializer, SubmissionSerializer, SubmissionFilesSerializer -from competitions.models import Submission, Phase, CompetitionParticipant +from competitions.models import Submission, SubmissionDetails, Phase, CompetitionParticipant from leaderboards.strategies import put_on_leaderboard_by_submission_rule from leaderboards.models import SubmissionScore, Column, Leaderboard +logger = logging.getLogger() + class SubmissionViewSet(ModelViewSet): queryset = Submission.objects.all().order_by('-pk') @@ -50,6 +54,25 @@ def check_object_permissions(self, request, obj): hostname = request.data['status_details'].replace('scoring_hostname-', '') obj.scoring_worker_hostname = hostname obj.save() + + # Set docker pull failure logs in ingestion std_err + # check if type: Docker_Image_Pull_Fail is there in the data + if "type" in self.request.data.keys(): + if request.data["type"] == "Docker_Image_Pull_Fail": + # Get the error message + error_message = request.data['error_message'] + try: + # Get submission detail for this submissions with name = prediction_ingestion_stderr + submission_detail = SubmissionDetails.objects.get( + name="prediction_ingestion_stderr", + submission=obj, + ) + # write the docker pull image error message to the file + submission_detail.data_file.save(submission_detail.data_file.name, ContentFile(error_message.encode("utf-8"))) + + except SubmissionDetails.DoesNotExist: + logger.warning("SubmissionDetails object not found.") + not_bot_user = self.request.user.is_authenticated and not self.request.user.is_bot if self.action in ['update_fact_sheet', 're_run_submission']: