Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions design/unopinionated-deployments.md
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,14 @@ prefill to the chosen worker; the engines themselves transfer the KV cache over
their configured connector. Modelplane injects the sidecar, labels the pods as
either prefill or decode, and configures the endpoint picker accordingly.

Because the engines transfer the KV cache over their connector (e.g. vLLM's
`NixlConnector`), the engine image must ship that connector's runtime: the NIXL
library. Recent vanilla `vllm/vllm-openai` images include it, so a disaggregated
deployment pins a current tag rather than an old one. Since the engine image and
flags are the user's, this is a deployment prerequisite Modelplane does not
provide; failing it surfaces as engines crashlooping with `NIXL is not
available`.

### Scheduling

The fleet scheduler places each ModelReplica on one InferenceCluster. However
Expand Down
7 changes: 7 additions & 0 deletions docs/content/models/model-deployment.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,13 @@ and long context. For small models or low traffic the KV-transfer overhead
outweighs the benefit, so aggregated serving (optionally with chunked prefill) is
the default.

Disaggregation requires the **engine image to provide the NIXL KV-transfer
runtime**. vLLM's `NixlConnector` (and SGLang's prefill/decode transfer) import
the `nixl` package, so disaggregated engines crash at startup with `NIXL is not
available` on an image that lacks it. Recent vanilla `vllm/vllm-openai` images
ship NIXL, so pin a current tag rather than an old one. The engine image is
yours to choose, so this is a prerequisite Modelplane does not bundle for you.

## Examples

{{< tabs >}}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,8 @@ NVIDIA
Envoy
Traefik
MetalLB
NIXL
NixlConnector
Prometheus
ArgoCD
FluxCD
Expand Down
176 changes: 161 additions & 15 deletions functions/compose-model-replica/function/routing.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,23 +27,64 @@
# The pd-sidecar takes ENGINE_PORT (8000), so the decode engine listens here.
_DECODE_ENGINE_PORT = 8001

# NIXL KV-transfer plumbing injected onto every disaggregated engine.
_NIXL_SHM_VOLUME = "nixl-shm"
_NIXL_SIDE_CHANNEL_PORT = "5557"

# Selector labels shared by both engines' serving pods (the InferencePool
# matchLabels) and the per-role label the picker partitions on.
_LABEL_ROLE = "llm-d.ai/role"
_LABEL_INFERENCE_SERVING = "llm-d.ai/inference-serving"

# EndpointPickerConfig for the disaggregated profile. The apiVersion is the GIE
# group the EPP binary registers (inference.networking.x-k8s.io/v1alpha1).
_EPP_CONFIG_YAML = """\
#
# The decider in disagg-profile-handler is what makes a request disaggregate: it
# runs the prefill profile, picks a prefill endpoint, and the handler sets the
# x-prefiller-host-port header the routing sidecar uses to send the prefill phase
# there (KV then flows prefill->decode over NIXL). The selective
# prefix-based-pd-decider disaggregates only when a request's uncached suffix is
# at least nonCachedTokens long, so short or cache-hot prompts skip the prefill
# hop (and its KV-transfer cost) and serve decode-only.
#
# Three things must line up or it silently never disaggregates, and the EPP
# image we run (llm-d-inference-scheduler v0.8.0, embedding
# gateway-api-inference-extension v1.5.0) makes the defaults wrong on every one:
# 1. nonCachedTokens defaults to 0, which the decider treats as "disabled"
# (always decode-only). Set it explicitly.
# 2. The decider reads a PrefixCacheMatchInfo attribute that prefix-cache-scorer
# no longer produces (GIE v1.5.0 split production into a separate plugin and
# made prepare-data default-on, so the old `prepareDataPlugins` feature gate
# the v0.8.0 docs still mention is *unregistered* and crashes the EPP). The
# producer is now an explicit plugin: approx-prefix-cache-producer.
# 3. That producer defaults to autoTune: true, which leaves its block size 0
# and never populates the attribute. Pin autoTune: false + blockSizeTokens.
# (Verified live: with this config the prefill engine's request_prefill_time
# counter increments for long prompts and stays flat for short ones; with the
# defaults it stayed at zero for everything.)
#
# blockSizeTokens MUST match the engine's KV block size or prefix-cache routing
# silently degrades (#179). It's derived best-effort from the engine flags via
# _kv_block_size() (BLOCK_SIZE_TOKENS placeholder), defaulting to vLLM's 16.
_EPP_CONFIG_TEMPLATE = """\
apiVersion: inference.networking.x-k8s.io/v1alpha1
kind: EndpointPickerConfig
plugins:
- type: approx-prefix-cache-producer
parameters:
autoTune: false
blockSizeTokens: BLOCK_SIZE_TOKENS
maxPrefixBlocksToMatch: 256
lruCapacityPerServer: 31250
- type: prefix-cache-scorer
- type: disagg-headers-handler
- type: queue-scorer
- type: prefill-filter
- type: decode-filter
- type: max-score-picker
- type: prefix-cache-scorer
- type: queue-scorer
- type: prefix-based-pd-decider
parameters:
nonCachedTokens: 16
- type: disagg-profile-handler
parameters:
deciders:
Expand All @@ -53,12 +94,70 @@
plugins:
- pluginRef: prefill-filter
- pluginRef: max-score-picker
- pluginRef: prefix-cache-scorer
weight: 2
- pluginRef: queue-scorer
weight: 1
- name: decode
plugins:
- pluginRef: decode-filter
- pluginRef: max-score-picker
- pluginRef: prefix-cache-scorer
weight: 2
- pluginRef: queue-scorer
weight: 1
"""

_DEFAULT_KV_BLOCK_SIZE = 16


def _epp_config_yaml(block_size: int) -> str:
"""Render the EPP config with the engine's KV block size."""
return _EPP_CONFIG_TEMPLATE.replace("BLOCK_SIZE_TOKENS", str(block_size))


def _flag_value(args: list, *flags: str) -> str | None:
"""Best-effort value of a `--flag value` or `--flag=value` engine arg.

Engine flags belong to the user (per #137); callers only peek. Returns the
first match's raw string value, or None if no flag is present.
"""
for i, a in enumerate(args or []):
for flag in flags:
if a == flag and i + 1 < len(args):
return args[i + 1]
if a.startswith(flag + "="):
return a.split("=", 1)[1]
return None


def _kv_block_size(engine_args: list) -> int:
"""HACK: best-effort read the engine's KV block size from its flags so the
EPP prefix-cache producer chunks prefixes the same way the engine does.

We peek for the common flags — vLLM's --block-size and SGLang's --page-size
— and fall back to vLLM's default of 16. A mismatch silently degrades
prefix-cache routing with no error (#179), so deriving it beats hardcoding.
The durable fix is a typed/overridable knob on the serving block (#179);
until then, this peek.
"""
raw = _flag_value(engine_args, "--block-size", "--page-size")
if raw is not None:
try:
return int(raw)
except ValueError:
pass
return _DEFAULT_KV_BLOCK_SIZE


def _engine_args(obj: k8sobjv1alpha1.Object) -> list:
"""The engine container's args from a workload Object (best-effort)."""
for tmpl in _serving_pod_templates(obj.spec.forProvider.manifest):
for c in tmpl["spec"]["containers"]:
if c.get("name") == "engine":
return c.get("args", [])
return []


def apply(
composed: dict[str, k8sobjv1alpha1.Object],
Expand Down Expand Up @@ -96,20 +195,36 @@ def _disaggregated(
pd-sidecar to decode, and adds the InferencePool, endpoint picker, and an
HTTPRoute pointing at the pool. The engine workloads are reused as-is apart
from the label/sidecar decoration.

Engine-image prerequisite: PrefillDecode needs the engine image to ship the
NIXL runtime. vLLM's NixlConnector (and SGLang's PD transfer) import the
`nixl` package, so an image without it crashloops at startup with "NIXL is
not available". Recent vanilla vllm/vllm-openai images ship NIXL, so pin a
current tag. Engine images are the user's (#137), so Modelplane can't bundle
this; it is a deployment prerequisite, not something the composition provides.
"""
name = replica.metadata.name
prefill = next(e for e in replica.spec.engines if e.phase == "Prefill")
decode = next(e for e in replica.spec.engines if e.phase == "Decode")

out = dict(composed)
prefill_key = base.workload_key(prefill)
decode_key = base.workload_key(decode)
_label_role(out[base.workload_key(prefill)], role="prefill", app=name)
_label_role(out[prefill_key], role="prefill", app=name)
_label_role(out[decode_key], role="decode", app=name)
_add_sidecar_to_decode(out[decode_key])

# Both engines need NIXL KV-transfer plumbing the ModelDeployment schema
# can't express (no fieldRef env, no volumes). Inject it for them.
_inject_nixl_plumbing(out[prefill_key])
_inject_nixl_plumbing(out[decode_key])

# The EPP's prefix-cache producer must chunk prefixes at the decode engine's
# KV block size; derive it from the decode engine's flags (HACK, #179).
block_size = _kv_block_size(_engine_args(out[decode_key]))
out["inference-pool"] = base.wrap_object(provider_config, _inference_pool(name))
out[base.ROUTE_KEY] = base.wrap_object(provider_config, _http_route(replica, name))
out.update(_epp_objects(name, provider_config))
out.update(_epp_objects(name, provider_config, block_size))
return out


Expand Down Expand Up @@ -143,13 +258,8 @@ def _decode_port(engine: dict) -> int:
expects _DECODE_ENGINE_PORT. The user owns the engine flags (per #137), so we
also best-effort honor an explicit --port override rather than assume one.
"""
args = engine.get("args", [])
for i, a in enumerate(args):
if a.startswith("--port="):
return int(a.split("=", 1)[1])
if a == "--port" and i + 1 < len(args):
return int(args[i + 1])
return _DECODE_ENGINE_PORT
raw = _flag_value(engine.get("args", []), "--port")
return int(raw) if raw is not None else _DECODE_ENGINE_PORT


def _add_sidecar_to_decode(obj: k8sobjv1alpha1.Object) -> None:
Expand Down Expand Up @@ -184,6 +294,38 @@ def _add_sidecar_to_decode(obj: k8sobjv1alpha1.Object) -> None:
)


def _inject_nixl_plumbing(obj: k8sobjv1alpha1.Object) -> None:
"""Add the NIXL KV-transfer plumbing every disaggregated engine needs but
that the ModelDeployment schema can't express (no fieldRef env, no volumes).

Two pieces, both infra-level and always-correct for PrefillDecode, so we
inject them the same way we inject the sidecar rather than asking the user:
- a Memory-backed /dev/shm: vLLM's NixlConnector uses shared memory, and
the container default (64Mi) is far too small.
- VLLM_NIXL_SIDE_CHANNEL_HOST set to the pod IP (+ a fixed port) so peer
engines can reach this one's NIXL metadata channel. Without it the
engine advertises an unreachable address and cross-pod KV transfer
fails — requests get a 500 with no error in the engine logs.
"""
for tmpl in _serving_pod_templates(obj.spec.forProvider.manifest):
spec = tmpl["spec"]
volumes = spec.setdefault("volumes", [])
if not any(v.get("name") == _NIXL_SHM_VOLUME for v in volumes):
volumes.append({"name": _NIXL_SHM_VOLUME, "emptyDir": {"medium": "Memory"}})
engine = next(c for c in spec["containers"] if c["name"] == "engine")
mounts = engine.setdefault("volumeMounts", [])
if not any(m.get("mountPath") == "/dev/shm" for m in mounts):
mounts.append({"name": _NIXL_SHM_VOLUME, "mountPath": "/dev/shm"})
env = engine.setdefault("env", [])
existing = {e.get("name") for e in env}
if "VLLM_NIXL_SIDE_CHANNEL_HOST" not in existing:
env.append(
{"name": "VLLM_NIXL_SIDE_CHANNEL_HOST", "valueFrom": {"fieldRef": {"fieldPath": "status.podIP"}}}
)
if "VLLM_NIXL_SIDE_CHANNEL_PORT" not in existing:
env.append({"name": "VLLM_NIXL_SIDE_CHANNEL_PORT", "value": _NIXL_SIDE_CHANNEL_PORT})


def _inference_pool(name: str) -> dict:
return {
"apiVersion": "inference.networking.k8s.io/v1",
Expand Down Expand Up @@ -222,8 +364,12 @@ def _http_route(replica: v1alpha1.ModelReplica, name: str) -> dict:
}


def _epp_objects(name: str, provider_config: str) -> dict[str, k8sobjv1alpha1.Object]:
"""The hardcoded endpoint picker: ServiceAccount, RBAC, ConfigMap, Deployment, Service."""
def _epp_objects(name: str, provider_config: str, block_size: int) -> dict[str, k8sobjv1alpha1.Object]:
"""The endpoint picker: ServiceAccount, RBAC, ConfigMap, Deployment, Service.

block_size is the engine's KV block size, rendered into the prefix-cache
producer so its prefix chunking matches the engine.
"""
ns = base.REMOTE_NAMESPACE
epp = f"{name}-epp"
sa = {"apiVersion": "v1", "kind": "ServiceAccount", "metadata": {"name": epp, "namespace": ns}}
Expand Down Expand Up @@ -258,7 +404,7 @@ def _epp_objects(name: str, provider_config: str) -> dict[str, k8sobjv1alpha1.Ob
"apiVersion": "v1",
"kind": "ConfigMap",
"metadata": {"name": epp, "namespace": ns},
"data": {"pd-epp-config.yaml": _EPP_CONFIG_YAML},
"data": {"pd-epp-config.yaml": _epp_config_yaml(block_size)},
}
deployment = {
"apiVersion": "apps/v1",
Expand Down
58 changes: 58 additions & 0 deletions functions/compose-model-replica/tests/test_backends.py
Original file line number Diff line number Diff line change
Expand Up @@ -711,6 +711,40 @@ def test_replaces_unified_service_with_pool_and_epp(self):
self.assertEqual(pool["kind"], "InferencePool")
self.assertEqual(pool["spec"]["endpointPickerRef"]["name"], "r-epp")

def test_injects_nixl_plumbing(self):
"""Both disagg engines get the NIXL plumbing the schema can't express:
a Memory /dev/shm and VLLM_NIXL_SIDE_CHANNEL_HOST = pod IP."""
out = self._apply()
for role in ("prefill", "decode"):
pod = self._serving_pod(out, role)["spec"]
self.assertTrue(
any(v.get("emptyDir", {}).get("medium") == "Memory" for v in pod["volumes"]),
f"{role} missing Memory /dev/shm volume",
)
engine = next(c for c in pod["containers"] if c["name"] == "engine")
self.assertIn("/dev/shm", [m["mountPath"] for m in engine["volumeMounts"]])
host = next((e for e in engine["env"] if e["name"] == "VLLM_NIXL_SIDE_CHANNEL_HOST"), None)
self.assertIsNotNone(host, f"{role} missing VLLM_NIXL_SIDE_CHANNEL_HOST")
self.assertEqual(host["valueFrom"]["fieldRef"]["fieldPath"], "status.podIP")
self.assertIn("VLLM_NIXL_SIDE_CHANNEL_PORT", [e["name"] for e in engine["env"]])

def test_epp_config_arms_the_pd_decider(self):
"""PrefillDecode silently serves decode-only unless the PD decider is armed.

Selective prefix-based-pd-decider needs all of: nonCachedTokens > 0 (0 =
disabled), the approx-prefix-cache-producer plugin that populates the
attribute it reads, and that producer pinned to autoTune: false (the
true default never populates). And it must NOT carry the prepareDataPlugins
feature gate, which the v0.8.0 EPP image rejects and crashloops on.
"""
cfg = self._apply()["epp-config"].spec.forProvider.manifest["data"]["pd-epp-config.yaml"]
self.assertIn("prefix-based-pd-decider", cfg)
self.assertIn("nonCachedTokens: 16", cfg)
self.assertIn("approx-prefix-cache-producer", cfg)
self.assertIn("autoTune: false", cfg)
self.assertNotIn("nonCachedTokens: 0", cfg)
self.assertNotIn("prepareDataPlugins", cfg)

def test_epp_role_watches_inferenceobjectives(self):
"""The picker watches InferenceObjectives (GIE x-k8s.io group); the Role must allow it."""
rules = self._apply()["epp-role"].spec.forProvider.manifest["rules"]
Expand Down Expand Up @@ -804,5 +838,29 @@ def test_adds_service_and_route(self):
self.assertNotIn("inference-pool", out)


class TestKvBlockSize(unittest.TestCase):
"""The EPP prefix-cache producer's blockSizeTokens is derived best-effort
from the engine flags (#179) so it matches the engine's KV block size."""

def test_defaults_to_16_when_absent(self):
self.assertEqual(routing._kv_block_size([]), 16)
self.assertEqual(routing._kv_block_size(["--model=/mnt/models"]), 16)

def test_reads_vllm_block_size(self):
self.assertEqual(routing._kv_block_size(["--block-size", "32"]), 32)
self.assertEqual(routing._kv_block_size(["--model=/m", "--block-size=8"]), 8)

def test_reads_sglang_page_size(self):
self.assertEqual(routing._kv_block_size(["--page-size=64"]), 64)

def test_non_integer_falls_back_to_default(self):
self.assertEqual(routing._kv_block_size(["--block-size", "auto"]), 16)

def test_rendered_config_uses_block_size(self):
cfg = routing._epp_config_yaml(32)
self.assertIn("blockSizeTokens: 32", cfg)
self.assertNotIn("BLOCK_SIZE_TOKENS", cfg)


if __name__ == "__main__":
unittest.main()
Loading