From 6b881c971aced1be05b53c578bad29a1173c586b Mon Sep 17 00:00:00 2001 From: Evan Hicks Date: Thu, 11 Jun 2026 14:00:53 -0400 Subject: [PATCH 1/2] fix: Allow configuring the push_task_timeout --- clients/python/src/examples/cli.py | 1 + clients/python/src/taskbroker_client/worker/worker.py | 11 +++++++---- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/clients/python/src/examples/cli.py b/clients/python/src/examples/cli.py index 4c9fdc60..8f253d75 100644 --- a/clients/python/src/examples/cli.py +++ b/clients/python/src/examples/cli.py @@ -98,6 +98,7 @@ def worker(rpc_host: str, concurrency: int, push_mode: bool, grpc_port: int) -> processing_pool_name="examples", process_type="forkserver", grpc_port=grpc_port, + push_task_timeout=5, ) else: worker = TaskWorker( diff --git a/clients/python/src/taskbroker_client/worker/worker.py b/clients/python/src/taskbroker_client/worker/worker.py index f69ecc4b..345fcfee 100644 --- a/clients/python/src/taskbroker_client/worker/worker.py +++ b/clients/python/src/taskbroker_client/worker/worker.py @@ -58,8 +58,9 @@ class WorkerServicer(taskbroker_pb2_grpc.WorkerServiceServicer): gRPC servicer that receives task activations pushed from the broker """ - def __init__(self, worker: TaskWorkerProcessingPool) -> None: + def __init__(self, worker: TaskWorkerProcessingPool, push_task_timeout: float = 5) -> None: self.worker_pool = worker + self.push_task_timeout = push_task_timeout def PushTask( self, @@ -80,8 +81,8 @@ def PushTask( receive_timestamp=time.monotonic(), ) - # Push the task to the worker queue (wait at most 5 seconds) - if not self.worker_pool.push_task(inflight, timeout=5): + # Push the task to the worker queue (wait at most N seconds) + if not self.worker_pool.push_task(inflight, timeout=self.push_task_timeout): self.worker_pool._metrics.incr( "taskworker.worker.push_rpc", tags={"result": "busy", "processing_pool": self.worker_pool._processing_pool_name}, @@ -127,6 +128,7 @@ def __init__( app_module: str, broker_service: str, max_child_task_count: int | None = None, + push_task_timeout: float = 5, namespace: str | None = None, concurrency: int = 1, child_tasks_queue_maxsize: int = DEFAULT_WORKER_QUEUE_SIZE, @@ -192,6 +194,7 @@ def __init__( self._grpc_port = grpc_port self._grpc_secrets = parse_rpc_secret_list(app.config["rpc_secret"]) + self._push_task_timeout = push_task_timeout def _send_result( self, result: ProcessingResult, is_draining: bool = False @@ -318,7 +321,7 @@ def signal_handler(*args: Any) -> None: ) taskbroker_pb2_grpc.add_WorkerServiceServicer_to_server( - WorkerServicer(self.worker_pool), server + WorkerServicer(self.worker_pool, self._push_task_timeout), server ) # The health service is used by the K8s readiness check From fc940f5440dac3d3d15abd863d83e6d6b5f5323b Mon Sep 17 00:00:00 2001 From: Evan Hicks Date: Thu, 11 Jun 2026 14:26:47 -0400 Subject: [PATCH 2/2] move down --- clients/python/src/taskbroker_client/worker/worker.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/clients/python/src/taskbroker_client/worker/worker.py b/clients/python/src/taskbroker_client/worker/worker.py index 345fcfee..cd976b56 100644 --- a/clients/python/src/taskbroker_client/worker/worker.py +++ b/clients/python/src/taskbroker_client/worker/worker.py @@ -128,7 +128,6 @@ def __init__( app_module: str, broker_service: str, max_child_task_count: int | None = None, - push_task_timeout: float = 5, namespace: str | None = None, concurrency: int = 1, child_tasks_queue_maxsize: int = DEFAULT_WORKER_QUEUE_SIZE, @@ -140,6 +139,7 @@ def __init__( health_check_file_path: str | None = None, health_check_sec_per_touch: float = DEFAULT_WORKER_HEALTH_CHECK_SEC_PER_TOUCH, grpc_port: int = 50052, + push_task_timeout: float = 5, ) -> None: app = import_app(app_module)