diff --git a/temporalio/worker/_replayer.py b/temporalio/worker/_replayer.py index 05f65cbd7..b90de9916 100644 --- a/temporalio/worker/_replayer.py +++ b/temporalio/worker/_replayer.py @@ -51,15 +51,22 @@ def __init__( """Create a replayer to replay workflows from history. See :py:meth:`temporalio.worker.Worker.__init__` for a description of - most of the arguments. The same arguments need to be passed to the - replayer that were passed to the worker when the workflow originally + most of the arguments. Most of the same arguments need to be passed to + the replayer that were passed to the worker when the workflow originally ran. + + Note, unlike the worker, for the replayer the workflow_task_executor + will default to a new thread pool executor with no max_workers set that + will be shared across all replay calls and never explicitly shut down. + Users are encouraged to provide their own if needing more control. """ if not workflows: raise ValueError("At least one workflow must be specified") self._config = ReplayerConfig( workflows=list(workflows), - workflow_task_executor=workflow_task_executor, + workflow_task_executor=( + workflow_task_executor or concurrent.futures.ThreadPoolExecutor() + ), workflow_runner=workflow_runner, unsandboxed_workflow_runner=unsandboxed_workflow_runner, namespace=namespace, @@ -195,6 +202,7 @@ def on_eviction_hook( task_queue=task_queue, workflows=self._config["workflows"], workflow_task_executor=self._config["workflow_task_executor"], + max_concurrent_workflow_tasks=5, workflow_runner=self._config["workflow_runner"], unsandboxed_workflow_runner=self._config["unsandboxed_workflow_runner"], data_converter=self._config["data_converter"], diff --git a/temporalio/worker/_worker.py b/temporalio/worker/_worker.py index 4c34a950e..3aeb6d4e2 100644 --- a/temporalio/worker/_worker.py +++ b/temporalio/worker/_worker.py @@ -109,9 +109,10 @@ def __init__( workflow_task_executor: Thread pool executor for workflow tasks. If this is not present, a new :py:class:`concurrent.futures.ThreadPoolExecutor` will be - created with ``max_workers`` set to ``max(os.cpu_count(), 4)``. - The default one will be properly shutdown, but if one is - provided, the caller is responsible for shutting it down after + created with ``max_workers`` set to + ``max_concurrent_workflow_tasks`` if it is present, or 500 + otherwise. The default one will be properly shutdown, but if one + is provided, the caller is responsible for shutting it down after the worker is shut down. workflow_runner: Runner for workflows. unsandboxed_workflow_runner: Runner for workflows that opt-out of @@ -312,6 +313,7 @@ def __init__( task_queue=task_queue, workflows=workflows, workflow_task_executor=workflow_task_executor, + max_concurrent_workflow_tasks=max_concurrent_workflow_tasks, workflow_runner=workflow_runner, unsandboxed_workflow_runner=unsandboxed_workflow_runner, data_converter=client_config["data_converter"], diff --git a/temporalio/worker/_workflow.py b/temporalio/worker/_workflow.py index ca7080935..37e6810c9 100644 --- a/temporalio/worker/_workflow.py +++ b/temporalio/worker/_workflow.py @@ -7,9 +7,12 @@ import logging import os import sys +import threading +from dataclasses import dataclass from datetime import timezone from types import TracebackType from typing import ( + Awaitable, Callable, Dict, List, @@ -25,6 +28,7 @@ import temporalio.bridge.client import temporalio.bridge.proto.workflow_activation import temporalio.bridge.proto.workflow_completion +import temporalio.bridge.runtime import temporalio.bridge.worker import temporalio.client import temporalio.common @@ -59,6 +63,7 @@ def __init__( task_queue: str, workflows: Sequence[Type], workflow_task_executor: Optional[concurrent.futures.ThreadPoolExecutor], + max_concurrent_workflow_tasks: Optional[int], workflow_runner: WorkflowRunner, unsandboxed_workflow_runner: WorkflowRunner, data_converter: temporalio.converter.DataConverter, @@ -80,7 +85,7 @@ def __init__( self._workflow_task_executor = ( workflow_task_executor or concurrent.futures.ThreadPoolExecutor( - max_workers=max(os.cpu_count() or 4, 4), + max_workers=max_concurrent_workflow_tasks or 500, thread_name_prefix="temporal_workflow_", ) ) @@ -102,7 +107,7 @@ def __init__( **_WorkflowExternFunctions(__temporal_get_metric_meter=lambda: metric_meter) ) self._workflow_failure_exception_types = workflow_failure_exception_types - self._running_workflows: Dict[str, WorkflowInstance] = {} + self._running_workflows: Dict[str, _RunningWorkflow] = {} self._disable_eager_activity_execution = disable_eager_activity_execution self._on_eviction_hook = on_eviction_hook self._disable_safe_eviction = disable_safe_eviction @@ -182,7 +187,7 @@ def notify_shutdown(self) -> None: if self._could_not_evict_count: logger.warning( f"Shutting down workflow worker, but {self._could_not_evict_count} " - + "workflow(s) could not be evicted previously, so the shutdown will hang" + + "workflow(s) could not be evicted previously, so the shutdown may hang" ) # Only call this if run() raised an error @@ -213,6 +218,14 @@ async def _handle_activation( elif job.HasField("initialize_workflow"): init_job = job.initialize_workflow + # If this is a cache removal, it is handled separately + if cache_remove_job: + # Should never happen + if len(act.jobs) != 1: + logger.warning("Unexpected job alongside cache remove job") + await self._handle_cache_eviction(act, cache_remove_job) + return + # Build default success completion (e.g. remove-job-only activations) completion = ( temporalio.bridge.proto.workflow_completion.WorkflowActivationCompletion() @@ -220,7 +233,7 @@ async def _handle_activation( completion.successful.SetInParent() try: # Decode the activation if there's a codec and not cache remove job - if self._data_converter.payload_codec and not cache_remove_job: + if self._data_converter.payload_codec: await temporalio.bridge.worker.decode_activation( act, self._data_converter.payload_codec ) @@ -228,19 +241,17 @@ async def _handle_activation( if LOG_PROTOS: logger.debug("Received workflow activation:\n%s", act) - # If the workflow is not running yet and this isn't a cache remove - # job, create it. We do not even fetch a workflow if it's a cache - # remove job and safe evictions are enabled - workflow = None - if not cache_remove_job or not self._disable_safe_eviction: - workflow = self._running_workflows.get(act.run_id) - if not workflow and not cache_remove_job: + # If the workflow is not running yet, create it + workflow = self._running_workflows.get(act.run_id) + if not workflow: # Must have a initialize job to create instance if not init_job: raise RuntimeError( "Missing initialize workflow, workflow could have unexpectedly been removed from cache" ) - workflow = self._create_workflow_instance(act, init_job) + workflow = _RunningWorkflow( + self._create_workflow_instance(act, init_job) + ) self._running_workflows[act.run_id] = workflow elif init_job: # This should never happen @@ -257,37 +268,36 @@ async def _handle_activation( act, ) - # Wait for deadlock timeout and set commands if successful + # Run activation task with deadlock timeout try: completion = await asyncio.wait_for( activate_task, self._deadlock_timeout_seconds ) except asyncio.TimeoutError: - raise _DeadlockError.from_deadlocked_workflow( - workflow, self._deadlock_timeout_seconds - ) from None + # Need to create the deadlock exception up here so it + # captures the trace now instead of later after we may have + # interrupted it + deadlock_exc = _DeadlockError.from_deadlocked_workflow( + workflow.instance, self._deadlock_timeout_seconds + ) + # When we deadlock, we will raise an exception to fail + # the task. But before we do that, we want to try to + # interrupt the thread and put this activation task on + # the workflow so that the successive eviction can wait + # on it before trying to evict. + workflow.attempt_deadlock_interruption() + # Set the task and raise + workflow.deadlocked_activation_task = activate_task + raise deadlock_exc from None except Exception as err: - # We cannot fail a cache eviction, we must just log and not complete - # the activation (failed or otherwise). This should only happen in - # cases of deadlock or tasks not properly completing, and yes this - # means that a slot is forever taken. - # TODO(cretz): Should we build a complex mechanism to continually - # try the eviction until it succeeds? - if cache_remove_job: - logger.exception( - "Failed running eviction job, not evicting. " - + "Since eviction could not be processed, this worker cannot complete and the slot will remain forever used." - ) - self._could_not_evict_count += 1 - return - if isinstance(err, _DeadlockError): err.swap_traceback() logger.exception( "Failed handling activation on workflow with run ID %s", act.run_id ) + # Set completion failure completion.failed.failure.SetInParent() try: @@ -308,20 +318,8 @@ async def _handle_activation( # Always set the run ID on the completion completion.run_id = act.run_id - # If there is a remove-from-cache job, do so. We don't need to log a - # warning if there's not, because create workflow failing for - # unregistered workflow still triggers cache remove job - if cache_remove_job: - if act.run_id in self._running_workflows: - logger.debug( - "Evicting workflow with run ID %s, message: %s", - act.run_id, - cache_remove_job.message, - ) - del self._running_workflows[act.run_id] - # Encode the completion if there's a codec and not cache remove job - if self._data_converter.payload_codec and not cache_remove_job: + if self._data_converter.payload_codec: try: await temporalio.bridge.worker.encode_completion( completion, self._data_converter.payload_codec @@ -344,13 +342,123 @@ async def _handle_activation( "Failed completing activation on workflow with run ID %s", act.run_id ) - # If there is a remove job and an eviction hook, run it - if cache_remove_job and self._on_eviction_hook is not None: + async def _handle_cache_eviction( + self, + act: temporalio.bridge.proto.workflow_activation.WorkflowActivation, + job: temporalio.bridge.proto.workflow_activation.RemoveFromCache, + ) -> None: + logger.debug( + "Evicting workflow with run ID %s, message: %s", act.run_id, job.message + ) + + # Find the workflow to process safe eviction unless safe eviction + # disabled + workflow = None + if not self._disable_safe_eviction: + workflow = self._running_workflows.get(act.run_id) + + # Safe eviction... + if workflow: + # We have to wait on the deadlocked task if it is set. This is + # because eviction may be the result of a deadlocked workflow but + # we cannot safely evict until that task is done with its thread. We + # don't care what errors may have occurred. We intentionally wait + # forever which means a deadlocked task cannot be evicted and give + # its slot back. + if workflow.deadlocked_activation_task: + logger.debug( + "Waiting for deadlocked task to complete on run %s", act.run_id + ) + try: + await workflow.deadlocked_activation_task + except: + pass + + # Process the activation to evict. It is very important that + # eviction complete successfully because this is the only way we can + # confirm the event loop was torn down gracefully and therefore no + # GC'ing of the tasks occurs (which can cause them to wake up in + # different threads). We will wait deadlock timeout amount (2s if + # enabled) before making it clear to users that eviction is being + # swallowed. Any error or timeout of eviction causes us to retry + # forever because something in users code is preventing eviction. + seen_fail = False + handle_eviction_task: Optional[asyncio.Future] = None + while True: + try: + # We only create the eviction task if we haven't already or + # it is done. This is because if it already is running and + # timed out, it's still running (and holding on to a + # thread). But if did complete running but failed with + # another error, we want to re-create the task. + if not handle_eviction_task or handle_eviction_task.done(): + handle_eviction_task = ( + asyncio.get_running_loop().run_in_executor( + self._workflow_task_executor, + workflow.activate, + act, + ) + ) + await asyncio.wait_for( + handle_eviction_task, self._deadlock_timeout_seconds + ) + # Break if it succeeds + break + except BaseException as err: + # Only want to log and mark as could not evict once + if not seen_fail: + seen_fail = True + self._could_not_evict_count += 1 + # We give a different message for timeout vs other + # exception + if isinstance(err, asyncio.TimeoutError): + logger.error( + "Timed out running eviction job for run ID %s, continually " + + "retrying eviction. This is usually caused by inadvertently " + + "catching 'BaseException's like asyncio.CancelledError or " + + "_WorkflowBeingEvictedError and still continuing work. " + + "Since eviction could not be processed, this worker " + + "may not complete and the slot may remain forever used " + + "unless it eventually completes.", + act.run_id, + ) + else: + logger.exception( + "Failed running eviction job for run ID %s, continually retrying " + + "eviction. Since eviction could not be processed, this worker " + + "may not complete and the slot may remain forever used " + + "unless it eventually completes.", + act.run_id, + ) + # We want to wait a couple of seconds before trying to evict again + await asyncio.sleep(2) + # Decrement the could-not-evict-count if it finally succeeded + if seen_fail: + self._could_not_evict_count -= 1 + + # Remove from map and send completion + if act.run_id in self._running_workflows: + del self._running_workflows[act.run_id] + try: + await self._bridge_worker().complete_workflow_activation( + temporalio.bridge.proto.workflow_completion.WorkflowActivationCompletion( + run_id=act.run_id, + successful=temporalio.bridge.proto.workflow_completion.Success(), + ) + ) + except Exception: + logger.exception( + "Failed completing eviction activation on workflow with run ID %s", + act.run_id, + ) + + # Run eviction hook if present + if self._on_eviction_hook is not None: try: - self._on_eviction_hook(act.run_id, cache_remove_job) + self._on_eviction_hook(act.run_id, job) except Exception as e: self._throw_after_activation = e - logger.debug("Shutting down worker on eviction") + logger.debug("Shutting down worker on eviction hook exception") self._bridge_worker().initiate_shutdown() def _create_workflow_instance( @@ -520,3 +628,39 @@ def _gen_tb_helper( tb = tb.tb_next return tb + + +class _RunningWorkflow: + def __init__(self, instance: WorkflowInstance): + self.instance = instance + self.deadlocked_activation_task: Optional[Awaitable] = None + self._deadlock_can_be_interrupted_lock = threading.Lock() + self._deadlock_can_be_interrupted = False + + def activate( + self, act: temporalio.bridge.proto.workflow_activation.WorkflowActivation + ) -> temporalio.bridge.proto.workflow_completion.WorkflowActivationCompletion: + # Mark that the deadlock can be interrupted, do work, then unmark + with self._deadlock_can_be_interrupted_lock: + self._deadlock_can_be_interrupted = True + try: + return self.instance.activate(act) + finally: + with self._deadlock_can_be_interrupted_lock: + self._deadlock_can_be_interrupted = False + + def attempt_deadlock_interruption(self) -> None: + # Need to be under mutex to ensure it can be interrupted + with self._deadlock_can_be_interrupted_lock: + # Do not interrupt if cannot be interrupted anymore + if not self._deadlock_can_be_interrupted: + return + deadlocked_thread_id = self.instance.get_thread_id() + if deadlocked_thread_id: + temporalio.bridge.runtime.Runtime._raise_in_thread( + deadlocked_thread_id, _InterruptDeadlockError + ) + + +class _InterruptDeadlockError(BaseException): + pass diff --git a/tests/helpers/__init__.py b/tests/helpers/__init__.py index 9388a1b76..bda35ea6b 100644 --- a/tests/helpers/__init__.py +++ b/tests/helpers/__init__.py @@ -46,6 +46,23 @@ def new_worker( ) +async def assert_eventually( + fn: Callable[[], Awaitable], + *, + timeout: timedelta = timedelta(seconds=10), + interval: timedelta = timedelta(milliseconds=200), +) -> None: + start_sec = time.monotonic() + while True: + try: + await fn() + return + except AssertionError: + if timedelta(seconds=time.monotonic() - start_sec) >= timeout: + raise + await asyncio.sleep(interval.total_seconds()) + + T = TypeVar("T") @@ -56,16 +73,26 @@ async def assert_eq_eventually( timeout: timedelta = timedelta(seconds=10), interval: timedelta = timedelta(milliseconds=200), ) -> None: - start_sec = time.monotonic() - last_value = None - while timedelta(seconds=time.monotonic() - start_sec) < timeout: - last_value = await fn() - if expected == last_value: - return - await asyncio.sleep(interval.total_seconds()) - assert ( - expected == last_value - ), f"timed out waiting for equal, asserted against last value of {last_value}" + async def check() -> None: + assert expected == await fn() + + await assert_eventually(check, timeout=timeout, interval=interval) + + +async def assert_task_fail_eventually( + handle: WorkflowHandle, *, message_contains: Optional[str] = None +) -> None: + async def check() -> None: + async for evt in handle.fetch_history_events(): + if evt.HasField("workflow_task_failed_event_attributes") and ( + not message_contains + or message_contains + in evt.workflow_task_failed_event_attributes.failure.message + ): + return + assert False, "Task failure not present" + + await assert_eventually(check) async def worker_versioning_enabled(client: Client) -> bool: diff --git a/tests/worker/test_workflow.py b/tests/worker/test_workflow.py index 95f114902..b81f34bda 100644 --- a/tests/worker/test_workflow.py +++ b/tests/worker/test_workflow.py @@ -5,9 +5,11 @@ import json import logging import logging.handlers +import os import queue import sys import threading +import time import typing import uuid from abc import ABC, abstractmethod @@ -112,6 +114,8 @@ from tests.helpers import ( admitted_update_task, assert_eq_eventually, + assert_eventually, + assert_task_fail_eventually, assert_workflow_exists_eventually, ensure_search_attributes_present, find_free_port, @@ -6977,6 +6981,245 @@ async def test_update_handler_semaphore_acquisition_respects_timeout( ) +deadlock_interruptible_completed = 0 + + +@workflow.defn(sandboxed=False) +class DeadlockInterruptibleWorkflow: + @workflow.run + async def run(self) -> None: + # Infinite loop, which is interruptible via PyThreadState_SetAsyncExc + try: + while True: + pass + finally: + global deadlock_interruptible_completed + deadlock_interruptible_completed += 1 + + +async def test_workflow_deadlock_interruptible(client: Client): + # TODO(cretz): Improve this test and other deadlock/eviction tests by + # checking slot counts with Core. There are a couple of bugs where used slot + # counts are off by one and slots are released before eviction (see + # https://github.com/temporalio/sdk-core/issues/894). + + # This worker used to not be able to shutdown because we hung evictions on + # deadlock + async with new_worker(client, DeadlockInterruptibleWorkflow) as worker: + # Start the workflow + assert deadlock_interruptible_completed == 0 + handle = await client.start_workflow( + DeadlockInterruptibleWorkflow.run, + id=f"workflow-{uuid.uuid4()}", + task_queue=worker.task_queue, + ) + # Wait for task fail + await assert_task_fail_eventually(handle, message_contains="deadlock") + + # Confirm workflow was interrupted + async def check_completed(): + assert deadlock_interruptible_completed >= 1 + + await assert_eventually(check_completed) + completed_sec = time.monotonic() + # Confirm worker shutdown didn't hang + assert time.monotonic() - completed_sec < 20 + + +deadlock_uninterruptible_event = threading.Event() +deadlock_uninterruptible_completed = 0 + + +@workflow.defn(sandboxed=False) +class DeadlockUninterruptibleWorkflow: + @workflow.run + async def run(self) -> None: + # Wait on event, which is not interruptible via PyThreadState_SetAsyncExc + try: + deadlock_uninterruptible_event.wait() + finally: + global deadlock_uninterruptible_completed + deadlock_uninterruptible_completed += 1 + + +async def test_workflow_deadlock_uninterruptible(client: Client): + # This worker used to not be able to shutdown because we hung evictions on + # deadlock + async with new_worker(client, DeadlockUninterruptibleWorkflow) as worker: + # Start the workflow + assert deadlock_uninterruptible_completed == 0 + handle = await client.start_workflow( + DeadlockUninterruptibleWorkflow.run, + id=f"workflow-{uuid.uuid4()}", + task_queue=worker.task_queue, + ) + # Wait for task fail + await assert_task_fail_eventually(handle, message_contains="deadlock") + # Confirm could not be interrupted + assert deadlock_uninterruptible_completed == 0 + + # Now complete the event and confirm the workflow does complete + deadlock_uninterruptible_event.set() + + async def check_completed(): + assert deadlock_uninterruptible_completed >= 1 + + await assert_eventually(check_completed) + completed_sec = time.monotonic() + # Confirm worker shutdown didn't hang + assert time.monotonic() - completed_sec < 20 + + +deadlock_fill_up_block_event = threading.Event() +deadlock_fill_up_block_completed = 0 + + +@workflow.defn(sandboxed=False) +class DeadlockFillUpBlockWorkflow: + @workflow.run + async def run(self) -> None: + try: + deadlock_fill_up_block_event.wait() + finally: + global deadlock_fill_up_block_completed + deadlock_fill_up_block_completed += 1 + + +@workflow.defn(sandboxed=False) +class DeadlockFillUpSimpleWorkflow: + @workflow.run + async def run(self) -> str: + return "done" + + +async def test_workflow_deadlock_fill_up_slots(client: Client): + cpu_count = os.cpu_count() + assert cpu_count + # This worker used to not be able to shutdown because we hung evictions on + # deadlock. + async with new_worker( + client, + DeadlockFillUpBlockWorkflow, + DeadlockFillUpSimpleWorkflow, + # Start the worker with CPU count + 10 task slots + max_concurrent_workflow_tasks=cpu_count + 10, + ) as worker: + # For this test we're going to start cpu_count + 5 workflows that + # deadlock. In previous SDK versions we defaulted to CPU count + # number of workflow threads, so deadlocking that many would prevent + # other code from executing. Now that we default to more workers, we + # can handle more work while some are deadlocked. + + # Start the workflows that deadlock + assert deadlock_fill_up_block_completed == 0 + handles = await asyncio.gather( + *[ + client.start_workflow( + DeadlockFillUpBlockWorkflow.run, + id=f"workflow-deadlock-{i}-{uuid.uuid4()}", + task_queue=worker.task_queue, + ) + for i in range(cpu_count + 5) + ] + ) + + # Wait for them all to deadlock + await asyncio.gather( + *[ + assert_task_fail_eventually(h, message_contains="deadlock") + for h in handles + ] + ) + + # Now try to run a regular non-deadlocked workflow. Before recent + # changes, this would also cause a deadlock because it would submit + # to the thread pool but the thread pool didn't have enough room. + assert "done" == await asyncio.wait_for( + client.execute_workflow( + DeadlockFillUpSimpleWorkflow.run, + id=f"workflow-simple-{uuid.uuid4()}", + task_queue=worker.task_queue, + ), + 10, + ) + + # Let the deadlocked ones complete too + deadlock_fill_up_block_event.set() + + async def check_completed(): + assert deadlock_fill_up_block_completed >= len(handles) + + await assert_eventually(check_completed) + completed_sec = time.monotonic() + # Confirm worker shutdown didn't hang + assert time.monotonic() - completed_sec < 20 + + +eviction_swallow_keep_looping = True + + +@workflow.defn(sandboxed=False) +class EvictionSwallowWorkflow: + @workflow.run + async def run(self) -> str: + # Start a task in the background that will prevent eviction because + # eviction requires all tasks complete + async def eviction_swallower(): + global eviction_swallow_keep_looping + while eviction_swallow_keep_looping: + try: + await workflow.wait_condition(lambda: False) + except BaseException: + # Swallow base exception intentionally which prevents + # eviction + pass + + asyncio.create_task(eviction_swallower()) + return "done" + + +async def test_workflow_eviction_swallow(client: Client): + # Add a queue handler to all logging, and remove later + log_queue: queue.Queue[logging.LogRecord] = queue.Queue() + log_handler = logging.handlers.QueueHandler(log_queue) + logging.getLogger().addHandler(log_handler) + try: + async with new_worker(client, EvictionSwallowWorkflow) as worker: + global eviction_swallow_keep_looping + assert eviction_swallow_keep_looping + + # Run workflow that completes but cannot evict + handle = await client.start_workflow( + EvictionSwallowWorkflow.run, + id=f"workflow-{uuid.uuid4()}", + task_queue=worker.task_queue, + ) + assert "done" == await handle.result() + + # Make sure we get the log we expect + async def check_logs(): + try: + while True: + log_record = log_queue.get(block=False) + if log_record.message.startswith( + f"Timed out running eviction job for run ID {handle.result_run_id}" + ): + return + except queue.Empty: + pass + assert False, "log record not found" + + await assert_eventually(check_logs) + + # Let it finish now + eviction_swallow_keep_looping = False + completed_sec = time.monotonic() + # Confirm worker shutdown didn't hang + assert time.monotonic() - completed_sec < 20 + finally: + logging.getLogger().removeHandler(log_handler) + + @activity.defn async def check_priority_activity(should_have_priorty: int) -> str: assert activity.info().priority.priority_key == should_have_priorty