Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 45 additions & 20 deletions bundle/artifacts/artifacts.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"github.com/databricks/cli/bundle"
"github.com/databricks/cli/bundle/artifacts/whl"
"github.com/databricks/cli/bundle/config"
"github.com/databricks/cli/bundle/libraries"
"github.com/databricks/cli/libs/cmdio"
"github.com/databricks/cli/libs/diag"
"github.com/databricks/cli/libs/filer"
Expand Down Expand Up @@ -117,8 +116,6 @@ func (m *basicUpload) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnost
}

func uploadArtifact(ctx context.Context, b *bundle.Bundle, a *config.Artifact, uploadPath string, client filer.Filer) error {
filesToLibraries := libraries.MapFilesToTaskLibraries(ctx, b)

for i := range a.Files {
f := &a.Files[i]

Expand All @@ -133,31 +130,59 @@ func uploadArtifact(ctx context.Context, b *bundle.Bundle, a *config.Artifact, u
log.Infof(ctx, "Upload succeeded")
f.RemotePath = path.Join(uploadPath, filepath.Base(f.Source))

// Lookup all tasks that reference this file.
libs, ok := filesToLibraries[f.Source]
if !ok {
log.Debugf(ctx, "No tasks reference %s", f.Source)
continue
}

// Update all tasks that reference this file.
for _, lib := range libs {
wsfsBase := "/Workspace"
remotePath := path.Join(wsfsBase, f.RemotePath)
if lib.Whl != "" {
lib.Whl = remotePath
continue
// TODO: confirm if we still need to update the remote path to start with /Workspace
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you file an issue for this and link it here? Worth checking indeed (can be async), but we should track it.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added here #1388

wsfsBase := "/Workspace"
remotePath := path.Join(wsfsBase, f.RemotePath)

for _, job := range b.Config.Resources.Jobs {
for i := range job.Tasks {
task := &job.Tasks[i]
for j := range task.Libraries {
lib := &task.Libraries[j]
if lib.Whl != "" && isArtifactMatchLibrary(f, lib.Whl, b) {
lib.Whl = remotePath
}
if lib.Jar != "" && isArtifactMatchLibrary(f, lib.Jar, b) {
lib.Jar = remotePath
}
}
}
if lib.Jar != "" {
lib.Jar = remotePath
continue

for i := range job.Environments {
env := &job.Environments[i]
for j := range env.Spec.Dependencies {
lib := env.Spec.Dependencies[j]
if isArtifactMatchLibrary(f, lib, b) {
env.Spec.Dependencies[j] = remotePath
}
}
}
}
}

return nil
}

func isArtifactMatchLibrary(f *config.ArtifactFile, libPath string, b *bundle.Bundle) bool {
if !filepath.IsAbs(libPath) {
libPath = filepath.Join(b.RootPath, libPath)
}

// libPath can be a glob pattern, so do the match first
matches, err := filepath.Glob(libPath)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't the globs have been expanded already at this point?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pietern I don't think we do this in translate paths now but we should but can be a follow up

if err != nil {
return false
}

for _, m := range matches {
if m == f.Source {
return true
}
}

return false
}

// Function to upload artifact file to Workspace
func uploadArtifactFile(ctx context.Context, file string, client filer.Filer) error {
raw, err := os.ReadFile(file)
Expand Down
91 changes: 91 additions & 0 deletions bundle/artifacts/artifacts_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package artifacts

import (
"context"
"path/filepath"
"testing"

"github.com/databricks/cli/bundle"
"github.com/databricks/cli/bundle/config"
"github.com/databricks/cli/bundle/config/resources"
mockfiler "github.com/databricks/cli/internal/mocks/libs/filer"
"github.com/databricks/cli/internal/testutil"
"github.com/databricks/cli/libs/filer"
"github.com/databricks/databricks-sdk-go/service/compute"
"github.com/databricks/databricks-sdk-go/service/jobs"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
)

func TestArtifactUpload(t *testing.T) {
tmpDir := t.TempDir()
whlFolder := filepath.Join(tmpDir, "whl")
testutil.Touch(t, whlFolder, "source.whl")
whlLocalPath := filepath.Join(whlFolder, "source.whl")

b := &bundle.Bundle{
RootPath: tmpDir,
Config: config.Root{
Workspace: config.Workspace{
ArtifactPath: "/foo/bar/artifacts",
},
Artifacts: config.Artifacts{
"whl": {
Type: config.ArtifactPythonWheel,
Files: []config.ArtifactFile{
{Source: whlLocalPath},
},
},
},
Resources: config.Resources{
Jobs: map[string]*resources.Job{
"job": {
JobSettings: &jobs.JobSettings{
Tasks: []jobs.Task{
{
Libraries: []compute.Library{
{
Whl: filepath.Join("whl", "*.whl"),
},
{
Whl: "/Workspace/Users/foo@bar.com/mywheel.whl",
},
},
},
},
Environments: []jobs.JobEnvironment{
{
Spec: &compute.Environment{
Dependencies: []string{
filepath.Join("whl", "source.whl"),
"/Workspace/Users/foo@bar.com/mywheel.whl",
},
},
},
},
},
},
},
},
},
}

artifact := b.Config.Artifacts["whl"]
mockFiler := mockfiler.NewMockFiler(t)
mockFiler.EXPECT().Write(
mock.Anything,
filepath.Join("source.whl"),
mock.AnythingOfType("*bytes.Reader"),
filer.OverwriteIfExists,
filer.CreateParentDirectories,
).Return(nil)

err := uploadArtifact(context.Background(), b, artifact, "/foo/bar/artifacts", mockFiler)
require.NoError(t, err)

// Test that libraries path is updated
require.Equal(t, "/Workspace/foo/bar/artifacts/source.whl", b.Config.Resources.Jobs["job"].JobSettings.Tasks[0].Libraries[0].Whl)
require.Equal(t, "/Workspace/Users/foo@bar.com/mywheel.whl", b.Config.Resources.Jobs["job"].JobSettings.Tasks[0].Libraries[1].Whl)
require.Equal(t, "/Workspace/foo/bar/artifacts/source.whl", b.Config.Resources.Jobs["job"].JobSettings.Environments[0].Spec.Dependencies[0])
require.Equal(t, "/Workspace/Users/foo@bar.com/mywheel.whl", b.Config.Resources.Jobs["job"].JobSettings.Environments[0].Spec.Dependencies[1])
}
51 changes: 34 additions & 17 deletions bundle/artifacts/whl/from_libraries.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,28 +30,45 @@ func (*fromLibraries) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnost
tasks := libraries.FindAllWheelTasksWithLocalLibraries(b)
for _, task := range tasks {
for _, lib := range task.Libraries {
matches, err := filepath.Glob(filepath.Join(b.RootPath, lib.Whl))
// File referenced from libraries section does not exists, skipping
if err != nil {
continue
}

for _, match := range matches {
name := filepath.Base(match)
if b.Config.Artifacts == nil {
b.Config.Artifacts = make(map[string]*config.Artifact)
}
matchAndAdd(ctx, lib.Whl, b)
}
}

log.Debugf(ctx, "Adding an artifact block for %s", match)
b.Config.Artifacts[name] = &config.Artifact{
Files: []config.ArtifactFile{
{Source: match},
},
Type: config.ArtifactPythonWheel,
envs := libraries.FindAllEnvironments(b)
for _, jobEnvs := range envs {
for _, env := range jobEnvs {
if env.Spec != nil {
for _, dep := range env.Spec.Dependencies {
if libraries.IsEnvironmentDependencyLocal(dep) {
matchAndAdd(ctx, dep, b)
}
}
}
}
}

return nil
}

func matchAndAdd(ctx context.Context, lib string, b *bundle.Bundle) {
matches, err := filepath.Glob(filepath.Join(b.RootPath, lib))
// File referenced from libraries section does not exists, skipping
if err != nil {
return
}

for _, match := range matches {
name := filepath.Base(match)
if b.Config.Artifacts == nil {
b.Config.Artifacts = make(map[string]*config.Artifact)
}

log.Debugf(ctx, "Adding an artifact block for %s", match)
b.Config.Artifacts[name] = &config.Artifact{
Files: []config.ArtifactFile{
{Source: match},
},
Type: config.ArtifactPythonWheel,
}
}
}
7 changes: 7 additions & 0 deletions bundle/config/mutator/translate_paths.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,13 @@ func translateNoOp(literal, localFullPath, localRelPath, remotePath string) (str
return localRelPath, nil
}

func translateNoOpWithPrefix(literal, localFullPath, localRelPath, remotePath string) (string, error) {
if !strings.HasPrefix(localRelPath, ".") {
localRelPath = "." + string(filepath.Separator) + localRelPath
}
return localRelPath, nil
}

func (m *translatePaths) rewriteValue(b *bundle.Bundle, p dyn.Path, v dyn.Value, fn rewriteFunc, dir string) (dyn.Value, error) {
out := v.MustString()
err := m.rewritePath(dir, b, &out, fn)
Expand Down
47 changes: 41 additions & 6 deletions bundle/config/mutator/translate_paths_jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,39 +5,51 @@ import (
"slices"

"github.com/databricks/cli/bundle"
"github.com/databricks/cli/bundle/libraries"
"github.com/databricks/cli/libs/dyn"
)

type jobTaskRewritePattern struct {
pattern dyn.Pattern
fn rewriteFunc
type jobRewritePattern struct {
pattern dyn.Pattern
fn rewriteFunc
skipRewrite func(string) bool
}

func rewritePatterns(base dyn.Pattern) []jobTaskRewritePattern {
return []jobTaskRewritePattern{
func noSkipRewrite(string) bool {
return false
}

func rewritePatterns(base dyn.Pattern) []jobRewritePattern {
return []jobRewritePattern{
{
base.Append(dyn.Key("notebook_task"), dyn.Key("notebook_path")),
translateNotebookPath,
noSkipRewrite,
},
{
base.Append(dyn.Key("spark_python_task"), dyn.Key("python_file")),
translateFilePath,
noSkipRewrite,
},
{
base.Append(dyn.Key("dbt_task"), dyn.Key("project_directory")),
translateDirectoryPath,
noSkipRewrite,
},
{
base.Append(dyn.Key("sql_task"), dyn.Key("file"), dyn.Key("path")),
translateFilePath,
noSkipRewrite,
},
{
base.Append(dyn.Key("libraries"), dyn.AnyIndex(), dyn.Key("whl")),
translateNoOp,
noSkipRewrite,
},
{
base.Append(dyn.Key("libraries"), dyn.AnyIndex(), dyn.Key("jar")),
translateNoOp,
noSkipRewrite,
},
}
}
Expand Down Expand Up @@ -73,9 +85,28 @@ func (m *translatePaths) applyJobTranslations(b *bundle.Bundle, v dyn.Value) (dy
)

// Compile list of patterns and their respective rewrite functions.
jobEnvironmentsPatterns := []jobRewritePattern{
{
dyn.NewPattern(
dyn.Key("resources"),
dyn.Key("jobs"),
dyn.AnyKey(),
dyn.Key("environments"),
dyn.AnyIndex(),
dyn.Key("spec"),
dyn.Key("dependencies"),
dyn.AnyIndex(),
),
translateNoOpWithPrefix,
func(s string) bool {
return !libraries.IsEnvironmentDependencyLocal(s)
},
},
}
taskPatterns := rewritePatterns(base)
forEachPatterns := rewritePatterns(base.Append(dyn.Key("for_each_task"), dyn.Key("task")))
allPatterns := append(taskPatterns, forEachPatterns...)
allPatterns := append(taskPatterns, jobEnvironmentsPatterns...)
allPatterns = append(allPatterns, forEachPatterns...)

for _, t := range allPatterns {
v, err = dyn.MapByPattern(v, t.pattern, func(p dyn.Path, v dyn.Value) (dyn.Value, error) {
Expand All @@ -91,6 +122,10 @@ func (m *translatePaths) applyJobTranslations(b *bundle.Bundle, v dyn.Value) (dy
return dyn.InvalidValue, fmt.Errorf("unable to determine directory for job %s: %w", key, err)
}

sv := v.MustString()
if t.skipRewrite(sv) {
return v, nil
}
return m.rewriteRelativeTo(b, p, v, t.fn, dir, fallback[key])
})
if err != nil {
Expand Down
Loading