diff --git a/clients/python/src/examples/cli.py b/clients/python/src/examples/cli.py index 35717a29..1689ef60 100644 --- a/clients/python/src/examples/cli.py +++ b/clients/python/src/examples/cli.py @@ -117,6 +117,7 @@ def worker( processing_pool_name="examples", process_type="forkserver", grpc_port=grpc_port, + push_task_timeout=5, ) else: worker = TaskWorker( diff --git a/clients/python/src/taskbroker_client/worker/worker.py b/clients/python/src/taskbroker_client/worker/worker.py index be69db2e..9c9b7fb6 100644 --- a/clients/python/src/taskbroker_client/worker/worker.py +++ b/clients/python/src/taskbroker_client/worker/worker.py @@ -59,8 +59,9 @@ class WorkerServicer(taskbroker_pb2_grpc.WorkerServiceServicer): gRPC servicer that receives task activations pushed from the broker """ - def __init__(self, worker: TaskWorkerProcessingPool) -> None: + def __init__(self, worker: TaskWorkerProcessingPool, push_task_timeout: float = 5) -> None: self.worker_pool = worker + self.push_task_timeout = push_task_timeout def PushTask( self, @@ -81,8 +82,8 @@ def PushTask( receive_timestamp=time.monotonic(), ) - # Push the task to the worker queue (wait at most 5 seconds) - if not self.worker_pool.push_task(inflight, timeout=5): + # Push the task to the worker queue (wait at most N seconds) + if not self.worker_pool.push_task(inflight, timeout=self.push_task_timeout): self.worker_pool._metrics.incr( "taskworker.worker.push_rpc", tags={"result": "busy", "processing_pool": self.worker_pool._processing_pool_name}, @@ -139,6 +140,7 @@ def __init__( health_check_file_path: str | None = None, health_check_sec_per_touch: float = DEFAULT_WORKER_HEALTH_CHECK_SEC_PER_TOUCH, grpc_port: int = 50052, + push_task_timeout: float = 5, update_in_batches: bool = False, ) -> None: app = import_app(app_module) @@ -195,6 +197,7 @@ def __init__( self._grpc_port = grpc_port self._grpc_secrets = parse_rpc_secret_list(app.config["rpc_secret"]) + self._push_task_timeout = push_task_timeout def _create_client( self, @@ -343,7 +346,7 @@ def signal_handler(*args: Any) -> None: ) taskbroker_pb2_grpc.add_WorkerServiceServicer_to_server( - WorkerServicer(self.worker_pool), server + WorkerServicer(self.worker_pool, self._push_task_timeout), server ) # The health service is used by the K8s readiness check