Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions NEXT_CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,6 @@
### CLI

### Bundles
* Do not exit early when checking incompatible tasks for specified DBR ([#2692](https://github.com/databricks/cli/pull/2692))

### API Changes
4 changes: 3 additions & 1 deletion acceptance/bundle/trampoline/warning_message/output.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@

>>> errcode [CLI] bundle validate -t dev
Error: task test_task uses cluster with incompatible DBR version 12.2.x-cpu-ml-scala2.12

Error: Python wheel tasks require compute with DBR 13.3+ to include local libraries. Please change your cluster configuration or use the experimental 'python_wheel_wrapper' setting. See https://docs.databricks.com/dev-tools/bundles/python-wheel.html for more information.

Name: trampoline_warning_message
Expand All @@ -8,7 +10,7 @@ Workspace:
User: [USERNAME]
Path: /Workspace/Users/[USERNAME]/.bundle/trampoline_warning_message/dev

Found 1 error
Found 2 errors

Exit code: 1

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@

>>> errcode [CLI] bundle validate
Error: task test_task uses cluster with incompatible DBR version 7.3.x-scala2.12

Error: Python wheel tasks require compute with DBR 13.3+ to include local libraries. Please change your cluster configuration or use the experimental 'python_wheel_wrapper' setting. See https://docs.databricks.com/dev-tools/bundles/python-wheel.html for more information.

Name: trampoline_warning_message_with_old_spark
Expand All @@ -8,6 +10,6 @@ Workspace:
User: [USERNAME]
Path: /Workspace/Users/[USERNAME]/.bundle/trampoline_warning_message_with_old_spark/dev

Found 1 error
Found 2 errors

Exit code: 1
43 changes: 31 additions & 12 deletions bundle/trampoline/python_dbr_warning.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package trampoline

import (
"context"
"fmt"
"strconv"
"strings"

Expand Down Expand Up @@ -29,22 +30,32 @@ func (m *wrapperWarning) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagn
return nil
}

if hasIncompatibleWheelTasks(ctx, b) {
return diag.Errorf("Python wheel tasks require compute with DBR 13.3+ to include local libraries. Please change your cluster configuration or use the experimental 'python_wheel_wrapper' setting. See https://docs.databricks.com/dev-tools/bundles/python-wheel.html for more information.")
diags := hasIncompatibleWheelTasks(ctx, b)
if len(diags) > 0 {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style nit: Why return a diag object instead of returning the error directory? Seems like an antipattern to add an error diag if any diags are returned by hasIncompatibleWheelTasks.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is common pattern we do across codebase when we want to collect all diagnostics instead of returning early

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case we don't have to return diags from hasIncompatibleWheelTasks, we can just return the error directly. That seems more idiomatic for Go.

diags = append(diags, diag.Diagnostic{
Severity: diag.Error,
Summary: "Python wheel tasks require compute with DBR 13.3+ to include local libraries. Please change your cluster configuration or use the experimental 'python_wheel_wrapper' setting. See https://docs.databricks.com/dev-tools/bundles/python-wheel.html for more information.",
})
}
return nil
return diags
}

func isPythonWheelWrapperOn(b *bundle.Bundle) bool {
return b.Config.Experimental != nil && b.Config.Experimental.PythonWheelWrapper
}

func hasIncompatibleWheelTasks(ctx context.Context, b *bundle.Bundle) bool {
func hasIncompatibleWheelTasks(ctx context.Context, b *bundle.Bundle) diag.Diagnostics {
var diags diag.Diagnostics

tasks := libraries.FindTasksWithLocalLibraries(b)
for _, task := range tasks {
if task.NewCluster != nil {
if lowerThanExpectedVersion(task.NewCluster.SparkVersion) {
return true
diags = append(diags, diag.Diagnostic{
Severity: diag.Error,
Summary: fmt.Sprintf("task %s uses cluster with incompatible DBR version %s", task.TaskKey, task.NewCluster.SparkVersion),
})
continue
}
}

Expand All @@ -53,7 +64,11 @@ func hasIncompatibleWheelTasks(ctx context.Context, b *bundle.Bundle) bool {
for _, cluster := range job.JobClusters {
if task.JobClusterKey == cluster.JobClusterKey && cluster.NewCluster.SparkVersion != "" {
if lowerThanExpectedVersion(cluster.NewCluster.SparkVersion) {
return true
diags = append(diags, diag.Diagnostic{
Severity: diag.Error,
Summary: fmt.Sprintf("job cluster %s uses incompatible DBR version %s", cluster.JobClusterKey, cluster.NewCluster.SparkVersion),
})
continue
}
}
}
Expand All @@ -70,37 +85,41 @@ func hasIncompatibleWheelTasks(ctx context.Context, b *bundle.Bundle) bool {
p, ok := dynvar.PureReferenceToPath(task.ExistingClusterId)
if !ok || len(p) < 3 {
log.Warnf(ctx, "unable to parse cluster key from %s", task.ExistingClusterId)
return false
continue
}

if p[0].Key() != "resources" || p[1].Key() != "clusters" {
log.Warnf(ctx, "incorrect variable reference for cluster id %s", task.ExistingClusterId)
return false
continue
}

clusterKey := p[2].Key()
cluster, ok := b.Config.Resources.Clusters[clusterKey]
if !ok {
log.Warnf(ctx, "unable to find cluster with key %s", clusterKey)
return false
continue
}
version = cluster.SparkVersion
} else {
version, err = getSparkVersionForCluster(ctx, b.WorkspaceClient(), task.ExistingClusterId)
// If there's error getting spark version for cluster, do not mark it as incompatible
if err != nil {
log.Warnf(ctx, "unable to get spark version for cluster %s, err: %s", task.ExistingClusterId, err.Error())
return false
continue
}
}

if lowerThanExpectedVersion(version) {
return true
diags = append(diags, diag.Diagnostic{
Severity: diag.Error,
Summary: fmt.Sprintf("task %s uses cluster with incompatible DBR version %s", task.TaskKey, version),
})
continue
}
}
}

return false
return diags
}

func lowerThanExpectedVersion(sparkVersion string) bool {
Expand Down
23 changes: 16 additions & 7 deletions bundle/trampoline/python_dbr_warning_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ func TestIncompatibleWheelTasksWithNewCluster(t *testing.T) {
},
}

require.True(t, hasIncompatibleWheelTasks(context.Background(), b))
diags := hasIncompatibleWheelTasks(context.Background(), b)
require.NotEmpty(t, diags)
}

func TestIncompatibleWheelTasksWithJobClusterKey(t *testing.T) {
Expand Down Expand Up @@ -99,10 +100,11 @@ func TestIncompatibleWheelTasksWithJobClusterKey(t *testing.T) {
},
}

require.True(t, hasIncompatibleWheelTasks(context.Background(), b))
diags := hasIncompatibleWheelTasks(context.Background(), b)
require.NotEmpty(t, diags)

diags := bundle.Apply(context.Background(), b, WrapperWarning())
require.ErrorContains(t, diags.Error(), "require compute with DBR 13.3")
diags = bundle.Apply(context.Background(), b, WrapperWarning())
require.ErrorContains(t, diags.Error(), "uses incompatible DBR version 12.2.x-scala2.12")
}

func TestIncompatibleWheelTasksWithExistingClusterId(t *testing.T) {
Expand Down Expand Up @@ -143,8 +145,13 @@ func TestIncompatibleWheelTasksWithExistingClusterId(t *testing.T) {
clustersApi.EXPECT().GetByClusterId(mock.Anything, "test-key-1").Return(&compute.ClusterDetails{
SparkVersion: "12.2.x-scala2.12",
}, nil)
clustersApi.EXPECT().GetByClusterId(mock.Anything, "test-key-2").Return(&compute.ClusterDetails{
SparkVersion: "12.2.x-scala2.12",
}, nil)

require.True(t, hasIncompatibleWheelTasks(context.Background(), b))
diags := hasIncompatibleWheelTasks(context.Background(), b)
require.NotEmpty(t, diags)
require.ErrorContains(t, diags.Error(), "uses cluster with incompatible DBR version 12.2.x-scala2.12")
}

func TestNoIncompatibleWheelTasks(t *testing.T) {
Expand Down Expand Up @@ -249,7 +256,8 @@ func TestNoIncompatibleWheelTasks(t *testing.T) {
SparkVersion: "13.2.x-scala2.12",
}, nil)

require.False(t, hasIncompatibleWheelTasks(context.Background(), b))
diags := hasIncompatibleWheelTasks(context.Background(), b)
require.Empty(t, diags)
}

func TestTasksWithPyPiPackageAreCompatible(t *testing.T) {
Expand Down Expand Up @@ -289,7 +297,8 @@ func TestTasksWithPyPiPackageAreCompatible(t *testing.T) {
m := mocks.NewMockWorkspaceClient(t)
b.SetWorkpaceClient(m.WorkspaceClient)

require.False(t, hasIncompatibleWheelTasks(context.Background(), b))
diags := hasIncompatibleWheelTasks(context.Background(), b)
require.Empty(t, diags)
}

func TestNoWarningWhenPythonWheelWrapperIsOn(t *testing.T) {
Expand Down
Loading