From d7da8f146e5e4773fb127a8e698d5f16e4d94b14 Mon Sep 17 00:00:00 2001 From: Gleb Kanterov Date: Mon, 23 Sep 2024 12:17:25 +0200 Subject: [PATCH 1/6] Add JobTaskClusterSpec validate mutator --- .../config/validate/job_task_cluster_spec.go | 155 ++++++++++++++ .../validate/job_task_cluster_spec_test.go | 201 ++++++++++++++++++ bundle/config/validate/validate.go | 1 + 3 files changed, 357 insertions(+) create mode 100644 bundle/config/validate/job_task_cluster_spec.go create mode 100644 bundle/config/validate/job_task_cluster_spec_test.go diff --git a/bundle/config/validate/job_task_cluster_spec.go b/bundle/config/validate/job_task_cluster_spec.go new file mode 100644 index 0000000000..e525a0f16d --- /dev/null +++ b/bundle/config/validate/job_task_cluster_spec.go @@ -0,0 +1,155 @@ +package validate + +import ( + "context" + "fmt" + "strings" + + "github.com/databricks/cli/bundle" + "github.com/databricks/cli/libs/diag" + "github.com/databricks/cli/libs/dyn" + "github.com/databricks/databricks-sdk-go/service/jobs" +) + +// JobTaskClusterSpec validates that job tasks have cluster spec defined +// if task requires a cluster +func JobTaskClusterSpec() bundle.ReadOnlyMutator { + return &jobTaskClusterSpec{} +} + +type jobTaskClusterSpec struct { +} + +func (v *jobTaskClusterSpec) Name() string { + return "validate:job_task_cluster_spec" +} + +func (v *jobTaskClusterSpec) Apply(ctx context.Context, rb bundle.ReadOnlyBundle) diag.Diagnostics { + diags := diag.Diagnostics{} + + jobsPath := dyn.NewPath(dyn.Key("resources"), dyn.Key("jobs")) + + for resourceName, job := range rb.Config().Resources.Jobs { + resourcePath := jobsPath.Append(dyn.Key(resourceName)) + + for taskIndex, task := range job.Tasks { + taskPath := resourcePath.Append(dyn.Key("tasks"), dyn.Index(taskIndex)) + + diags = diags.Extend(validateJobTask(rb, task, taskPath)) + } + } + + return diags +} + +func validateJobTask(rb bundle.ReadOnlyBundle, task jobs.Task, taskPath dyn.Path) diag.Diagnostics { + diags := diag.Diagnostics{} + + var specified []string + var unspecified []string + + if task.JobClusterKey != "" { + specified = append(specified, "job_cluster_key") + } else { + unspecified = append(unspecified, "job_cluster_key") + } + + if task.EnvironmentKey != "" { + specified = append(specified, "environment_key") + } else { + unspecified = append(unspecified, "environment_key") + } + + if task.ExistingClusterId != "" { + specified = append(specified, "existing_cluster_id") + } else { + unspecified = append(unspecified, "existing_cluster_id") + } + + if task.NewCluster != nil { + specified = append(specified, "new_cluster") + } else { + unspecified = append(unspecified, "new_cluster") + } + + if task.ForEachTask != nil { + forEachTaskPath := taskPath.Append(dyn.Key("for_each_task"), dyn.Key("task")) + + diags = diags.Extend(validateJobTask(rb, task.ForEachTask.Task, forEachTaskPath)) + } + + if isComputeTask(task) && len(specified) == 0 { + if task.NotebookTask != nil { + // notebook tasks without cluster spec will use notebook environment + } else { + // path might be not very helpful, adding user-specified task key clarifies the context + detail := fmt.Sprintf("Task %q has a task type that requires a cluster, but no cluster is specified", task.TaskKey) + + diags = diags.Append(diag.Diagnostic{ + Severity: diag.Error, + Summary: fmt.Sprintf("One of the following fields must be set: %s", strings.Join(unspecified, ", ")), + Detail: detail, + Locations: rb.Config().GetLocations(taskPath.String()), + Paths: []dyn.Path{taskPath}, + }) + } + } + + return diags +} + +// isComputeTask returns true if the task requires a cluster +func isComputeTask(task jobs.Task) bool { + if task.NotebookTask != nil { + // if warehouse_id is set, it's SQL notebook that doesn't need cluster + if task.NotebookTask.WarehouseId != "" { + return false + } else { + return true + } + } + + if task.PythonWheelTask != nil { + return true + } + + if task.DbtTask != nil { + return true + } + + if task.SparkJarTask != nil { + return true + } + + if task.SparkSubmitTask != nil { + return true + } + + if task.SparkPythonTask != nil { + return true + } + + if task.SqlTask != nil { + return false + } + + if task.PipelineTask != nil { + // while pipelines use clusters, pipeline tasks don't, they only trigger pipelines + return false + } + + if task.RunJobTask != nil { + return false + } + + if task.ConditionTask != nil { + return false + } + + // for each task doesn't use clusters, underlying task(s) can though + if task.ForEachTask != nil { + return false + } + + return false +} diff --git a/bundle/config/validate/job_task_cluster_spec_test.go b/bundle/config/validate/job_task_cluster_spec_test.go new file mode 100644 index 0000000000..c52f927ad5 --- /dev/null +++ b/bundle/config/validate/job_task_cluster_spec_test.go @@ -0,0 +1,201 @@ +package validate + +import ( + "context" + "testing" + + "github.com/databricks/cli/bundle" + "github.com/databricks/cli/bundle/config" + "github.com/databricks/cli/bundle/config/resources" + "github.com/databricks/databricks-sdk-go/service/compute" + "github.com/databricks/databricks-sdk-go/service/jobs" + "github.com/stretchr/testify/assert" +) + +func TestJobTaskClusterSpec(t *testing.T) { + expectedSummary := "One of the following fields must be set: job_cluster_key, environment_key, existing_cluster_id, new_cluster" + + type testCase struct { + name string + task jobs.Task + errorPath string + errorDetail string + errorSummary string + } + + testCases := []testCase{ + { + name: "valid notebook task", + task: jobs.Task{ + // while a cluster is needed, it will use notebook environment to create one + NotebookTask: &jobs.NotebookTask{}, + }, + }, + { + name: "valid notebook task (job_cluster_key)", + task: jobs.Task{ + JobClusterKey: "cluster1", + NotebookTask: &jobs.NotebookTask{}, + }, + }, + { + name: "valid notebook task (new_cluster)", + task: jobs.Task{ + NewCluster: &compute.ClusterSpec{}, + NotebookTask: &jobs.NotebookTask{}, + }, + }, + { + name: "valid notebook task (existing_cluster_id)", + task: jobs.Task{ + ExistingClusterId: "cluster1", + NotebookTask: &jobs.NotebookTask{}, + }, + }, + { + name: "valid SQL notebook task", + task: jobs.Task{ + NotebookTask: &jobs.NotebookTask{ + WarehouseId: "warehouse1", + }, + }, + }, + { + name: "valid python wheel task", + task: jobs.Task{ + JobClusterKey: "cluster1", + PythonWheelTask: &jobs.PythonWheelTask{}, + }, + }, + { + name: "valid python wheel task (environment_key)", + task: jobs.Task{ + EnvironmentKey: "environment1", + PythonWheelTask: &jobs.PythonWheelTask{}, + }, + }, + { + name: "valid dbt task", + task: jobs.Task{ + JobClusterKey: "cluster1", + DbtTask: &jobs.DbtTask{}, + }, + }, + { + name: "valid spark jar task", + task: jobs.Task{ + JobClusterKey: "cluster1", + SparkJarTask: &jobs.SparkJarTask{}, + }, + }, + { + name: "valid spark submit", + task: jobs.Task{ + NewCluster: &compute.ClusterSpec{}, + SparkSubmitTask: &jobs.SparkSubmitTask{}, + }, + }, + { + name: "valid spark python task", + task: jobs.Task{ + JobClusterKey: "cluster1", + SparkPythonTask: &jobs.SparkPythonTask{}, + }, + }, + { + name: "valid SQL task", + task: jobs.Task{ + SqlTask: &jobs.SqlTask{}, + }, + }, + { + name: "valid pipeline task", + task: jobs.Task{ + PipelineTask: &jobs.PipelineTask{}, + }, + }, + { + name: "valid run job task", + task: jobs.Task{ + RunJobTask: &jobs.RunJobTask{}, + }, + }, + { + name: "valid condition task", + task: jobs.Task{ + ConditionTask: &jobs.ConditionTask{}, + }, + }, + { + name: "valid for each task", + task: jobs.Task{ + ForEachTask: &jobs.ForEachTask{ + Task: jobs.Task{ + JobClusterKey: "cluster1", + NotebookTask: &jobs.NotebookTask{}, + }, + }, + }, + }, + { + name: "invalid python wheel task", + task: jobs.Task{ + PythonWheelTask: &jobs.PythonWheelTask{}, + TaskKey: "my_task", + }, + errorPath: "resources.jobs.job1.tasks[0]", + errorDetail: "Task \"my_task\" has a task type that requires a cluster, but no cluster is specified", + errorSummary: expectedSummary, + }, + { + name: "invalid for each task", + task: jobs.Task{ + ForEachTask: &jobs.ForEachTask{ + Task: jobs.Task{ + PythonWheelTask: &jobs.PythonWheelTask{}, + TaskKey: "my_task", + }, + }, + }, + errorPath: "resources.jobs.job1.tasks[0].for_each_task.task", + errorDetail: "Task \"my_task\" has a task type that requires a cluster, but no cluster is specified", + errorSummary: expectedSummary, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + job := &resources.Job{ + JobSettings: &jobs.JobSettings{ + Tasks: []jobs.Task{tc.task}, + }, + } + + b := createBundle(map[string]*resources.Job{"job1": job}) + diags := bundle.ApplyReadOnly(context.Background(), bundle.ReadOnly(b), JobTaskClusterSpec()) + + if tc.errorPath != "" || tc.errorDetail != "" || tc.errorSummary != "" { + assert.Len(t, diags, 1) + assert.Len(t, diags[0].Paths, 1) + + diag := diags[0] + + assert.Equal(t, tc.errorPath, diag.Paths[0].String()) + assert.Equal(t, tc.errorSummary, diag.Summary) + assert.Equal(t, tc.errorDetail, diag.Detail) + } else { + assert.ElementsMatch(t, []string{}, diags) + } + }) + } +} + +func createBundle(jobs map[string]*resources.Job) *bundle.Bundle { + return &bundle.Bundle{ + Config: config.Root{ + Resources: config.Resources{ + Jobs: jobs, + }, + }, + } +} diff --git a/bundle/config/validate/validate.go b/bundle/config/validate/validate.go index b4da0bc053..79f42bd232 100644 --- a/bundle/config/validate/validate.go +++ b/bundle/config/validate/validate.go @@ -34,6 +34,7 @@ func (v *validate) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnostics JobClusterKeyDefined(), FilesToSync(), ValidateSyncPatterns(), + JobTaskClusterSpec(), )) } From d3032c493c4476448cfbdad52ca5421075413db2 Mon Sep 17 00:00:00 2001 From: Gleb Kanterov Date: Tue, 24 Sep 2024 15:55:32 +0200 Subject: [PATCH 2/6] Improve messages --- bundle/config/validate/job_task_cluster_spec.go | 8 +++++--- bundle/config/validate/job_task_cluster_spec_test.go | 4 ++-- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/bundle/config/validate/job_task_cluster_spec.go b/bundle/config/validate/job_task_cluster_spec.go index e525a0f16d..f175f23481 100644 --- a/bundle/config/validate/job_task_cluster_spec.go +++ b/bundle/config/validate/job_task_cluster_spec.go @@ -83,7 +83,7 @@ func validateJobTask(rb bundle.ReadOnlyBundle, task jobs.Task, taskPath dyn.Path // notebook tasks without cluster spec will use notebook environment } else { // path might be not very helpful, adding user-specified task key clarifies the context - detail := fmt.Sprintf("Task %q has a task type that requires a cluster, but no cluster is specified", task.TaskKey) + detail := fmt.Sprintf("Task %q has a task type that requires a cluster or environment, but neither is specified", task.TaskKey) diags = diags.Append(diag.Diagnostic{ Severity: diag.Error, @@ -98,13 +98,15 @@ func validateJobTask(rb bundle.ReadOnlyBundle, task jobs.Task, taskPath dyn.Path return diags } -// isComputeTask returns true if the task requires a cluster +// isComputeTask returns true if the task runs on a cluster or serverless GC func isComputeTask(task jobs.Task) bool { if task.NotebookTask != nil { - // if warehouse_id is set, it's SQL notebook that doesn't need cluster + // if warehouse_id is set, it's SQL notebook that doesn't need cluster or serverless GC if task.NotebookTask.WarehouseId != "" { return false } else { + // task settings don't require specifying a cluster/serverless GC, but task itself can run on one + // we handle that case separately in validateJobTask return true } } diff --git a/bundle/config/validate/job_task_cluster_spec_test.go b/bundle/config/validate/job_task_cluster_spec_test.go index c52f927ad5..9145fb2d87 100644 --- a/bundle/config/validate/job_task_cluster_spec_test.go +++ b/bundle/config/validate/job_task_cluster_spec_test.go @@ -144,7 +144,7 @@ func TestJobTaskClusterSpec(t *testing.T) { TaskKey: "my_task", }, errorPath: "resources.jobs.job1.tasks[0]", - errorDetail: "Task \"my_task\" has a task type that requires a cluster, but no cluster is specified", + errorDetail: "Task \"my_task\" has a task type that requires a cluster or environment, but neither is specified", errorSummary: expectedSummary, }, { @@ -158,7 +158,7 @@ func TestJobTaskClusterSpec(t *testing.T) { }, }, errorPath: "resources.jobs.job1.tasks[0].for_each_task.task", - errorDetail: "Task \"my_task\" has a task type that requires a cluster, but no cluster is specified", + errorDetail: "Task \"my_task\" has a task type that requires a cluster or environment, but neither is specified", errorSummary: expectedSummary, }, } From 3a3569e1a780dbeee0dc5e97359a3e0f07c8e0e7 Mon Sep 17 00:00:00 2001 From: Gleb Kanterov Date: Wed, 25 Sep 2024 10:26:17 +0200 Subject: [PATCH 3/6] Better messages --- bundle/config/validate/job_task_cluster_spec.go | 8 ++++++-- bundle/config/validate/job_task_cluster_spec_test.go | 12 +++++++----- 2 files changed, 13 insertions(+), 7 deletions(-) diff --git a/bundle/config/validate/job_task_cluster_spec.go b/bundle/config/validate/job_task_cluster_spec.go index f175f23481..c119e6dc80 100644 --- a/bundle/config/validate/job_task_cluster_spec.go +++ b/bundle/config/validate/job_task_cluster_spec.go @@ -83,11 +83,15 @@ func validateJobTask(rb bundle.ReadOnlyBundle, task jobs.Task, taskPath dyn.Path // notebook tasks without cluster spec will use notebook environment } else { // path might be not very helpful, adding user-specified task key clarifies the context - detail := fmt.Sprintf("Task %q has a task type that requires a cluster or environment, but neither is specified", task.TaskKey) + detail := fmt.Sprintf( + "Task %q requires a cluster or an environment to run. Specify one of the following fields: %s", + task.TaskKey, + strings.Join(unspecified, ", "), + ) diags = diags.Append(diag.Diagnostic{ Severity: diag.Error, - Summary: fmt.Sprintf("One of the following fields must be set: %s", strings.Join(unspecified, ", ")), + Summary: fmt.Sprintf("Missing required cluster or environment settings"), Detail: detail, Locations: rb.Config().GetLocations(taskPath.String()), Paths: []dyn.Path{taskPath}, diff --git a/bundle/config/validate/job_task_cluster_spec_test.go b/bundle/config/validate/job_task_cluster_spec_test.go index 9145fb2d87..8f3bed8397 100644 --- a/bundle/config/validate/job_task_cluster_spec_test.go +++ b/bundle/config/validate/job_task_cluster_spec_test.go @@ -13,7 +13,7 @@ import ( ) func TestJobTaskClusterSpec(t *testing.T) { - expectedSummary := "One of the following fields must be set: job_cluster_key, environment_key, existing_cluster_id, new_cluster" + expectedSummary := "Missing required cluster or environment settings" type testCase struct { name string @@ -143,8 +143,9 @@ func TestJobTaskClusterSpec(t *testing.T) { PythonWheelTask: &jobs.PythonWheelTask{}, TaskKey: "my_task", }, - errorPath: "resources.jobs.job1.tasks[0]", - errorDetail: "Task \"my_task\" has a task type that requires a cluster or environment, but neither is specified", + errorPath: "resources.jobs.job1.tasks[0]", + errorDetail: "Task \"my_task\" requires a cluster or an environment to run. Specify one of the " + + "following fields: job_cluster_key, environment_key, existing_cluster_id, new_cluster", errorSummary: expectedSummary, }, { @@ -157,8 +158,9 @@ func TestJobTaskClusterSpec(t *testing.T) { }, }, }, - errorPath: "resources.jobs.job1.tasks[0].for_each_task.task", - errorDetail: "Task \"my_task\" has a task type that requires a cluster or environment, but neither is specified", + errorPath: "resources.jobs.job1.tasks[0].for_each_task.task", + errorDetail: "Task \"my_task\" requires a cluster or an environment to run. Specify one of the " + + "following fields: job_cluster_key, environment_key, existing_cluster_id, new_cluster", errorSummary: expectedSummary, }, } From 3a28d20d04785f6bfe24523ea09f6fa5be3cff47 Mon Sep 17 00:00:00 2001 From: Gleb Kanterov Date: Wed, 25 Sep 2024 10:28:46 +0200 Subject: [PATCH 4/6] Fix lint --- bundle/config/validate/job_task_cluster_spec.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bundle/config/validate/job_task_cluster_spec.go b/bundle/config/validate/job_task_cluster_spec.go index c119e6dc80..14bd960bbb 100644 --- a/bundle/config/validate/job_task_cluster_spec.go +++ b/bundle/config/validate/job_task_cluster_spec.go @@ -91,7 +91,7 @@ func validateJobTask(rb bundle.ReadOnlyBundle, task jobs.Task, taskPath dyn.Path diags = diags.Append(diag.Diagnostic{ Severity: diag.Error, - Summary: fmt.Sprintf("Missing required cluster or environment settings"), + Summary: "Missing required cluster or environment settings", Detail: detail, Locations: rb.Config().GetLocations(taskPath.String()), Paths: []dyn.Path{taskPath}, From 3b49d5e89db9f5265775a9d1f7873c4c191f7f00 Mon Sep 17 00:00:00 2001 From: Gleb Kanterov Date: Wed, 25 Sep 2024 13:19:04 +0200 Subject: [PATCH 5/6] Update bundle/config/validate/job_task_cluster_spec.go Co-authored-by: Pieter Noordhuis --- bundle/config/validate/job_task_cluster_spec.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bundle/config/validate/job_task_cluster_spec.go b/bundle/config/validate/job_task_cluster_spec.go index 14bd960bbb..b80befcdff 100644 --- a/bundle/config/validate/job_task_cluster_spec.go +++ b/bundle/config/validate/job_task_cluster_spec.go @@ -84,7 +84,7 @@ func validateJobTask(rb bundle.ReadOnlyBundle, task jobs.Task, taskPath dyn.Path } else { // path might be not very helpful, adding user-specified task key clarifies the context detail := fmt.Sprintf( - "Task %q requires a cluster or an environment to run. Specify one of the following fields: %s", + "Task %q requires a cluster or an environment to run.\nSpecify one of the following fields: %s.", task.TaskKey, strings.Join(unspecified, ", "), ) From 434aa17bde830cbe61962c06a66d88cef9008390 Mon Sep 17 00:00:00 2001 From: Gleb Kanterov Date: Wed, 25 Sep 2024 13:20:47 +0200 Subject: [PATCH 6/6] Fix tests --- bundle/config/validate/job_task_cluster_spec_test.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/bundle/config/validate/job_task_cluster_spec_test.go b/bundle/config/validate/job_task_cluster_spec_test.go index 8f3bed8397..a3a7ccf258 100644 --- a/bundle/config/validate/job_task_cluster_spec_test.go +++ b/bundle/config/validate/job_task_cluster_spec_test.go @@ -144,8 +144,8 @@ func TestJobTaskClusterSpec(t *testing.T) { TaskKey: "my_task", }, errorPath: "resources.jobs.job1.tasks[0]", - errorDetail: "Task \"my_task\" requires a cluster or an environment to run. Specify one of the " + - "following fields: job_cluster_key, environment_key, existing_cluster_id, new_cluster", + errorDetail: `Task "my_task" requires a cluster or an environment to run. +Specify one of the following fields: job_cluster_key, environment_key, existing_cluster_id, new_cluster.`, errorSummary: expectedSummary, }, { @@ -159,8 +159,8 @@ func TestJobTaskClusterSpec(t *testing.T) { }, }, errorPath: "resources.jobs.job1.tasks[0].for_each_task.task", - errorDetail: "Task \"my_task\" requires a cluster or an environment to run. Specify one of the " + - "following fields: job_cluster_key, environment_key, existing_cluster_id, new_cluster", + errorDetail: `Task "my_task" requires a cluster or an environment to run. +Specify one of the following fields: job_cluster_key, environment_key, existing_cluster_id, new_cluster.`, errorSummary: expectedSummary, }, }