Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 94 additions & 10 deletions compute_worker/compute_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,12 @@
from kombu import Queue, Exchange
from urllib3 import Retry


logger = logging.getLogger()


# -----------------------------------------------
# Celery + Rabbit MQ
# -----------------------------------------------
# Init celery + rabbit queue definitions
app = Celery()
app.config_from_object('celery_config') # grabs celery_config.py
Expand All @@ -38,13 +41,20 @@
]


# -----------------------------------------------
# Directories
# -----------------------------------------------
# Setup base directories used by all submissions
# note: we need to pass this directory to docker-compose so it knows where to store things!
HOST_DIRECTORY = os.environ.get("HOST_DIRECTORY", "/tmp/codabench/")
BASE_DIR = "/codabench/" # base directory inside the container
CACHE_DIR = os.path.join(BASE_DIR, "cache")
MAX_CACHE_DIR_SIZE_GB = float(os.environ.get('MAX_CACHE_DIR_SIZE_GB', 10))


# -----------------------------------------------
# Submission status
# -----------------------------------------------
# Status options for submissions
STATUS_NONE = "None"
STATUS_SUBMITTING = "Submitting"
Expand All @@ -65,6 +75,10 @@
STATUS_FAILED,
)


# -----------------------------------------------
# Container Engine
# -----------------------------------------------
# Setup the container engine that we are using
if os.environ.get("CONTAINER_ENGINE_EXECUTABLE"):
CONTAINER_ENGINE_EXECUTABLE = os.environ.get("CONTAINER_ENGINE_EXECUTABLE")
Expand All @@ -75,9 +89,18 @@
CONTAINER_ENGINE_EXECUTABLE = "docker"


# -----------------------------------------------
# Exceptions
# -----------------------------------------------
class SubmissionException(Exception):
pass

class DockerImagePullException(Exception):
pass

class ExecutionTimeLimitExceeded(Exception):
pass


# -----------------------------------------------------------------------------
# The main compute worker entrypoint, this is how a job is ran at the highest
Expand All @@ -94,6 +117,8 @@ def run_wrapper(run_args):
if run.is_scoring:
run.push_scores()
run.push_output()
except DockerImagePullException as e:
run._update_status(STATUS_FAILED, str(e))
except SubmissionException as e:
run._update_status(STATUS_FAILED, str(e))
except SoftTimeLimitExceeded:
Expand Down Expand Up @@ -160,14 +185,14 @@ def is_valid_zip(zip_path):
return False


class ExecutionTimeLimitExceeded(Exception):
pass


def alarm_handler(signum, frame):
raise ExecutionTimeLimitExceeded


# -----------------------------------------------
# Class Run
# Respnosible for running a submission inside a docker container
# -----------------------------------------------
class Run:
"""A "Run" in Codalab is composed of some program, some data to work with, and some signed URLs to upload results
to. There is also a secret key to do special commands for just this submission.
Expand Down Expand Up @@ -340,8 +365,55 @@ def _get_container_image(self, image_name):
container_engine_pull = check_output(cmd)
logger.info("Pull complete for image: {0} with output of {1}".format(image_name, container_engine_pull))
except CalledProcessError:
logger.info("Pull for image: {} returned a non-zero exit code!")
raise SubmissionException(f"Pull for {image_name} failed!")
error_message = f"Pull for image: {image_name} returned a non-zero exit code! Check if the docker image exists on docker hub."
logger.info(error_message)
# Prepare data to be sent to submissions api
docker_pull_fail_data = {
"type": "Docker_Image_Pull_Fail",
"error_message": error_message,
}
# Send data to be written to ingestion logs
self._update_submission(docker_pull_fail_data)
# Send error through web socket to the frontend
asyncio.run(self._send_data_through_socket(error_message))
raise DockerImagePullException(f"Pull for {image_name} failed!")

async def _send_data_through_socket(self, error_message):
"""
This function gets an error messages and sends it through a web socket. This function is used for sending
- Docker image pull failure logs
- Execution time limit exceeded logs
"""
logger.info(f"Connecting to {self.websocket_url} to send docker image pull error")

# connect to web socket
websocket = await websockets.connect(self.websocket_url)

# define websocket errors
websocket_errors = (socket.gaierror, websockets.WebSocketException, websockets.ConnectionClosedError, ConnectionRefusedError)

try:
# send message
await websocket.send(json.dumps({
"kind": "stderr",
"message": error_message
}))

except websocket_errors:
# handle websocket errors
logger.info(f"Error sending failed through websocket")
try:
await websocket.close()
except Exception as e:
logger.error(e)
else:
# no error in websocket message sending
logger.info(f"Error sent successfully through websocket")

logger.info(f"Disconnecting from websocket {self.websocket_url}")

# close websocket
await websocket.close()

def _get_bundle(self, url, destination, cache=True):
"""Downloads zip from url and unzips into destination. If cache=True then url is hashed and checked
Expand Down Expand Up @@ -384,7 +456,7 @@ def _get_bundle(self, url, destination, cache=True):
raise # Re-raise the last caught BadZipFile exception
else:
logger.info("Failed. Retrying in 60 seconds...")
time.sleep(60) # Wait 60 seconds before retrying
time.sleep(60) # Wait 60 seconds before retrying
# Return the zip file path for other uses, e.g. for creating a MD5 hash to identify it
return bundle_file

Expand Down Expand Up @@ -627,7 +699,7 @@ def _put_file(self, url, file=None, raw_data=None, content_type='application/zip
"""
if file and raw_data:
raise Exception("Cannot put both a file and raw_data")

headers = {
# For Azure only, other systems ignore these headers
'x-ms-blob-type': 'BlockBlob',
Expand Down Expand Up @@ -731,7 +803,19 @@ def start(self):
try:
loop.run_until_complete(gathered_tasks)
except ExecutionTimeLimitExceeded:
raise SubmissionException(f"Execution Time Limit exceeded. Limit was {self.execution_time_limit} seconds")
error_message = f"Execution Time Limit exceeded. Limit was {self.execution_time_limit} seconds"
logger.info(error_message)
# Prepare data to be sent to submissions api
execution_time_limit_exceeded_data = {
"type": "Execution_Time_Limit_Exceeded",
"error_message": error_message,
"is_scoring": self.is_scoring
}
# Send data to be written to ingestion/scoring std_err
self._update_submission(execution_time_limit_exceeded_data)
# Send error through web socket to the frontend
asyncio.run(self._send_data_through_socket(error_message))
raise SubmissionException(error_message)
finally:
self.watch = False
for kind, logs in self.logs.items():
Expand Down
44 changes: 43 additions & 1 deletion src/apps/api/views/submissions.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import json
import uuid
import logging

from django.db.models import Q
from django_filters.rest_framework import DjangoFilterBackend
Expand All @@ -14,14 +15,17 @@
from rest_framework.settings import api_settings
from rest_framework.viewsets import ModelViewSet
from rest_framework_csv import renderers
from django.core.files.base import ContentFile

from profiles.models import Organization, Membership
from tasks.models import Task
from api.serializers.submissions import SubmissionCreationSerializer, SubmissionSerializer, SubmissionFilesSerializer
from competitions.models import Submission, Phase, CompetitionParticipant
from competitions.models import Submission, SubmissionDetails, Phase, CompetitionParticipant
from leaderboards.strategies import put_on_leaderboard_by_submission_rule
from leaderboards.models import SubmissionScore, Column, Leaderboard

logger = logging.getLogger()


class SubmissionViewSet(ModelViewSet):
queryset = Submission.objects.all().order_by('-pk')
Expand Down Expand Up @@ -50,6 +54,44 @@ def check_object_permissions(self, request, obj):
hostname = request.data['status_details'].replace('scoring_hostname-', '')
obj.scoring_worker_hostname = hostname
obj.save()

# check if type is in request data. type can have the following values
# - Docker_Image_Pull_Fail
# - Execution_Time_Limit_Exceeded
if "type" in self.request.data.keys():

if request.data["type"] in ["Docker_Image_Pull_Fail", "Execution_Time_Limit_Exceeded"]:

# Get the error message
error_message = request.data['error_message']

# Set file name to ingestion std error as default
error_file_name = "prediction_ingestion_stderr"

# Change error file name when error comes from execution time limit
# and error occured during scoring
if request.data["type"] == "Execution_Time_Limit_Exceeded" and request.data['is_scoring'] == "True":
error_file_name = "scoring_stderr"

try:
# Get submission detail for this submission
submission_detail = SubmissionDetails.objects.get(
name=error_file_name,
submission=obj,
)

# Read the existing content from the file
existing_content = submission_detail.data_file.read().decode("utf-8")

# Append the new error message to the existing content
modified_content = existing_content + "\n" + error_message

# write error message to the file
submission_detail.data_file.save(submission_detail.data_file.name, ContentFile(modified_content.encode("utf-8")))

except SubmissionDetails.DoesNotExist:
logger.warning("SubmissionDetails object not found.")

not_bot_user = self.request.user.is_authenticated and not self.request.user.is_bot

if self.action in ['update_fact_sheet', 're_run_submission']:
Expand Down