diff --git a/api/inference/v1alpha1/playground_types.go b/api/inference/v1alpha1/playground_types.go index eba28772..11ebcc81 100644 --- a/api/inference/v1alpha1/playground_types.go +++ b/api/inference/v1alpha1/playground_types.go @@ -73,6 +73,9 @@ const ( PlaygroundProgressing = "Progressing" // PlaygroundAvailable indicates the corresponding inference service is available now. PlaygroundAvailable string = "Available" + // SkipModelLoaderAnnoKey indicates whether to skip the model loader, + // enabling the inference engine to manage model loading directly. + SkipModelLoaderAnnoKey = "llmaz.io/skip-model-loader" ) // PlaygroundStatus defines the observed state of Playground diff --git a/docs/examples/README.md b/docs/examples/README.md index 42a745b4..31b188b0 100644 --- a/docs/examples/README.md +++ b/docs/examples/README.md @@ -68,6 +68,10 @@ llama.cpp supports speculative decoding to significantly improve inference perfo [Speculative Decoding](https://arxiv.org/abs/2211.17192) can improve inference performance efficiently, see [example](./speculative-decoding/vllm/) here. +### Loading models with Run:ai Model Streamer with vLLM + +[Run:ai Model Streamer](https://github.com/run-ai/runai-model-streamer/blob/master/docs/README.md) is a library to read tensors in concurrency, while streaming it to GPU memory. vLLM supports loading weights in Safetensors format using the Run:ai Model Streamer. See [example](./runai-streamer/) here. + ### Multi-Host Inference Model size is growing bigger and bigger, Llama 3.1 405B FP16 LLM requires more than 750 GB GPU for weights only, leaving kv cache unconsidered, even with 8 x H100 Nvidia GPUs, 80 GB size of HBM each, can not fit in a single host, requires a multi-host deployment, see [example](./multi-nodes/) here. diff --git a/docs/examples/runai-streamer/playground-streaming-from-file-system.yaml b/docs/examples/runai-streamer/playground-streaming-from-file-system.yaml new file mode 100644 index 00000000..e114dbe5 --- /dev/null +++ b/docs/examples/runai-streamer/playground-streaming-from-file-system.yaml @@ -0,0 +1,44 @@ +# This example demonstrates how to use the Run:ai Model Streamer to load models from the local file system. +# The model-loader initContainer first downloads the model from Hugging Face. +# By using `--load-format runai_streamer`, vLLM leverages the Run:ai Model Streamer to stream models from the local file system. +# While this approach may be slightly slower than streaming directly from S3 (due to the initial download to local disk), +# it still offers faster model loading compared to not using the Streamer, +# as it utilizes multiple threads to concurrently read tensor data from files into a dedicated CPU buffer, +# and then transfers the tensors to GPU memory. +apiVersion: llmaz.io/v1alpha1 +kind: OpenModel +metadata: + name: deepseek-r1-distill-qwen-1-5b +spec: + familyName: deepseek + source: + modelHub: + modelID: deepseek-ai/DeepSeek-R1-Distill-Qwen-1.5B + inferenceConfig: + flavors: + - name: t4 # GPU type + limits: + nvidia.com/gpu: 1 +--- +apiVersion: inference.llmaz.io/v1alpha1 +kind: Playground +metadata: + name: deepseek-r1-distill-qwen-1-5b +spec: + replicas: 1 + modelClaim: + modelName: deepseek-r1-distill-qwen-1-5b + backendRuntimeConfig: + backendName: vllm # currently, only vllm supports runai streamer + args: + - --load-format + - runai_streamer + resources: + limits: + cpu: "4" + memory: 16Gi + nvidia.com/gpu: "1" + requests: + cpu: "4" + memory: 16Gi + nvidia.com/gpu: "1" diff --git a/docs/examples/runai-streamer/playground-streaming-from-s3.yaml b/docs/examples/runai-streamer/playground-streaming-from-s3.yaml new file mode 100644 index 00000000..c78ef483 --- /dev/null +++ b/docs/examples/runai-streamer/playground-streaming-from-s3.yaml @@ -0,0 +1,54 @@ +# This example demonstrates how to use the Run:ai Model Streamer to load models directly from S3. +# Adding the annotation `llmaz.io/skip-model-loader: "true"` skips the model-loader initContainer, +# allowing the inference engine to load models directly from remote storage (e.g., S3). +# By using `--load-format runai_streamer`, the vLLM leverages the Run:ai Model Streamer to stream models from S3. +# vLLM will load models into the CPU buffer and then into GPU memory, without the need to download them to local disk first. +# This can significantly improve model loading speed and reduce disk usage. +apiVersion: llmaz.io/v1alpha1 +kind: OpenModel +metadata: + name: deepseek-r1-distill-qwen-1-5b +spec: + familyName: deepseek + source: + # Note: You need to replace with your actual S3 bucket name + # If the s3 bucket need AWS credentials for authentication, + # please run `kubectl create secret generic aws-access-secret --from-literal=AWS_ACCESS_KEY_ID= --from-literal=AWS_SECRET_ACCESS_KEY=` ahead. + uri: s3:///DeepSeek-R1-Distill-Qwen-1.5B + inferenceConfig: + flavors: + - name: t4 # GPU type + limits: + nvidia.com/gpu: 1 +--- +apiVersion: inference.llmaz.io/v1alpha1 +kind: Playground +metadata: + name: deepseek-r1-distill-qwen-1-5b + annotations: + llmaz.io/skip-model-loader: "true" +spec: + replicas: 1 + modelClaim: + modelName: deepseek-r1-distill-qwen-1-5b + backendRuntimeConfig: + backendName: vllm # currently, only vllm supports runai streamer + args: + - --load-format + - runai_streamer + envs: + # The default value is 1 second. Increase it to 10 seconds to avoid timeouts in case of slow network conditions. + - name: RUNAI_STREAMER_S3_REQUEST_TIMEOUT_MS + value: "10000" + # Controls the level of concurrency and number of OS threads reading tensors from the file to the CPU buffer, the default value is 16 + #- name: RUNAI_STREAMER_CONCURRENCY + # value: "32" + resources: + limits: + cpu: "4" + memory: 16Gi + nvidia.com/gpu: "1" + requests: + cpu: "4" + memory: 16Gi + nvidia.com/gpu: "1" diff --git a/pkg/controller/inference/playground_controller.go b/pkg/controller/inference/playground_controller.go index 73dd19ad..5355a586 100644 --- a/pkg/controller/inference/playground_controller.go +++ b/pkg/controller/inference/playground_controller.go @@ -201,6 +201,15 @@ func buildServiceApplyConfiguration(models []*coreapi.OpenModel, playground *inf // Build metadata serviceApplyConfiguration := inferenceclientgo.Service(playground.Name, playground.Namespace) + if annotations := playground.GetAnnotations(); annotations != nil { + // Propagate llmaz.io/skip-model-loader annotation to Inference Service. + if value, exists := annotations[inferenceapi.SkipModelLoaderAnnoKey]; exists { + serviceApplyConfiguration.WithAnnotations(map[string]string{ + inferenceapi.SkipModelLoaderAnnoKey: value, + }) + } + } + // Build spec. spec := inferenceclientgo.ServiceSpec() diff --git a/pkg/controller/inference/service_controller.go b/pkg/controller/inference/service_controller.go index 12b10d80..bc98b53e 100644 --- a/pkg/controller/inference/service_controller.go +++ b/pkg/controller/inference/service_controller.go @@ -196,10 +196,26 @@ func injectModelProperties(template *applyconfigurationv1.LeaderWorkerTemplateAp for i, model := range models { source := modelSource.NewModelSourceProvider(model) + // Skip model-loader initContainer if llmaz.io/skip-model-loader annotation is set. + if !helper.SkipModelLoader(service) { + if isMultiNodesInference { + source.InjectModelLoader(template.LeaderTemplate, i) + } + source.InjectModelLoader(template.WorkerTemplate, i) + } else { + if isMultiNodesInference { + source.InjectModelEnvVars(template.LeaderTemplate) + } + source.InjectModelEnvVars(template.WorkerTemplate) + } + } + + // If model-loader initContainer is injected, we should mount the model-volume to the model-runner container. + if !helper.SkipModelLoader(service) { if isMultiNodesInference { - source.InjectModelLoader(template.LeaderTemplate, i) + modelSource.InjectModelVolume(template.LeaderTemplate) } - source.InjectModelLoader(template.WorkerTemplate, i) + modelSource.InjectModelVolume(template.WorkerTemplate) } // We only consider the main model's requirements for now. diff --git a/pkg/controller_helper/backendruntime/backendruntime.go b/pkg/controller_helper/backendruntime/backendruntime.go index 8358d3f6..054a9c60 100644 --- a/pkg/controller_helper/backendruntime/backendruntime.go +++ b/pkg/controller_helper/backendruntime/backendruntime.go @@ -65,14 +65,14 @@ func (p *BackendRuntimeParser) Args() ([]string, error) { source := modelSource.NewModelSourceProvider(mainModel) modelInfo := map[string]string{ - "ModelPath": source.ModelPath(), + "ModelPath": source.ModelPath(helper.SkipModelLoader(p.playground)), "ModelName": source.ModelName(), } // TODO: This is not that reliable because two models doesn't always means speculative-decoding. // Revisit this later. if len(p.models) > 1 { - modelInfo["DraftModelPath"] = modelSource.NewModelSourceProvider(p.models[1]).ModelPath() + modelInfo["DraftModelPath"] = modelSource.NewModelSourceProvider(p.models[1]).ModelPath(helper.SkipModelLoader(p.playground)) } for _, recommend := range p.backendRuntime.Spec.RecommendedConfigs { diff --git a/pkg/controller_helper/helper.go b/pkg/controller_helper/helper.go index 4d334c42..73ce1c7f 100644 --- a/pkg/controller_helper/helper.go +++ b/pkg/controller_helper/helper.go @@ -21,6 +21,7 @@ import ( coreapi "github.com/inftyai/llmaz/api/core/v1alpha1" inferenceapi "github.com/inftyai/llmaz/api/inference/v1alpha1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "sigs.k8s.io/controller-runtime/pkg/client" ) @@ -122,3 +123,10 @@ func FirstAssignedFlavor(model *coreapi.OpenModel, playground *inferenceapi.Play return nil } + +func SkipModelLoader(obj metav1.Object) bool { + if annotations := obj.GetAnnotations(); annotations != nil { + return annotations[inferenceapi.SkipModelLoaderAnnoKey] == "true" + } + return false +} diff --git a/pkg/controller_helper/modelsource/modelhub.go b/pkg/controller_helper/modelsource/modelhub.go index 91db583e..b5c08e88 100644 --- a/pkg/controller_helper/modelsource/modelhub.go +++ b/pkg/controller_helper/modelsource/modelhub.go @@ -50,7 +50,13 @@ func (p *ModelHubProvider) ModelName() string { // - modelID: Qwen/Qwen2-0.5B-Instruct-GGUF // fileName: qwen2-0_5b-instruct-q5_k_m.gguf // modelPath: /workspace/models/qwen2-0_5b-instruct-q5_k_m.gguf -func (p *ModelHubProvider) ModelPath() string { +func (p *ModelHubProvider) ModelPath(skipModelLoader bool) string { + // Skip the model loader to allow the inference engine to handle loading models directly from model hub (e.g., Hugging Face, ModelScope). + // In this case, the model ID should be returned (e.g., facebook/opt-125m). + if skipModelLoader { + return p.modelID + } + if p.fileName != nil { return CONTAINER_MODEL_PATH + *p.fileName } @@ -108,59 +114,87 @@ func (p *ModelHubProvider) InjectModelLoader(template *corev1.PodTemplateSpec, i // Both HUGGING_FACE_HUB_TOKEN and HF_TOKEN works. initContainer.Env = append(initContainer.Env, corev1.EnvVar{ - Name: "HUGGING_FACE_HUB_TOKEN", + Name: HUGGING_FACE_HUB_TOKEN, ValueFrom: &corev1.EnvVarSource{ SecretKeyRef: &corev1.SecretKeySelector{ LocalObjectReference: corev1.LocalObjectReference{ Name: MODELHUB_SECRET_NAME, // if secret not exists, the env is empty. }, - Key: HUGGINGFACE_TOKEN_KEY, + Key: HUGGING_FACE_TOKEN_KEY, Optional: ptr.To[bool](true), }, }, - }, corev1.EnvVar{ - Name: "HF_TOKEN", + }) + + initContainer.Env = append(initContainer.Env, + corev1.EnvVar{ + Name: HUGGING_FACE_TOKEN_KEY, ValueFrom: &corev1.EnvVarSource{ SecretKeyRef: &corev1.SecretKeySelector{ LocalObjectReference: corev1.LocalObjectReference{ Name: MODELHUB_SECRET_NAME, }, - Key: HUGGINGFACE_TOKEN_KEY, + Key: HUGGING_FACE_TOKEN_KEY, Optional: ptr.To[bool](true), }, }, - }, - ) - template.Spec.InitContainers = append(template.Spec.InitContainers, *initContainer) + }) - // Return once not the main model, because all the below has already been injected. - if index != 0 { - return - } + template.Spec.InitContainers = append(template.Spec.InitContainers, *initContainer) +} - // Handle container. +func spreadEnvToInitContainer(containerEnv []corev1.EnvVar, initContainer *corev1.Container) { + initContainer.Env = append(initContainer.Env, containerEnv...) +} +func (p *ModelHubProvider) InjectModelEnvVars(template *corev1.PodTemplateSpec) { for i := range template.Spec.Containers { - // We only consider this container. if template.Spec.Containers[i].Name == MODEL_RUNNER_CONTAINER_NAME { - template.Spec.Containers[i].VolumeMounts = append(template.Spec.Containers[i].VolumeMounts, corev1.VolumeMount{ - Name: MODEL_VOLUME_NAME, - MountPath: CONTAINER_MODEL_PATH, - ReadOnly: true, - }) + // Check if HuggingFace token environment variables already exist + hfHubTokenExists := false + hfTokenExists := false + for _, env := range template.Spec.Containers[i].Env { + if env.Name == HUGGING_FACE_HUB_TOKEN { + hfHubTokenExists = true + } + if env.Name == HUGGING_FACE_TOKEN_KEY { + hfTokenExists = true + } + } + + // Add HUGGING_FACE_HUB_TOKEN if it doesn't exist + if !hfHubTokenExists { + template.Spec.Containers[i].Env = append(template.Spec.Containers[i].Env, + corev1.EnvVar{ + Name: HUGGING_FACE_HUB_TOKEN, + ValueFrom: &corev1.EnvVarSource{ + SecretKeyRef: &corev1.SecretKeySelector{ + LocalObjectReference: corev1.LocalObjectReference{ + Name: MODELHUB_SECRET_NAME, // if secret not exists, the env is empty. + }, + Key: HUGGING_FACE_TOKEN_KEY, + Optional: ptr.To[bool](true), + }, + }, + }) + } + + // Add HF_TOKEN if it doesn't exist + if !hfTokenExists { + template.Spec.Containers[i].Env = append(template.Spec.Containers[i].Env, + corev1.EnvVar{ + Name: HUGGING_FACE_TOKEN_KEY, + ValueFrom: &corev1.EnvVarSource{ + SecretKeyRef: &corev1.SecretKeySelector{ + LocalObjectReference: corev1.LocalObjectReference{ + Name: MODELHUB_SECRET_NAME, + }, + Key: HUGGING_FACE_TOKEN_KEY, + Optional: ptr.To[bool](true), + }, + }, + }) + } } } - - // Handle spec. - - template.Spec.Volumes = append(template.Spec.Volumes, corev1.Volume{ - Name: MODEL_VOLUME_NAME, - VolumeSource: corev1.VolumeSource{ - EmptyDir: &corev1.EmptyDirVolumeSource{}, - }, - }) -} - -func spreadEnvToInitContainer(containerEnv []corev1.EnvVar, initContainer *corev1.Container) { - initContainer.Env = append(initContainer.Env, containerEnv...) } diff --git a/pkg/controller_helper/modelsource/modelsource.go b/pkg/controller_helper/modelsource/modelsource.go index 8fa33e57..281a6697 100644 --- a/pkg/controller_helper/modelsource/modelsource.go +++ b/pkg/controller_helper/modelsource/modelsource.go @@ -40,20 +40,28 @@ const ( MODEL_SOURCE_MODEL_OBJ_STORE = "objstore" // secrets - MODELHUB_SECRET_NAME = "modelhub-secret" - HUGGINGFACE_TOKEN_KEY = "HF_TOKEN" + MODELHUB_SECRET_NAME = "modelhub-secret" + HUGGING_FACE_TOKEN_KEY = "HF_TOKEN" + HUGGING_FACE_HUB_TOKEN = "HUGGING_FACE_HUB_TOKEN" OSS_ACCESS_SECRET_NAME = "oss-access-secret" OSS_ACCESS_KEY_ID = "OSS_ACCESS_KEY_ID" OSS_ACCESS_KEY_SECRET = "OSS_ACCESS_KEY_SECRET" + + AWS_ACCESS_SECRET_NAME = "aws-access-secret" + AWS_ACCESS_KEY_ID = "AWS_ACCESS_KEY_ID" + AWS_ACCESS_KEY_SECRET = "AWS_SECRET_ACCESS_KEY" ) type ModelSourceProvider interface { ModelName() string - ModelPath() string + ModelPath(skipModelLoader bool) string // InjectModelLoader will inject the model loader to the spec, // index refers to the suffix of the initContainer name, like model-loader, model-loader-1. InjectModelLoader(spec *corev1.PodTemplateSpec, index int) + // InjectModelEnvVars will inject the model credentials env to the model-runner container. + // This is used when the model-loader initContainer is not injected, and the model loading is handled by the model-runner container. + InjectModelEnvVars(spec *corev1.PodTemplateSpec) } func NewModelSourceProvider(model *coreapi.OpenModel) ModelSourceProvider { @@ -72,11 +80,13 @@ func NewModelSourceProvider(model *coreapi.OpenModel) ModelSourceProvider { if model.Spec.Source.URI != nil { // We'll validate the format in the webhook, so generally no error should happen here. protocol, value, _ := util.ParseURI(string(*model.Spec.Source.URI)) - provider := &URIProvider{modelName: model.Name, protocol: protocol} + provider := &URIProvider{modelName: model.Name, protocol: protocol, uri: string(*model.Spec.Source.URI)} switch protocol { case OSS: provider.endpoint, provider.bucket, provider.modelPath, _ = util.ParseOSS(value) + case S3, GCS: + provider.bucket, provider.modelPath, _ = util.ParseS3(value) case HostPath: provider.modelPath = value case Ollama: @@ -91,3 +101,39 @@ func NewModelSourceProvider(model *coreapi.OpenModel) ModelSourceProvider { // Should not reach here, it will be validated at webhook in prior. return nil } + +// InjectModelVolume mounts the model-volume to the pod template +// The logic for mounting model-volume to model-runner container is identical in both ModelHubProvider and URIProvider, +// so this function can be reused and only needs to be configured once +func InjectModelVolume(template *corev1.PodTemplateSpec) { + // Handle container. + for i, container := range template.Spec.Containers { + // We only consider this container. + if container.Name == MODEL_RUNNER_CONTAINER_NAME { + template.Spec.Containers[i].VolumeMounts = append(template.Spec.Containers[i].VolumeMounts, corev1.VolumeMount{ + Name: MODEL_VOLUME_NAME, + MountPath: CONTAINER_MODEL_PATH, + ReadOnly: true, + }) + } + } + + // Handle spec. + template.Spec.Volumes = append(template.Spec.Volumes, corev1.Volume{ + Name: MODEL_VOLUME_NAME, + VolumeSource: corev1.VolumeSource{ + EmptyDir: &corev1.EmptyDirVolumeSource{}, + }, + }) + + // TODO: support OCI image volume + // template.Spec.Volumes = append(template.Spec.Volumes, corev1.Volume{ + // Name: MODEL_VOLUME_NAME, + // VolumeSource: corev1.VolumeSource{ + // Image: &corev1.ImageVolumeSource{ + // Reference: url, + // PullPolicy: corev1.PullIfNotPresent, + // }, + // }, + // }) +} diff --git a/pkg/controller_helper/modelsource/modelsource_test.go b/pkg/controller_helper/modelsource/modelsource_test.go index d8b5df71..3a156670 100644 --- a/pkg/controller_helper/modelsource/modelsource_test.go +++ b/pkg/controller_helper/modelsource/modelsource_test.go @@ -27,36 +27,55 @@ import ( "github.com/inftyai/llmaz/test/util/wrapper" ) -func Test_ModelSourceProvider(t *testing.T) { +func TestModelSourceProvider(t *testing.T) { testCases := []struct { - name string - model *coreapi.OpenModel - wantModelName string - wantModelPath string + name string + model *coreapi.OpenModel + wantModelName string + wantModelPath string + skipModelLoader bool }{ { - name: "model with modelhub configured", - model: util.MockASampleModel(), - wantModelName: "llama3-8b", - wantModelPath: "/workspace/models/models--meta-llama--Meta-Llama-3-8B", + name: "model with modelhub configured", + model: util.MockASampleModel(), + wantModelName: "llama3-8b", + wantModelPath: "/workspace/models/models--meta-llama--Meta-Llama-3-8B", + skipModelLoader: false, }, { - name: "modelhub with GGUF file", - model: wrapper.MakeModel("test-7b").FamilyName("test").ModelSourceWithModelHub("Huggingface").ModelSourceWithModelID("Qwen/Qwen2-0.5B-Instruct-GGUF", "qwen2-0_5b-instruct-q5_k_m.gguf", "", nil, nil).Obj(), - wantModelName: "test-7b", - wantModelPath: "/workspace/models/qwen2-0_5b-instruct-q5_k_m.gguf", + name: "model with modelhub configured and skipModelLoader is true", + model: util.MockASampleModel(), + wantModelName: "llama3-8b", + wantModelPath: "meta-llama/Meta-Llama-3-8B", + skipModelLoader: true, }, { - name: "model with URI configured", - model: wrapper.MakeModel("test-7b").FamilyName("test").ModelSourceWithURI("oss://bucket.endpoint/modelPath/subPath").Obj(), - wantModelName: "test-7b", - wantModelPath: "/workspace/models/models--subPath", + name: "modelhub with GGUF file", + model: wrapper.MakeModel("test-7b").FamilyName("test").ModelSourceWithModelHub("Huggingface").ModelSourceWithModelID("Qwen/Qwen2-0.5B-Instruct-GGUF", "qwen2-0_5b-instruct-q5_k_m.gguf", "", nil, nil).Obj(), + wantModelName: "test-7b", + wantModelPath: "/workspace/models/qwen2-0_5b-instruct-q5_k_m.gguf", + skipModelLoader: false, }, { - name: "URI with GGUF model", - model: wrapper.MakeModel("test-7b").FamilyName("test").ModelSourceWithURI("oss://bucket.endpoint/modelPath/weight.gguf").Obj(), - wantModelName: "test-7b", - wantModelPath: "/workspace/models/weight.gguf", + name: "model with URI configured", + model: wrapper.MakeModel("test-7b").FamilyName("test").ModelSourceWithURI("oss://bucket.endpoint/modelPath/subPath").Obj(), + wantModelName: "test-7b", + wantModelPath: "/workspace/models/models--subPath", + skipModelLoader: false, + }, + { + name: "model with URI configured and skipModelLoader is true", + model: wrapper.MakeModel("test-7b").FamilyName("test").ModelSourceWithURI("oss://bucket.endpoint/modelPath/subPath").Obj(), + wantModelName: "test-7b", + wantModelPath: "oss://bucket.endpoint/modelPath/subPath", + skipModelLoader: true, + }, + { + name: "URI with GGUF model", + model: wrapper.MakeModel("test-7b").FamilyName("test").ModelSourceWithURI("oss://bucket.endpoint/modelPath/weight.gguf").Obj(), + wantModelName: "test-7b", + wantModelPath: "/workspace/models/weight.gguf", + skipModelLoader: false, }, } @@ -66,8 +85,8 @@ func Test_ModelSourceProvider(t *testing.T) { if tc.wantModelName != provider.ModelName() { t.Fatalf("unexpected model name, want %s, got %s", tc.wantModelName, provider.ModelName()) } - if tc.wantModelPath != provider.ModelPath() { - t.Fatalf("unexpected model path, want %s, got %s", tc.wantModelPath, provider.ModelPath()) + if tc.wantModelPath != provider.ModelPath(tc.skipModelLoader) { + t.Fatalf("unexpected model path, want %s, got %s", tc.wantModelPath, provider.ModelPath(tc.skipModelLoader)) } }) } @@ -129,3 +148,105 @@ func TestEnvInjectModelLoader(t *testing.T) { }) } } + +func TestInjectModelEnvVars(t *testing.T) { + tests := []struct { + name string + provider ModelSourceProvider + template *corev1.PodTemplateSpec + expectEnvVars []string + expectSecretRef string + }{ + { + name: "S3 URI provider injects AWS credentials", + provider: &URIProvider{ + modelName: "test-model", + protocol: S3, + }, + template: &corev1.PodTemplateSpec{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: MODEL_RUNNER_CONTAINER_NAME, + }, + }, + }, + }, + expectEnvVars: []string{AWS_ACCESS_KEY_ID, AWS_ACCESS_KEY_SECRET}, + expectSecretRef: AWS_ACCESS_SECRET_NAME, + }, + { + name: "OSS URI provider injects OSS credentials", + provider: &URIProvider{ + modelName: "test-model", + protocol: OSS, + }, + template: &corev1.PodTemplateSpec{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: MODEL_RUNNER_CONTAINER_NAME, + }, + }, + }, + }, + expectEnvVars: []string{OSS_ACCESS_KEY_ID, OSS_ACCESS_KEY_SECRET}, + expectSecretRef: OSS_ACCESS_SECRET_NAME, + }, + { + name: "ModelHub provider injects HuggingFace token", + provider: &ModelHubProvider{ + modelName: "test-model", + }, + template: &corev1.PodTemplateSpec{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: MODEL_RUNNER_CONTAINER_NAME, + }, + }, + }, + }, + expectEnvVars: []string{"HUGGING_FACE_HUB_TOKEN", "HF_TOKEN"}, + expectSecretRef: MODELHUB_SECRET_NAME, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // Call InjectModelEnvVars + tt.provider.InjectModelEnvVars(tt.template) + + // Find the model-runner container + var container *corev1.Container + for i, c := range tt.template.Spec.Containers { + if c.Name == MODEL_RUNNER_CONTAINER_NAME { + container = &tt.template.Spec.Containers[i] + break + } + } + assert.NotNil(t, container, "model-runner container not found") + + // Check for expected environment variables + envVarMap := make(map[string]bool) + secretRefFound := false + + for _, env := range container.Env { + envVarMap[env.Name] = true + if env.ValueFrom != nil && env.ValueFrom.SecretKeyRef != nil { + if env.ValueFrom.SecretKeyRef.LocalObjectReference.Name == tt.expectSecretRef { + secretRefFound = true + } + } + } + + // Verify all expected env vars are present + for _, envName := range tt.expectEnvVars { + assert.True(t, envVarMap[envName], "expected env var %s not found", envName) + } + + // Verify secret reference is present + assert.True(t, secretRefFound, "expected secret reference %s not found", tt.expectSecretRef) + }) + } +} diff --git a/pkg/controller_helper/modelsource/uri.go b/pkg/controller_helper/modelsource/uri.go index 6790dd54..1577fc9f 100644 --- a/pkg/controller_helper/modelsource/uri.go +++ b/pkg/controller_helper/modelsource/uri.go @@ -29,7 +29,9 @@ import ( var _ ModelSourceProvider = &URIProvider{} const ( + GCS = "GCS" OSS = "OSS" + S3 = "S3" Ollama = "OLLAMA" HostPath = "HOST" ) @@ -40,6 +42,7 @@ type URIProvider struct { bucket string endpoint string modelPath string + uri string } func (p *URIProvider) ModelName() string { @@ -58,13 +61,18 @@ func (p *URIProvider) ModelName() string { // Example 2: // - uri: bucket.endpoint/modelPath/model.gguf // modelPath: /workspace/models/model.gguf -func (p *URIProvider) ModelPath() string { +func (p *URIProvider) ModelPath(skipModelLoader bool) string { if p.protocol == HostPath { return p.modelPath } - // protocol is oss. + // Skip the model loader to allow the inference engine to handle loading models directly from remote storage (e.g., S3, OSS). + // In this case, the remote model path should be returned (e.g., s3://bucket/modelPath). + if skipModelLoader { + return p.uri + } + // protocol is oss. splits := strings.Split(p.modelPath, "/") if strings.Contains(p.modelPath, ".gguf") { @@ -103,7 +111,6 @@ func (p *URIProvider) InjectModelLoader(template *corev1.PodTemplateSpec, index } // Other protocols. - initContainerName := MODEL_LOADER_CONTAINER_NAME if index != 0 { initContainerName += "-" + strconv.Itoa(index) @@ -160,42 +167,109 @@ func (p *URIProvider) InjectModelLoader(template *corev1.PodTemplateSpec, index } template.Spec.InitContainers = append(template.Spec.InitContainers, *initContainer) +} - // Return once not the main model, because all the below has already been injected. - if index != 0 { - return - } +func (p *URIProvider) InjectModelEnvVars(template *corev1.PodTemplateSpec) { + switch p.protocol { + case S3, GCS: + for i := range template.Spec.Containers { + if template.Spec.Containers[i].Name == MODEL_RUNNER_CONTAINER_NAME { + // Check if AWS credentials already exist + awsKeyIDExists := false + awsKeySecretExists := false + for _, env := range template.Spec.Containers[i].Env { + if env.Name == AWS_ACCESS_KEY_ID { + awsKeyIDExists = true + } + if env.Name == AWS_ACCESS_KEY_SECRET { + awsKeySecretExists = true + } + } - // Handle container. + // Add AWS_ACCESS_KEY_ID if it doesn't exist + if !awsKeyIDExists { + template.Spec.Containers[i].Env = append(template.Spec.Containers[i].Env, + corev1.EnvVar{ + Name: AWS_ACCESS_KEY_ID, + ValueFrom: &corev1.EnvVarSource{ + SecretKeyRef: &corev1.SecretKeySelector{ + LocalObjectReference: corev1.LocalObjectReference{ + Name: AWS_ACCESS_SECRET_NAME, // if secret not exists, the env is empty. + }, + Key: AWS_ACCESS_KEY_ID, + Optional: ptr.To[bool](true), + }, + }, + }) + } - for i, container := range template.Spec.Containers { - // We only consider this container. - if container.Name == MODEL_RUNNER_CONTAINER_NAME { - template.Spec.Containers[i].VolumeMounts = append(template.Spec.Containers[i].VolumeMounts, corev1.VolumeMount{ - Name: MODEL_VOLUME_NAME, - MountPath: CONTAINER_MODEL_PATH, - ReadOnly: true, - }) + // Add AWS_ACCESS_KEY_SECRET if it doesn't exist + if !awsKeySecretExists { + template.Spec.Containers[i].Env = append(template.Spec.Containers[i].Env, + corev1.EnvVar{ + Name: AWS_ACCESS_KEY_SECRET, + ValueFrom: &corev1.EnvVarSource{ + SecretKeyRef: &corev1.SecretKeySelector{ + LocalObjectReference: corev1.LocalObjectReference{ + Name: AWS_ACCESS_SECRET_NAME, // if secret not exists, the env is empty. + }, + Key: AWS_ACCESS_KEY_SECRET, + Optional: ptr.To[bool](true), + }, + }, + }) + } + } } - } + case OSS: + for i := range template.Spec.Containers { + if template.Spec.Containers[i].Name == MODEL_RUNNER_CONTAINER_NAME { + // Check if OSS credentials already exist + ossKeyIDExists := false + ossKeySecretExists := false + for _, env := range template.Spec.Containers[i].Env { + if env.Name == OSS_ACCESS_KEY_ID { + ossKeyIDExists = true + } + if env.Name == OSS_ACCESS_KEY_SECRET { + ossKeySecretExists = true + } + } - // Handle spec. + // Add OSS_ACCESS_KEY_ID if it doesn't exist + if !ossKeyIDExists { + template.Spec.Containers[i].Env = append(template.Spec.Containers[i].Env, + corev1.EnvVar{ + Name: OSS_ACCESS_KEY_ID, + ValueFrom: &corev1.EnvVarSource{ + SecretKeyRef: &corev1.SecretKeySelector{ + LocalObjectReference: corev1.LocalObjectReference{ + Name: OSS_ACCESS_SECRET_NAME, // if secret not exists, the env is empty. + }, + Key: OSS_ACCESS_KEY_ID, + Optional: ptr.To[bool](true), + }, + }, + }) + } - template.Spec.Volumes = append(template.Spec.Volumes, corev1.Volume{ - Name: MODEL_VOLUME_NAME, - VolumeSource: corev1.VolumeSource{ - EmptyDir: &corev1.EmptyDirVolumeSource{}, - }, - }) - - // TODO: support OCI image volume - // template.Spec.Volumes = append(template.Spec.Volumes, corev1.Volume{ - // Name: MODEL_VOLUME_NAME, - // VolumeSource: corev1.VolumeSource{ - // Image: &corev1.ImageVolumeSource{ - // Reference: url, - // PullPolicy: corev1.PullIfNotPresent, - // }, - // }, - // }) + // Add OSS_ACCESS_KEY_SECRET if it doesn't exist + if !ossKeySecretExists { + template.Spec.Containers[i].Env = append(template.Spec.Containers[i].Env, + corev1.EnvVar{ + Name: OSS_ACCESS_KEY_SECRET, + ValueFrom: &corev1.EnvVarSource{ + SecretKeyRef: &corev1.SecretKeySelector{ + LocalObjectReference: corev1.LocalObjectReference{ + Name: OSS_ACCESS_SECRET_NAME, // if secret not exists, the env is empty. + }, + Key: OSS_ACCESS_KEY_SECRET, + Optional: ptr.To[bool](true), + }, + }, + }) + } + } + } + } } diff --git a/pkg/util/uri.go b/pkg/util/uri.go index 28d203c5..69b15ed6 100644 --- a/pkg/util/uri.go +++ b/pkg/util/uri.go @@ -45,3 +45,13 @@ func ParseOSS(address string) (endpoint, bucket, modelPath string, err error) { endpoint, modelPath = splits[0], splits[1] return endpoint, bucket, modelPath, nil } + +// ParseS3 address looks like: / +func ParseS3(address string) (bucket, modelPath string, err error) { + splits := strings.SplitN(address, "/", 2) + if len(splits) != 2 { + return "", "", fmt.Errorf("address not right %s", address) + } + bucket, modelPath = splits[0], splits[1] + return bucket, modelPath, nil +} diff --git a/pkg/webhook/openmodel_webhook.go b/pkg/webhook/openmodel_webhook.go index b3d84c9d..ca8e7c34 100644 --- a/pkg/webhook/openmodel_webhook.go +++ b/pkg/webhook/openmodel_webhook.go @@ -47,7 +47,9 @@ func SetupOpenModelWebhook(mgr ctrl.Manager) error { var _ webhook.CustomDefaulter = &OpenModelWebhook{} var SUPPORTED_OBJ_STORES = map[string]struct{}{ + modelSource.GCS: {}, modelSource.OSS: {}, + modelSource.S3: {}, modelSource.Ollama: {}, modelSource.HostPath: {}, } @@ -111,6 +113,10 @@ func (w *OpenModelWebhook) generateValidate(obj runtime.Object) field.ErrorList if _, _, _, err := util.ParseOSS(address); err != nil { allErrs = append(allErrs, field.Invalid(sourcePath.Child("uri"), *model.Spec.Source.URI, "URI with wrong address")) } + case modelSource.S3, modelSource.GCS: + if _, _, err := util.ParseS3(address); err != nil { + allErrs = append(allErrs, field.Invalid(sourcePath.Child("uri"), *model.Spec.Source.URI, "URI with wrong address")) + } } } } diff --git a/test/e2e/playground_test.go b/test/e2e/playground_test.go index 58fc0704..6087e120 100644 --- a/test/e2e/playground_test.go +++ b/test/e2e/playground_test.go @@ -167,4 +167,62 @@ var _ = ginkgo.Describe("playground e2e tests", func() { validation.ValidateServiceStatusEqualTo(ctx, k8sClient, service, inferenceapi.ServiceAvailable, "ServiceReady", metav1.ConditionTrue) validation.ValidateServicePods(ctx, k8sClient, service) }) + + ginkgo.It("Deploy huggingface model with llmaz.io/skip-model-loader annotation", func() { + model := wrapper.MakeModel("opt-125m").FamilyName("opt").ModelSourceWithModelHub("Huggingface").ModelSourceWithModelID("facebook/opt-125m", "", "", nil, nil).Obj() + gomega.Expect(k8sClient.Create(ctx, model)).To(gomega.Succeed()) + defer func() { + gomega.Expect(k8sClient.Delete(ctx, model)).To(gomega.Succeed()) + }() + + playground := wrapper.MakePlayground("opt-125m", ns.Name). + ModelClaim("opt-125m"). + BackendRuntime("vllm").Replicas(1).Obj() + + if playground.Annotations == nil { + playground.Annotations = make(map[string]string) + } + playground.Annotations["llmaz.io/skip-model-loader"] = "true" + + gomega.Expect(k8sClient.Create(ctx, playground)).To(gomega.Succeed()) + validation.ValidatePlayground(ctx, k8sClient, playground) + //validation.ValidatePlaygroundStatusEqualTo(ctx, k8sClient, playground, inferenceapi.PlaygroundAvailable, "PlaygroundReady", metav1.ConditionTrue) + + service := &inferenceapi.Service{} + gomega.Expect(k8sClient.Get(ctx, types.NamespacedName{Name: playground.Name, Namespace: playground.Namespace}, service)).To(gomega.Succeed()) + validation.ValidateService(ctx, k8sClient, service) + // Revisit this when we have GPU resources for e2e test + //validation.ValidateServiceStatusEqualTo(ctx, k8sClient, service, inferenceapi.ServiceAvailable, "ServiceReady", metav1.ConditionTrue) + //validation.ValidateServicePods(ctx, k8sClient, service) + //gomega.Expect(validation.ValidateServiceAvaliable(ctx, k8sClient, cfg, service, validation.CheckServiceAvaliable)).To(gomega.Succeed()) + }) + + ginkgo.It("Deploy S3 model with llmaz.io/skip-model-loader annotation", func() { + model := wrapper.MakeModel("opt-125m").FamilyName("opt").ModelSourceWithURI("s3://test-bucket/opt-125m").Obj() + gomega.Expect(k8sClient.Create(ctx, model)).To(gomega.Succeed()) + defer func() { + gomega.Expect(k8sClient.Delete(ctx, model)).To(gomega.Succeed()) + }() + + playground := wrapper.MakePlayground("opt-125m", ns.Name). + ModelClaim("opt-125m"). + BackendRuntime("vllm").Replicas(1).Obj() + + if playground.Annotations == nil { + playground.Annotations = make(map[string]string) + } + playground.Annotations["llmaz.io/skip-model-loader"] = "true" + + gomega.Expect(k8sClient.Create(ctx, playground)).To(gomega.Succeed()) + validation.ValidatePlayground(ctx, k8sClient, playground) + //validation.ValidatePlaygroundStatusEqualTo(ctx, k8sClient, playground, inferenceapi.PlaygroundAvailable, "PlaygroundReady", metav1.ConditionTrue) + + service := &inferenceapi.Service{} + gomega.Expect(k8sClient.Get(ctx, types.NamespacedName{Name: playground.Name, Namespace: playground.Namespace}, service)).To(gomega.Succeed()) + validation.ValidateService(ctx, k8sClient, service) + // Revisit this when we have GPU resources for e2e test + //validation.ValidateServiceStatusEqualTo(ctx, k8sClient, service, inferenceapi.ServiceAvailable, "ServiceReady", metav1.ConditionTrue) + //validation.ValidateServicePods(ctx, k8sClient, service) + //gomega.Expect(validation.ValidateServiceAvaliable(ctx, k8sClient, cfg, service, validation.CheckServiceAvaliable)).To(gomega.Succeed()) + }) }) diff --git a/test/util/validation/validate_service.go b/test/util/validation/validate_service.go index 6ed13b6c..fe70a439 100644 --- a/test/util/validation/validate_service.go +++ b/test/util/validation/validate_service.go @@ -73,14 +73,25 @@ func ValidateService(ctx context.Context, k8sClient client.Client, service *infe } for index, model := range models { - // Validate injecting modelLoaders - if service.Spec.WorkloadTemplate.LeaderTemplate != nil { - if err := ValidateModelLoader(model, index, *workload.Spec.LeaderWorkerTemplate.LeaderTemplate, service); err != nil { + if helper.SkipModelLoader(service) { + if service.Spec.WorkloadTemplate.LeaderTemplate != nil { + if err := ValidateSkipModelLoader(model, index, *workload.Spec.LeaderWorkerTemplate.LeaderTemplate, service); err != nil { + return err + } + } + if err := ValidateSkipModelLoader(model, index, workload.Spec.LeaderWorkerTemplate.WorkerTemplate, service); err != nil { + return err + } + } else { + // Validate injecting modelLoaders + if service.Spec.WorkloadTemplate.LeaderTemplate != nil { + if err := ValidateModelLoader(model, index, *workload.Spec.LeaderWorkerTemplate.LeaderTemplate, service); err != nil { + return err + } + } + if err := ValidateModelLoader(model, index, workload.Spec.LeaderWorkerTemplate.WorkerTemplate, service); err != nil { return err } - } - if err := ValidateModelLoader(model, index, workload.Spec.LeaderWorkerTemplate.WorkerTemplate, service); err != nil { - return err } } @@ -139,7 +150,7 @@ func ValidateModelLoader(model *coreapi.OpenModel, index int, template corev1.Po var envStrings []string if model.Spec.Source.ModelHub != nil { - envStrings = []string{"MODEL_SOURCE_TYPE", "MODEL_ID", "MODEL_HUB_NAME", "HF_TOKEN", "HUGGING_FACE_HUB_TOKEN"} + envStrings = []string{"MODEL_SOURCE_TYPE", "MODEL_ID", "MODEL_HUB_NAME", modelSource.HUGGING_FACE_TOKEN_KEY, modelSource.HUGGING_FACE_HUB_TOKEN} if model.Spec.Source.ModelHub.Revision != nil { envStrings = append(envStrings, "REVISION") } @@ -151,7 +162,7 @@ func ValidateModelLoader(model *coreapi.OpenModel, index int, template corev1.Po } } if model.Spec.Source.URI != nil { - envStrings = []string{"MODEL_SOURCE_TYPE", "PROVIDER", "ENDPOINT", "BUCKET", "MODEL_PATH", "OSS_ACCESS_KEY_ID", "OSS_ACCESS_KEY_SECRET"} + envStrings = []string{"MODEL_SOURCE_TYPE", "PROVIDER", "ENDPOINT", "BUCKET", "MODEL_PATH", modelSource.OSS_ACCESS_KEY_ID, modelSource.OSS_ACCESS_KEY_SECRET} } for _, str := range envStrings { @@ -251,6 +262,76 @@ func ValidateServicePods(ctx context.Context, k8sClient client.Client, service * }).Should(gomega.Succeed()) } +// ValidateSkipModelLoader validates the model-loader initContainer is not injected into the template +// and checks if the model-runner contains model credentials environment variables +func ValidateSkipModelLoader(model *coreapi.OpenModel, index int, template corev1.PodTemplateSpec, service *inferenceapi.Service) error { + if model.Spec.Source.URI != nil { + protocol, _, _ := pkgUtil.ParseURI(string(*model.Spec.Source.URI)) + if protocol == modelSource.Ollama { + return nil + } + } + + if model.Spec.Source.ModelHub != nil || model.Spec.Source.URI != nil { + // Check if the template does not contain the model-loader initContainer + containerName := modelSource.MODEL_LOADER_CONTAINER_NAME + if index != 0 { + containerName += "-" + strconv.Itoa(index) + } + + for _, container := range template.Spec.InitContainers { + if container.Name == containerName { + return fmt.Errorf("template has model-loader initContainer: %s", container.Name) + } + } + + for _, container := range template.Spec.Containers { + if container.Name == modelSource.MODEL_RUNNER_CONTAINER_NAME { + // Check if the model-runner container contains model credentials environment variables + var envStrings []string + if model.Spec.Source.ModelHub != nil { + envStrings = append(envStrings, modelSource.HUGGING_FACE_TOKEN_KEY, modelSource.HUGGING_FACE_HUB_TOKEN) + } else if model.Spec.Source.URI != nil { + protocol, _, _ := pkgUtil.ParseURI(string(*model.Spec.Source.URI)) + if protocol == modelSource.S3 || protocol == modelSource.GCS { + envStrings = append(envStrings, modelSource.AWS_ACCESS_KEY_ID, modelSource.AWS_ACCESS_KEY_SECRET) + } else if protocol == modelSource.OSS { + envStrings = append(envStrings, modelSource.OSS_ACCESS_KEY_ID, modelSource.OSS_ACCESS_KEY_SECRET) + } + } + + for _, str := range envStrings { + envExist := false + for _, env := range container.Env { + if env.Name == str { + envExist = true + break + } + } + if !envExist { + return fmt.Errorf("env %s doesn't exist", str) + } + } + + // The model-runner container should not mount the model-volume if the model-loader initContainer is not injected + for _, v := range container.VolumeMounts { + if v.Name == modelSource.MODEL_VOLUME_NAME { + return fmt.Errorf("model-runner container has volume mount %s", v.Name) + } + } + } + } + + for _, v := range template.Spec.Volumes { + if v.Name == modelSource.MODEL_VOLUME_NAME { + return errors.New("when skip the model-loader initContainer, the model-volume should not be created") + } + } + } + + return nil +} + type CheckServiceAvailableFunc func() error func ValidateServiceAvaliable(ctx context.Context, k8sClient client.Client, cfg *rest.Config, service *inferenceapi.Service, check CheckServiceAvailableFunc) error {