From 2d857cd3c895395d69cfd773e24b5585ec2906fd Mon Sep 17 00:00:00 2001 From: "rshaw@neuralmagic.com" Date: Tue, 31 Dec 2024 15:41:08 +0000 Subject: [PATCH 01/23] remove shutdown from LLMEngine --- vllm/v1/engine/async_llm.py | 7 ++++--- vllm/v1/engine/core_client.py | 4 +--- vllm/v1/engine/llm_engine.py | 10 ---------- vllm/v1/utils.py | 3 --- 4 files changed, 5 insertions(+), 19 deletions(-) diff --git a/vllm/v1/engine/async_llm.py b/vllm/v1/engine/async_llm.py index 3f097ca7f439..40f2a12839f2 100644 --- a/vllm/v1/engine/async_llm.py +++ b/vllm/v1/engine/async_llm.py @@ -1,6 +1,7 @@ import asyncio import os import signal +import weakref from typing import AsyncGenerator, Dict, List, Mapping, Optional, Type, Union from vllm.config import ModelConfig, VllmConfig @@ -41,6 +42,9 @@ def __init__( log_requests: bool = True, start_engine_loop: bool = True, ) -> None: + # Call self.shutdown at exit to clean up + # and ensure workers will be terminated. + self._finalizer = weakref.finalize(self, self.shutdown) # The child processes will send SIGQUIT when unrecoverable # errors happen. We kill the process tree here so that the @@ -103,9 +107,6 @@ def sigquit_handler(signum, frame): self.output_handler: Optional[asyncio.Task] = None - def __del__(self): - self.shutdown() - @classmethod def from_engine_args( cls, diff --git a/vllm/v1/engine/core_client.py b/vllm/v1/engine/core_client.py index 3293205e110a..302bbc09c5ea 100644 --- a/vllm/v1/engine/core_client.py +++ b/vllm/v1/engine/core_client.py @@ -1,5 +1,6 @@ from typing import List, Optional, Type +import weakref import msgspec import zmq import zmq.asyncio @@ -107,9 +108,6 @@ def abort_requests(self, request_ids: List[str]) -> None: def shutdown(self): self.engine_core.shutdown() - def __del__(self): - self.shutdown() - def profile(self, is_start: bool = True) -> None: self.engine_core.profile(is_start) diff --git a/vllm/v1/engine/llm_engine.py b/vllm/v1/engine/llm_engine.py index a19109559eab..cc86e6491d86 100644 --- a/vllm/v1/engine/llm_engine.py +++ b/vllm/v1/engine/llm_engine.py @@ -28,7 +28,6 @@ class LLMEngine: - """Legacy LLMEngine for backwards compatibility.""" def __init__( self, @@ -42,8 +41,6 @@ def __init__( use_cached_outputs: bool = False, multiprocess_mode: bool = False, ) -> None: - - # TODO: Can we avoid this? self.model_config = vllm_config.model_config # Tokenizer (+ ensure liveness if running in another process). @@ -205,10 +202,3 @@ def get_tokenizer_group( f"found type: {type(tokenizer_group)}") return tokenizer_group - - def __del__(self): - self.shutdown() - - def shutdown(self): - if engine_core := getattr(self, "engine_core", None): - engine_core.shutdown() diff --git a/vllm/v1/utils.py b/vllm/v1/utils.py index 19e0dd17237c..d7d9cf82b04c 100644 --- a/vllm/v1/utils.py +++ b/vllm/v1/utils.py @@ -114,9 +114,6 @@ def __init__( raise RuntimeError(f"{process_name} initialization failed. " "See root cause above.") - def __del__(self): - self.shutdown() - def shutdown(self): # Shutdown the process if needed. if hasattr(self, "proc") and self.proc.is_alive(): From 9e70c5f9cf606ab24c6e28ae7df952c47b68c26b Mon Sep 17 00:00:00 2001 From: "rshaw@neuralmagic.com" Date: Tue, 31 Dec 2024 15:43:52 +0000 Subject: [PATCH 02/23] format --- vllm/v1/engine/core_client.py | 1 - vllm/v1/utils.py | 1 + 2 files changed, 1 insertion(+), 1 deletion(-) diff --git a/vllm/v1/engine/core_client.py b/vllm/v1/engine/core_client.py index 302bbc09c5ea..e4a61a521f3f 100644 --- a/vllm/v1/engine/core_client.py +++ b/vllm/v1/engine/core_client.py @@ -1,6 +1,5 @@ from typing import List, Optional, Type -import weakref import msgspec import zmq import zmq.asyncio diff --git a/vllm/v1/utils.py b/vllm/v1/utils.py index d7d9cf82b04c..76de4d171bf5 100644 --- a/vllm/v1/utils.py +++ b/vllm/v1/utils.py @@ -91,6 +91,7 @@ def __init__( target_fn: Callable, process_kwargs: Dict[Any, Any], ): + # Ensure cleanup of background process during GC. self._finalizer = weakref.finalize(self, self.shutdown) context = get_mp_context() From f34875c115f4b7f7bb0ac6c608eae826b9c96ccd Mon Sep 17 00:00:00 2001 From: "rshaw@neuralmagic.com" Date: Tue, 31 Dec 2024 15:52:54 +0000 Subject: [PATCH 03/23] no need for shutdown in asyncllm --- vllm/v1/engine/async_llm.py | 11 +---------- vllm/v1/engine/llm_engine.py | 3 +++ 2 files changed, 4 insertions(+), 10 deletions(-) diff --git a/vllm/v1/engine/async_llm.py b/vllm/v1/engine/async_llm.py index 40f2a12839f2..6f55bcc65009 100644 --- a/vllm/v1/engine/async_llm.py +++ b/vllm/v1/engine/async_llm.py @@ -44,7 +44,7 @@ def __init__( ) -> None: # Call self.shutdown at exit to clean up # and ensure workers will be terminated. - self._finalizer = weakref.finalize(self, self.shutdown) + # self._finalizer = weakref.finalize(self, self.shutdown) # The child processes will send SIGQUIT when unrecoverable # errors happen. We kill the process tree here so that the @@ -137,15 +137,6 @@ def from_engine_args( stat_loggers=stat_loggers, ) - def shutdown(self): - """Shutdown, cleaning up the background proc and IPC.""" - - if engine_core := getattr(self, "engine_core", None): - engine_core.shutdown() - - if handler := getattr(self, "output_handler", None): - handler.cancel() - @classmethod def _get_executor_cls(cls, vllm_config: VllmConfig) -> Type[Executor]: executor_class: Type[Executor] diff --git a/vllm/v1/engine/llm_engine.py b/vllm/v1/engine/llm_engine.py index cc86e6491d86..1f49de67d749 100644 --- a/vllm/v1/engine/llm_engine.py +++ b/vllm/v1/engine/llm_engine.py @@ -28,6 +28,7 @@ class LLMEngine: + """Legacy LLMEngine for backwards compatibility.""" def __init__( self, @@ -41,6 +42,8 @@ def __init__( use_cached_outputs: bool = False, multiprocess_mode: bool = False, ) -> None: + + # TODO: Can we avoid this? self.model_config = vllm_config.model_config # Tokenizer (+ ensure liveness if running in another process). From 7a777d9ea0f83f706ac6ce3d6eff23f61d61d933 Mon Sep 17 00:00:00 2001 From: "rshaw@neuralmagic.com" Date: Tue, 31 Dec 2024 15:53:40 +0000 Subject: [PATCH 04/23] remove from asyncllm --- vllm/v1/engine/async_llm.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/vllm/v1/engine/async_llm.py b/vllm/v1/engine/async_llm.py index 6f55bcc65009..8916f94d3491 100644 --- a/vllm/v1/engine/async_llm.py +++ b/vllm/v1/engine/async_llm.py @@ -1,7 +1,6 @@ import asyncio import os import signal -import weakref from typing import AsyncGenerator, Dict, List, Mapping, Optional, Type, Union from vllm.config import ModelConfig, VllmConfig @@ -42,9 +41,6 @@ def __init__( log_requests: bool = True, start_engine_loop: bool = True, ) -> None: - # Call self.shutdown at exit to clean up - # and ensure workers will be terminated. - # self._finalizer = weakref.finalize(self, self.shutdown) # The child processes will send SIGQUIT when unrecoverable # errors happen. We kill the process tree here so that the From dfc9deed21aeac4f1fe4dfb5714b3bf3bc6edf0a Mon Sep 17 00:00:00 2001 From: "rshaw@neuralmagic.com" Date: Tue, 31 Dec 2024 16:01:19 +0000 Subject: [PATCH 05/23] stash --- vllm/v1/engine/core.py | 171 +++++++++++++++++++++++++++------- vllm/v1/engine/core_client.py | 2 +- 2 files changed, 140 insertions(+), 33 deletions(-) diff --git a/vllm/v1/engine/core.py b/vllm/v1/engine/core.py index 5840541d774b..ff1b587686dc 100644 --- a/vllm/v1/engine/core.py +++ b/vllm/v1/engine/core.py @@ -1,29 +1,29 @@ -import pickle import queue import signal import threading import time +from abc import ABC, abstractmethod from multiprocessing.connection import Connection -from typing import List, Tuple, Type +from typing import List, Optional, Tuple, Type import psutil import zmq -import zmq.asyncio from msgspec import msgpack from vllm.config import CacheConfig, VllmConfig from vllm.logger import init_logger from vllm.transformers_utils.config import ( maybe_register_config_serialize_by_value) -from vllm.utils import get_exception_traceback, zmq_socket_ctx +from vllm.utils import get_exception_traceback, make_zmq_socket, zmq_socket_ctx from vllm.v1.core.scheduler import Scheduler -from vllm.v1.engine import (EngineCoreOutput, EngineCoreOutputs, - EngineCoreProfile, EngineCoreRequest, - EngineCoreRequestType, EngineCoreRequestUnion) +from vllm.v1.engine import (EngineCoreAbort, EngineCoreOutput, + EngineCoreOutputs, EngineCoreProfile, + EngineCoreRequest, EngineCoreRequestType, + EngineCoreRequestUnion) from vllm.v1.engine.mm_input_mapper import MMInputMapperServer from vllm.v1.executor.abstract import Executor from vllm.v1.request import Request, RequestStatus -from vllm.v1.serial_utils import PickleEncoder +from vllm.v1.utils import BackgroundProcHandle from vllm.version import __version__ as VLLM_VERSION logger = init_logger(__name__) @@ -127,7 +127,8 @@ def step(self) -> List[EngineCoreOutput]: return engine_core_outputs def shutdown(self): - self.model_executor.shutdown() + pass + # self.model_executor.shutdown() def profile(self, is_start: bool = True): self.model_executor.profile(is_start) @@ -164,6 +165,24 @@ def __init__( # Send Readiness signal to EngineClient. ready_pipe.send({"status": "READY"}) + @staticmethod + def make_process( + vllm_config: VllmConfig, + executor_class: Type[Executor], + input_path: str, + output_path: str, + log_stats: bool, + ) -> BackgroundProcHandle: + return BackgroundProcHandle(input_path=input_path, + output_path=output_path, + process_name="EngineCore", + target_fn=EngineCoreProc.run_engine_core, + process_kwargs={ + "vllm_config": vllm_config, + "executor_class": executor_class, + "log_stats": log_stats, + }) + @staticmethod def run_engine_core(*args, **kwargs): """Launch EngineCore busy loop in background process.""" @@ -260,36 +279,18 @@ def _handle_client_request(self, request: EngineCoreRequestUnion) -> None: self.add_request(request) elif isinstance(request, EngineCoreProfile): self.model_executor.profile(request.is_start) + elif isinstance(request, EngineCoreAbort): + self.abort_requests(request.request_ids) else: - # TODO: make an EngineCoreAbort wrapper - assert isinstance(request, list) - self.abort_requests(request) + raise ValueError("Unknown request type: {request}") def process_input_socket(self, input_path: str): """Input socket IO thread.""" - # Msgpack serialization decoding. - decoder_add_req = PickleEncoder() - decoder_abort_req = PickleEncoder() - with zmq_socket_ctx(input_path, zmq.constants.PULL) as socket: while True: - # (RequestType, RequestData) - type_frame, data_frame = socket.recv_multipart(copy=False) - request_type = type_frame.buffer - request_data = data_frame.buffer - - # Deserialize the request data. - if request_type == EngineCoreRequestType.ADD.value: - request = decoder_add_req.decode(request_data) - elif request_type == EngineCoreRequestType.ABORT.value: - request = decoder_abort_req.decode(request_data) - elif request_type == EngineCoreRequestType.PROFILE.value: - request = pickle.loads(request_data) - else: - raise ValueError(f"Unknown RequestType: {request_type}") - # Push to input queue for core busy loop. + request = socket.recv_pyobj() self.input_queue.put_nowait(request) def process_output_socket(self, output_path: str): @@ -305,4 +306,110 @@ def process_output_socket(self, output_path: str): engine_core_outputs = self.output_queue.get() outputs = EngineCoreOutputs(outputs=engine_core_outputs) encoder.encode_into(outputs, buffer) - socket.send_multipart((buffer, ), copy=False) + msg = (EngineCoreRequestType.FROM_ENGINE_CORE.value, buffer) + socket.send_multipart(msg, copy=False) + + +class EngineCoreClient(ABC): + """Client used To interact with EngineCore.""" + + @abstractmethod + def get_output(self) -> List[EngineCoreOutput]: + ... + + @abstractmethod + def add_request(self, request: EngineCoreRequest) -> None: + ... + + @abstractmethod + def abort_requests(self, request_ids: List[str]) -> None: + ... + + @abstractmethod + def profile(self, is_start: bool = True) -> None: + ... + + @abstractmethod + def shutdown(self): + ... + + +class InprocEngineCoreClient(EngineCoreClient): + """ + InprocClient: client for in-process EngineCore. Intended + for use in LLMEngine for V0-style add_request() and step() + EngineCore setup in this process (no busy loop). + * pushes EngineCoreRequest directly into the EngineCore + * pulls EngineCoreOutputs by stepping the EngineCore + """ + + def __init__(self, engine_core: EngineCore): + self.engine_core = engine_core + + def get_output(self) -> List[EngineCoreOutput]: + return self.engine_core.step() + + def add_request(self, request: EngineCoreRequest) -> None: + self.engine_core.add_request(request) + + def abort_requests(self, request_ids: List[str]) -> None: + self.engine_core.abort_requests(request_ids) + + def profile(self, is_start: bool = True) -> None: + self.engine_core.profile(is_start) + + def shutdown(self): + self.engine_core.shutdown() + + +class MpEngineCoreClient(EngineCoreClient): + """ + MPClient: client for multi-proc EngineCore. + EngineCore runs in a background process busy loop, getting + new EngineCoreRequests and returning EngineCoreOutputs + + * pushes EngineCoreRequests via input_socket + * pulls EngineCoreOutputs via output_socket + """ + + def __init__( + self, + input_path: str, + output_path: str, + proc_handle: Optional[BackgroundProcHandle] = None, + ) -> None: + + # Use msgpack for hotpath serialization. + self.decoder = msgpack.Decoder(EngineCoreOutputs) + + # Setup ZMQ IO. + self.ctx = zmq.Context(io_threads=2) # type: ignore[attr-defined] + self.input_socket = make_zmq_socket(self.ctx, input_path, + zmq.constants.PUSH) + self.output_socket = make_zmq_socket(self.ctx, output_path, + zmq.constants.PULL) + + # Optionally hold the proc handle for cleanup at shutdown(). + self.proc_handle = proc_handle + + def get_output(self) -> List[EngineCoreOutput]: + # TODO(rob): use copy=False + (msg_type, msg_bytes) = self.output_socket.recv_multipart() + assert msg_type == EngineCoreRequestType.FROM_ENGINE_CORE.value + return self.decoder.decode(msg_bytes).outputs + + def add_request(self, request: EngineCoreRequest) -> None: + self.input_socket.send_pyobj(request) + + def abort_requests(self, request_ids: List[str]) -> None: + self.input_socket.send_pyobj(EngineCoreAbort(request_ids)) + + def profile(self, is_start: bool = True) -> None: + self.input_socket.send_pyobj(EngineCoreProfile(is_start)) + + def shutdown(self) -> None: + if hasattr(self, "ctx"): + self.ctx.destroy(linger=0) + + if hasattr(self, "proc_handle") and self.proc_handle: + self.proc_handle.shutdown() diff --git a/vllm/v1/engine/core_client.py b/vllm/v1/engine/core_client.py index e4a61a521f3f..40b69750c405 100644 --- a/vllm/v1/engine/core_client.py +++ b/vllm/v1/engine/core_client.py @@ -105,7 +105,7 @@ def abort_requests(self, request_ids: List[str]) -> None: self.engine_core.abort_requests(request_ids) def shutdown(self): - self.engine_core.shutdown() + pass def profile(self, is_start: bool = True) -> None: self.engine_core.profile(is_start) From c72b45a51416f8d918757e16416bc2de58823dc2 Mon Sep 17 00:00:00 2001 From: "rshaw@neuralmagic.com" Date: Tue, 31 Dec 2024 16:07:53 +0000 Subject: [PATCH 06/23] update --- vllm/v1/engine/core.py | 178 ++++++---------------------------- vllm/v1/engine/core_client.py | 18 +--- 2 files changed, 36 insertions(+), 160 deletions(-) diff --git a/vllm/v1/engine/core.py b/vllm/v1/engine/core.py index ff1b587686dc..dcd758a938fe 100644 --- a/vllm/v1/engine/core.py +++ b/vllm/v1/engine/core.py @@ -1,29 +1,29 @@ +import pickle import queue import signal import threading import time -from abc import ABC, abstractmethod from multiprocessing.connection import Connection -from typing import List, Optional, Tuple, Type +from typing import List, Tuple, Type import psutil import zmq +import zmq.asyncio from msgspec import msgpack from vllm.config import CacheConfig, VllmConfig from vllm.logger import init_logger from vllm.transformers_utils.config import ( maybe_register_config_serialize_by_value) -from vllm.utils import get_exception_traceback, make_zmq_socket, zmq_socket_ctx +from vllm.utils import get_exception_traceback, zmq_socket_ctx from vllm.v1.core.scheduler import Scheduler -from vllm.v1.engine import (EngineCoreAbort, EngineCoreOutput, - EngineCoreOutputs, EngineCoreProfile, - EngineCoreRequest, EngineCoreRequestType, - EngineCoreRequestUnion) +from vllm.v1.engine import (EngineCoreOutput, EngineCoreOutputs, + EngineCoreProfile, EngineCoreRequest, + EngineCoreRequestType, EngineCoreRequestUnion) from vllm.v1.engine.mm_input_mapper import MMInputMapperServer from vllm.v1.executor.abstract import Executor from vllm.v1.request import Request, RequestStatus -from vllm.v1.utils import BackgroundProcHandle +from vllm.v1.serial_utils import PickleEncoder from vllm.version import __version__ as VLLM_VERSION logger = init_logger(__name__) @@ -126,10 +126,6 @@ def step(self) -> List[EngineCoreOutput]: scheduler_output, output) return engine_core_outputs - def shutdown(self): - pass - # self.model_executor.shutdown() - def profile(self, is_start: bool = True): self.model_executor.profile(is_start) @@ -165,24 +161,6 @@ def __init__( # Send Readiness signal to EngineClient. ready_pipe.send({"status": "READY"}) - @staticmethod - def make_process( - vllm_config: VllmConfig, - executor_class: Type[Executor], - input_path: str, - output_path: str, - log_stats: bool, - ) -> BackgroundProcHandle: - return BackgroundProcHandle(input_path=input_path, - output_path=output_path, - process_name="EngineCore", - target_fn=EngineCoreProc.run_engine_core, - process_kwargs={ - "vllm_config": vllm_config, - "executor_class": executor_class, - "log_stats": log_stats, - }) - @staticmethod def run_engine_core(*args, **kwargs): """Launch EngineCore busy loop in background process.""" @@ -206,7 +184,6 @@ def signal_handler(signum, frame): signal.signal(signal.SIGINT, signal_handler) parent_process = psutil.Process().parent() - engine_core = None try: engine_core = EngineCoreProc(*args, **kwargs) engine_core.run_busy_loop() @@ -219,11 +196,6 @@ def signal_handler(signum, frame): logger.error("EngineCore hit an exception: %s", traceback) parent_process.send_signal(signal.SIGQUIT) - finally: - if engine_core is not None: - engine_core.shutdown() - engine_core = None - def run_busy_loop(self): """Core busy loop of the EngineCore.""" @@ -279,18 +251,36 @@ def _handle_client_request(self, request: EngineCoreRequestUnion) -> None: self.add_request(request) elif isinstance(request, EngineCoreProfile): self.model_executor.profile(request.is_start) - elif isinstance(request, EngineCoreAbort): - self.abort_requests(request.request_ids) else: - raise ValueError("Unknown request type: {request}") + # TODO: make an EngineCoreAbort wrapper + assert isinstance(request, list) + self.abort_requests(request) def process_input_socket(self, input_path: str): """Input socket IO thread.""" + # Msgpack serialization decoding. + decoder_add_req = PickleEncoder() + decoder_abort_req = PickleEncoder() + with zmq_socket_ctx(input_path, zmq.constants.PULL) as socket: while True: + # (RequestType, RequestData) + type_frame, data_frame = socket.recv_multipart(copy=False) + request_type = type_frame.buffer + request_data = data_frame.buffer + + # Deserialize the request data. + if request_type == EngineCoreRequestType.ADD.value: + request = decoder_add_req.decode(request_data) + elif request_type == EngineCoreRequestType.ABORT.value: + request = decoder_abort_req.decode(request_data) + elif request_type == EngineCoreRequestType.PROFILE.value: + request = pickle.loads(request_data) + else: + raise ValueError(f"Unknown RequestType: {request_type}") + # Push to input queue for core busy loop. - request = socket.recv_pyobj() self.input_queue.put_nowait(request) def process_output_socket(self, output_path: str): @@ -306,110 +296,4 @@ def process_output_socket(self, output_path: str): engine_core_outputs = self.output_queue.get() outputs = EngineCoreOutputs(outputs=engine_core_outputs) encoder.encode_into(outputs, buffer) - msg = (EngineCoreRequestType.FROM_ENGINE_CORE.value, buffer) - socket.send_multipart(msg, copy=False) - - -class EngineCoreClient(ABC): - """Client used To interact with EngineCore.""" - - @abstractmethod - def get_output(self) -> List[EngineCoreOutput]: - ... - - @abstractmethod - def add_request(self, request: EngineCoreRequest) -> None: - ... - - @abstractmethod - def abort_requests(self, request_ids: List[str]) -> None: - ... - - @abstractmethod - def profile(self, is_start: bool = True) -> None: - ... - - @abstractmethod - def shutdown(self): - ... - - -class InprocEngineCoreClient(EngineCoreClient): - """ - InprocClient: client for in-process EngineCore. Intended - for use in LLMEngine for V0-style add_request() and step() - EngineCore setup in this process (no busy loop). - * pushes EngineCoreRequest directly into the EngineCore - * pulls EngineCoreOutputs by stepping the EngineCore - """ - - def __init__(self, engine_core: EngineCore): - self.engine_core = engine_core - - def get_output(self) -> List[EngineCoreOutput]: - return self.engine_core.step() - - def add_request(self, request: EngineCoreRequest) -> None: - self.engine_core.add_request(request) - - def abort_requests(self, request_ids: List[str]) -> None: - self.engine_core.abort_requests(request_ids) - - def profile(self, is_start: bool = True) -> None: - self.engine_core.profile(is_start) - - def shutdown(self): - self.engine_core.shutdown() - - -class MpEngineCoreClient(EngineCoreClient): - """ - MPClient: client for multi-proc EngineCore. - EngineCore runs in a background process busy loop, getting - new EngineCoreRequests and returning EngineCoreOutputs - - * pushes EngineCoreRequests via input_socket - * pulls EngineCoreOutputs via output_socket - """ - - def __init__( - self, - input_path: str, - output_path: str, - proc_handle: Optional[BackgroundProcHandle] = None, - ) -> None: - - # Use msgpack for hotpath serialization. - self.decoder = msgpack.Decoder(EngineCoreOutputs) - - # Setup ZMQ IO. - self.ctx = zmq.Context(io_threads=2) # type: ignore[attr-defined] - self.input_socket = make_zmq_socket(self.ctx, input_path, - zmq.constants.PUSH) - self.output_socket = make_zmq_socket(self.ctx, output_path, - zmq.constants.PULL) - - # Optionally hold the proc handle for cleanup at shutdown(). - self.proc_handle = proc_handle - - def get_output(self) -> List[EngineCoreOutput]: - # TODO(rob): use copy=False - (msg_type, msg_bytes) = self.output_socket.recv_multipart() - assert msg_type == EngineCoreRequestType.FROM_ENGINE_CORE.value - return self.decoder.decode(msg_bytes).outputs - - def add_request(self, request: EngineCoreRequest) -> None: - self.input_socket.send_pyobj(request) - - def abort_requests(self, request_ids: List[str]) -> None: - self.input_socket.send_pyobj(EngineCoreAbort(request_ids)) - - def profile(self, is_start: bool = True) -> None: - self.input_socket.send_pyobj(EngineCoreProfile(is_start)) - - def shutdown(self) -> None: - if hasattr(self, "ctx"): - self.ctx.destroy(linger=0) - - if hasattr(self, "proc_handle") and self.proc_handle: - self.proc_handle.shutdown() + socket.send_multipart((buffer, ), copy=False) diff --git a/vllm/v1/engine/core_client.py b/vllm/v1/engine/core_client.py index 40b69750c405..3f2b2457d3fe 100644 --- a/vllm/v1/engine/core_client.py +++ b/vllm/v1/engine/core_client.py @@ -1,4 +1,5 @@ -from typing import List, Optional, Type +import weakref +from typing import List, Type import msgspec import zmq @@ -52,9 +53,6 @@ def make_client( return InprocClient(vllm_config, executor_class, log_stats) - def shutdown(self): - pass - def get_output(self) -> List[EngineCoreOutput]: raise NotImplementedError @@ -104,9 +102,6 @@ def add_request(self, request: EngineCoreRequest) -> None: def abort_requests(self, request_ids: List[str]) -> None: self.engine_core.abort_requests(request_ids) - def shutdown(self): - pass - def profile(self, is_start: bool = True) -> None: self.engine_core.profile(is_start) @@ -131,6 +126,9 @@ def __init__( executor_class: Type[Executor], log_stats: bool = False, ): + # Ensure cleanup of ZMQ during GC. + self._finalizer = weakref.finalize(self, self.shutdown) + # Serialization setup. self.encoder = PickleEncoder() self.decoder = msgspec.msgpack.Decoder(EngineCoreOutputs) @@ -150,7 +148,6 @@ def __init__( zmq.constants.PUSH) # Start EngineCore in background process. - self.proc_handle: Optional[BackgroundProcHandle] self.proc_handle = BackgroundProcHandle( input_path=input_path, output_path=output_path, @@ -163,13 +160,8 @@ def __init__( }) def shutdown(self): - # Shut down the zmq context. self.ctx.destroy(linger=0) - if hasattr(self, "proc_handle") and self.proc_handle: - self.proc_handle.shutdown() - self.proc_handle = None - class SyncMPClient(MPClient): """Synchronous client for multi-proc EngineCore.""" From 4e2dc000707e0c7f488020efa6fd9e3edb126714 Mon Sep 17 00:00:00 2001 From: "rshaw@neuralmagic.com" Date: Tue, 31 Dec 2024 21:37:51 +0000 Subject: [PATCH 07/23] fix --- vllm/entrypoints/llm.py | 5 ----- 1 file changed, 5 deletions(-) diff --git a/vllm/entrypoints/llm.py b/vllm/entrypoints/llm.py index fadf297e9f6a..7c0de3b3e548 100644 --- a/vllm/entrypoints/llm.py +++ b/vllm/entrypoints/llm.py @@ -232,11 +232,6 @@ def __init__( self.request_counter = Counter() - def __del__(self): - if hasattr(self, 'llm_engine') and self.llm_engine and hasattr( - self.llm_engine, "shutdown"): - self.llm_engine.shutdown() - @staticmethod def get_engine_class() -> Type[LLMEngine]: if envs.VLLM_USE_V1: From 0b4b6af9613bb98a1e63d831b694bcde649b535c Mon Sep 17 00:00:00 2001 From: "rshaw@neuralmagic.com" Date: Tue, 31 Dec 2024 23:04:56 +0000 Subject: [PATCH 08/23] added back explicit del --- vllm/v1/executor/multiproc_executor.py | 3 +++ vllm/v1/utils.py | 4 ++++ 2 files changed, 7 insertions(+) diff --git a/vllm/v1/executor/multiproc_executor.py b/vllm/v1/executor/multiproc_executor.py index ed64e7741390..cc2304728f5e 100644 --- a/vllm/v1/executor/multiproc_executor.py +++ b/vllm/v1/executor/multiproc_executor.py @@ -300,6 +300,9 @@ def shutdown(self): self.worker_response_mq = None destroy_model_parallel() destroy_distributed_environment() + + def __del__(self): + self.shutdown() @staticmethod def worker_main(*args, **kwargs): diff --git a/vllm/v1/utils.py b/vllm/v1/utils.py index 76de4d171bf5..a189c2eb01b4 100644 --- a/vllm/v1/utils.py +++ b/vllm/v1/utils.py @@ -130,3 +130,7 @@ def shutdown(self): socket_file = ipc_socket.replace("ipc://", "") if os and os.path.exists(socket_file): os.remove(socket_file) + + def __del__(self): + print("CALLED DEL") + self.shutdown() From 4c445af76310ec7e6e9c40404a5b245e887dd404 Mon Sep 17 00:00:00 2001 From: "rshaw@neuralmagic.com" Date: Tue, 31 Dec 2024 23:33:36 +0000 Subject: [PATCH 09/23] stash --- vllm/v1/engine/core_client.py | 8 ++-- vllm/v1/executor/multiproc_executor.py | 3 -- vllm/v1/utils.py | 52 +++++++++++++------------- 3 files changed, 29 insertions(+), 34 deletions(-) diff --git a/vllm/v1/engine/core_client.py b/vllm/v1/engine/core_client.py index 3f2b2457d3fe..a0607215bf19 100644 --- a/vllm/v1/engine/core_client.py +++ b/vllm/v1/engine/core_client.py @@ -126,8 +126,8 @@ def __init__( executor_class: Type[Executor], log_stats: bool = False, ): - # Ensure cleanup of ZMQ during GC. - self._finalizer = weakref.finalize(self, self.shutdown) + # # Ensure cleanup of ZMQ during GC. + # self._finalizer = weakref.finalize(self, self.shutdown) # Serialization setup. self.encoder = PickleEncoder() @@ -159,8 +159,8 @@ def __init__( "log_stats": log_stats, }) - def shutdown(self): - self.ctx.destroy(linger=0) + # def shutdown(self): + # self.ctx.destroy(linger=0) class SyncMPClient(MPClient): diff --git a/vllm/v1/executor/multiproc_executor.py b/vllm/v1/executor/multiproc_executor.py index cc2304728f5e..ed64e7741390 100644 --- a/vllm/v1/executor/multiproc_executor.py +++ b/vllm/v1/executor/multiproc_executor.py @@ -300,9 +300,6 @@ def shutdown(self): self.worker_response_mq = None destroy_model_parallel() destroy_distributed_environment() - - def __del__(self): - self.shutdown() @staticmethod def worker_main(*args, **kwargs): diff --git a/vllm/v1/utils.py b/vllm/v1/utils.py index a189c2eb01b4..efe5e3880fb5 100644 --- a/vllm/v1/utils.py +++ b/vllm/v1/utils.py @@ -91,9 +91,6 @@ def __init__( target_fn: Callable, process_kwargs: Dict[Any, Any], ): - # Ensure cleanup of background process during GC. - self._finalizer = weakref.finalize(self, self.shutdown) - context = get_mp_context() reader, writer = context.Pipe(duplex=False) @@ -103,34 +100,35 @@ def __init__( process_kwargs["ready_pipe"] = writer process_kwargs["input_path"] = input_path process_kwargs["output_path"] = output_path - self.input_path = input_path - self.output_path = output_path + # self.input_path = input_path + # self.output_path = output_path - # Run Detokenizer busy loop in background process. - self.proc = context.Process(target=target_fn, kwargs=process_kwargs) - self.proc.start() + # Run busy loop in background process. + proc = context.Process(target=target_fn, kwargs=process_kwargs) + self._finalizer = weakref.finalize( + self, shutdown, proc, input_path, output_path) + proc.start() # Wait for startup. if reader.recv()["status"] != "READY": raise RuntimeError(f"{process_name} initialization failed. " "See root cause above.") - def shutdown(self): - # Shutdown the process if needed. - if hasattr(self, "proc") and self.proc.is_alive(): - self.proc.terminate() - self.proc.join(5) - - if self.proc.is_alive(): - kill_process_tree(self.proc.pid) - - # Remove zmq ipc socket files - ipc_sockets = [self.output_path, self.input_path] - for ipc_socket in ipc_sockets: - socket_file = ipc_socket.replace("ipc://", "") - if os and os.path.exists(socket_file): - os.remove(socket_file) - - def __del__(self): - print("CALLED DEL") - self.shutdown() + +# Note(rob): shutdown function cannot be a bound method, +# else the gc cannot collect the object. +def shutdown(proc, input_path, output_path): + # Shutdown the process. + if proc.is_alive(): + proc.terminate() + proc.join(5) + + if proc.is_alive(): + kill_process_tree(proc.pid) + + # Remove zmq ipc socket files + ipc_sockets = [output_path, input_path] + for ipc_socket in ipc_sockets: + socket_file = ipc_socket.replace("ipc://", "") + if os and os.path.exists(socket_file): + os.remove(socket_file) From 567b424b3ed9f2bacfc91e05cdb3b6afd9b9d59d Mon Sep 17 00:00:00 2001 From: "rshaw@neuralmagic.com" Date: Wed, 1 Jan 2025 14:11:46 +0000 Subject: [PATCH 10/23] working --- vllm/v1/engine/core_client.py | 22 ++++++++++------------ vllm/v1/utils.py | 9 ++++----- 2 files changed, 14 insertions(+), 17 deletions(-) diff --git a/vllm/v1/engine/core_client.py b/vllm/v1/engine/core_client.py index a0607215bf19..a6f71770eeb4 100644 --- a/vllm/v1/engine/core_client.py +++ b/vllm/v1/engine/core_client.py @@ -126,25 +126,26 @@ def __init__( executor_class: Type[Executor], log_stats: bool = False, ): - # # Ensure cleanup of ZMQ during GC. - # self._finalizer = weakref.finalize(self, self.shutdown) - # Serialization setup. self.encoder = PickleEncoder() self.decoder = msgspec.msgpack.Decoder(EngineCoreOutputs) # ZMQ setup. - if asyncio_mode: - self.ctx = zmq.asyncio.Context() - else: - self.ctx = zmq.Context() # type: ignore[attr-defined] + ctx = ( + zmq.asyncio.Context() # type: ignore[attr-defined] + if asyncio_mode else zmq.Context()) # type: ignore[attr-defined] + + # Note(rob): shutdown function cannot be a bound method, + # else the gc cannot collect the object. + self._finalizer = weakref.finalize(self, lambda x: x.destroy(linger=0), + ctx) # Paths and sockets for IPC. output_path = get_open_zmq_ipc_path() input_path = get_open_zmq_ipc_path() - self.output_socket = make_zmq_socket(self.ctx, output_path, + self.output_socket = make_zmq_socket(ctx, output_path, zmq.constants.PULL) - self.input_socket = make_zmq_socket(self.ctx, input_path, + self.input_socket = make_zmq_socket(ctx, input_path, zmq.constants.PUSH) # Start EngineCore in background process. @@ -159,9 +160,6 @@ def __init__( "log_stats": log_stats, }) - # def shutdown(self): - # self.ctx.destroy(linger=0) - class SyncMPClient(MPClient): """Synchronous client for multi-proc EngineCore.""" diff --git a/vllm/v1/utils.py b/vllm/v1/utils.py index efe5e3880fb5..4cf1c9198508 100644 --- a/vllm/v1/utils.py +++ b/vllm/v1/utils.py @@ -1,3 +1,4 @@ +import multiprocessing import os import weakref from collections.abc import Sequence @@ -100,13 +101,11 @@ def __init__( process_kwargs["ready_pipe"] = writer process_kwargs["input_path"] = input_path process_kwargs["output_path"] = output_path - # self.input_path = input_path - # self.output_path = output_path # Run busy loop in background process. proc = context.Process(target=target_fn, kwargs=process_kwargs) - self._finalizer = weakref.finalize( - self, shutdown, proc, input_path, output_path) + self._finalizer = weakref.finalize(self, shutdown, proc, input_path, + output_path) proc.start() # Wait for startup. @@ -117,7 +116,7 @@ def __init__( # Note(rob): shutdown function cannot be a bound method, # else the gc cannot collect the object. -def shutdown(proc, input_path, output_path): +def shutdown(proc: multiprocessing.Process, input_path: str, output_path: str): # Shutdown the process. if proc.is_alive(): proc.terminate() From 7d04b981d6ddcf7d4c6b0101acc213ba1ab561e2 Mon Sep 17 00:00:00 2001 From: "rshaw@neuralmagic.com" Date: Fri, 3 Jan 2025 12:52:18 +0000 Subject: [PATCH 11/23] fix failing test --- tests/v1/engine/test_async_llm.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/tests/v1/engine/test_async_llm.py b/tests/v1/engine/test_async_llm.py index fffb5b8100ec..70c6a32dc48e 100644 --- a/tests/v1/engine/test_async_llm.py +++ b/tests/v1/engine/test_async_llm.py @@ -65,5 +65,3 @@ async def test_load(monkeypatch): assert failed_request_id is None, ( f"{failed_request_id} generated {tokens} but " f"expected {NUM_EXPECTED_TOKENS}") - - engine.shutdown() From 62e1022dd90eb89caf21787592bc340abbfc5816 Mon Sep 17 00:00:00 2001 From: "rshaw@neuralmagic.com" Date: Fri, 3 Jan 2025 13:01:06 +0000 Subject: [PATCH 12/23] remove explicit shutdown calls --- tests/v1/engine/test_engine_core_client.py | 6 ------ vllm/entrypoints/openai/api_server.py | 12 +++--------- 2 files changed, 3 insertions(+), 15 deletions(-) diff --git a/tests/v1/engine/test_engine_core_client.py b/tests/v1/engine/test_engine_core_client.py index 729975e4ea8c..20d4e6f63b33 100644 --- a/tests/v1/engine/test_engine_core_client.py +++ b/tests/v1/engine/test_engine_core_client.py @@ -142,9 +142,6 @@ def test_engine_core_client(monkeypatch, multiprocessing_mode: bool): client.abort_requests([request.request_id]) - # Shutdown the client. - client.shutdown() - @pytest.mark.asyncio async def test_engine_core_client_asyncio(monkeypatch): @@ -200,6 +197,3 @@ async def test_engine_core_client_asyncio(monkeypatch): else: assert len(outputs[req_id]) == MAX_TOKENS, ( f"{len(outputs[req_id])=}, {MAX_TOKENS=}") - - # Shutdown the client. - client.shutdown() diff --git a/vllm/entrypoints/openai/api_server.py b/vllm/entrypoints/openai/api_server.py index bac72d87376d..eb846265e08f 100644 --- a/vllm/entrypoints/openai/api_server.py +++ b/vllm/entrypoints/openai/api_server.py @@ -137,15 +137,9 @@ async def build_async_engine_client_from_engine_args( if (MQLLMEngineClient.is_unsupported_config(engine_args) or envs.VLLM_USE_V1 or disable_frontend_multiprocessing): - engine_client: Optional[EngineClient] = None - try: - engine_client = AsyncLLMEngine.from_engine_args( - engine_args=engine_args, - usage_context=UsageContext.OPENAI_API_SERVER) - yield engine_client - finally: - if engine_client and hasattr(engine_client, "shutdown"): - engine_client.shutdown() + yield AsyncLLMEngine.from_engine_args( + engine_args=engine_args, + usage_context=UsageContext.OPENAI_API_SERVER) # MQLLMEngine. else: From 0b0ca0809f3ae96e6d511e9343bbf881b93a0f69 Mon Sep 17 00:00:00 2001 From: "rshaw@neuralmagic.com" Date: Fri, 3 Jan 2025 14:42:44 +0000 Subject: [PATCH 13/23] updated --- tests/v1/engine/test_async_llm.py | 2 ++ vllm/v1/engine/async_llm.py | 12 ++++++++++++ vllm/v1/engine/core.py | 3 +++ vllm/v1/engine/core_client.py | 14 ++++++++++++++ vllm/v1/utils.py | 3 +++ 5 files changed, 34 insertions(+) diff --git a/tests/v1/engine/test_async_llm.py b/tests/v1/engine/test_async_llm.py index 70c6a32dc48e..fffb5b8100ec 100644 --- a/tests/v1/engine/test_async_llm.py +++ b/tests/v1/engine/test_async_llm.py @@ -65,3 +65,5 @@ async def test_load(monkeypatch): assert failed_request_id is None, ( f"{failed_request_id} generated {tokens} but " f"expected {NUM_EXPECTED_TOKENS}") + + engine.shutdown() diff --git a/vllm/v1/engine/async_llm.py b/vllm/v1/engine/async_llm.py index 8916f94d3491..fa29e62b9e91 100644 --- a/vllm/v1/engine/async_llm.py +++ b/vllm/v1/engine/async_llm.py @@ -133,6 +133,15 @@ def from_engine_args( stat_loggers=stat_loggers, ) + def shutdown(self): + """Shutdown background resources.""" + + if engine_core := getattr(self, "engine_core", None): + engine_core.shutdown() + + if handler := getattr(self, "output_handler", None): + handler.cancel() + @classmethod def _get_executor_cls(cls, vllm_config: VllmConfig) -> Type[Executor]: executor_class: Type[Executor] @@ -287,6 +296,9 @@ async def _run_output_handler(self): logger.exception("EngineCore output handler hit an error: %s", e) kill_process_tree(os.getpid()) + finally: + logger.debug("AsyncLLM output handler shutting down.") + async def abort(self, request_id: str) -> None: """Abort RequestId in self, detokenizer, and engine core.""" diff --git a/vllm/v1/engine/core.py b/vllm/v1/engine/core.py index dcd758a938fe..6439989ead90 100644 --- a/vllm/v1/engine/core.py +++ b/vllm/v1/engine/core.py @@ -129,6 +129,9 @@ def step(self) -> List[EngineCoreOutput]: def profile(self, is_start: bool = True): self.model_executor.profile(is_start) + def shutdown(self): + self.model_executor.shutdown() + class EngineCoreProc(EngineCore): """ZMQ-wrapper for running EngineCore in background process.""" diff --git a/vllm/v1/engine/core_client.py b/vllm/v1/engine/core_client.py index a6f71770eeb4..a5de3f1e64f8 100644 --- a/vllm/v1/engine/core_client.py +++ b/vllm/v1/engine/core_client.py @@ -53,6 +53,9 @@ def make_client( return InprocClient(vllm_config, executor_class, log_stats) + def shutdown(self): + raise NotImplementedError + def get_output(self) -> List[EngineCoreOutput]: raise NotImplementedError @@ -93,6 +96,9 @@ class InprocClient(EngineCoreClient): def __init__(self, *args, **kwargs): self.engine_core = EngineCore(*args, **kwargs) + def shutdown(self): + self.engine_core.shutdown() + def get_output(self) -> List[EngineCoreOutput]: return self.engine_core.step() @@ -160,6 +166,14 @@ def __init__( "log_stats": log_stats, }) + def shutdown(self): + """Clean up background resources.""" + + self._finalizer() + + if hasattr(self, "proc_handle"): + self.proc_handle.shutdown() + class SyncMPClient(MPClient): """Synchronous client for multi-proc EngineCore.""" diff --git a/vllm/v1/utils.py b/vllm/v1/utils.py index 4cf1c9198508..ca60bc990c1f 100644 --- a/vllm/v1/utils.py +++ b/vllm/v1/utils.py @@ -113,6 +113,9 @@ def __init__( raise RuntimeError(f"{process_name} initialization failed. " "See root cause above.") + def shutdown(self): + self._finalizer() + # Note(rob): shutdown function cannot be a bound method, # else the gc cannot collect the object. From 729938a2dd3036d5f3b0daaea4271b74b1cfb64c Mon Sep 17 00:00:00 2001 From: "rshaw@neuralmagic.com" Date: Fri, 3 Jan 2025 14:44:06 +0000 Subject: [PATCH 14/23] pdated --- vllm/v1/engine/async_llm.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/vllm/v1/engine/async_llm.py b/vllm/v1/engine/async_llm.py index fa29e62b9e91..255eaee5aa77 100644 --- a/vllm/v1/engine/async_llm.py +++ b/vllm/v1/engine/async_llm.py @@ -296,9 +296,6 @@ async def _run_output_handler(self): logger.exception("EngineCore output handler hit an error: %s", e) kill_process_tree(os.getpid()) - finally: - logger.debug("AsyncLLM output handler shutting down.") - async def abort(self, request_id: str) -> None: """Abort RequestId in self, detokenizer, and engine core.""" From 0259241efe8fe75a1ba1fe283bed0524bbb1ab83 Mon Sep 17 00:00:00 2001 From: "rshaw@neuralmagic.com" Date: Fri, 3 Jan 2025 14:45:25 +0000 Subject: [PATCH 15/23] update --- vllm/entrypoints/openai/api_server.py | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/vllm/entrypoints/openai/api_server.py b/vllm/entrypoints/openai/api_server.py index eb846265e08f..bac72d87376d 100644 --- a/vllm/entrypoints/openai/api_server.py +++ b/vllm/entrypoints/openai/api_server.py @@ -137,9 +137,15 @@ async def build_async_engine_client_from_engine_args( if (MQLLMEngineClient.is_unsupported_config(engine_args) or envs.VLLM_USE_V1 or disable_frontend_multiprocessing): - yield AsyncLLMEngine.from_engine_args( - engine_args=engine_args, - usage_context=UsageContext.OPENAI_API_SERVER) + engine_client: Optional[EngineClient] = None + try: + engine_client = AsyncLLMEngine.from_engine_args( + engine_args=engine_args, + usage_context=UsageContext.OPENAI_API_SERVER) + yield engine_client + finally: + if engine_client and hasattr(engine_client, "shutdown"): + engine_client.shutdown() # MQLLMEngine. else: From 58e4b366e8aa2d1ab80876e32f65ef28770d1107 Mon Sep 17 00:00:00 2001 From: "rshaw@neuralmagic.com" Date: Fri, 3 Jan 2025 14:51:14 +0000 Subject: [PATCH 16/23] working --- vllm/v1/engine/core_client.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/vllm/v1/engine/core_client.py b/vllm/v1/engine/core_client.py index a5de3f1e64f8..95bec9dd25cb 100644 --- a/vllm/v1/engine/core_client.py +++ b/vllm/v1/engine/core_client.py @@ -1,4 +1,5 @@ import weakref +from abc import ABC, abstractmethod from typing import List, Type import msgspec @@ -19,7 +20,7 @@ logger = init_logger(__name__) -class EngineCoreClient: +class EngineCoreClient(ABC): """ EngineCoreClient: subclasses handle different methods for pushing and pulling from the EngineCore for asyncio / multiprocessing. @@ -53,8 +54,9 @@ def make_client( return InprocClient(vllm_config, executor_class, log_stats) + @abstractmethod def shutdown(self): - raise NotImplementedError + ... def get_output(self) -> List[EngineCoreOutput]: raise NotImplementedError From cacf6b081bb12dc4f6dfc1e47e21eb9d4896c2d6 Mon Sep 17 00:00:00 2001 From: "rshaw@neuralmagic.com" Date: Fri, 3 Jan 2025 14:54:47 +0000 Subject: [PATCH 17/23] updated --- vllm/v1/engine/core_client.py | 8 ++++---- vllm/v1/utils.py | 10 +++++----- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/vllm/v1/engine/core_client.py b/vllm/v1/engine/core_client.py index 95bec9dd25cb..c7127d0b83bf 100644 --- a/vllm/v1/engine/core_client.py +++ b/vllm/v1/engine/core_client.py @@ -139,21 +139,21 @@ def __init__( self.decoder = msgspec.msgpack.Decoder(EngineCoreOutputs) # ZMQ setup. - ctx = ( + self.ctx = ( zmq.asyncio.Context() # type: ignore[attr-defined] if asyncio_mode else zmq.Context()) # type: ignore[attr-defined] # Note(rob): shutdown function cannot be a bound method, # else the gc cannot collect the object. self._finalizer = weakref.finalize(self, lambda x: x.destroy(linger=0), - ctx) + self.ctx) # Paths and sockets for IPC. output_path = get_open_zmq_ipc_path() input_path = get_open_zmq_ipc_path() - self.output_socket = make_zmq_socket(ctx, output_path, + self.output_socket = make_zmq_socket(self.ctx, output_path, zmq.constants.PULL) - self.input_socket = make_zmq_socket(ctx, input_path, + self.input_socket = make_zmq_socket(self.ctx, input_path, zmq.constants.PUSH) # Start EngineCore in background process. diff --git a/vllm/v1/utils.py b/vllm/v1/utils.py index ca60bc990c1f..b0a7affbebb7 100644 --- a/vllm/v1/utils.py +++ b/vllm/v1/utils.py @@ -103,10 +103,10 @@ def __init__( process_kwargs["output_path"] = output_path # Run busy loop in background process. - proc = context.Process(target=target_fn, kwargs=process_kwargs) - self._finalizer = weakref.finalize(self, shutdown, proc, input_path, - output_path) - proc.start() + self.proc = context.Process(target=target_fn, kwargs=process_kwargs) + self._finalizer = weakref.finalize(self, shutdown, self.proc, + input_path, output_path) + self.proc.start() # Wait for startup. if reader.recv()["status"] != "READY": @@ -128,7 +128,7 @@ def shutdown(proc: multiprocessing.Process, input_path: str, output_path: str): if proc.is_alive(): kill_process_tree(proc.pid) - # Remove zmq ipc socket files + # Remove zmq ipc socket files. ipc_sockets = [output_path, input_path] for ipc_socket in ipc_sockets: socket_file = ipc_socket.replace("ipc://", "") From ccc747dad9ed37fed4b255d3cfa2ef2aab81a417 Mon Sep 17 00:00:00 2001 From: "rshaw@neuralmagic.com" Date: Fri, 3 Jan 2025 15:16:29 +0000 Subject: [PATCH 18/23] fixup --- vllm/v1/engine/core.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/vllm/v1/engine/core.py b/vllm/v1/engine/core.py index 6439989ead90..075e24898539 100644 --- a/vllm/v1/engine/core.py +++ b/vllm/v1/engine/core.py @@ -187,6 +187,7 @@ def signal_handler(signum, frame): signal.signal(signal.SIGINT, signal_handler) parent_process = psutil.Process().parent() + engine_core = None try: engine_core = EngineCoreProc(*args, **kwargs) engine_core.run_busy_loop() @@ -199,6 +200,10 @@ def signal_handler(signum, frame): logger.error("EngineCore hit an exception: %s", traceback) parent_process.send_signal(signal.SIGQUIT) + finally: + if engine_core is not None: + engine_core.shutdown() + def run_busy_loop(self): """Core busy loop of the EngineCore.""" From ddc2a97632a5ce262b347fcd020c33a9033581bf Mon Sep 17 00:00:00 2001 From: "rshaw@neuralmagic.com" Date: Fri, 3 Jan 2025 15:17:11 +0000 Subject: [PATCH 19/23] fixup --- vllm/v1/engine/core.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/vllm/v1/engine/core.py b/vllm/v1/engine/core.py index 075e24898539..13a50a4f855e 100644 --- a/vllm/v1/engine/core.py +++ b/vllm/v1/engine/core.py @@ -126,12 +126,12 @@ def step(self) -> List[EngineCoreOutput]: scheduler_output, output) return engine_core_outputs - def profile(self, is_start: bool = True): - self.model_executor.profile(is_start) - def shutdown(self): self.model_executor.shutdown() + def profile(self, is_start: bool = True): + self.model_executor.profile(is_start) + class EngineCoreProc(EngineCore): """ZMQ-wrapper for running EngineCore in background process.""" From af0d529a78acc5cd89d051172fd233e1dffb9987 Mon Sep 17 00:00:00 2001 From: "rshaw@neuralmagic.com" Date: Fri, 3 Jan 2025 15:17:41 +0000 Subject: [PATCH 20/23] reduce cruft --- vllm/v1/engine/async_llm.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/vllm/v1/engine/async_llm.py b/vllm/v1/engine/async_llm.py index 255eaee5aa77..ff7a0c28dd91 100644 --- a/vllm/v1/engine/async_llm.py +++ b/vllm/v1/engine/async_llm.py @@ -134,7 +134,7 @@ def from_engine_args( ) def shutdown(self): - """Shutdown background resources.""" + """Shutdown, cleaning up the background proc and IPC.""" if engine_core := getattr(self, "engine_core", None): engine_core.shutdown() From 17e152b1188d9c51d584bb5bc2f500e5e9c67ec5 Mon Sep 17 00:00:00 2001 From: "rshaw@neuralmagic.com" Date: Fri, 3 Jan 2025 15:18:36 +0000 Subject: [PATCH 21/23] updated --- vllm/v1/engine/core_client.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/vllm/v1/engine/core_client.py b/vllm/v1/engine/core_client.py index c7127d0b83bf..8b0ff913e0c5 100644 --- a/vllm/v1/engine/core_client.py +++ b/vllm/v1/engine/core_client.py @@ -98,9 +98,6 @@ class InprocClient(EngineCoreClient): def __init__(self, *args, **kwargs): self.engine_core = EngineCore(*args, **kwargs) - def shutdown(self): - self.engine_core.shutdown() - def get_output(self) -> List[EngineCoreOutput]: return self.engine_core.step() @@ -110,6 +107,9 @@ def add_request(self, request: EngineCoreRequest) -> None: def abort_requests(self, request_ids: List[str]) -> None: self.engine_core.abort_requests(request_ids) + def shutdown(self): + self.engine_core.shutdown() + def profile(self, is_start: bool = True) -> None: self.engine_core.profile(is_start) From 37859d7b462b0aaa878220fa466fcc24fafc3898 Mon Sep 17 00:00:00 2001 From: "rshaw@neuralmagic.com" Date: Fri, 3 Jan 2025 15:24:25 +0000 Subject: [PATCH 22/23] finish --- vllm/v1/engine/core_client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/vllm/v1/engine/core_client.py b/vllm/v1/engine/core_client.py index 8b0ff913e0c5..f6b167b3e759 100644 --- a/vllm/v1/engine/core_client.py +++ b/vllm/v1/engine/core_client.py @@ -167,7 +167,7 @@ def __init__( "executor_class": executor_class, "log_stats": log_stats, }) - + def shutdown(self): """Clean up background resources.""" From c29f3290f7fff5eeda74bdcbf496b2fe5ba9fafa Mon Sep 17 00:00:00 2001 From: "rshaw@neuralmagic.com" Date: Fri, 3 Jan 2025 15:27:54 +0000 Subject: [PATCH 23/23] updated --- vllm/v1/engine/core_client.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/vllm/v1/engine/core_client.py b/vllm/v1/engine/core_client.py index f6b167b3e759..e009f3448bf6 100644 --- a/vllm/v1/engine/core_client.py +++ b/vllm/v1/engine/core_client.py @@ -167,15 +167,14 @@ def __init__( "executor_class": executor_class, "log_stats": log_stats, }) - + def shutdown(self): """Clean up background resources.""" - - self._finalizer() - if hasattr(self, "proc_handle"): self.proc_handle.shutdown() + self._finalizer() + class SyncMPClient(MPClient): """Synchronous client for multi-proc EngineCore."""