Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .env_sample
Original file line number Diff line number Diff line change
Expand Up @@ -104,3 +104,10 @@ ENABLE_SIGN_IN=True
# GS_PUBLIC_BUCKET_NAME=public
# GS_PRIVATE_BUCKET_NAME=private
# GOOGLE_APPLICATION_CREDENTIALS=/app/certs/google-storage-api.json


# -----------------------------------------------------------------------------
# Logging (Serialized outputs the logs in JSON format)
# -----------------------------------------------------------------------------
LOG_LEVEL=info
SERIALIZED=false
2 changes: 1 addition & 1 deletion Containerfile.compute_worker_podman
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ RUN poetry config virtualenvs.prefer-active-python true
COPY ./compute_worker/pyproject.toml ./
COPY ./compute_worker/poetry.lock ./
RUN poetry install

COPY ./src/settings/logs_loguru.py /usr/bin
CMD celery -A compute_worker worker \
-l info \
-Q compute-worker \
Expand Down
1 change: 1 addition & 0 deletions Dockerfile.compute_worker
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ COPY ./compute_worker/poetry.lock ./
RUN poetry config virtualenvs.prefer-active-python true && poetry install

ADD compute_worker .
COPY ./src/settings/logs_loguru.py /usr/bin

CMD celery -A compute_worker worker \
-l info \
Expand Down
47 changes: 29 additions & 18 deletions compute_worker/compute_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
import glob
import hashlib
import json
import logging
import os
import shutil
import signal
Expand All @@ -26,12 +25,21 @@
from kombu import Queue, Exchange
from urllib3 import Retry

logger = logging.getLogger()
# This is only needed for the pytests to pass
import sys
sys.path.append('/app/src/settings/')

from celery import signals
import logging
logger = logging.getLogger(__name__)
from logs_loguru import configure_logging

# -----------------------------------------------
# Celery + Rabbit MQ
# -----------------------------------------------
@signals.setup_logging.connect
def setup_celery_logging(**kwargs):
pass
# Init celery + rabbit queue definitions
app = Celery()
app.config_from_object('celery_config') # grabs celery_config.py
Expand All @@ -40,7 +48,10 @@
Queue('compute-worker', Exchange('compute-worker'), routing_key='compute-worker', queue_arguments={'x-max-priority': 10}),
]


# -----------------------------------------------
# Logging
# -----------------------------------------------
configure_logging(os.environ.get("LOG_LEVEL", "INFO"),os.environ.get("SERIALIZED", 'false'))
# -----------------------------------------------
# Directories
# -----------------------------------------------
Expand Down Expand Up @@ -98,7 +109,6 @@ class DockerImagePullException(Exception):
class ExecutionTimeLimitExceeded(Exception):
pass


# -----------------------------------------------------------------------------
# The main compute worker entrypoint, this is how a job is ran at the highest
# level.
Expand Down Expand Up @@ -337,7 +347,7 @@ def _update_submission(self, data):
if resp.status_code == 200:
logger.info("Submission updated successfully!")
else:
logger.info(f"Submission patch failed with status = {resp.status_code}, and response = \n{resp.content}")
logger.error(f"Submission patch failed with status = {resp.status_code}, and response = \n{resp.content}")
raise SubmissionException("Failure updating submission data.")

def _update_status(self, status, extra_information=None):
Expand Down Expand Up @@ -370,7 +380,7 @@ def _get_container_image(self, image_name):
retries += 1
if retries >= max_retries:
error_message = f"Pull for image: {image_name} returned a non-zero exit code! Check if the docker image exists on docker hub. {pull_error}"
logger.info(error_message)
logger.error(error_message)
# Prepare data to be sent to submissions api
docker_pull_fail_data = {
"type": "Docker_Image_Pull_Fail",
Expand All @@ -383,7 +393,7 @@ def _get_container_image(self, image_name):
asyncio.run(self._send_data_through_socket(error_message))
raise DockerImagePullException(f"Pull for {image_name} failed!")
else:
logger.info("Failed. Retrying in 5 seconds...")
logger.warning("Failed. Retrying in 5 seconds...")
time.sleep(5) # Wait 5 seconds before retrying

async def _send_data_through_socket(self, error_message):
Expand Down Expand Up @@ -413,7 +423,7 @@ async def _send_data_through_socket(self, error_message):

except websocket_errors:
# handle websocket errors
logger.info(f"Error sending failed through websocket")
logger.error(f"Error sending failed through websocket")
try:
await websocket.close()
except Exception as e:
Expand Down Expand Up @@ -467,7 +477,7 @@ def _get_bundle(self, url, destination, cache=True):
if retries >= max_retries:
raise # Re-raise the last caught BadZipFile exception
else:
logger.info("Failed. Retrying in 60 seconds...")
logger.warning("Failed. Retrying in 60 seconds...")
time.sleep(60) # Wait 60 seconds before retrying
# Return the zip file path for other uses, e.g. for creating a MD5 hash to identify it
return bundle_file
Expand Down Expand Up @@ -544,7 +554,7 @@ async def _readline_or_chunk(stream):
except asyncio.TimeoutError:
continue
except websocket_errors:
logger.debug("\n\nWebsocket error (line 538)\n\n")
logger.error("\n\nWebsocket error (line 538)\n\n")
try:
# do we need to await websocket.close() on the old socket? before making a new one probably not?
await websocket.close()
Expand All @@ -557,7 +567,7 @@ async def _readline_or_chunk(stream):
tries = 0
while tries < 3 and not websocket.open:
try:
logger.debug(f"\n\nAttempting to reconnect in 2 seconds (attempt {tries+1}/3)")
logger.warning(f"\n\nAttempting to reconnect in 2 seconds (attempt {tries+1}/3)")
websocket = await websockets.connect(websocket_url)
logger.debug(f"\n\nSuccessfully reconnected to {websocket_url}")
except websocket_errors:
Expand Down Expand Up @@ -606,7 +616,7 @@ async def _run_program_directory(self, program_dir, kind):
"""
# If the directory doesn't even exist, move on
if not os.path.exists(program_dir):
logger.info(f"{program_dir} not found, no program to execute")
logger.error(f"{program_dir} not found, no program to execute")

# Communicate that the program is closing
self.completed_program_counter += 1
Expand All @@ -619,7 +629,7 @@ async def _run_program_directory(self, program_dir, kind):
else:
# Display a warning in logs when there is no metadata file in submission/program dir
if kind == "program":
logger.info(
logger.warning(
"Program directory missing metadata, assuming it's going to be handled by ingestion"
)
return
Expand All @@ -636,12 +646,13 @@ async def _run_program_directory(self, program_dir, kind):
else:
command = None
except yaml.YAMLError as e:
logger.error("Error parsing YAML file: ", e)
print("Error parsing YAML file: ", e)
command = None
if not command and kind == "ingestion":
raise SubmissionException("Program directory missing 'command' in metadata")
elif not command:
logger.info(
logger.warning(
f"Warning: {program_dir} has no command in metadata, continuing anyway "
f"(may be meant to be consumed by an ingestion program)"
)
Expand Down Expand Up @@ -852,7 +863,7 @@ def start(self):
loop.run_until_complete(gathered_tasks)
except ExecutionTimeLimitExceeded:
error_message = f"Execution Time Limit exceeded. Limit was {self.execution_time_limit} seconds"
logger.info(error_message)
logger.error(error_message)
# Prepare data to be sent to submissions api
execution_time_limit_exceeded_data = {
"type": "Execution_Time_Limit_Exceeded",
Expand All @@ -873,14 +884,14 @@ def start(self):
elapsed_time = self.execution_time_limit
return_code = logs["proc"].returncode
if return_code is None:
logger.info('No return code from Process. Killing it')
logger.warning('No return code from Process. Killing it')
if kind == 'ingestion':
program_to_kill = self.ingestion_container_name
else:
program_to_kill = self.program_container_name
# Try and stop the program. If stop does not succeed
kill_code = subprocess.call([CONTAINER_ENGINE_EXECUTABLE, 'stop', str(program_to_kill)])
logger.info(f'Kill process returned {kill_code}')
logger.warning(f'Kill process returned {kill_code}')
if kind == 'program':
self.program_exit_code = return_code
self.program_elapsed_time = elapsed_time
Expand Down Expand Up @@ -966,7 +977,7 @@ def push_output(self):

def clean_up(self):
if os.environ.get("CODALAB_IGNORE_CLEANUP_STEP"):
logger.info(f"CODALAB_IGNORE_CLEANUP_STEP mode enabled, ignoring clean up of: {self.root_dir}")
logger.warning(f"CODALAB_IGNORE_CLEANUP_STEP mode enabled, ignoring clean up of: {self.root_dir}")
return

logger.info(f"Destroying submission temp dir: {self.root_dir}")
Expand Down
Loading