From 654af261c4ffe58639dd13066c6352fa2ca06bc5 Mon Sep 17 00:00:00 2001 From: Andrew Nester Date: Thu, 8 Aug 2024 12:41:03 +0200 Subject: [PATCH 01/11] Upload local libraries even if they don't have artifact defined --- bundle/artifacts/artifacts.go | 191 ---------- bundle/artifacts/artifacts_test.go | 196 ----------- bundle/artifacts/autodetect.go | 1 - bundle/artifacts/{upload.go => cleanup.go} | 38 +- bundle/artifacts/upload_test.go | 114 ------ bundle/artifacts/whl/from_libraries.go | 79 ----- bundle/config/mutator/translate_paths_jobs.go | 2 +- bundle/libraries/expand_glob_references.go | 200 +++++++++++ .../libraries/expand_glob_references_test.go | 141 ++++++++ bundle/libraries/libraries.go | 2 +- bundle/libraries/local_path.go | 42 ++- bundle/libraries/local_path_test.go | 13 +- bundle/libraries/match.go | 82 ----- bundle/libraries/match_test.go | 12 +- bundle/libraries/upload.go | 263 ++++++++++++++ bundle/libraries/upload_test.go | 331 ++++++++++++++++++ bundle/phases/deploy.go | 5 +- bundle/tests/enviroment_key_test.go | 2 +- bundle/tests/python_wheel_test.go | 44 ++- 19 files changed, 1017 insertions(+), 741 deletions(-) delete mode 100644 bundle/artifacts/artifacts_test.go rename bundle/artifacts/{upload.go => cleanup.go} (51%) delete mode 100644 bundle/artifacts/upload_test.go delete mode 100644 bundle/artifacts/whl/from_libraries.go create mode 100644 bundle/libraries/expand_glob_references.go create mode 100644 bundle/libraries/expand_glob_references_test.go delete mode 100644 bundle/libraries/match.go create mode 100644 bundle/libraries/upload.go create mode 100644 bundle/libraries/upload_test.go diff --git a/bundle/artifacts/artifacts.go b/bundle/artifacts/artifacts.go index 3060d08d9e..e5e55a14d5 100644 --- a/bundle/artifacts/artifacts.go +++ b/bundle/artifacts/artifacts.go @@ -1,25 +1,16 @@ package artifacts import ( - "bytes" "context" - "errors" "fmt" - "os" - "path" - "path/filepath" - "strings" "github.com/databricks/cli/bundle" "github.com/databricks/cli/bundle/artifacts/whl" "github.com/databricks/cli/bundle/config" "github.com/databricks/cli/bundle/config/mutator" - "github.com/databricks/cli/bundle/config/resources" "github.com/databricks/cli/libs/cmdio" "github.com/databricks/cli/libs/diag" - "github.com/databricks/cli/libs/filer" "github.com/databricks/cli/libs/log" - "github.com/databricks/databricks-sdk-go" ) type mutatorFactory = func(name string) bundle.Mutator @@ -28,8 +19,6 @@ var buildMutators map[config.ArtifactType]mutatorFactory = map[config.ArtifactTy config.ArtifactPythonWheel: whl.Build, } -var uploadMutators map[config.ArtifactType]mutatorFactory = map[config.ArtifactType]mutatorFactory{} - var prepareMutators map[config.ArtifactType]mutatorFactory = map[config.ArtifactType]mutatorFactory{ config.ArtifactPythonWheel: whl.Prepare, } @@ -43,15 +32,6 @@ func getBuildMutator(t config.ArtifactType, name string) bundle.Mutator { return mutatorFactory(name) } -func getUploadMutator(t config.ArtifactType, name string) bundle.Mutator { - mutatorFactory, ok := uploadMutators[t] - if !ok { - mutatorFactory = BasicUpload - } - - return mutatorFactory(name) -} - func getPrepareMutator(t config.ArtifactType, name string) bundle.Mutator { mutatorFactory, ok := prepareMutators[t] if !ok { @@ -92,174 +72,3 @@ func (m *basicBuild) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnosti return nil } - -// Basic Upload defines a general upload mutator which uploads artifact as a library to workspace -type basicUpload struct { - name string -} - -func BasicUpload(name string) bundle.Mutator { - return &basicUpload{name: name} -} - -func (m *basicUpload) Name() string { - return fmt.Sprintf("artifacts.Upload(%s)", m.name) -} - -func (m *basicUpload) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnostics { - artifact, ok := b.Config.Artifacts[m.name] - if !ok { - return diag.Errorf("artifact doesn't exist: %s", m.name) - } - - if len(artifact.Files) == 0 { - return diag.Errorf("artifact source is not configured: %s", m.name) - } - - uploadPath, err := getUploadBasePath(b) - if err != nil { - return diag.FromErr(err) - } - - client, err := getFilerForArtifacts(b.WorkspaceClient(), uploadPath) - if err != nil { - return diag.FromErr(err) - } - - err = uploadArtifact(ctx, b, artifact, uploadPath, client) - if err != nil { - return diag.Errorf("upload for %s failed, error: %v", m.name, err) - } - - return nil -} - -func getFilerForArtifacts(w *databricks.WorkspaceClient, uploadPath string) (filer.Filer, error) { - if isVolumesPath(uploadPath) { - return filer.NewFilesClient(w, uploadPath) - } - return filer.NewWorkspaceFilesClient(w, uploadPath) -} - -func isVolumesPath(path string) bool { - return strings.HasPrefix(path, "/Volumes/") -} - -func uploadArtifact(ctx context.Context, b *bundle.Bundle, a *config.Artifact, uploadPath string, client filer.Filer) error { - for i := range a.Files { - f := &a.Files[i] - - filename := filepath.Base(f.Source) - cmdio.LogString(ctx, fmt.Sprintf("Uploading %s...", filename)) - - err := uploadArtifactFile(ctx, f.Source, client) - if err != nil { - return err - } - - log.Infof(ctx, "Upload succeeded") - f.RemotePath = path.Join(uploadPath, filepath.Base(f.Source)) - remotePath := f.RemotePath - - if !strings.HasPrefix(f.RemotePath, "/Workspace/") && !strings.HasPrefix(f.RemotePath, "/Volumes/") { - wsfsBase := "/Workspace" - remotePath = path.Join(wsfsBase, f.RemotePath) - } - - for _, job := range b.Config.Resources.Jobs { - rewriteArtifactPath(b, f, job, remotePath) - } - } - - return nil -} - -func rewriteArtifactPath(b *bundle.Bundle, f *config.ArtifactFile, job *resources.Job, remotePath string) { - // Rewrite artifact path in job task libraries - for i := range job.Tasks { - task := &job.Tasks[i] - for j := range task.Libraries { - lib := &task.Libraries[j] - if lib.Whl != "" && isArtifactMatchLibrary(f, lib.Whl, b) { - lib.Whl = remotePath - } - if lib.Jar != "" && isArtifactMatchLibrary(f, lib.Jar, b) { - lib.Jar = remotePath - } - } - - // Rewrite artifact path in job task libraries for ForEachTask - if task.ForEachTask != nil { - forEachTask := task.ForEachTask - for j := range forEachTask.Task.Libraries { - lib := &forEachTask.Task.Libraries[j] - if lib.Whl != "" && isArtifactMatchLibrary(f, lib.Whl, b) { - lib.Whl = remotePath - } - if lib.Jar != "" && isArtifactMatchLibrary(f, lib.Jar, b) { - lib.Jar = remotePath - } - } - } - } - - // Rewrite artifact path in job environments - for i := range job.Environments { - env := &job.Environments[i] - if env.Spec == nil { - continue - } - - for j := range env.Spec.Dependencies { - lib := env.Spec.Dependencies[j] - if isArtifactMatchLibrary(f, lib, b) { - env.Spec.Dependencies[j] = remotePath - } - } - } -} - -func isArtifactMatchLibrary(f *config.ArtifactFile, libPath string, b *bundle.Bundle) bool { - if !filepath.IsAbs(libPath) { - libPath = filepath.Join(b.RootPath, libPath) - } - - // libPath can be a glob pattern, so do the match first - matches, err := filepath.Glob(libPath) - if err != nil { - return false - } - - for _, m := range matches { - if m == f.Source { - return true - } - } - - return false -} - -// Function to upload artifact file to Workspace -func uploadArtifactFile(ctx context.Context, file string, client filer.Filer) error { - raw, err := os.ReadFile(file) - if err != nil { - return fmt.Errorf("unable to read %s: %w", file, errors.Unwrap(err)) - } - - filename := filepath.Base(file) - err = client.Write(ctx, filename, bytes.NewReader(raw), filer.OverwriteIfExists, filer.CreateParentDirectories) - if err != nil { - return fmt.Errorf("unable to import %s: %w", filename, err) - } - - return nil -} - -func getUploadBasePath(b *bundle.Bundle) (string, error) { - artifactPath := b.Config.Workspace.ArtifactPath - if artifactPath == "" { - return "", fmt.Errorf("remote artifact path not configured") - } - - return path.Join(artifactPath, ".internal"), nil -} diff --git a/bundle/artifacts/artifacts_test.go b/bundle/artifacts/artifacts_test.go deleted file mode 100644 index 6d85f3af90..0000000000 --- a/bundle/artifacts/artifacts_test.go +++ /dev/null @@ -1,196 +0,0 @@ -package artifacts - -import ( - "context" - "path/filepath" - "testing" - - "github.com/databricks/cli/bundle" - "github.com/databricks/cli/bundle/config" - "github.com/databricks/cli/bundle/config/resources" - mockfiler "github.com/databricks/cli/internal/mocks/libs/filer" - "github.com/databricks/cli/internal/testutil" - "github.com/databricks/cli/libs/filer" - "github.com/databricks/databricks-sdk-go/service/compute" - "github.com/databricks/databricks-sdk-go/service/jobs" - "github.com/stretchr/testify/mock" - "github.com/stretchr/testify/require" -) - -func TestArtifactUploadForWorkspace(t *testing.T) { - tmpDir := t.TempDir() - whlFolder := filepath.Join(tmpDir, "whl") - testutil.Touch(t, whlFolder, "source.whl") - whlLocalPath := filepath.Join(whlFolder, "source.whl") - - b := &bundle.Bundle{ - RootPath: tmpDir, - Config: config.Root{ - Workspace: config.Workspace{ - ArtifactPath: "/foo/bar/artifacts", - }, - Artifacts: config.Artifacts{ - "whl": { - Type: config.ArtifactPythonWheel, - Files: []config.ArtifactFile{ - {Source: whlLocalPath}, - }, - }, - }, - Resources: config.Resources{ - Jobs: map[string]*resources.Job{ - "job": { - JobSettings: &jobs.JobSettings{ - Tasks: []jobs.Task{ - { - Libraries: []compute.Library{ - { - Whl: filepath.Join("whl", "*.whl"), - }, - { - Whl: "/Workspace/Users/foo@bar.com/mywheel.whl", - }, - }, - }, - { - ForEachTask: &jobs.ForEachTask{ - Task: jobs.Task{ - Libraries: []compute.Library{ - { - Whl: filepath.Join("whl", "*.whl"), - }, - { - Whl: "/Workspace/Users/foo@bar.com/mywheel.whl", - }, - }, - }, - }, - }, - }, - Environments: []jobs.JobEnvironment{ - { - Spec: &compute.Environment{ - Dependencies: []string{ - filepath.Join("whl", "source.whl"), - "/Workspace/Users/foo@bar.com/mywheel.whl", - }, - }, - }, - }, - }, - }, - }, - }, - }, - } - - artifact := b.Config.Artifacts["whl"] - mockFiler := mockfiler.NewMockFiler(t) - mockFiler.EXPECT().Write( - mock.Anything, - filepath.Join("source.whl"), - mock.AnythingOfType("*bytes.Reader"), - filer.OverwriteIfExists, - filer.CreateParentDirectories, - ).Return(nil) - - err := uploadArtifact(context.Background(), b, artifact, "/foo/bar/artifacts", mockFiler) - require.NoError(t, err) - - // Test that libraries path is updated - require.Equal(t, "/Workspace/foo/bar/artifacts/source.whl", b.Config.Resources.Jobs["job"].JobSettings.Tasks[0].Libraries[0].Whl) - require.Equal(t, "/Workspace/Users/foo@bar.com/mywheel.whl", b.Config.Resources.Jobs["job"].JobSettings.Tasks[0].Libraries[1].Whl) - require.Equal(t, "/Workspace/foo/bar/artifacts/source.whl", b.Config.Resources.Jobs["job"].JobSettings.Environments[0].Spec.Dependencies[0]) - require.Equal(t, "/Workspace/Users/foo@bar.com/mywheel.whl", b.Config.Resources.Jobs["job"].JobSettings.Environments[0].Spec.Dependencies[1]) - require.Equal(t, "/Workspace/foo/bar/artifacts/source.whl", b.Config.Resources.Jobs["job"].JobSettings.Tasks[1].ForEachTask.Task.Libraries[0].Whl) - require.Equal(t, "/Workspace/Users/foo@bar.com/mywheel.whl", b.Config.Resources.Jobs["job"].JobSettings.Tasks[1].ForEachTask.Task.Libraries[1].Whl) -} - -func TestArtifactUploadForVolumes(t *testing.T) { - tmpDir := t.TempDir() - whlFolder := filepath.Join(tmpDir, "whl") - testutil.Touch(t, whlFolder, "source.whl") - whlLocalPath := filepath.Join(whlFolder, "source.whl") - - b := &bundle.Bundle{ - RootPath: tmpDir, - Config: config.Root{ - Workspace: config.Workspace{ - ArtifactPath: "/Volumes/foo/bar/artifacts", - }, - Artifacts: config.Artifacts{ - "whl": { - Type: config.ArtifactPythonWheel, - Files: []config.ArtifactFile{ - {Source: whlLocalPath}, - }, - }, - }, - Resources: config.Resources{ - Jobs: map[string]*resources.Job{ - "job": { - JobSettings: &jobs.JobSettings{ - Tasks: []jobs.Task{ - { - Libraries: []compute.Library{ - { - Whl: filepath.Join("whl", "*.whl"), - }, - { - Whl: "/Volumes/some/path/mywheel.whl", - }, - }, - }, - { - ForEachTask: &jobs.ForEachTask{ - Task: jobs.Task{ - Libraries: []compute.Library{ - { - Whl: filepath.Join("whl", "*.whl"), - }, - { - Whl: "/Volumes/some/path/mywheel.whl", - }, - }, - }, - }, - }, - }, - Environments: []jobs.JobEnvironment{ - { - Spec: &compute.Environment{ - Dependencies: []string{ - filepath.Join("whl", "source.whl"), - "/Volumes/some/path/mywheel.whl", - }, - }, - }, - }, - }, - }, - }, - }, - }, - } - - artifact := b.Config.Artifacts["whl"] - mockFiler := mockfiler.NewMockFiler(t) - mockFiler.EXPECT().Write( - mock.Anything, - filepath.Join("source.whl"), - mock.AnythingOfType("*bytes.Reader"), - filer.OverwriteIfExists, - filer.CreateParentDirectories, - ).Return(nil) - - err := uploadArtifact(context.Background(), b, artifact, "/Volumes/foo/bar/artifacts", mockFiler) - require.NoError(t, err) - - // Test that libraries path is updated - require.Equal(t, "/Volumes/foo/bar/artifacts/source.whl", b.Config.Resources.Jobs["job"].JobSettings.Tasks[0].Libraries[0].Whl) - require.Equal(t, "/Volumes/some/path/mywheel.whl", b.Config.Resources.Jobs["job"].JobSettings.Tasks[0].Libraries[1].Whl) - require.Equal(t, "/Volumes/foo/bar/artifacts/source.whl", b.Config.Resources.Jobs["job"].JobSettings.Environments[0].Spec.Dependencies[0]) - require.Equal(t, "/Volumes/some/path/mywheel.whl", b.Config.Resources.Jobs["job"].JobSettings.Environments[0].Spec.Dependencies[1]) - require.Equal(t, "/Volumes/foo/bar/artifacts/source.whl", b.Config.Resources.Jobs["job"].JobSettings.Tasks[1].ForEachTask.Task.Libraries[0].Whl) - require.Equal(t, "/Volumes/some/path/mywheel.whl", b.Config.Resources.Jobs["job"].JobSettings.Tasks[1].ForEachTask.Task.Libraries[1].Whl) -} diff --git a/bundle/artifacts/autodetect.go b/bundle/artifacts/autodetect.go index 0e94edd820..569a480f00 100644 --- a/bundle/artifacts/autodetect.go +++ b/bundle/artifacts/autodetect.go @@ -29,6 +29,5 @@ func (m *autodetect) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnosti return bundle.Apply(ctx, b, bundle.Seq( whl.DetectPackage(), - whl.DefineArtifactsFromLibraries(), )) } diff --git a/bundle/artifacts/upload.go b/bundle/artifacts/cleanup.go similarity index 51% rename from bundle/artifacts/upload.go rename to bundle/artifacts/cleanup.go index 3af50021e8..58c006dc18 100644 --- a/bundle/artifacts/upload.go +++ b/bundle/artifacts/cleanup.go @@ -2,50 +2,18 @@ package artifacts import ( "context" - "fmt" "github.com/databricks/cli/bundle" + "github.com/databricks/cli/bundle/libraries" "github.com/databricks/cli/libs/diag" "github.com/databricks/cli/libs/filer" "github.com/databricks/cli/libs/log" ) -func UploadAll() bundle.Mutator { - return &all{ - name: "Upload", - fn: uploadArtifactByName, - } -} - func CleanUp() bundle.Mutator { return &cleanUp{} } -type upload struct { - name string -} - -func uploadArtifactByName(name string) (bundle.Mutator, error) { - return &upload{name}, nil -} - -func (m *upload) Name() string { - return fmt.Sprintf("artifacts.Upload(%s)", m.name) -} - -func (m *upload) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnostics { - artifact, ok := b.Config.Artifacts[m.name] - if !ok { - return diag.Errorf("artifact doesn't exist: %s", m.name) - } - - if len(artifact.Files) == 0 { - return diag.Errorf("artifact source is not configured: %s", m.name) - } - - return bundle.Apply(ctx, b, getUploadMutator(artifact.Type, m.name)) -} - type cleanUp struct{} func (m *cleanUp) Name() string { @@ -53,12 +21,12 @@ func (m *cleanUp) Name() string { } func (m *cleanUp) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnostics { - uploadPath, err := getUploadBasePath(b) + uploadPath, err := libraries.GetUploadBasePath(b) if err != nil { return diag.FromErr(err) } - client, err := getFilerForArtifacts(b.WorkspaceClient(), uploadPath) + client, err := libraries.GetFilerForLibraries(b.WorkspaceClient(), uploadPath) if err != nil { return diag.FromErr(err) } diff --git a/bundle/artifacts/upload_test.go b/bundle/artifacts/upload_test.go deleted file mode 100644 index 202086bd37..0000000000 --- a/bundle/artifacts/upload_test.go +++ /dev/null @@ -1,114 +0,0 @@ -package artifacts - -import ( - "context" - "os" - "path/filepath" - "testing" - - "github.com/databricks/cli/bundle" - "github.com/databricks/cli/bundle/config" - "github.com/databricks/cli/bundle/internal/bundletest" - "github.com/databricks/cli/libs/diag" - "github.com/databricks/cli/libs/testfile" - "github.com/stretchr/testify/require" -) - -type noop struct{} - -func (n *noop) Apply(context.Context, *bundle.Bundle) diag.Diagnostics { - return nil -} - -func (n *noop) Name() string { - return "noop" -} - -func TestExpandGlobFilesSource(t *testing.T) { - rootPath := t.TempDir() - err := os.Mkdir(filepath.Join(rootPath, "test"), 0755) - require.NoError(t, err) - - t1 := testfile.CreateFile(t, filepath.Join(rootPath, "test", "myjar1.jar")) - t1.Close(t) - - t2 := testfile.CreateFile(t, filepath.Join(rootPath, "test", "myjar2.jar")) - t2.Close(t) - - b := &bundle.Bundle{ - RootPath: rootPath, - Config: config.Root{ - Artifacts: map[string]*config.Artifact{ - "test": { - Type: "custom", - Files: []config.ArtifactFile{ - { - Source: filepath.Join("..", "test", "*.jar"), - }, - }, - }, - }, - }, - } - - bundletest.SetLocation(b, ".", filepath.Join(rootPath, "resources", "artifacts.yml")) - - u := &upload{"test"} - uploadMutators[config.ArtifactType("custom")] = func(name string) bundle.Mutator { - return &noop{} - } - - bm := &build{"test"} - buildMutators[config.ArtifactType("custom")] = func(name string) bundle.Mutator { - return &noop{} - } - - pm := &prepare{"test"} - prepareMutators[config.ArtifactType("custom")] = func(name string) bundle.Mutator { - return &noop{} - } - - diags := bundle.Apply(context.Background(), b, bundle.Seq(pm, bm, u)) - require.NoError(t, diags.Error()) - - require.Equal(t, 2, len(b.Config.Artifacts["test"].Files)) - require.Equal(t, filepath.Join(rootPath, "test", "myjar1.jar"), b.Config.Artifacts["test"].Files[0].Source) - require.Equal(t, filepath.Join(rootPath, "test", "myjar2.jar"), b.Config.Artifacts["test"].Files[1].Source) -} - -func TestExpandGlobFilesSourceWithNoMatches(t *testing.T) { - rootPath := t.TempDir() - err := os.Mkdir(filepath.Join(rootPath, "test"), 0755) - require.NoError(t, err) - - b := &bundle.Bundle{ - RootPath: rootPath, - Config: config.Root{ - Artifacts: map[string]*config.Artifact{ - "test": { - Type: "custom", - Files: []config.ArtifactFile{ - { - Source: filepath.Join("..", "test", "myjar.jar"), - }, - }, - }, - }, - }, - } - - bundletest.SetLocation(b, ".", filepath.Join(rootPath, "resources", "artifacts.yml")) - - u := &upload{"test"} - uploadMutators[config.ArtifactType("custom")] = func(name string) bundle.Mutator { - return &noop{} - } - - bm := &build{"test"} - buildMutators[config.ArtifactType("custom")] = func(name string) bundle.Mutator { - return &noop{} - } - - diags := bundle.Apply(context.Background(), b, bundle.Seq(bm, u)) - require.ErrorContains(t, diags.Error(), "no matching files") -} diff --git a/bundle/artifacts/whl/from_libraries.go b/bundle/artifacts/whl/from_libraries.go deleted file mode 100644 index 79161a82de..0000000000 --- a/bundle/artifacts/whl/from_libraries.go +++ /dev/null @@ -1,79 +0,0 @@ -package whl - -import ( - "context" - "path/filepath" - - "github.com/databricks/cli/bundle" - "github.com/databricks/cli/bundle/config" - "github.com/databricks/cli/bundle/libraries" - "github.com/databricks/cli/libs/diag" - "github.com/databricks/cli/libs/log" -) - -type fromLibraries struct{} - -func DefineArtifactsFromLibraries() bundle.Mutator { - return &fromLibraries{} -} - -func (m *fromLibraries) Name() string { - return "artifacts.whl.DefineArtifactsFromLibraries" -} - -func (*fromLibraries) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnostics { - if len(b.Config.Artifacts) != 0 { - log.Debugf(ctx, "Skipping defining artifacts from libraries because artifacts section is explicitly defined") - return nil - } - - tasks := libraries.FindTasksWithLocalLibraries(b) - for _, task := range tasks { - // Skip tasks that are not PythonWheelTasks for now, we can later support Jars too - if task.PythonWheelTask == nil { - continue - } - - for _, lib := range task.Libraries { - matchAndAdd(ctx, lib.Whl, b) - } - } - - envs := libraries.FindAllEnvironments(b) - for _, jobEnvs := range envs { - for _, env := range jobEnvs { - if env.Spec != nil { - for _, dep := range env.Spec.Dependencies { - if libraries.IsEnvironmentDependencyLocal(dep) { - matchAndAdd(ctx, dep, b) - } - } - } - } - } - - return nil -} - -func matchAndAdd(ctx context.Context, lib string, b *bundle.Bundle) { - matches, err := filepath.Glob(filepath.Join(b.RootPath, lib)) - // File referenced from libraries section does not exists, skipping - if err != nil { - return - } - - for _, match := range matches { - name := filepath.Base(match) - if b.Config.Artifacts == nil { - b.Config.Artifacts = make(map[string]*config.Artifact) - } - - log.Debugf(ctx, "Adding an artifact block for %s", match) - b.Config.Artifacts[name] = &config.Artifact{ - Files: []config.ArtifactFile{ - {Source: match}, - }, - Type: config.ArtifactPythonWheel, - } - } -} diff --git a/bundle/config/mutator/translate_paths_jobs.go b/bundle/config/mutator/translate_paths_jobs.go index 60cc8bb9aa..6febf4f8f7 100644 --- a/bundle/config/mutator/translate_paths_jobs.go +++ b/bundle/config/mutator/translate_paths_jobs.go @@ -78,7 +78,7 @@ func (t *translateContext) jobRewritePatterns() []jobRewritePattern { ), t.translateNoOpWithPrefix, func(s string) bool { - return !libraries.IsEnvironmentDependencyLocal(s) + return !libraries.IsLibraryLocal(s) }, }, } diff --git a/bundle/libraries/expand_glob_references.go b/bundle/libraries/expand_glob_references.go new file mode 100644 index 0000000000..af100da3f3 --- /dev/null +++ b/bundle/libraries/expand_glob_references.go @@ -0,0 +1,200 @@ +package libraries + +import ( + "context" + "fmt" + "path/filepath" + + "github.com/databricks/cli/bundle" + "github.com/databricks/cli/libs/diag" + "github.com/databricks/cli/libs/dyn" +) + +type expand struct { +} + +func matchWarning(p dyn.Path, message string) diag.Diagnostic { + return diag.Diagnostic{ + Severity: diag.Warning, + Summary: message, + Paths: []dyn.Path{ + p.Append(), + }, + } +} + +func getLibDetails(v dyn.Value) (string, string, bool) { + m := v.MustMap() + whl, ok := m.GetByString("whl") + if ok { + return whl.MustString(), "whl", true + } + + jar, ok := m.GetByString("jar") + if ok { + return jar.MustString(), "jar", true + } + + return "", "", false +} + +func findMatches(b *bundle.Bundle, path string) ([]string, error) { + matches, err := filepath.Glob(filepath.Join(b.RootPath, path)) + if err != nil { + return nil, err + } + + if len(matches) == 0 { + return nil, fmt.Errorf("no matching files for %s", path) + } + + return matches, nil +} + +func expandLibraries(b *bundle.Bundle, p dyn.Path, v dyn.Value) (diag.Diagnostics, []dyn.Value) { + var output []dyn.Value + var diags diag.Diagnostics + + libs := v.MustSequence() + for i, lib := range libs { + lp := p.Append(dyn.Index(i)) + path, libType, supported := getLibDetails(lib) + if !supported { + output = append(output, lib) + continue + } + + if !IsLibraryLocal(path) { + output = append(output, lib) + continue + } + + matches, err := findMatches(b, path) + if err != nil { + diags = diags.Append(matchWarning(lp, err.Error())) + continue + } + + for _, match := range matches { + output = append(output, dyn.NewValue(map[string]dyn.Value{ + libType: dyn.V(match), + }, lib.Locations())) + } + } + + return diags, output +} + +func expandEnvironmentDeps(b *bundle.Bundle, p dyn.Path, v dyn.Value) (diag.Diagnostics, []dyn.Value) { + var output []dyn.Value + var diags diag.Diagnostics + + deps := v.MustSequence() + for i, dep := range deps { + lp := p.Append(dyn.Index(i)) + path := dep.MustString() + if !IsLibraryLocal(path) { + output = append(output, dep) + continue + } + + matches, err := findMatches(b, path) + if err != nil { + diags = diags.Append(matchWarning(lp, err.Error())) + continue + } + + for _, match := range matches { + output = append(output, dyn.NewValue(match, dep.Locations())) + } + } + + return diags, output +} + +type expandPattern struct { + pattern dyn.Pattern + fn func(b *bundle.Bundle, p dyn.Path, v dyn.Value) (diag.Diagnostics, []dyn.Value) +} + +func (e *expand) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnostics { + taskLibraries := dyn.NewPattern( + dyn.Key("resources"), + dyn.Key("jobs"), + dyn.AnyKey(), + dyn.Key("tasks"), + dyn.AnyIndex(), + dyn.Key("libraries"), + ) + + forEachTaskLibraries := dyn.NewPattern( + dyn.Key("resources"), + dyn.Key("jobs"), + dyn.AnyKey(), + dyn.Key("tasks"), + dyn.AnyIndex(), + dyn.Key("for_each_task"), + dyn.Key("task"), + dyn.Key("libraries"), + ) + + envDepsPattern := dyn.NewPattern( + dyn.Key("resources"), + dyn.Key("jobs"), + dyn.AnyKey(), + dyn.Key("environments"), + dyn.AnyIndex(), + dyn.Key("spec"), + dyn.Key("dependencies"), + ) + + expanders := []expandPattern{ + { + pattern: taskLibraries, + fn: expandLibraries, + }, + { + pattern: forEachTaskLibraries, + fn: expandLibraries, + }, + { + pattern: envDepsPattern, + fn: expandEnvironmentDeps, + }, + } + + var diags diag.Diagnostics + + err := b.Config.Mutate(func(v dyn.Value) (dyn.Value, error) { + var err error + for _, expander := range expanders { + v, err = dyn.MapByPattern(v, expander.pattern, func(p dyn.Path, lv dyn.Value) (dyn.Value, error) { + d, output := expander.fn(b, p, lv) + diags = diags.Extend(d) + return dyn.V(output), err + }) + + if err != nil { + return dyn.InvalidValue, err + } + } + + return v, nil + }) + + if err != nil { + diags = diags.Extend(diag.FromErr(err)) + } + + return diags +} + +func (e *expand) Name() string { + return "libraries.ExpandGlobReferences" +} + +// ExpandGlobReferences expands any glob references in the libraries or environments section +// to corresponding local paths +func ExpandGlobReferences() bundle.Mutator { + return &expand{} +} diff --git a/bundle/libraries/expand_glob_references_test.go b/bundle/libraries/expand_glob_references_test.go new file mode 100644 index 0000000000..ef190f7f09 --- /dev/null +++ b/bundle/libraries/expand_glob_references_test.go @@ -0,0 +1,141 @@ +package libraries + +import ( + "context" + "path/filepath" + "testing" + + "github.com/databricks/cli/bundle" + "github.com/databricks/cli/bundle/config" + "github.com/databricks/cli/bundle/config/resources" + "github.com/databricks/cli/bundle/internal/bundletest" + "github.com/databricks/cli/internal/testutil" + "github.com/databricks/databricks-sdk-go/service/compute" + "github.com/databricks/databricks-sdk-go/service/jobs" + "github.com/stretchr/testify/require" +) + +func TestGlobReferencesExpandedForTaskLibraries(t *testing.T) { + dir := t.TempDir() + testutil.Touch(t, dir, "whl", "my1.whl") + testutil.Touch(t, dir, "whl", "my2.whl") + testutil.Touch(t, dir, "jar", "my1.jar") + testutil.Touch(t, dir, "jar", "my2.jar") + + b := &bundle.Bundle{ + RootPath: dir, + Config: config.Root{ + Resources: config.Resources{ + Jobs: map[string]*resources.Job{ + "job": { + JobSettings: &jobs.JobSettings{ + Tasks: []jobs.Task{ + { + TaskKey: "task", + Libraries: []compute.Library{ + { + Whl: "whl/*.whl", + }, + { + Whl: "/Workspace/path/to/whl/my.whl", + }, + { + Jar: "./jar/*.jar", + }, + { + Egg: "egg/*.egg", + }, + }, + }, + }, + }, + }, + }, + }, + }, + } + + bundletest.SetLocation(b, ".", filepath.Join(dir, "resource.yml")) + + diags := bundle.Apply(context.Background(), b, ExpandGlobReferences()) + require.Empty(t, diags) + + job := b.Config.Resources.Jobs["job"] + task := job.JobSettings.Tasks[0] + require.Equal(t, []compute.Library{ + { + Whl: filepath.Join(dir, "whl", "my1.whl"), + }, + { + Whl: filepath.Join(dir, "whl", "my2.whl"), + }, + { + Whl: "/Workspace/path/to/whl/my.whl", + }, + { + Jar: filepath.Join(dir, "jar", "my1.jar"), + }, + { + Jar: filepath.Join(dir, "jar", "my2.jar"), + }, + { + Egg: "egg/*.egg", + }, + }, task.Libraries) +} + +func TestGlobReferencesExpandedForEnvironmentsDeps(t *testing.T) { + dir := t.TempDir() + testutil.Touch(t, dir, "whl", "my1.whl") + testutil.Touch(t, dir, "whl", "my2.whl") + testutil.Touch(t, dir, "jar", "my1.jar") + testutil.Touch(t, dir, "jar", "my2.jar") + + b := &bundle.Bundle{ + RootPath: dir, + Config: config.Root{ + Resources: config.Resources{ + Jobs: map[string]*resources.Job{ + "job": { + JobSettings: &jobs.JobSettings{ + Tasks: []jobs.Task{ + { + TaskKey: "task", + EnvironmentKey: "env", + }, + }, + Environments: []jobs.JobEnvironment{ + { + EnvironmentKey: "env", + Spec: &compute.Environment{ + Dependencies: []string{ + "./whl/*.whl", + "/Workspace/path/to/whl/my.whl", + "./jar/*.jar", + }, + }, + }, + }, + }, + }, + }, + }, + }, + } + + bundletest.SetLocation(b, ".", filepath.Join(dir, "resource.yml")) + + diags := bundle.Apply(context.Background(), b, ExpandGlobReferences()) + require.Empty(t, diags) + + job := b.Config.Resources.Jobs["job"] + env := job.JobSettings.Environments[0] + require.Equal(t, []string{ + filepath.Join(dir, "whl", "my1.whl"), + filepath.Join(dir, "whl", "my2.whl"), + "/Workspace/path/to/whl/my.whl", + filepath.Join(dir, "jar", "my1.jar"), + filepath.Join(dir, "jar", "my2.jar"), + }, env.Spec.Dependencies) + +} diff --git a/bundle/libraries/libraries.go b/bundle/libraries/libraries.go index 72e5bcc66b..2b189e3897 100644 --- a/bundle/libraries/libraries.go +++ b/bundle/libraries/libraries.go @@ -35,7 +35,7 @@ func isEnvsWithLocalLibraries(envs []jobs.JobEnvironment) bool { } for _, l := range e.Spec.Dependencies { - if IsEnvironmentDependencyLocal(l) { + if IsLibraryLocal(l) { return true } } diff --git a/bundle/libraries/local_path.go b/bundle/libraries/local_path.go index f1e3788f24..7a0b4b4291 100644 --- a/bundle/libraries/local_path.go +++ b/bundle/libraries/local_path.go @@ -34,18 +34,36 @@ func IsLocalPath(p string) bool { return false } - // If path starts with /, it's a remote absolute path - return !path.IsAbs(p) + // if path starts with . or .., it's a local path + if strings.HasPrefix(p, ".") || strings.HasPrefix(p, "..") { + return true + } + + // If the path is absolute, it's might be a remote path. + if path.IsAbs(p) { + possiblePrefixes := []string{ + "/Workspace", "/Users", "/mnt", "/dbfs", + "/Volumes", "/Shared", + } + + for _, prefix := range possiblePrefixes { + if strings.HasPrefix(p, prefix) { + return false + } + } + } + + return true } -// IsEnvironmentDependencyLocal returns true if the specified dependency +// IsLibraryLocal returns true if the specified library or environment dependency // should be interpreted as a local path. -// We use this to check if the dependency in environment spec is local. +// We use this to check if the dependency in environment spec is local or that library is local. // We can't use IsLocalPath beacuse environment dependencies can be // a pypi package name which can be misinterpreted as a local path by IsLocalPath. -func IsEnvironmentDependencyLocal(dep string) bool { +func IsLibraryLocal(dep string) bool { possiblePrefixes := []string{ - ".", + ".", "..", } for _, prefix := range possiblePrefixes { @@ -54,7 +72,17 @@ func IsEnvironmentDependencyLocal(dep string) bool { } } - return false + // If the dependency is a requirements file, it's not a valid local path + if strings.HasPrefix(dep, "-r") { + return false + } + + // If the dependency has no extension, it's a PyPi package name + if path.Ext(dep) == "" { + return false + } + + return IsLocalPath(dep) } func isRemoteStorageScheme(path string) bool { diff --git a/bundle/libraries/local_path_test.go b/bundle/libraries/local_path_test.go index d2492d6b12..6cdee3c79b 100644 --- a/bundle/libraries/local_path_test.go +++ b/bundle/libraries/local_path_test.go @@ -10,16 +10,17 @@ import ( func TestIsLocalPath(t *testing.T) { // Relative paths, paths with the file scheme, and Windows paths. + assert.True(t, IsLocalPath("some/local/path")) assert.True(t, IsLocalPath("./some/local/path")) assert.True(t, IsLocalPath("file://path/to/package")) assert.True(t, IsLocalPath("C:\\path\\to\\package")) + assert.True(t, IsLocalPath("/some/full/path")) assert.True(t, IsLocalPath("myfile.txt")) assert.True(t, IsLocalPath("./myfile.txt")) assert.True(t, IsLocalPath("../myfile.txt")) assert.True(t, IsLocalPath("file:///foo/bar/myfile.txt")) - // Absolute paths. - assert.False(t, IsLocalPath("/some/full/path")) + // Remote paths. assert.False(t, IsLocalPath("/Workspace/path/to/package")) assert.False(t, IsLocalPath("/Users/path/to/package")) @@ -48,6 +49,8 @@ func TestIsEnvironmentDependencyLocal(t *testing.T) { path string expected bool }){ + {path: "local/*.whl", expected: true}, + {path: "local/test.whl", expected: true}, {path: "./local/*.whl", expected: true}, {path: ".\\local\\*.whl", expected: true}, {path: "./local/mypath.whl", expected: true}, @@ -59,14 +62,12 @@ func TestIsEnvironmentDependencyLocal(t *testing.T) { {path: "../../local/*.whl", expected: true}, {path: "..\\..\\local\\*.whl", expected: true}, {path: "pypipackage", expected: false}, - {path: "pypipackage/test.whl", expected: false}, - {path: "pypipackage/*.whl", expected: false}, {path: "/Volumes/catalog/schema/volume/path.whl", expected: false}, {path: "/Workspace/my_project/dist.whl", expected: false}, {path: "-r /Workspace/my_project/requirements.txt", expected: false}, } - for _, tc := range testCases { - require.Equal(t, IsEnvironmentDependencyLocal(tc.path), tc.expected) + for i, tc := range testCases { + require.Equalf(t, tc.expected, IsLibraryLocal(tc.path), "failed case: %d, path: %s", i, tc.path) } } diff --git a/bundle/libraries/match.go b/bundle/libraries/match.go deleted file mode 100644 index 4feb4225d6..0000000000 --- a/bundle/libraries/match.go +++ /dev/null @@ -1,82 +0,0 @@ -package libraries - -import ( - "context" - "fmt" - "path/filepath" - - "github.com/databricks/cli/bundle" - "github.com/databricks/cli/libs/diag" - "github.com/databricks/databricks-sdk-go/service/compute" - "github.com/databricks/databricks-sdk-go/service/jobs" -) - -type match struct { -} - -func ValidateLocalLibrariesExist() bundle.Mutator { - return &match{} -} - -func (a *match) Name() string { - return "libraries.ValidateLocalLibrariesExist" -} - -func (a *match) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnostics { - for _, job := range b.Config.Resources.Jobs { - err := validateEnvironments(job.Environments, b) - if err != nil { - return diag.FromErr(err) - } - - for _, task := range job.JobSettings.Tasks { - err := validateTaskLibraries(task.Libraries, b) - if err != nil { - return diag.FromErr(err) - } - } - } - - return nil -} - -func validateTaskLibraries(libs []compute.Library, b *bundle.Bundle) error { - for _, lib := range libs { - path := libraryPath(&lib) - if path == "" || !IsLocalPath(path) { - continue - } - - matches, err := filepath.Glob(filepath.Join(b.RootPath, path)) - if err != nil { - return err - } - - if len(matches) == 0 { - return fmt.Errorf("file %s is referenced in libraries section but doesn't exist on the local file system", libraryPath(&lib)) - } - } - - return nil -} - -func validateEnvironments(envs []jobs.JobEnvironment, b *bundle.Bundle) error { - for _, env := range envs { - if env.Spec == nil { - continue - } - - for _, dep := range env.Spec.Dependencies { - matches, err := filepath.Glob(filepath.Join(b.RootPath, dep)) - if err != nil { - return err - } - - if len(matches) == 0 && IsEnvironmentDependencyLocal(dep) { - return fmt.Errorf("file %s is referenced in environments section but doesn't exist on the local file system", dep) - } - } - } - - return nil -} diff --git a/bundle/libraries/match_test.go b/bundle/libraries/match_test.go index bb4b15107f..e44e505a20 100644 --- a/bundle/libraries/match_test.go +++ b/bundle/libraries/match_test.go @@ -42,7 +42,7 @@ func TestValidateEnvironments(t *testing.T) { }, } - diags := bundle.Apply(context.Background(), b, ValidateLocalLibrariesExist()) + diags := bundle.Apply(context.Background(), b, ExpandGlobReferences()) require.Nil(t, diags) } @@ -74,9 +74,9 @@ func TestValidateEnvironmentsNoFile(t *testing.T) { }, } - diags := bundle.Apply(context.Background(), b, ValidateLocalLibrariesExist()) + diags := bundle.Apply(context.Background(), b, ExpandGlobReferences()) require.Len(t, diags, 1) - require.Equal(t, "file ./wheel.whl is referenced in environments section but doesn't exist on the local file system", diags[0].Summary) + require.Equal(t, "no matching files for ./wheel.whl", diags[0].Summary) } func TestValidateTaskLibraries(t *testing.T) { @@ -109,7 +109,7 @@ func TestValidateTaskLibraries(t *testing.T) { }, } - diags := bundle.Apply(context.Background(), b, ValidateLocalLibrariesExist()) + diags := bundle.Apply(context.Background(), b, ExpandGlobReferences()) require.Nil(t, diags) } @@ -142,7 +142,7 @@ func TestValidateTaskLibrariesNoFile(t *testing.T) { }, } - diags := bundle.Apply(context.Background(), b, ValidateLocalLibrariesExist()) + diags := bundle.Apply(context.Background(), b, ExpandGlobReferences()) require.Len(t, diags, 1) - require.Equal(t, "file ./wheel.whl is referenced in libraries section but doesn't exist on the local file system", diags[0].Summary) + require.Equal(t, "no matching files for ./wheel.whl", diags[0].Summary) } diff --git a/bundle/libraries/upload.go b/bundle/libraries/upload.go new file mode 100644 index 0000000000..ffd0bf5006 --- /dev/null +++ b/bundle/libraries/upload.go @@ -0,0 +1,263 @@ +package libraries + +import ( + "bytes" + "context" + "errors" + "fmt" + "os" + "path" + "path/filepath" + "strings" + + "github.com/databricks/cli/bundle" + "github.com/databricks/cli/libs/diag" + "github.com/databricks/cli/libs/dyn" + "github.com/databricks/cli/libs/filer" + + "github.com/databricks/databricks-sdk-go" + + "golang.org/x/sync/errgroup" +) + +func Upload() bundle.Mutator { + return &upload{} +} + +func UploadWithClient(client filer.Filer) bundle.Mutator { + return &upload{ + client: client, + } +} + +type upload struct { + client filer.Filer +} + +type configLocation struct { + configPath dyn.Path + location dyn.Location +} + +type libsLocations = map[string]([]configLocation) + +// Collect all libraries from the bundle configuration and their config paths. +// By this stage all glob references are expanded and we have a list of all libraries that need to be uploaded. +// We collect them from task libraries, foreach task libraries environment dependencies and artifacts. +// We return a map of library source to a list of config paths and locations where the library is used. +// We use map so we don't upload the same library multiple times. +// Instead we upload it once and update all the config paths to point to the uploaded location. +func collectLocalLibraries(b *bundle.Bundle) libsLocations { + libs := make(map[string]([]configLocation)) + + for i, job := range b.Config.Resources.Jobs { + for j, task := range job.Tasks { + for k, lib := range task.Libraries { + if !IsLocalLibrary(&lib) { + continue + } + + p := dyn.NewPath( + dyn.Key("resources"), + dyn.Key("jobs"), + dyn.Key(i), + dyn.Key("tasks"), + dyn.Index(j), + dyn.Key("libraries"), + dyn.Index(k), + ) + if lib.Whl != "" { + libs[lib.Whl] = append(libs[lib.Whl], configLocation{ + configPath: p.Append(dyn.Key("whl")), + location: b.Config.GetLocation(p.String()), + }) + } + + if lib.Jar != "" { + libs[lib.Jar] = append(libs[lib.Jar], configLocation{ + configPath: p.Append(dyn.Key("jar")), + location: b.Config.GetLocation(p.String()), + }) + } + } + + if task.ForEachTask != nil { + for l, lib := range task.ForEachTask.Task.Libraries { + if !IsLocalLibrary(&lib) { + continue + } + + p := dyn.NewPath( + dyn.Key("resources"), + dyn.Key("jobs"), + dyn.Key(i), + dyn.Key("tasks"), + dyn.Index(j), + dyn.Key("for_each_task"), + dyn.Key("task"), + dyn.Key("libraries"), + dyn.Index(l), + ) + + if lib.Whl != "" { + libs[lib.Whl] = append(libs[lib.Whl], configLocation{ + configPath: p.Append(dyn.Key("whl")), + location: b.Config.GetLocation(p.String()), + }) + } + + if lib.Jar != "" { + libs[lib.Jar] = append(libs[lib.Jar], configLocation{ + configPath: p.Append(dyn.Key("jar")), + location: b.Config.GetLocation(p.String()), + }) + } + } + } + } + + for j, env := range job.Environments { + if env.Spec == nil { + continue + } + + for k, dep := range env.Spec.Dependencies { + if !IsLibraryLocal(dep) { + continue + } + + p := dyn.NewPath( + dyn.Key("resources"), + dyn.Key("jobs"), + dyn.Key(i), + dyn.Key("environments"), + dyn.Index(j), + dyn.Key("spec"), + dyn.Key("dependencies"), + dyn.Index(k), + ) + + libs[dep] = append(libs[dep], configLocation{ + configPath: p, + location: b.Config.GetLocation(p.String()), + }) + } + } + } + + for key, artifact := range b.Config.Artifacts { + for i, file := range artifact.Files { + p := dyn.NewPath( + dyn.Key("artifacts"), + dyn.Key(key), + dyn.Key("files"), + dyn.Index(i), + ) + + libs[file.Source] = append(libs[file.Source], configLocation{ + configPath: p.Append(dyn.Key("remote_path")), + location: b.Config.GetLocation(p.String()), + }) + } + } + + return libs +} + +func (u *upload) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnostics { + uploadPath, err := GetUploadBasePath(b) + if err != nil { + return diag.FromErr(err) + } + + // If the client is not initialized, initialize it + // We use client field in mutator to allow for mocking client in testing + if u.client == nil { + filer, err := GetFilerForLibraries(b.WorkspaceClient(), uploadPath) + if err != nil { + return diag.FromErr(err) + } + + u.client = filer + } + + var diags diag.Diagnostics + + libs := collectLocalLibraries(b) + errs, errCtx := errgroup.WithContext(ctx) + for source := range libs { + errs.Go(func() error { + return UploadFile(errCtx, source, u.client) + }) + } + + if err := errs.Wait(); err != nil { + return diag.FromErr(err) + } + + // Update all the config paths to point to the uploaded location + for source, locations := range libs { + err = b.Config.Mutate(func(v dyn.Value) (dyn.Value, error) { + remotePath := path.Join(uploadPath, filepath.Base(source)) + + // If the remote path does not start with /Workspace or /Volumes, prepend /Workspace + if !strings.HasPrefix(remotePath, "/Workspace") && !strings.HasPrefix(remotePath, "/Volumes") { + remotePath = "/Workspace" + remotePath + } + for _, location := range locations { + v, err = dyn.SetByPath(v, location.configPath, dyn.NewValue(remotePath, []dyn.Location{location.location})) + if err != nil { + return v, err + } + } + + return v, nil + }) + + if err != nil { + diags = diags.Extend(diag.FromErr(err)) + } + } + + return diags +} + +func (u *upload) Name() string { + return "libraries.Upload" +} + +func GetFilerForLibraries(w *databricks.WorkspaceClient, uploadPath string) (filer.Filer, error) { + if isVolumesPath(uploadPath) { + return filer.NewFilesClient(w, uploadPath) + } + return filer.NewWorkspaceFilesClient(w, uploadPath) +} + +func isVolumesPath(path string) bool { + return strings.HasPrefix(path, "/Volumes/") +} + +// Function to upload file (a library, artifact and etc) to Workspace or UC volume +func UploadFile(ctx context.Context, file string, client filer.Filer) error { + raw, err := os.ReadFile(file) + if err != nil { + return fmt.Errorf("unable to read %s: %w", file, errors.Unwrap(err)) + } + + filename := filepath.Base(file) + err = client.Write(ctx, filename, bytes.NewReader(raw), filer.OverwriteIfExists, filer.CreateParentDirectories) + if err != nil { + return fmt.Errorf("unable to import %s: %w", filename, err) + } + + return nil +} + +func GetUploadBasePath(b *bundle.Bundle) (string, error) { + artifactPath := b.Config.Workspace.ArtifactPath + if artifactPath == "" { + return "", fmt.Errorf("remote artifact path not configured") + } + + return path.Join(artifactPath, ".internal"), nil +} diff --git a/bundle/libraries/upload_test.go b/bundle/libraries/upload_test.go new file mode 100644 index 0000000000..de4f53012c --- /dev/null +++ b/bundle/libraries/upload_test.go @@ -0,0 +1,331 @@ +package libraries + +import ( + "context" + "path/filepath" + "testing" + + "github.com/databricks/cli/bundle" + "github.com/databricks/cli/bundle/config" + "github.com/databricks/cli/bundle/config/resources" + mockfiler "github.com/databricks/cli/internal/mocks/libs/filer" + "github.com/databricks/cli/internal/testutil" + "github.com/databricks/cli/libs/filer" + "github.com/databricks/databricks-sdk-go/service/compute" + "github.com/databricks/databricks-sdk-go/service/jobs" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" +) + +func TestArtifactUploadForWorkspace(t *testing.T) { + tmpDir := t.TempDir() + whlFolder := filepath.Join(tmpDir, "whl") + testutil.Touch(t, whlFolder, "source.whl") + whlLocalPath := filepath.Join(whlFolder, "source.whl") + + b := &bundle.Bundle{ + RootPath: tmpDir, + Config: config.Root{ + Workspace: config.Workspace{ + ArtifactPath: "/foo/bar/artifacts", + }, + Artifacts: config.Artifacts{ + "whl": { + Type: config.ArtifactPythonWheel, + Files: []config.ArtifactFile{ + {Source: whlLocalPath}, + }, + }, + }, + Resources: config.Resources{ + Jobs: map[string]*resources.Job{ + "job": { + JobSettings: &jobs.JobSettings{ + Tasks: []jobs.Task{ + { + Libraries: []compute.Library{ + { + Whl: filepath.Join("whl", "*.whl"), + }, + { + Whl: "/Workspace/Users/foo@bar.com/mywheel.whl", + }, + }, + }, + { + ForEachTask: &jobs.ForEachTask{ + Task: jobs.Task{ + Libraries: []compute.Library{ + { + Whl: filepath.Join("whl", "*.whl"), + }, + { + Whl: "/Workspace/Users/foo@bar.com/mywheel.whl", + }, + }, + }, + }, + }, + }, + Environments: []jobs.JobEnvironment{ + { + Spec: &compute.Environment{ + Dependencies: []string{ + filepath.Join("whl", "source.whl"), + "/Workspace/Users/foo@bar.com/mywheel.whl", + }, + }, + }, + }, + }, + }, + }, + }, + }, + } + + mockFiler := mockfiler.NewMockFiler(t) + mockFiler.EXPECT().Write( + mock.Anything, + filepath.Join("source.whl"), + mock.AnythingOfType("*bytes.Reader"), + filer.OverwriteIfExists, + filer.CreateParentDirectories, + ).Return(nil) + + diags := bundle.Apply(context.Background(), b, bundle.Seq(ExpandGlobReferences(), UploadWithClient(mockFiler))) + require.NoError(t, diags.Error()) + + // Test that libraries path is updated + require.Equal(t, "/Workspace/foo/bar/artifacts/.internal/source.whl", b.Config.Resources.Jobs["job"].JobSettings.Tasks[0].Libraries[0].Whl) + require.Equal(t, "/Workspace/Users/foo@bar.com/mywheel.whl", b.Config.Resources.Jobs["job"].JobSettings.Tasks[0].Libraries[1].Whl) + require.Equal(t, "/Workspace/foo/bar/artifacts/.internal/source.whl", b.Config.Resources.Jobs["job"].JobSettings.Environments[0].Spec.Dependencies[0]) + require.Equal(t, "/Workspace/Users/foo@bar.com/mywheel.whl", b.Config.Resources.Jobs["job"].JobSettings.Environments[0].Spec.Dependencies[1]) + require.Equal(t, "/Workspace/foo/bar/artifacts/.internal/source.whl", b.Config.Resources.Jobs["job"].JobSettings.Tasks[1].ForEachTask.Task.Libraries[0].Whl) + require.Equal(t, "/Workspace/Users/foo@bar.com/mywheel.whl", b.Config.Resources.Jobs["job"].JobSettings.Tasks[1].ForEachTask.Task.Libraries[1].Whl) +} + +func TestArtifactUploadForVolumes(t *testing.T) { + tmpDir := t.TempDir() + whlFolder := filepath.Join(tmpDir, "whl") + testutil.Touch(t, whlFolder, "source.whl") + whlLocalPath := filepath.Join(whlFolder, "source.whl") + + b := &bundle.Bundle{ + RootPath: tmpDir, + Config: config.Root{ + Workspace: config.Workspace{ + ArtifactPath: "/Volumes/foo/bar/artifacts", + }, + Artifacts: config.Artifacts{ + "whl": { + Type: config.ArtifactPythonWheel, + Files: []config.ArtifactFile{ + {Source: whlLocalPath}, + }, + }, + }, + Resources: config.Resources{ + Jobs: map[string]*resources.Job{ + "job": { + JobSettings: &jobs.JobSettings{ + Tasks: []jobs.Task{ + { + Libraries: []compute.Library{ + { + Whl: filepath.Join("whl", "*.whl"), + }, + { + Whl: "/Volumes/some/path/mywheel.whl", + }, + }, + }, + { + ForEachTask: &jobs.ForEachTask{ + Task: jobs.Task{ + Libraries: []compute.Library{ + { + Whl: filepath.Join("whl", "*.whl"), + }, + { + Whl: "/Volumes/some/path/mywheel.whl", + }, + }, + }, + }, + }, + }, + Environments: []jobs.JobEnvironment{ + { + Spec: &compute.Environment{ + Dependencies: []string{ + filepath.Join("whl", "source.whl"), + "/Volumes/some/path/mywheel.whl", + }, + }, + }, + }, + }, + }, + }, + }, + }, + } + + mockFiler := mockfiler.NewMockFiler(t) + mockFiler.EXPECT().Write( + mock.Anything, + filepath.Join("source.whl"), + mock.AnythingOfType("*bytes.Reader"), + filer.OverwriteIfExists, + filer.CreateParentDirectories, + ).Return(nil) + + diags := bundle.Apply(context.Background(), b, bundle.Seq(ExpandGlobReferences(), UploadWithClient(mockFiler))) + require.NoError(t, diags.Error()) + + // Test that libraries path is updated + require.Equal(t, "/Volumes/foo/bar/artifacts/.internal/source.whl", b.Config.Resources.Jobs["job"].JobSettings.Tasks[0].Libraries[0].Whl) + require.Equal(t, "/Volumes/some/path/mywheel.whl", b.Config.Resources.Jobs["job"].JobSettings.Tasks[0].Libraries[1].Whl) + require.Equal(t, "/Volumes/foo/bar/artifacts/.internal/source.whl", b.Config.Resources.Jobs["job"].JobSettings.Environments[0].Spec.Dependencies[0]) + require.Equal(t, "/Volumes/some/path/mywheel.whl", b.Config.Resources.Jobs["job"].JobSettings.Environments[0].Spec.Dependencies[1]) + require.Equal(t, "/Volumes/foo/bar/artifacts/.internal/source.whl", b.Config.Resources.Jobs["job"].JobSettings.Tasks[1].ForEachTask.Task.Libraries[0].Whl) + require.Equal(t, "/Volumes/some/path/mywheel.whl", b.Config.Resources.Jobs["job"].JobSettings.Tasks[1].ForEachTask.Task.Libraries[1].Whl) +} + +func TestArtifactUploadWithNoLibraryReference(t *testing.T) { + tmpDir := t.TempDir() + whlFolder := filepath.Join(tmpDir, "whl") + testutil.Touch(t, whlFolder, "source.whl") + whlLocalPath := filepath.Join(whlFolder, "source.whl") + + b := &bundle.Bundle{ + RootPath: tmpDir, + Config: config.Root{ + Workspace: config.Workspace{ + ArtifactPath: "/Workspace/foo/bar/artifacts", + }, + Artifacts: config.Artifacts{ + "whl": { + Type: config.ArtifactPythonWheel, + Files: []config.ArtifactFile{ + {Source: whlLocalPath}, + }, + }, + }, + }, + } + + mockFiler := mockfiler.NewMockFiler(t) + mockFiler.EXPECT().Write( + mock.Anything, + filepath.Join("source.whl"), + mock.AnythingOfType("*bytes.Reader"), + filer.OverwriteIfExists, + filer.CreateParentDirectories, + ).Return(nil) + + diags := bundle.Apply(context.Background(), b, bundle.Seq(ExpandGlobReferences(), UploadWithClient(mockFiler))) + require.NoError(t, diags.Error()) + + require.Equal(t, "/Workspace/foo/bar/artifacts/.internal/source.whl", b.Config.Artifacts["whl"].Files[0].RemotePath) +} + +func TestUploadMultipleLibraries(t *testing.T) { + tmpDir := t.TempDir() + whlFolder := filepath.Join(tmpDir, "whl") + testutil.Touch(t, whlFolder, "source1.whl") + testutil.Touch(t, whlFolder, "source2.whl") + testutil.Touch(t, whlFolder, "source3.whl") + testutil.Touch(t, whlFolder, "source4.whl") + + b := &bundle.Bundle{ + RootPath: tmpDir, + Config: config.Root{ + Workspace: config.Workspace{ + ArtifactPath: "/foo/bar/artifacts", + }, + Resources: config.Resources{ + Jobs: map[string]*resources.Job{ + "job": { + JobSettings: &jobs.JobSettings{ + Tasks: []jobs.Task{ + { + Libraries: []compute.Library{ + { + Whl: filepath.Join("whl", "*.whl"), + }, + { + Whl: "/Workspace/Users/foo@bar.com/mywheel.whl", + }, + }, + }, + }, + Environments: []jobs.JobEnvironment{ + { + Spec: &compute.Environment{ + Dependencies: []string{ + filepath.Join("whl", "*.whl"), + "/Workspace/Users/foo@bar.com/mywheel.whl", + }, + }, + }, + }, + }, + }, + }, + }, + }, + } + + mockFiler := mockfiler.NewMockFiler(t) + mockFiler.EXPECT().Write( + mock.Anything, + filepath.Join("source1.whl"), + mock.AnythingOfType("*bytes.Reader"), + filer.OverwriteIfExists, + filer.CreateParentDirectories, + ).Return(nil).Once() + + mockFiler.EXPECT().Write( + mock.Anything, + filepath.Join("source2.whl"), + mock.AnythingOfType("*bytes.Reader"), + filer.OverwriteIfExists, + filer.CreateParentDirectories, + ).Return(nil).Once() + + mockFiler.EXPECT().Write( + mock.Anything, + filepath.Join("source3.whl"), + mock.AnythingOfType("*bytes.Reader"), + filer.OverwriteIfExists, + filer.CreateParentDirectories, + ).Return(nil).Once() + + mockFiler.EXPECT().Write( + mock.Anything, + filepath.Join("source4.whl"), + mock.AnythingOfType("*bytes.Reader"), + filer.OverwriteIfExists, + filer.CreateParentDirectories, + ).Return(nil).Once() + + diags := bundle.Apply(context.Background(), b, bundle.Seq(ExpandGlobReferences(), UploadWithClient(mockFiler))) + require.NoError(t, diags.Error()) + + // Test that libraries path is updated + require.Len(t, b.Config.Resources.Jobs["job"].JobSettings.Tasks[0].Libraries, 5) + require.Contains(t, b.Config.Resources.Jobs["job"].JobSettings.Tasks[0].Libraries, compute.Library{Whl: "/Workspace/foo/bar/artifacts/.internal/source1.whl"}) + require.Contains(t, b.Config.Resources.Jobs["job"].JobSettings.Tasks[0].Libraries, compute.Library{Whl: "/Workspace/foo/bar/artifacts/.internal/source2.whl"}) + require.Contains(t, b.Config.Resources.Jobs["job"].JobSettings.Tasks[0].Libraries, compute.Library{Whl: "/Workspace/foo/bar/artifacts/.internal/source3.whl"}) + require.Contains(t, b.Config.Resources.Jobs["job"].JobSettings.Tasks[0].Libraries, compute.Library{Whl: "/Workspace/foo/bar/artifacts/.internal/source4.whl"}) + require.Contains(t, b.Config.Resources.Jobs["job"].JobSettings.Tasks[0].Libraries, compute.Library{Whl: "/Workspace/Users/foo@bar.com/mywheel.whl"}) + + require.Len(t, b.Config.Resources.Jobs["job"].JobSettings.Environments[0].Spec.Dependencies, 5) + require.Contains(t, b.Config.Resources.Jobs["job"].JobSettings.Environments[0].Spec.Dependencies, "/Workspace/foo/bar/artifacts/.internal/source1.whl") + require.Contains(t, b.Config.Resources.Jobs["job"].JobSettings.Environments[0].Spec.Dependencies, "/Workspace/foo/bar/artifacts/.internal/source2.whl") + require.Contains(t, b.Config.Resources.Jobs["job"].JobSettings.Environments[0].Spec.Dependencies, "/Workspace/foo/bar/artifacts/.internal/source3.whl") + require.Contains(t, b.Config.Resources.Jobs["job"].JobSettings.Environments[0].Spec.Dependencies, "/Workspace/foo/bar/artifacts/.internal/source4.whl") + require.Contains(t, b.Config.Resources.Jobs["job"].JobSettings.Environments[0].Spec.Dependencies, "/Workspace/Users/foo@bar.com/mywheel.whl") +} diff --git a/bundle/phases/deploy.go b/bundle/phases/deploy.go index 6929f74baf..48ca73024a 100644 --- a/bundle/phases/deploy.go +++ b/bundle/phases/deploy.go @@ -113,9 +113,10 @@ func Deploy() bundle.Mutator { terraform.StatePull(), deploy.StatePull(), mutator.ValidateGitDetails(), - libraries.ValidateLocalLibrariesExist(), + libraries.ExpandGlobReferences(), artifacts.CleanUp(), - artifacts.UploadAll(), + libraries.ExpandGlobReferences(), + libraries.Upload(), python.TransformWheelTask(), files.Upload(), deploy.StateUpdate(), diff --git a/bundle/tests/enviroment_key_test.go b/bundle/tests/enviroment_key_test.go index aed3964db5..135ef19177 100644 --- a/bundle/tests/enviroment_key_test.go +++ b/bundle/tests/enviroment_key_test.go @@ -18,6 +18,6 @@ func TestEnvironmentKeyProvidedAndNoPanic(t *testing.T) { b, diags := loadTargetWithDiags("./environment_key_only", "default") require.Empty(t, diags) - diags = bundle.Apply(context.Background(), b, libraries.ValidateLocalLibrariesExist()) + diags = bundle.Apply(context.Background(), b, libraries.ExpandGlobReferences()) require.Empty(t, diags) } diff --git a/bundle/tests/python_wheel_test.go b/bundle/tests/python_wheel_test.go index 05e4fdfafd..318467d3fa 100644 --- a/bundle/tests/python_wheel_test.go +++ b/bundle/tests/python_wheel_test.go @@ -8,6 +8,9 @@ import ( "github.com/databricks/cli/bundle" "github.com/databricks/cli/bundle/libraries" "github.com/databricks/cli/bundle/phases" + mockfiler "github.com/databricks/cli/internal/mocks/libs/filer" + "github.com/databricks/cli/libs/filer" + "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" ) @@ -23,7 +26,7 @@ func TestPythonWheelBuild(t *testing.T) { require.NoError(t, err) require.Equal(t, 1, len(matches)) - match := libraries.ValidateLocalLibrariesExist() + match := libraries.ExpandGlobReferences() diags = bundle.Apply(ctx, b, match) require.NoError(t, diags.Error()) } @@ -40,7 +43,7 @@ func TestPythonWheelBuildAutoDetect(t *testing.T) { require.NoError(t, err) require.Equal(t, 1, len(matches)) - match := libraries.ValidateLocalLibrariesExist() + match := libraries.ExpandGlobReferences() diags = bundle.Apply(ctx, b, match) require.NoError(t, diags.Error()) } @@ -57,7 +60,7 @@ func TestPythonWheelBuildAutoDetectWithNotebookTask(t *testing.T) { require.NoError(t, err) require.Equal(t, 1, len(matches)) - match := libraries.ValidateLocalLibrariesExist() + match := libraries.ExpandGlobReferences() diags = bundle.Apply(ctx, b, match) require.NoError(t, diags.Error()) } @@ -70,7 +73,7 @@ func TestPythonWheelWithDBFSLib(t *testing.T) { diags := bundle.Apply(ctx, b, bundle.Seq(phases.Load(), phases.Build())) require.NoError(t, diags.Error()) - match := libraries.ValidateLocalLibrariesExist() + match := libraries.ExpandGlobReferences() diags = bundle.Apply(ctx, b, match) require.NoError(t, diags.Error()) } @@ -80,21 +83,24 @@ func TestPythonWheelBuildNoBuildJustUpload(t *testing.T) { b, err := bundle.Load(ctx, "./python_wheel/python_wheel_no_artifact_no_setup") require.NoError(t, err) - diags := bundle.Apply(ctx, b, bundle.Seq(phases.Load(), phases.Build())) - require.NoError(t, diags.Error()) + b.Config.Workspace.ArtifactPath = "/foo/bar" - match := libraries.ValidateLocalLibrariesExist() - diags = bundle.Apply(ctx, b, match) - require.ErrorContains(t, diags.Error(), "./non-existing/*.whl") + mockFiler := mockfiler.NewMockFiler(t) + mockFiler.EXPECT().Write( + mock.Anything, + filepath.Join("my_test_code-0.0.1-py3-none-any.whl"), + mock.AnythingOfType("*bytes.Reader"), + filer.OverwriteIfExists, + filer.CreateParentDirectories, + ).Return(nil) + + u := libraries.UploadWithClient(mockFiler) + diags := bundle.Apply(ctx, b, bundle.Seq(phases.Load(), phases.Build(), libraries.ExpandGlobReferences(), u)) + require.NoError(t, diags.Error()) - require.NotZero(t, len(b.Config.Artifacts)) + require.Len(t, diags, 1) - artifact := b.Config.Artifacts["my_test_code-0.0.1-py3-none-any.whl"] - require.NotNil(t, artifact) - require.Empty(t, artifact.BuildCommand) - require.Contains(t, artifact.Files[0].Source, filepath.Join(b.RootPath, "package", - "my_test_code-0.0.1-py3-none-any.whl", - )) + require.Equal(t, "/foo/bar/.internal/my_test_code-0.0.1-py3-none-any.whl", b.Config.Resources.Jobs["test_job"].JobSettings.Tasks[0].Libraries[0].Whl) } func TestPythonWheelBuildWithEnvironmentKey(t *testing.T) { @@ -109,7 +115,7 @@ func TestPythonWheelBuildWithEnvironmentKey(t *testing.T) { require.NoError(t, err) require.Equal(t, 1, len(matches)) - match := libraries.ValidateLocalLibrariesExist() + match := libraries.ExpandGlobReferences() diags = bundle.Apply(ctx, b, match) require.NoError(t, diags.Error()) } @@ -126,7 +132,7 @@ func TestPythonWheelBuildMultiple(t *testing.T) { require.NoError(t, err) require.Equal(t, 2, len(matches)) - match := libraries.ValidateLocalLibrariesExist() + match := libraries.ExpandGlobReferences() diags = bundle.Apply(ctx, b, match) require.NoError(t, diags.Error()) } @@ -139,7 +145,7 @@ func TestPythonWheelNoBuild(t *testing.T) { diags := bundle.Apply(ctx, b, bundle.Seq(phases.Load(), phases.Build())) require.NoError(t, diags.Error()) - match := libraries.ValidateLocalLibrariesExist() + match := libraries.ExpandGlobReferences() diags = bundle.Apply(ctx, b, match) require.NoError(t, diags.Error()) } From 94c960607cabc1f82b86fcb11bacf075298bc098 Mon Sep 17 00:00:00 2001 From: Andrew Nester Date: Thu, 8 Aug 2024 12:49:36 +0200 Subject: [PATCH 02/11] merge + fix --- bundle/libraries/upload.go | 6 +++++- bundle/tests/python_wheel_test.go | 2 +- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/bundle/libraries/upload.go b/bundle/libraries/upload.go index ffd0bf5006..f7b95d1374 100644 --- a/bundle/libraries/upload.go +++ b/bundle/libraries/upload.go @@ -11,6 +11,7 @@ import ( "strings" "github.com/databricks/cli/bundle" + "github.com/databricks/cli/libs/cmdio" "github.com/databricks/cli/libs/diag" "github.com/databricks/cli/libs/dyn" "github.com/databricks/cli/libs/filer" @@ -239,17 +240,20 @@ func isVolumesPath(path string) bool { // Function to upload file (a library, artifact and etc) to Workspace or UC volume func UploadFile(ctx context.Context, file string, client filer.Filer) error { + filename := filepath.Base(file) + cmdio.LogString(ctx, fmt.Sprintf("Uploading %s...", filename)) + raw, err := os.ReadFile(file) if err != nil { return fmt.Errorf("unable to read %s: %w", file, errors.Unwrap(err)) } - filename := filepath.Base(file) err = client.Write(ctx, filename, bytes.NewReader(raw), filer.OverwriteIfExists, filer.CreateParentDirectories) if err != nil { return fmt.Errorf("unable to import %s: %w", filename, err) } + cmdio.LogString(ctx, "Upload succeeded") return nil } diff --git a/bundle/tests/python_wheel_test.go b/bundle/tests/python_wheel_test.go index 318467d3fa..0b3c7b306d 100644 --- a/bundle/tests/python_wheel_test.go +++ b/bundle/tests/python_wheel_test.go @@ -100,7 +100,7 @@ func TestPythonWheelBuildNoBuildJustUpload(t *testing.T) { require.Len(t, diags, 1) - require.Equal(t, "/foo/bar/.internal/my_test_code-0.0.1-py3-none-any.whl", b.Config.Resources.Jobs["test_job"].JobSettings.Tasks[0].Libraries[0].Whl) + require.Equal(t, "/Workspace/foo/bar/.internal/my_test_code-0.0.1-py3-none-any.whl", b.Config.Resources.Jobs["test_job"].JobSettings.Tasks[0].Libraries[0].Whl) } func TestPythonWheelBuildWithEnvironmentKey(t *testing.T) { From d584e3b912da67f607cd14dcfd1129073992dc54 Mon Sep 17 00:00:00 2001 From: Andrew Nester Date: Thu, 8 Aug 2024 12:54:13 +0200 Subject: [PATCH 03/11] fixed test --- internal/bundle/artifacts_test.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/internal/bundle/artifacts_test.go b/internal/bundle/artifacts_test.go index 46c236a4e9..bae8073fcb 100644 --- a/internal/bundle/artifacts_test.go +++ b/internal/bundle/artifacts_test.go @@ -8,9 +8,9 @@ import ( "testing" "github.com/databricks/cli/bundle" - "github.com/databricks/cli/bundle/artifacts" "github.com/databricks/cli/bundle/config" "github.com/databricks/cli/bundle/config/resources" + "github.com/databricks/cli/bundle/libraries" "github.com/databricks/cli/internal" "github.com/databricks/cli/internal/acc" "github.com/databricks/databricks-sdk-go/service/compute" @@ -74,7 +74,7 @@ func TestAccUploadArtifactFileToCorrectRemotePath(t *testing.T) { }, } - diags := bundle.Apply(ctx, b, artifacts.BasicUpload("test")) + diags := bundle.Apply(ctx, b, bundle.Seq(libraries.ExpandGlobReferences(), libraries.Upload())) require.NoError(t, diags.Error()) // The remote path attribute on the artifact file should have been set. @@ -138,7 +138,7 @@ func TestAccUploadArtifactFileToCorrectRemotePathWithEnvironments(t *testing.T) }, } - diags := bundle.Apply(ctx, b, artifacts.BasicUpload("test")) + diags := bundle.Apply(ctx, b, bundle.Seq(libraries.ExpandGlobReferences(), libraries.Upload())) require.NoError(t, diags.Error()) // The remote path attribute on the artifact file should have been set. @@ -207,7 +207,7 @@ func TestAccUploadArtifactFileToCorrectRemotePathForVolumes(t *testing.T) { }, } - diags := bundle.Apply(ctx, b, artifacts.BasicUpload("test")) + diags := bundle.Apply(ctx, b, bundle.Seq(libraries.ExpandGlobReferences(), libraries.Upload())) require.NoError(t, diags.Error()) // The remote path attribute on the artifact file should have been set. From cfd23412e4e879ddef5fcfe8998bd6ba340bef6a Mon Sep 17 00:00:00 2001 From: Andrew Nester Date: Thu, 8 Aug 2024 13:55:47 +0200 Subject: [PATCH 04/11] fixed typo --- bundle/libraries/upload.go | 3 ++- bundle/phases/deploy.go | 1 - 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/bundle/libraries/upload.go b/bundle/libraries/upload.go index f7b95d1374..eac92a1505 100644 --- a/bundle/libraries/upload.go +++ b/bundle/libraries/upload.go @@ -15,6 +15,7 @@ import ( "github.com/databricks/cli/libs/diag" "github.com/databricks/cli/libs/dyn" "github.com/databricks/cli/libs/filer" + "github.com/databricks/cli/libs/log" "github.com/databricks/databricks-sdk-go" @@ -253,7 +254,7 @@ func UploadFile(ctx context.Context, file string, client filer.Filer) error { return fmt.Errorf("unable to import %s: %w", filename, err) } - cmdio.LogString(ctx, "Upload succeeded") + log.Infof(ctx, "Upload succeeded") return nil } diff --git a/bundle/phases/deploy.go b/bundle/phases/deploy.go index 48ca73024a..ca967c321a 100644 --- a/bundle/phases/deploy.go +++ b/bundle/phases/deploy.go @@ -113,7 +113,6 @@ func Deploy() bundle.Mutator { terraform.StatePull(), deploy.StatePull(), mutator.ValidateGitDetails(), - libraries.ExpandGlobReferences(), artifacts.CleanUp(), libraries.ExpandGlobReferences(), libraries.Upload(), From 67a37e4888e65330bc6584b9327e3b5e6804e943 Mon Sep 17 00:00:00 2001 From: Andrew Nester Date: Thu, 8 Aug 2024 17:02:27 +0200 Subject: [PATCH 05/11] fixes to local path --- bundle/artifacts/{cleanup.go => upload.go} | 0 bundle/artifacts/upload_test.go | 115 +++++++++++++++++++++ bundle/libraries/expand_glob_references.go | 13 +-- bundle/libraries/libraries.go | 2 +- bundle/libraries/local_path.go | 69 +++++++------ bundle/libraries/local_path_test.go | 36 +++++-- bundle/libraries/upload.go | 4 +- 7 files changed, 186 insertions(+), 53 deletions(-) rename bundle/artifacts/{cleanup.go => upload.go} (100%) create mode 100644 bundle/artifacts/upload_test.go diff --git a/bundle/artifacts/cleanup.go b/bundle/artifacts/upload.go similarity index 100% rename from bundle/artifacts/cleanup.go rename to bundle/artifacts/upload.go diff --git a/bundle/artifacts/upload_test.go b/bundle/artifacts/upload_test.go new file mode 100644 index 0000000000..bec209a23b --- /dev/null +++ b/bundle/artifacts/upload_test.go @@ -0,0 +1,115 @@ +package artifacts + +import ( + "context" + "os" + "path/filepath" + "testing" + + "github.com/databricks/cli/bundle" + "github.com/databricks/cli/bundle/config" + "github.com/databricks/cli/bundle/internal/bundletest" + "github.com/databricks/cli/bundle/libraries" + "github.com/databricks/cli/libs/diag" + "github.com/databricks/cli/libs/testfile" + "github.com/stretchr/testify/require" +) + +type noop struct{} + +func (n *noop) Apply(context.Context, *bundle.Bundle) diag.Diagnostics { + return nil +} + +func (n *noop) Name() string { + return "noop" +} + +func TestExpandGlobFilesSource(t *testing.T) { + rootPath := t.TempDir() + err := os.Mkdir(filepath.Join(rootPath, "test"), 0755) + require.NoError(t, err) + + t1 := testfile.CreateFile(t, filepath.Join(rootPath, "test", "myjar1.jar")) + t1.Close(t) + + t2 := testfile.CreateFile(t, filepath.Join(rootPath, "test", "myjar2.jar")) + t2.Close(t) + + b := &bundle.Bundle{ + RootPath: rootPath, + Config: config.Root{ + Artifacts: map[string]*config.Artifact{ + "test": { + Type: "custom", + Files: []config.ArtifactFile{ + { + Source: filepath.Join("..", "test", "*.jar"), + }, + }, + }, + }, + }, + } + + bundletest.SetLocation(b, ".", filepath.Join(rootPath, "resources", "artifacts.yml")) + + u := &upload{"test"} + uploadMutators[config.ArtifactType("custom")] = func(name string) bundle.Mutator { + return &noop{} + } + + bm := &build{"test"} + buildMutators[config.ArtifactType("custom")] = func(name string) bundle.Mutator { + return &noop{} + } + + pm := &prepare{"test"} + prepareMutators[config.ArtifactType("custom")] = func(name string) bundle.Mutator { + return &noop{} + } + + diags := bundle.Apply(context.Background(), b, bundle.Seq(pm, bm, u, libraries.ExpandGlobReferences())) + require.NoError(t, diags.Error()) + + require.Equal(t, 2, len(b.Config.Artifacts["test"].Files)) + require.Equal(t, filepath.Join(rootPath, "test", "myjar1.jar"), b.Config.Artifacts["test"].Files[0].Source) + require.Equal(t, filepath.Join(rootPath, "test", "myjar2.jar"), b.Config.Artifacts["test"].Files[1].Source) +} + +func TestExpandGlobFilesSourceWithNoMatches(t *testing.T) { + rootPath := t.TempDir() + err := os.Mkdir(filepath.Join(rootPath, "test"), 0755) + require.NoError(t, err) + + b := &bundle.Bundle{ + RootPath: rootPath, + Config: config.Root{ + Artifacts: map[string]*config.Artifact{ + "test": { + Type: "custom", + Files: []config.ArtifactFile{ + { + Source: filepath.Join("..", "test", "myjar.jar"), + }, + }, + }, + }, + }, + } + + bundletest.SetLocation(b, ".", filepath.Join(rootPath, "resources", "artifacts.yml")) + + u := &upload{"test"} + uploadMutators[config.ArtifactType("custom")] = func(name string) bundle.Mutator { + return &noop{} + } + + bm := &build{"test"} + buildMutators[config.ArtifactType("custom")] = func(name string) bundle.Mutator { + return &noop{} + } + + diags := bundle.Apply(context.Background(), b, bundle.Seq(bm, u, libraries.ExpandGlobReferences())) + require.ErrorContains(t, diags.Error(), "no files found for") +} diff --git a/bundle/libraries/expand_glob_references.go b/bundle/libraries/expand_glob_references.go index af100da3f3..01bac9b281 100644 --- a/bundle/libraries/expand_glob_references.go +++ b/bundle/libraries/expand_glob_references.go @@ -59,15 +59,12 @@ func expandLibraries(b *bundle.Bundle, p dyn.Path, v dyn.Value) (diag.Diagnostic for i, lib := range libs { lp := p.Append(dyn.Index(i)) path, libType, supported := getLibDetails(lib) - if !supported { + if !supported || !IsLibraryLocal(path) { output = append(output, lib) continue } - if !IsLibraryLocal(path) { - output = append(output, lib) - continue - } + lp = lp.Append(dyn.Key(libType)) matches, err := findMatches(b, path) if err != nil { @@ -138,7 +135,7 @@ func (e *expand) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnostics { dyn.Key("libraries"), ) - envDepsPattern := dyn.NewPattern( + envDeps := dyn.NewPattern( dyn.Key("resources"), dyn.Key("jobs"), dyn.AnyKey(), @@ -158,7 +155,7 @@ func (e *expand) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnostics { fn: expandLibraries, }, { - pattern: envDepsPattern, + pattern: envDeps, fn: expandEnvironmentDeps, }, } @@ -171,7 +168,7 @@ func (e *expand) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnostics { v, err = dyn.MapByPattern(v, expander.pattern, func(p dyn.Path, lv dyn.Value) (dyn.Value, error) { d, output := expander.fn(b, p, lv) diags = diags.Extend(d) - return dyn.V(output), err + return dyn.V(output), nil }) if err != nil { diff --git a/bundle/libraries/libraries.go b/bundle/libraries/libraries.go index 2b189e3897..33b848dd93 100644 --- a/bundle/libraries/libraries.go +++ b/bundle/libraries/libraries.go @@ -67,7 +67,7 @@ func FindTasksWithLocalLibraries(b *bundle.Bundle) []jobs.Task { func isTaskWithLocalLibraries(task jobs.Task) bool { for _, l := range task.Libraries { - if IsLocalLibrary(&l) { + if IsLibraryLocal(libraryPath(&l)) { return true } } diff --git a/bundle/libraries/local_path.go b/bundle/libraries/local_path.go index 7a0b4b4291..44ea97bb14 100644 --- a/bundle/libraries/local_path.go +++ b/bundle/libraries/local_path.go @@ -4,8 +4,6 @@ import ( "net/url" "path" "strings" - - "github.com/databricks/databricks-sdk-go/service/compute" ) // IsLocalPath returns true if the specified path indicates that @@ -29,31 +27,41 @@ func IsLocalPath(p string) bool { return true } - // If the path has another scheme, it's a remote path. - if isRemoteStorageScheme(p) { + if IsRemotePath(p) { return false } - // if path starts with . or .., it's a local path - if strings.HasPrefix(p, ".") || strings.HasPrefix(p, "..") { + // If the path is absolute, it's a remote path. + return !path.IsAbs(p) +} + +func IsRemotePath(p string) bool { + if isRemoteStorageScheme(p) { return true } + // if the path is not absolute, it's not a remote path + if !path.IsAbs(p) { + return false + } + // If the path is absolute, it's might be a remote path. - if path.IsAbs(p) { - possiblePrefixes := []string{ - "/Workspace", "/Users", "/mnt", "/dbfs", - "/Volumes", "/Shared", - } + possiblePrefixes := []string{ + "/Workspace", + "/Users", + "/Volumes", + "/Shared", + "/mnt", + "/dbfs", + } - for _, prefix := range possiblePrefixes { - if strings.HasPrefix(p, prefix) { - return false - } + for _, prefix := range possiblePrefixes { + if strings.HasPrefix(p, prefix) { + return true } } - return true + return false } // IsLibraryLocal returns true if the specified library or environment dependency @@ -63,7 +71,7 @@ func IsLocalPath(p string) bool { // a pypi package name which can be misinterpreted as a local path by IsLocalPath. func IsLibraryLocal(dep string) bool { possiblePrefixes := []string{ - ".", "..", + ".", } for _, prefix := range possiblePrefixes { @@ -78,11 +86,20 @@ func IsLibraryLocal(dep string) bool { } // If the dependency has no extension, it's a PyPi package name - if path.Ext(dep) == "" { + if isPackage(dep) { + return false + } + + if IsRemotePath(dep) { return false } - return IsLocalPath(dep) + return true +} + +func isPackage(name string) bool { + // If the dependency has no extension, it's a PyPi package name + return path.Ext(name) == "" } func isRemoteStorageScheme(path string) bool { @@ -95,16 +112,6 @@ func isRemoteStorageScheme(path string) bool { return false } - // If the path starts with scheme:/ format, it's a correct remote storage scheme - return strings.HasPrefix(path, url.Scheme+":/") -} - -// IsLocalLibrary returns true if the specified library refers to a local path. -func IsLocalLibrary(library *compute.Library) bool { - path := libraryPath(library) - if path == "" { - return false - } - - return IsLocalPath(path) + // If the path starts with scheme:/ format (not file), it's a correct remote storage scheme + return strings.HasPrefix(path, url.Scheme+":/") && url.Scheme != "file" } diff --git a/bundle/libraries/local_path_test.go b/bundle/libraries/local_path_test.go index 6cdee3c79b..0ad5c93dbd 100644 --- a/bundle/libraries/local_path_test.go +++ b/bundle/libraries/local_path_test.go @@ -3,7 +3,6 @@ package libraries import ( "testing" - "github.com/databricks/databricks-sdk-go/service/compute" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -14,13 +13,14 @@ func TestIsLocalPath(t *testing.T) { assert.True(t, IsLocalPath("./some/local/path")) assert.True(t, IsLocalPath("file://path/to/package")) assert.True(t, IsLocalPath("C:\\path\\to\\package")) - assert.True(t, IsLocalPath("/some/full/path")) + assert.True(t, IsLocalPath("myfile.txt")) assert.True(t, IsLocalPath("./myfile.txt")) assert.True(t, IsLocalPath("../myfile.txt")) assert.True(t, IsLocalPath("file:///foo/bar/myfile.txt")) // Remote paths. + assert.False(t, IsLocalPath("/some/full/path")) assert.False(t, IsLocalPath("/Workspace/path/to/package")) assert.False(t, IsLocalPath("/Users/path/to/package")) @@ -31,17 +31,31 @@ func TestIsLocalPath(t *testing.T) { assert.False(t, IsLocalPath("abfss://path/to/package")) } -func TestIsLocalLibrary(t *testing.T) { - // Local paths. - assert.True(t, IsLocalLibrary(&compute.Library{Whl: "./file.whl"})) - assert.True(t, IsLocalLibrary(&compute.Library{Jar: "../target/some.jar"})) +func TestIsRemotePath(t *testing.T) { + // Paths with schemes. + assert.True(t, IsRemotePath("dbfs://path/to/package")) + assert.True(t, IsRemotePath("dbfs:/path/to/package")) + assert.True(t, IsRemotePath("s3://path/to/package")) + assert.True(t, IsRemotePath("abfss://path/to/package")) + + // Remote paths. + assert.True(t, IsRemotePath("/Workspace/path/to/package")) + assert.True(t, IsRemotePath("/Users/path/to/package")) + + // Relative paths, paths with the file scheme, and Windows paths. + assert.False(t, IsRemotePath("some/local/path")) + assert.False(t, IsRemotePath("./some/local/path")) + assert.False(t, IsRemotePath("file://path/to/package")) + + assert.False(t, IsRemotePath("myfile.txt")) + assert.False(t, IsRemotePath("./myfile.txt")) + assert.False(t, IsRemotePath("../myfile.txt")) - // Non-local paths. - assert.False(t, IsLocalLibrary(&compute.Library{Whl: "/Workspace/path/to/file.whl"})) - assert.False(t, IsLocalLibrary(&compute.Library{Jar: "s3:/bucket/path/some.jar"})) + // Local absolute paths. + assert.False(t, IsRemotePath("/some/full/path")) + assert.False(t, IsRemotePath("file:///foo/bar/myfile.txt")) + assert.False(t, IsRemotePath("C:\\path\\to\\package")) - // Empty. - assert.False(t, IsLocalLibrary(&compute.Library{})) } func TestIsEnvironmentDependencyLocal(t *testing.T) { diff --git a/bundle/libraries/upload.go b/bundle/libraries/upload.go index eac92a1505..c13a9bf693 100644 --- a/bundle/libraries/upload.go +++ b/bundle/libraries/upload.go @@ -55,7 +55,7 @@ func collectLocalLibraries(b *bundle.Bundle) libsLocations { for i, job := range b.Config.Resources.Jobs { for j, task := range job.Tasks { for k, lib := range task.Libraries { - if !IsLocalLibrary(&lib) { + if !IsLibraryLocal(libraryPath(&lib)) { continue } @@ -85,7 +85,7 @@ func collectLocalLibraries(b *bundle.Bundle) libsLocations { if task.ForEachTask != nil { for l, lib := range task.ForEachTask.Task.Libraries { - if !IsLocalLibrary(&lib) { + if !IsLibraryLocal(libraryPath(&lib)) { continue } From 9487d15905ffb2e76b67da8740f000720b35c92c Mon Sep 17 00:00:00 2001 From: Andrew Nester Date: Thu, 8 Aug 2024 17:09:08 +0200 Subject: [PATCH 06/11] removed file --- bundle/artifacts/upload_test.go | 115 -------------------------------- 1 file changed, 115 deletions(-) delete mode 100644 bundle/artifacts/upload_test.go diff --git a/bundle/artifacts/upload_test.go b/bundle/artifacts/upload_test.go deleted file mode 100644 index bec209a23b..0000000000 --- a/bundle/artifacts/upload_test.go +++ /dev/null @@ -1,115 +0,0 @@ -package artifacts - -import ( - "context" - "os" - "path/filepath" - "testing" - - "github.com/databricks/cli/bundle" - "github.com/databricks/cli/bundle/config" - "github.com/databricks/cli/bundle/internal/bundletest" - "github.com/databricks/cli/bundle/libraries" - "github.com/databricks/cli/libs/diag" - "github.com/databricks/cli/libs/testfile" - "github.com/stretchr/testify/require" -) - -type noop struct{} - -func (n *noop) Apply(context.Context, *bundle.Bundle) diag.Diagnostics { - return nil -} - -func (n *noop) Name() string { - return "noop" -} - -func TestExpandGlobFilesSource(t *testing.T) { - rootPath := t.TempDir() - err := os.Mkdir(filepath.Join(rootPath, "test"), 0755) - require.NoError(t, err) - - t1 := testfile.CreateFile(t, filepath.Join(rootPath, "test", "myjar1.jar")) - t1.Close(t) - - t2 := testfile.CreateFile(t, filepath.Join(rootPath, "test", "myjar2.jar")) - t2.Close(t) - - b := &bundle.Bundle{ - RootPath: rootPath, - Config: config.Root{ - Artifacts: map[string]*config.Artifact{ - "test": { - Type: "custom", - Files: []config.ArtifactFile{ - { - Source: filepath.Join("..", "test", "*.jar"), - }, - }, - }, - }, - }, - } - - bundletest.SetLocation(b, ".", filepath.Join(rootPath, "resources", "artifacts.yml")) - - u := &upload{"test"} - uploadMutators[config.ArtifactType("custom")] = func(name string) bundle.Mutator { - return &noop{} - } - - bm := &build{"test"} - buildMutators[config.ArtifactType("custom")] = func(name string) bundle.Mutator { - return &noop{} - } - - pm := &prepare{"test"} - prepareMutators[config.ArtifactType("custom")] = func(name string) bundle.Mutator { - return &noop{} - } - - diags := bundle.Apply(context.Background(), b, bundle.Seq(pm, bm, u, libraries.ExpandGlobReferences())) - require.NoError(t, diags.Error()) - - require.Equal(t, 2, len(b.Config.Artifacts["test"].Files)) - require.Equal(t, filepath.Join(rootPath, "test", "myjar1.jar"), b.Config.Artifacts["test"].Files[0].Source) - require.Equal(t, filepath.Join(rootPath, "test", "myjar2.jar"), b.Config.Artifacts["test"].Files[1].Source) -} - -func TestExpandGlobFilesSourceWithNoMatches(t *testing.T) { - rootPath := t.TempDir() - err := os.Mkdir(filepath.Join(rootPath, "test"), 0755) - require.NoError(t, err) - - b := &bundle.Bundle{ - RootPath: rootPath, - Config: config.Root{ - Artifacts: map[string]*config.Artifact{ - "test": { - Type: "custom", - Files: []config.ArtifactFile{ - { - Source: filepath.Join("..", "test", "myjar.jar"), - }, - }, - }, - }, - }, - } - - bundletest.SetLocation(b, ".", filepath.Join(rootPath, "resources", "artifacts.yml")) - - u := &upload{"test"} - uploadMutators[config.ArtifactType("custom")] = func(name string) bundle.Mutator { - return &noop{} - } - - bm := &build{"test"} - buildMutators[config.ArtifactType("custom")] = func(name string) bundle.Mutator { - return &noop{} - } - - diags := bundle.Apply(context.Background(), b, bundle.Seq(bm, u, libraries.ExpandGlobReferences())) - require.ErrorContains(t, diags.Error(), "no files found for") -} From 8e3d4c774a4b4cf7e06de0971029a497008155d1 Mon Sep 17 00:00:00 2001 From: Andrew Nester Date: Thu, 8 Aug 2024 17:17:24 +0200 Subject: [PATCH 07/11] added test for expanding absolute paths --- bundle/libraries/expand_glob_references.go | 4 ++-- bundle/libraries/expand_glob_references_test.go | 15 ++++++++++++++- 2 files changed, 16 insertions(+), 3 deletions(-) diff --git a/bundle/libraries/expand_glob_references.go b/bundle/libraries/expand_glob_references.go index 01bac9b281..5c5b5db676 100644 --- a/bundle/libraries/expand_glob_references.go +++ b/bundle/libraries/expand_glob_references.go @@ -59,7 +59,7 @@ func expandLibraries(b *bundle.Bundle, p dyn.Path, v dyn.Value) (diag.Diagnostic for i, lib := range libs { lp := p.Append(dyn.Index(i)) path, libType, supported := getLibDetails(lib) - if !supported || !IsLibraryLocal(path) { + if !supported || !IsLocalPath(path) { output = append(output, lib) continue } @@ -90,7 +90,7 @@ func expandEnvironmentDeps(b *bundle.Bundle, p dyn.Path, v dyn.Value) (diag.Diag for i, dep := range deps { lp := p.Append(dyn.Index(i)) path := dep.MustString() - if !IsLibraryLocal(path) { + if !IsLocalPath(path) || !IsLibraryLocal(path) { output = append(output, dep) continue } diff --git a/bundle/libraries/expand_glob_references_test.go b/bundle/libraries/expand_glob_references_test.go index ef190f7f09..eeec8cf7da 100644 --- a/bundle/libraries/expand_glob_references_test.go +++ b/bundle/libraries/expand_glob_references_test.go @@ -45,6 +45,12 @@ func TestGlobReferencesExpandedForTaskLibraries(t *testing.T) { { Egg: "egg/*.egg", }, + { + Jar: "/Workspace/path/to/jar/*.jar", + }, + { + Whl: "/some/full/path/to/whl/*.whl", + }, }, }, }, @@ -81,6 +87,12 @@ func TestGlobReferencesExpandedForTaskLibraries(t *testing.T) { { Egg: "egg/*.egg", }, + { + Jar: "/Workspace/path/to/jar/*.jar", + }, + { + Whl: "/some/full/path/to/whl/*.whl", + }, }, task.Libraries) } @@ -112,6 +124,7 @@ func TestGlobReferencesExpandedForEnvironmentsDeps(t *testing.T) { "./whl/*.whl", "/Workspace/path/to/whl/my.whl", "./jar/*.jar", + "/some/local/path/to/whl/*.whl", }, }, }, @@ -136,6 +149,6 @@ func TestGlobReferencesExpandedForEnvironmentsDeps(t *testing.T) { "/Workspace/path/to/whl/my.whl", filepath.Join(dir, "jar", "my1.jar"), filepath.Join(dir, "jar", "my2.jar"), + "/some/local/path/to/whl/*.whl", }, env.Spec.Dependencies) - } From 54084c4ed9f63f98584f704a0cf88ae438413f56 Mon Sep 17 00:00:00 2001 From: Andrew Nester Date: Thu, 8 Aug 2024 18:28:37 +0200 Subject: [PATCH 08/11] use dyn map by pattern --- bundle/libraries/expand_glob_references.go | 86 ++++++----- bundle/libraries/match_test.go | 4 +- bundle/libraries/upload.go | 163 ++++++++------------- 3 files changed, 111 insertions(+), 142 deletions(-) diff --git a/bundle/libraries/expand_glob_references.go b/bundle/libraries/expand_glob_references.go index 5c5b5db676..adbdcbad12 100644 --- a/bundle/libraries/expand_glob_references.go +++ b/bundle/libraries/expand_glob_references.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "path/filepath" + "strings" "github.com/databricks/cli/bundle" "github.com/databricks/cli/libs/diag" @@ -13,13 +14,14 @@ import ( type expand struct { } -func matchWarning(p dyn.Path, message string) diag.Diagnostic { +func matchWarning(p dyn.Path, l []dyn.Location, message string) diag.Diagnostic { return diag.Diagnostic{ Severity: diag.Warning, Summary: message, Paths: []dyn.Path{ p.Append(), }, + Locations: l, } } @@ -45,12 +47,22 @@ func findMatches(b *bundle.Bundle, path string) ([]string, error) { } if len(matches) == 0 { - return nil, fmt.Errorf("no matching files for %s", path) + if isGlobPattern(path) { + return nil, fmt.Errorf("no files match pattern: %s", path) + } else { + return nil, fmt.Errorf("file doesn't exist %s", path) + } } return matches, nil } +// Checks if the path is a glob pattern +// It can contain *, [] or ? characters +func isGlobPattern(path string) bool { + return strings.ContainsAny(path, "*?[") +} + func expandLibraries(b *bundle.Bundle, p dyn.Path, v dyn.Value) (diag.Diagnostics, []dyn.Value) { var output []dyn.Value var diags diag.Diagnostics @@ -68,7 +80,7 @@ func expandLibraries(b *bundle.Bundle, p dyn.Path, v dyn.Value) (diag.Diagnostic matches, err := findMatches(b, path) if err != nil { - diags = diags.Append(matchWarning(lp, err.Error())) + diags = diags.Append(matchWarning(lp, v.Locations(), err.Error())) continue } @@ -97,7 +109,7 @@ func expandEnvironmentDeps(b *bundle.Bundle, p dyn.Path, v dyn.Value) (diag.Diag matches, err := findMatches(b, path) if err != nil { - diags = diags.Append(matchWarning(lp, err.Error())) + diags = diags.Append(matchWarning(lp, v.Locations(), err.Error())) continue } @@ -114,48 +126,48 @@ type expandPattern struct { fn func(b *bundle.Bundle, p dyn.Path, v dyn.Value) (diag.Diagnostics, []dyn.Value) } -func (e *expand) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnostics { - taskLibraries := dyn.NewPattern( - dyn.Key("resources"), - dyn.Key("jobs"), - dyn.AnyKey(), - dyn.Key("tasks"), - dyn.AnyIndex(), - dyn.Key("libraries"), - ) - - forEachTaskLibraries := dyn.NewPattern( - dyn.Key("resources"), - dyn.Key("jobs"), - dyn.AnyKey(), - dyn.Key("tasks"), - dyn.AnyIndex(), - dyn.Key("for_each_task"), - dyn.Key("task"), - dyn.Key("libraries"), - ) - - envDeps := dyn.NewPattern( - dyn.Key("resources"), - dyn.Key("jobs"), - dyn.AnyKey(), - dyn.Key("environments"), - dyn.AnyIndex(), - dyn.Key("spec"), - dyn.Key("dependencies"), - ) +var taskLibrariesPattern = dyn.NewPattern( + dyn.Key("resources"), + dyn.Key("jobs"), + dyn.AnyKey(), + dyn.Key("tasks"), + dyn.AnyIndex(), + dyn.Key("libraries"), +) +var forEachTaskLibrariesPattern = dyn.NewPattern( + dyn.Key("resources"), + dyn.Key("jobs"), + dyn.AnyKey(), + dyn.Key("tasks"), + dyn.AnyIndex(), + dyn.Key("for_each_task"), + dyn.Key("task"), + dyn.Key("libraries"), +) + +var envDepsPattern = dyn.NewPattern( + dyn.Key("resources"), + dyn.Key("jobs"), + dyn.AnyKey(), + dyn.Key("environments"), + dyn.AnyIndex(), + dyn.Key("spec"), + dyn.Key("dependencies"), +) + +func (e *expand) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnostics { expanders := []expandPattern{ { - pattern: taskLibraries, + pattern: taskLibrariesPattern, fn: expandLibraries, }, { - pattern: forEachTaskLibraries, + pattern: forEachTaskLibrariesPattern, fn: expandLibraries, }, { - pattern: envDeps, + pattern: envDepsPattern, fn: expandEnvironmentDeps, }, } diff --git a/bundle/libraries/match_test.go b/bundle/libraries/match_test.go index e44e505a20..e60504c844 100644 --- a/bundle/libraries/match_test.go +++ b/bundle/libraries/match_test.go @@ -76,7 +76,7 @@ func TestValidateEnvironmentsNoFile(t *testing.T) { diags := bundle.Apply(context.Background(), b, ExpandGlobReferences()) require.Len(t, diags, 1) - require.Equal(t, "no matching files for ./wheel.whl", diags[0].Summary) + require.Equal(t, "file doesn't exist ./wheel.whl", diags[0].Summary) } func TestValidateTaskLibraries(t *testing.T) { @@ -144,5 +144,5 @@ func TestValidateTaskLibrariesNoFile(t *testing.T) { diags := bundle.Apply(context.Background(), b, ExpandGlobReferences()) require.Len(t, diags, 1) - require.Equal(t, "no matching files for ./wheel.whl", diags[0].Summary) + require.Equal(t, "file doesn't exist ./wheel.whl", diags[0].Summary) } diff --git a/bundle/libraries/upload.go b/bundle/libraries/upload.go index c13a9bf693..d0fcdc9e73 100644 --- a/bundle/libraries/upload.go +++ b/bundle/libraries/upload.go @@ -22,6 +22,11 @@ import ( "golang.org/x/sync/errgroup" ) +// The Files API backend has a rate limit of 10 concurrent +// requests and 100 QPS. We limit the number of concurrent requests to 5 to +// avoid hitting the rate limit. +var maxFilesRequestsInFlight = 5 + func Upload() bundle.Mutator { return &upload{} } @@ -41,129 +46,75 @@ type configLocation struct { location dyn.Location } -type libsLocations = map[string]([]configLocation) - // Collect all libraries from the bundle configuration and their config paths. // By this stage all glob references are expanded and we have a list of all libraries that need to be uploaded. // We collect them from task libraries, foreach task libraries environment dependencies and artifacts. // We return a map of library source to a list of config paths and locations where the library is used. // We use map so we don't upload the same library multiple times. // Instead we upload it once and update all the config paths to point to the uploaded location. -func collectLocalLibraries(b *bundle.Bundle) libsLocations { +func collectLocalLibraries(b *bundle.Bundle) (map[string][]configLocation, error) { libs := make(map[string]([]configLocation)) - for i, job := range b.Config.Resources.Jobs { - for j, task := range job.Tasks { - for k, lib := range task.Libraries { - if !IsLibraryLocal(libraryPath(&lib)) { - continue - } - - p := dyn.NewPath( - dyn.Key("resources"), - dyn.Key("jobs"), - dyn.Key(i), - dyn.Key("tasks"), - dyn.Index(j), - dyn.Key("libraries"), - dyn.Index(k), - ) - if lib.Whl != "" { - libs[lib.Whl] = append(libs[lib.Whl], configLocation{ - configPath: p.Append(dyn.Key("whl")), - location: b.Config.GetLocation(p.String()), - }) - } - - if lib.Jar != "" { - libs[lib.Jar] = append(libs[lib.Jar], configLocation{ - configPath: p.Append(dyn.Key("jar")), - location: b.Config.GetLocation(p.String()), - }) - } - } + patterns := []dyn.Pattern{ + taskLibrariesPattern.Append(dyn.AnyIndex(), dyn.Key("whl")), + taskLibrariesPattern.Append(dyn.AnyIndex(), dyn.Key("jar")), + forEachTaskLibrariesPattern.Append(dyn.AnyIndex(), dyn.Key("whl")), + forEachTaskLibrariesPattern.Append(dyn.AnyIndex(), dyn.Key("jar")), + envDepsPattern.Append(dyn.AnyIndex()), + } - if task.ForEachTask != nil { - for l, lib := range task.ForEachTask.Task.Libraries { - if !IsLibraryLocal(libraryPath(&lib)) { - continue - } - - p := dyn.NewPath( - dyn.Key("resources"), - dyn.Key("jobs"), - dyn.Key(i), - dyn.Key("tasks"), - dyn.Index(j), - dyn.Key("for_each_task"), - dyn.Key("task"), - dyn.Key("libraries"), - dyn.Index(l), - ) - - if lib.Whl != "" { - libs[lib.Whl] = append(libs[lib.Whl], configLocation{ - configPath: p.Append(dyn.Key("whl")), - location: b.Config.GetLocation(p.String()), - }) - } - - if lib.Jar != "" { - libs[lib.Jar] = append(libs[lib.Jar], configLocation{ - configPath: p.Append(dyn.Key("jar")), - location: b.Config.GetLocation(p.String()), - }) - } + for _, pattern := range patterns { + err := b.Config.Mutate(func(v dyn.Value) (dyn.Value, error) { + return dyn.MapByPattern(v, pattern, func(p dyn.Path, v dyn.Value) (dyn.Value, error) { + source := v.MustString() + if !IsLibraryLocal(source) { + return v, nil } - } - } - for j, env := range job.Environments { - if env.Spec == nil { - continue - } + libs[source] = append(libs[source], configLocation{ + configPath: p.Append(), // Hack to get the copy of path + location: v.Location(), + }) - for k, dep := range env.Spec.Dependencies { - if !IsLibraryLocal(dep) { - continue - } + return v, nil + }) + }) - p := dyn.NewPath( - dyn.Key("resources"), - dyn.Key("jobs"), - dyn.Key(i), - dyn.Key("environments"), - dyn.Index(j), - dyn.Key("spec"), - dyn.Key("dependencies"), - dyn.Index(k), - ) - - libs[dep] = append(libs[dep], configLocation{ - configPath: p, - location: b.Config.GetLocation(p.String()), - }) - } + if err != nil { + return nil, err } } - for key, artifact := range b.Config.Artifacts { - for i, file := range artifact.Files { - p := dyn.NewPath( - dyn.Key("artifacts"), - dyn.Key(key), - dyn.Key("files"), - dyn.Index(i), - ) + artifactPattern := dyn.NewPattern( + dyn.Key("artifacts"), + dyn.AnyKey(), + dyn.Key("files"), + dyn.AnyIndex(), + ) + + err := b.Config.Mutate(func(v dyn.Value) (dyn.Value, error) { + return dyn.MapByPattern(v, artifactPattern, func(p dyn.Path, v dyn.Value) (dyn.Value, error) { + file := v.MustMap() + sv, ok := file.GetByString("source") + if !ok { + return v, nil + } - libs[file.Source] = append(libs[file.Source], configLocation{ + source := sv.MustString() + libs[source] = append(libs[source], configLocation{ configPath: p.Append(dyn.Key("remote_path")), - location: b.Config.GetLocation(p.String()), + location: v.Location(), }) - } + + return v, nil + }) + }) + + if err != nil { + return nil, err } - return libs + return libs, nil } func (u *upload) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnostics { @@ -185,8 +136,14 @@ func (u *upload) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnostics { var diags diag.Diagnostics - libs := collectLocalLibraries(b) + libs, err := collectLocalLibraries(b) + if err != nil { + return diag.FromErr(err) + } + errs, errCtx := errgroup.WithContext(ctx) + errs.SetLimit(maxFilesRequestsInFlight) + for source := range libs { errs.Go(func() error { return UploadFile(errCtx, source, u.client) From 54b22a67e727f0f58c9cdf817c26bd60631eaa5e Mon Sep 17 00:00:00 2001 From: Andrew Nester Date: Tue, 13 Aug 2024 10:59:07 +0200 Subject: [PATCH 09/11] fixes for is local path --- bundle/libraries/expand_glob_references.go | 26 ++++++++---- .../libraries/expand_glob_references_test.go | 16 ++++---- bundle/libraries/local_path.go | 40 ++----------------- bundle/libraries/local_path_test.go | 32 +-------------- bundle/libraries/upload.go | 9 +++-- bundle/libraries/upload_test.go | 14 +++---- .../bundle.yml | 7 ---- bundle/tests/python_wheel_test.go | 5 +-- 8 files changed, 47 insertions(+), 102 deletions(-) diff --git a/bundle/libraries/expand_glob_references.go b/bundle/libraries/expand_glob_references.go index adbdcbad12..9e90a2a17f 100644 --- a/bundle/libraries/expand_glob_references.go +++ b/bundle/libraries/expand_glob_references.go @@ -14,9 +14,9 @@ import ( type expand struct { } -func matchWarning(p dyn.Path, l []dyn.Location, message string) diag.Diagnostic { +func matchError(p dyn.Path, l []dyn.Location, message string) diag.Diagnostic { return diag.Diagnostic{ - Severity: diag.Warning, + Severity: diag.Error, Summary: message, Paths: []dyn.Path{ p.Append(), @@ -54,6 +54,15 @@ func findMatches(b *bundle.Bundle, path string) ([]string, error) { } } + // We make the matched path relative to the root path before storing it + // to allow upload mutator to distinguish between local and remote paths + for i, match := range matches { + matches[i], err = filepath.Rel(b.RootPath, match) + if err != nil { + return nil, err + } + } + return matches, nil } @@ -71,7 +80,7 @@ func expandLibraries(b *bundle.Bundle, p dyn.Path, v dyn.Value) (diag.Diagnostic for i, lib := range libs { lp := p.Append(dyn.Index(i)) path, libType, supported := getLibDetails(lib) - if !supported || !IsLocalPath(path) { + if !supported || !IsLibraryLocal(path) { output = append(output, lib) continue } @@ -80,7 +89,7 @@ func expandLibraries(b *bundle.Bundle, p dyn.Path, v dyn.Value) (diag.Diagnostic matches, err := findMatches(b, path) if err != nil { - diags = diags.Append(matchWarning(lp, v.Locations(), err.Error())) + diags = diags.Append(matchError(lp, lib.Locations(), err.Error())) continue } @@ -102,14 +111,14 @@ func expandEnvironmentDeps(b *bundle.Bundle, p dyn.Path, v dyn.Value) (diag.Diag for i, dep := range deps { lp := p.Append(dyn.Index(i)) path := dep.MustString() - if !IsLocalPath(path) || !IsLibraryLocal(path) { + if !IsLibraryLocal(path) { output = append(output, dep) continue } matches, err := findMatches(b, path) if err != nil { - diags = diags.Append(matchWarning(lp, v.Locations(), err.Error())) + diags = diags.Append(matchError(lp, dep.Locations(), err.Error())) continue } @@ -203,7 +212,10 @@ func (e *expand) Name() string { } // ExpandGlobReferences expands any glob references in the libraries or environments section -// to corresponding local paths +// to corresponding local paths. +// We only expand local paths (i.e. paths that are relative to the root path). +// After expanding we make the paths relative to the root path to allow upload mutator later in the chain to +// distinguish between local and remote paths. func ExpandGlobReferences() bundle.Mutator { return &expand{} } diff --git a/bundle/libraries/expand_glob_references_test.go b/bundle/libraries/expand_glob_references_test.go index eeec8cf7da..8c77fe7b55 100644 --- a/bundle/libraries/expand_glob_references_test.go +++ b/bundle/libraries/expand_glob_references_test.go @@ -70,19 +70,19 @@ func TestGlobReferencesExpandedForTaskLibraries(t *testing.T) { task := job.JobSettings.Tasks[0] require.Equal(t, []compute.Library{ { - Whl: filepath.Join(dir, "whl", "my1.whl"), + Whl: filepath.Join("whl", "my1.whl"), }, { - Whl: filepath.Join(dir, "whl", "my2.whl"), + Whl: filepath.Join("whl", "my2.whl"), }, { Whl: "/Workspace/path/to/whl/my.whl", }, { - Jar: filepath.Join(dir, "jar", "my1.jar"), + Jar: filepath.Join("jar", "my1.jar"), }, { - Jar: filepath.Join(dir, "jar", "my2.jar"), + Jar: filepath.Join("jar", "my2.jar"), }, { Egg: "egg/*.egg", @@ -144,11 +144,11 @@ func TestGlobReferencesExpandedForEnvironmentsDeps(t *testing.T) { job := b.Config.Resources.Jobs["job"] env := job.JobSettings.Environments[0] require.Equal(t, []string{ - filepath.Join(dir, "whl", "my1.whl"), - filepath.Join(dir, "whl", "my2.whl"), + filepath.Join("whl", "my1.whl"), + filepath.Join("whl", "my2.whl"), "/Workspace/path/to/whl/my.whl", - filepath.Join(dir, "jar", "my1.jar"), - filepath.Join(dir, "jar", "my2.jar"), + filepath.Join("jar", "my1.jar"), + filepath.Join("jar", "my2.jar"), "/some/local/path/to/whl/*.whl", }, env.Spec.Dependencies) } diff --git a/bundle/libraries/local_path.go b/bundle/libraries/local_path.go index 44ea97bb14..5b5ec6c076 100644 --- a/bundle/libraries/local_path.go +++ b/bundle/libraries/local_path.go @@ -27,41 +27,13 @@ func IsLocalPath(p string) bool { return true } - if IsRemotePath(p) { - return false - } - - // If the path is absolute, it's a remote path. - return !path.IsAbs(p) -} - -func IsRemotePath(p string) bool { + // If the path has another scheme, it's a remote path. if isRemoteStorageScheme(p) { - return true - } - - // if the path is not absolute, it's not a remote path - if !path.IsAbs(p) { return false } - // If the path is absolute, it's might be a remote path. - possiblePrefixes := []string{ - "/Workspace", - "/Users", - "/Volumes", - "/Shared", - "/mnt", - "/dbfs", - } - - for _, prefix := range possiblePrefixes { - if strings.HasPrefix(p, prefix) { - return true - } - } - - return false + // If path starts with /, it's a remote absolute path + return !path.IsAbs(p) } // IsLibraryLocal returns true if the specified library or environment dependency @@ -90,11 +62,7 @@ func IsLibraryLocal(dep string) bool { return false } - if IsRemotePath(dep) { - return false - } - - return true + return IsLocalPath(dep) } func isPackage(name string) bool { diff --git a/bundle/libraries/local_path_test.go b/bundle/libraries/local_path_test.go index 0ad5c93dbd..1249452abf 100644 --- a/bundle/libraries/local_path_test.go +++ b/bundle/libraries/local_path_test.go @@ -13,13 +13,12 @@ func TestIsLocalPath(t *testing.T) { assert.True(t, IsLocalPath("./some/local/path")) assert.True(t, IsLocalPath("file://path/to/package")) assert.True(t, IsLocalPath("C:\\path\\to\\package")) - assert.True(t, IsLocalPath("myfile.txt")) assert.True(t, IsLocalPath("./myfile.txt")) assert.True(t, IsLocalPath("../myfile.txt")) assert.True(t, IsLocalPath("file:///foo/bar/myfile.txt")) - // Remote paths. + // Absolute paths. assert.False(t, IsLocalPath("/some/full/path")) assert.False(t, IsLocalPath("/Workspace/path/to/package")) assert.False(t, IsLocalPath("/Users/path/to/package")) @@ -31,34 +30,7 @@ func TestIsLocalPath(t *testing.T) { assert.False(t, IsLocalPath("abfss://path/to/package")) } -func TestIsRemotePath(t *testing.T) { - // Paths with schemes. - assert.True(t, IsRemotePath("dbfs://path/to/package")) - assert.True(t, IsRemotePath("dbfs:/path/to/package")) - assert.True(t, IsRemotePath("s3://path/to/package")) - assert.True(t, IsRemotePath("abfss://path/to/package")) - - // Remote paths. - assert.True(t, IsRemotePath("/Workspace/path/to/package")) - assert.True(t, IsRemotePath("/Users/path/to/package")) - - // Relative paths, paths with the file scheme, and Windows paths. - assert.False(t, IsRemotePath("some/local/path")) - assert.False(t, IsRemotePath("./some/local/path")) - assert.False(t, IsRemotePath("file://path/to/package")) - - assert.False(t, IsRemotePath("myfile.txt")) - assert.False(t, IsRemotePath("./myfile.txt")) - assert.False(t, IsRemotePath("../myfile.txt")) - - // Local absolute paths. - assert.False(t, IsRemotePath("/some/full/path")) - assert.False(t, IsRemotePath("file:///foo/bar/myfile.txt")) - assert.False(t, IsRemotePath("C:\\path\\to\\package")) - -} - -func TestIsEnvironmentDependencyLocal(t *testing.T) { +func TestIsLibraryLocal(t *testing.T) { testCases := [](struct { path string expected bool diff --git a/bundle/libraries/upload.go b/bundle/libraries/upload.go index d0fcdc9e73..4d509217f7 100644 --- a/bundle/libraries/upload.go +++ b/bundle/libraries/upload.go @@ -1,7 +1,6 @@ package libraries import ( - "bytes" "context" "errors" "fmt" @@ -71,6 +70,7 @@ func collectLocalLibraries(b *bundle.Bundle) (map[string][]configLocation, error return v, nil } + source = filepath.Join(b.RootPath, source) libs[source] = append(libs[source], configLocation{ configPath: p.Append(), // Hack to get the copy of path location: v.Location(), @@ -201,12 +201,13 @@ func UploadFile(ctx context.Context, file string, client filer.Filer) error { filename := filepath.Base(file) cmdio.LogString(ctx, fmt.Sprintf("Uploading %s...", filename)) - raw, err := os.ReadFile(file) + f, err := os.Open(file) if err != nil { - return fmt.Errorf("unable to read %s: %w", file, errors.Unwrap(err)) + return fmt.Errorf("unable to open %s: %w", file, errors.Unwrap(err)) } + defer f.Close() - err = client.Write(ctx, filename, bytes.NewReader(raw), filer.OverwriteIfExists, filer.CreateParentDirectories) + err = client.Write(ctx, filename, f, filer.OverwriteIfExists, filer.CreateParentDirectories) if err != nil { return fmt.Errorf("unable to import %s: %w", filename, err) } diff --git a/bundle/libraries/upload_test.go b/bundle/libraries/upload_test.go index de4f53012c..82fe6e7c7c 100644 --- a/bundle/libraries/upload_test.go +++ b/bundle/libraries/upload_test.go @@ -88,7 +88,7 @@ func TestArtifactUploadForWorkspace(t *testing.T) { mockFiler.EXPECT().Write( mock.Anything, filepath.Join("source.whl"), - mock.AnythingOfType("*bytes.Reader"), + mock.AnythingOfType("*os.File"), filer.OverwriteIfExists, filer.CreateParentDirectories, ).Return(nil) @@ -176,7 +176,7 @@ func TestArtifactUploadForVolumes(t *testing.T) { mockFiler.EXPECT().Write( mock.Anything, filepath.Join("source.whl"), - mock.AnythingOfType("*bytes.Reader"), + mock.AnythingOfType("*os.File"), filer.OverwriteIfExists, filer.CreateParentDirectories, ).Return(nil) @@ -220,7 +220,7 @@ func TestArtifactUploadWithNoLibraryReference(t *testing.T) { mockFiler.EXPECT().Write( mock.Anything, filepath.Join("source.whl"), - mock.AnythingOfType("*bytes.Reader"), + mock.AnythingOfType("*os.File"), filer.OverwriteIfExists, filer.CreateParentDirectories, ).Return(nil) @@ -282,7 +282,7 @@ func TestUploadMultipleLibraries(t *testing.T) { mockFiler.EXPECT().Write( mock.Anything, filepath.Join("source1.whl"), - mock.AnythingOfType("*bytes.Reader"), + mock.AnythingOfType("*os.File"), filer.OverwriteIfExists, filer.CreateParentDirectories, ).Return(nil).Once() @@ -290,7 +290,7 @@ func TestUploadMultipleLibraries(t *testing.T) { mockFiler.EXPECT().Write( mock.Anything, filepath.Join("source2.whl"), - mock.AnythingOfType("*bytes.Reader"), + mock.AnythingOfType("*os.File"), filer.OverwriteIfExists, filer.CreateParentDirectories, ).Return(nil).Once() @@ -298,7 +298,7 @@ func TestUploadMultipleLibraries(t *testing.T) { mockFiler.EXPECT().Write( mock.Anything, filepath.Join("source3.whl"), - mock.AnythingOfType("*bytes.Reader"), + mock.AnythingOfType("*os.File"), filer.OverwriteIfExists, filer.CreateParentDirectories, ).Return(nil).Once() @@ -306,7 +306,7 @@ func TestUploadMultipleLibraries(t *testing.T) { mockFiler.EXPECT().Write( mock.Anything, filepath.Join("source4.whl"), - mock.AnythingOfType("*bytes.Reader"), + mock.AnythingOfType("*os.File"), filer.OverwriteIfExists, filer.CreateParentDirectories, ).Return(nil).Once() diff --git a/bundle/tests/python_wheel/python_wheel_no_artifact_no_setup/bundle.yml b/bundle/tests/python_wheel/python_wheel_no_artifact_no_setup/bundle.yml index 1bac4ebadf..492861969d 100644 --- a/bundle/tests/python_wheel/python_wheel_no_artifact_no_setup/bundle.yml +++ b/bundle/tests/python_wheel/python_wheel_no_artifact_no_setup/bundle.yml @@ -13,10 +13,3 @@ resources: entry_point: "run" libraries: - whl: ./package/*.whl - - task_key: TestTask2 - existing_cluster_id: "0717-aaaaa-bbbbbb" - python_wheel_task: - package_name: "my_test_code" - entry_point: "run" - libraries: - - whl: ./non-existing/*.whl diff --git a/bundle/tests/python_wheel_test.go b/bundle/tests/python_wheel_test.go index 0b3c7b306d..c4d85703cc 100644 --- a/bundle/tests/python_wheel_test.go +++ b/bundle/tests/python_wheel_test.go @@ -89,7 +89,7 @@ func TestPythonWheelBuildNoBuildJustUpload(t *testing.T) { mockFiler.EXPECT().Write( mock.Anything, filepath.Join("my_test_code-0.0.1-py3-none-any.whl"), - mock.AnythingOfType("*bytes.Reader"), + mock.AnythingOfType("*os.File"), filer.OverwriteIfExists, filer.CreateParentDirectories, ).Return(nil) @@ -97,8 +97,7 @@ func TestPythonWheelBuildNoBuildJustUpload(t *testing.T) { u := libraries.UploadWithClient(mockFiler) diags := bundle.Apply(ctx, b, bundle.Seq(phases.Load(), phases.Build(), libraries.ExpandGlobReferences(), u)) require.NoError(t, diags.Error()) - - require.Len(t, diags, 1) + require.Empty(t, diags) require.Equal(t, "/Workspace/foo/bar/.internal/my_test_code-0.0.1-py3-none-any.whl", b.Config.Resources.Jobs["test_job"].JobSettings.Tasks[0].Libraries[0].Whl) } From f31cce1848f2d94aea46c560ab8c41d7804edae3 Mon Sep 17 00:00:00 2001 From: Andrew Nester Date: Tue, 13 Aug 2024 12:58:14 +0200 Subject: [PATCH 10/11] Update bundle/libraries/upload.go Co-authored-by: Pieter Noordhuis --- bundle/libraries/upload.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bundle/libraries/upload.go b/bundle/libraries/upload.go index 4d509217f7..9abd0419e0 100644 --- a/bundle/libraries/upload.go +++ b/bundle/libraries/upload.go @@ -47,7 +47,7 @@ type configLocation struct { // Collect all libraries from the bundle configuration and their config paths. // By this stage all glob references are expanded and we have a list of all libraries that need to be uploaded. -// We collect them from task libraries, foreach task libraries environment dependencies and artifacts. +// We collect them from task libraries, foreach task libraries, environment dependencies, and artifacts. // We return a map of library source to a list of config paths and locations where the library is used. // We use map so we don't upload the same library multiple times. // Instead we upload it once and update all the config paths to point to the uploaded location. From 27f4e06734b73ae1db5ad7a3da7e38603f211d7b Mon Sep 17 00:00:00 2001 From: Andrew Nester Date: Tue, 13 Aug 2024 13:08:52 +0200 Subject: [PATCH 11/11] fixes --- .../libraries/expand_glob_references_test.go | 85 +++++++++++++++++++ bundle/libraries/local_path_test.go | 3 + bundle/libraries/upload.go | 18 +++- 3 files changed, 103 insertions(+), 3 deletions(-) diff --git a/bundle/libraries/expand_glob_references_test.go b/bundle/libraries/expand_glob_references_test.go index 8c77fe7b55..34855b539d 100644 --- a/bundle/libraries/expand_glob_references_test.go +++ b/bundle/libraries/expand_glob_references_test.go @@ -96,6 +96,91 @@ func TestGlobReferencesExpandedForTaskLibraries(t *testing.T) { }, task.Libraries) } +func TestGlobReferencesExpandedForForeachTaskLibraries(t *testing.T) { + dir := t.TempDir() + testutil.Touch(t, dir, "whl", "my1.whl") + testutil.Touch(t, dir, "whl", "my2.whl") + testutil.Touch(t, dir, "jar", "my1.jar") + testutil.Touch(t, dir, "jar", "my2.jar") + + b := &bundle.Bundle{ + RootPath: dir, + Config: config.Root{ + Resources: config.Resources{ + Jobs: map[string]*resources.Job{ + "job": { + JobSettings: &jobs.JobSettings{ + Tasks: []jobs.Task{ + { + TaskKey: "task", + ForEachTask: &jobs.ForEachTask{ + Task: jobs.Task{ + Libraries: []compute.Library{ + { + Whl: "whl/*.whl", + }, + { + Whl: "/Workspace/path/to/whl/my.whl", + }, + { + Jar: "./jar/*.jar", + }, + { + Egg: "egg/*.egg", + }, + { + Jar: "/Workspace/path/to/jar/*.jar", + }, + { + Whl: "/some/full/path/to/whl/*.whl", + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + } + + bundletest.SetLocation(b, ".", filepath.Join(dir, "resource.yml")) + + diags := bundle.Apply(context.Background(), b, ExpandGlobReferences()) + require.Empty(t, diags) + + job := b.Config.Resources.Jobs["job"] + task := job.JobSettings.Tasks[0].ForEachTask.Task + require.Equal(t, []compute.Library{ + { + Whl: filepath.Join("whl", "my1.whl"), + }, + { + Whl: filepath.Join("whl", "my2.whl"), + }, + { + Whl: "/Workspace/path/to/whl/my.whl", + }, + { + Jar: filepath.Join("jar", "my1.jar"), + }, + { + Jar: filepath.Join("jar", "my2.jar"), + }, + { + Egg: "egg/*.egg", + }, + { + Jar: "/Workspace/path/to/jar/*.jar", + }, + { + Whl: "/some/full/path/to/whl/*.whl", + }, + }, task.Libraries) +} + func TestGlobReferencesExpandedForEnvironmentsDeps(t *testing.T) { dir := t.TempDir() testutil.Touch(t, dir, "whl", "my1.whl") diff --git a/bundle/libraries/local_path_test.go b/bundle/libraries/local_path_test.go index 1249452abf..be4028d522 100644 --- a/bundle/libraries/local_path_test.go +++ b/bundle/libraries/local_path_test.go @@ -47,10 +47,13 @@ func TestIsLibraryLocal(t *testing.T) { {path: ".\\..\\local\\*.whl", expected: true}, {path: "../../local/*.whl", expected: true}, {path: "..\\..\\local\\*.whl", expected: true}, + {path: "file://path/to/package/whl.whl", expected: true}, {path: "pypipackage", expected: false}, {path: "/Volumes/catalog/schema/volume/path.whl", expected: false}, {path: "/Workspace/my_project/dist.whl", expected: false}, {path: "-r /Workspace/my_project/requirements.txt", expected: false}, + {path: "s3://mybucket/path/to/package", expected: false}, + {path: "dbfs:/mnt/path/to/package", expected: false}, } for i, tc := range testCases { diff --git a/bundle/libraries/upload.go b/bundle/libraries/upload.go index 9abd0419e0..be7cc41db5 100644 --- a/bundle/libraries/upload.go +++ b/bundle/libraries/upload.go @@ -65,7 +65,11 @@ func collectLocalLibraries(b *bundle.Bundle) (map[string][]configLocation, error for _, pattern := range patterns { err := b.Config.Mutate(func(v dyn.Value) (dyn.Value, error) { return dyn.MapByPattern(v, pattern, func(p dyn.Path, v dyn.Value) (dyn.Value, error) { - source := v.MustString() + source, ok := v.AsString() + if !ok { + return v, fmt.Errorf("expected string, got %s", v.Kind()) + } + if !IsLibraryLocal(source) { return v, nil } @@ -94,13 +98,21 @@ func collectLocalLibraries(b *bundle.Bundle) (map[string][]configLocation, error err := b.Config.Mutate(func(v dyn.Value) (dyn.Value, error) { return dyn.MapByPattern(v, artifactPattern, func(p dyn.Path, v dyn.Value) (dyn.Value, error) { - file := v.MustMap() + file, ok := v.AsMap() + if !ok { + return v, fmt.Errorf("expected map, got %s", v.Kind()) + } + sv, ok := file.GetByString("source") if !ok { return v, nil } - source := sv.MustString() + source, ok := sv.AsString() + if !ok { + return v, fmt.Errorf("expected string, got %s", v.Kind()) + } + libs[source] = append(libs[source], configLocation{ configPath: p.Append(dyn.Key("remote_path")), location: v.Location(),