From 9f4bf1261b6327d3327bb7813ad17e1ba67080c3 Mon Sep 17 00:00:00 2001 From: Shreyas Goenka Date: Thu, 1 Jun 2023 11:23:31 +0200 Subject: [PATCH 01/26] Add workspace import_dir command to the CLI --- cmd/workspace/workspace/import_dir.go | 84 ++++++++++++++++++++++++ cmd/workspace/workspace/import_events.go | 67 +++++++++++++++++++ libs/cmdio/render.go | 4 ++ libs/sync/event.go | 13 ++++ libs/sync/repofiles/repofiles.go | 17 +++-- libs/sync/repofiles/repofiles_test.go | 42 ++++++------ libs/sync/sync.go | 17 ++++- 7 files changed, 216 insertions(+), 28 deletions(-) create mode 100644 cmd/workspace/workspace/import_dir.go create mode 100644 cmd/workspace/workspace/import_events.go diff --git a/cmd/workspace/workspace/import_dir.go b/cmd/workspace/workspace/import_dir.go new file mode 100644 index 0000000000..385deca4ac --- /dev/null +++ b/cmd/workspace/workspace/import_dir.go @@ -0,0 +1,84 @@ +package workspace + +import ( + "github.com/databricks/cli/cmd/root" + "github.com/databricks/cli/libs/cmdio" + "github.com/databricks/cli/libs/sync" + "github.com/databricks/databricks-sdk-go" + "github.com/spf13/cobra" + "golang.org/x/sync/errgroup" +) + +// TODO: add some comments here +var importDirCmd = &cobra.Command{ + Use: "import_dir SOURCE_PATH TARGET_PATH", + Short: `Recursively imports a directory from local to the Databricks workspace.`, + Long: ` + Imports directory to the workspace. + + This command respects your git ignore configuration. Notebooks with extensions + .scala, .py, .sql, .r, .R, .ipynb are stripped of their extensions. +`, + + Annotations: map[string]string{}, + PreRunE: root.MustWorkspaceClient, + Args: cobra.ExactArgs(2), + RunE: func(cmd *cobra.Command, args []string) (err error) { + ctx := cmd.Context() + sourcePath := args[0] + targetPath := args[1] + + // Initialize syncer to do a full sync with the correct from source to target. + // This will upload the local files + opts := sync.SyncOptions{ + LocalPath: sourcePath, + RemotePath: targetPath, + Full: true, + WorkspaceClient: databricks.Must(databricks.NewWorkspaceClient()), + DisallowOverwrites: !importDirOverwrite, + } + s, err := sync.New(ctx, opts) + if err != nil { + return err + } + + // Initialize error wait group, and spawn the progress event emitter inside + // the error wait group + group, ctx := errgroup.WithContext(ctx) + group.Go( + func() error { + return renderSyncEvents(ctx, s.Events(), s) + }) + + // Start Uploading local files + cmdio.Render(ctx, newImportStartedEvent(sourcePath, targetPath)) + err = s.RunOnce(ctx) + if err != nil { + return err + } + // Upload completed, close the syncer + s.Close() + + // Wait for any inflight progress events to be emitted + if err := group.Wait(); err != nil { + return err + } + + // Render import completetion event + cmdio.Render(ctx, newImportCompleteEvent(sourcePath, targetPath)) + return nil + }, +} + +var importDirOverwrite bool + +func init() { + importDirCmd.Annotations["template"] = cmdio.Heredoc(` + {{if eq .Type "IMPORT_STARTED"}}Import started + {{else if eq .Type "UPLOAD_COMPLETE"}}Uploaded {{.SourcePath}} -> {{.TargetPath}} + {{else if eq .Type "IMPORT_COMPLETE"}}Import completed + {{end}} + `) + importDirCmd.Flags().BoolVar(&importDirOverwrite, "overwrite", false, "Overwrite if file already exists in the workspace") + Cmd.AddCommand(importDirCmd) +} diff --git a/cmd/workspace/workspace/import_events.go b/cmd/workspace/workspace/import_events.go new file mode 100644 index 0000000000..7627691c86 --- /dev/null +++ b/cmd/workspace/workspace/import_events.go @@ -0,0 +1,67 @@ +package workspace + +import ( + "context" + + "github.com/databricks/cli/libs/cmdio" + "github.com/databricks/cli/libs/sync" +) + +type fileIOEvent struct { + SourcePath string `json:"source_path,omitempty"` + TargetPath string `json:"target_path,omitempty"` + Type string `json:"type"` +} + +func newImportStartedEvent(sourcePath, targetPath string) fileIOEvent { + return fileIOEvent{ + SourcePath: sourcePath, + TargetPath: targetPath, + Type: "IMPORT_STARTED", + } +} + +func newImportCompleteEvent(sourcePath, targetPath string) fileIOEvent { + return fileIOEvent{ + Type: "IMPORT_COMPLETE", + } +} + +func newUploadCompleteEvent(sourcePath, targetPath string) fileIOEvent { + return fileIOEvent{ + SourcePath: sourcePath, + TargetPath: targetPath, + Type: "UPLOAD_COMPLETE", + } +} + +func renderSyncEvents(ctx context.Context, eventChannel <-chan sync.Event, syncer *sync.Sync) error { + for { + select { + case <-ctx.Done(): + return nil + case e, ok := <-eventChannel: + if !ok { + return nil + } + + // We parse progress events from the sync to track when file uploads + // are complete and emit the corresponding events + if e.String() != "" && e.Type() == sync.EventTypeProgress { + progressEvent := e.(*sync.EventSyncProgress) + if progressEvent.Progress < 1 { + return nil + } + // TODO: test this works with windows paths + remotePath, err := syncer.RemotePath(progressEvent.Path) + if err != nil { + return err + } + err = cmdio.Render(ctx, newUploadCompleteEvent(progressEvent.Path, remotePath)) + if err != nil { + return err + } + } + } + } +} diff --git a/libs/cmdio/render.go b/libs/cmdio/render.go index 8aff2b8d2c..c5abe2cbe8 100644 --- a/libs/cmdio/render.go +++ b/libs/cmdio/render.go @@ -49,6 +49,10 @@ func renderJson(w io.Writer, v any) error { return err } _, err = w.Write(pretty) + if err != nil { + return err + } + _, err = w.Write([]byte("\n")) return err } diff --git a/libs/sync/event.go b/libs/sync/event.go index 8e5c0efa2e..dcba223275 100644 --- a/libs/sync/event.go +++ b/libs/sync/event.go @@ -24,6 +24,7 @@ const ( type Event interface { fmt.Stringer + Type() EventType } type EventBase struct { @@ -73,6 +74,10 @@ func (e *EventStart) String() string { return fmt.Sprintf("Action: %s", e.EventChanges.String()) } +func (e *EventStart) Type() EventType { + return EventTypeStart +} + func newEventStart(seq int, put []string, delete []string) Event { return &EventStart{ EventBase: newEventBase(seq, EventTypeStart), @@ -106,6 +111,10 @@ func (e *EventSyncProgress) String() string { } } +func (e *EventSyncProgress) Type() EventType { + return EventTypeProgress +} + func newEventProgress(seq int, action EventAction, path string, progress float32) Event { return &EventSyncProgress{ EventBase: newEventBase(seq, EventTypeProgress), @@ -133,6 +142,10 @@ func (e *EventSyncComplete) String() string { return "Complete" } +func (e *EventSyncComplete) Type() EventType { + return EventTypeComplete +} + func newEventComplete(seq int, put []string, delete []string) Event { return &EventSyncComplete{ EventBase: newEventBase(seq, EventTypeComplete), diff --git a/libs/sync/repofiles/repofiles.go b/libs/sync/repofiles/repofiles.go index 8fcabc113e..9771021732 100644 --- a/libs/sync/repofiles/repofiles.go +++ b/libs/sync/repofiles/repofiles.go @@ -17,23 +17,30 @@ import ( "github.com/databricks/databricks-sdk-go/service/workspace" ) +type RepoFileOptions struct { + OverwriteIfExists bool +} + // RepoFiles wraps reading and writing into a remote repo with safeguards to prevent // accidental deletion of repos and more robust methods to overwrite workspace files type RepoFiles struct { + *RepoFileOptions + repoRoot string localRoot string workspaceClient *databricks.WorkspaceClient } -func Create(repoRoot, localRoot string, workspaceClient *databricks.WorkspaceClient) *RepoFiles { +func Create(repoRoot, localRoot string, workspaceClient *databricks.WorkspaceClient, opts *RepoFileOptions) *RepoFiles { return &RepoFiles{ repoRoot: repoRoot, localRoot: localRoot, workspaceClient: workspaceClient, + RepoFileOptions: opts, } } -func (r *RepoFiles) remotePath(relativePath string) (string, error) { +func (r *RepoFiles) RemotePath(relativePath string) (string, error) { fullPath := path.Join(r.repoRoot, relativePath) cleanFullPath := path.Clean(fullPath) if !strings.HasPrefix(cleanFullPath, r.repoRoot) { @@ -58,12 +65,12 @@ func (r *RepoFiles) writeRemote(ctx context.Context, relativePath string, conten if err != nil { return err } - remotePath, err := r.remotePath(relativePath) + remotePath, err := r.RemotePath(relativePath) if err != nil { return err } escapedPath := url.PathEscape(strings.TrimLeft(remotePath, "/")) - apiPath := fmt.Sprintf("/api/2.0/workspace-files/import-file/%s?overwrite=true", escapedPath) + apiPath := fmt.Sprintf("/api/2.0/workspace-files/import-file/%s?overwrite=%t", escapedPath, r.OverwriteIfExists) err = apiClient.Do(ctx, http.MethodPost, apiPath, content, nil) @@ -113,7 +120,7 @@ func (r *RepoFiles) writeRemote(ctx context.Context, relativePath string, conten } func (r *RepoFiles) deleteRemote(ctx context.Context, relativePath string) error { - remotePath, err := r.remotePath(relativePath) + remotePath, err := r.RemotePath(relativePath) if err != nil { return err } diff --git a/libs/sync/repofiles/repofiles_test.go b/libs/sync/repofiles/repofiles_test.go index 2a881d90d0..e71f26abf2 100644 --- a/libs/sync/repofiles/repofiles_test.go +++ b/libs/sync/repofiles/repofiles_test.go @@ -10,68 +10,68 @@ import ( func TestRepoFilesRemotePath(t *testing.T) { repoRoot := "/Repos/doraemon/bar" - repoFiles := Create(repoRoot, "/doraemon/foo/bar", nil) + repoFiles := Create(repoRoot, "/doraemon/foo/bar", nil, nil) - remotePath, err := repoFiles.remotePath("a/b/c") + remotePath, err := repoFiles.RemotePath("a/b/c") assert.NoError(t, err) assert.Equal(t, repoRoot+"/a/b/c", remotePath) - remotePath, err = repoFiles.remotePath("a/b/../d") + remotePath, err = repoFiles.RemotePath("a/b/../d") assert.NoError(t, err) assert.Equal(t, repoRoot+"/a/d", remotePath) - remotePath, err = repoFiles.remotePath("a/../c") + remotePath, err = repoFiles.RemotePath("a/../c") assert.NoError(t, err) assert.Equal(t, repoRoot+"/c", remotePath) - remotePath, err = repoFiles.remotePath("a/b/c/.") + remotePath, err = repoFiles.RemotePath("a/b/c/.") assert.NoError(t, err) assert.Equal(t, repoRoot+"/a/b/c", remotePath) - remotePath, err = repoFiles.remotePath("a/b/c/d/./../../f/g") + remotePath, err = repoFiles.RemotePath("a/b/c/d/./../../f/g") assert.NoError(t, err) assert.Equal(t, repoRoot+"/a/b/f/g", remotePath) - _, err = repoFiles.remotePath("..") + _, err = repoFiles.RemotePath("..") assert.ErrorContains(t, err, `relative file path is not inside repo root: ..`) - _, err = repoFiles.remotePath("a/../..") + _, err = repoFiles.RemotePath("a/../..") assert.ErrorContains(t, err, `relative file path is not inside repo root: a/../..`) - _, err = repoFiles.remotePath("./../.") + _, err = repoFiles.RemotePath("./../.") assert.ErrorContains(t, err, `relative file path is not inside repo root: ./../.`) - _, err = repoFiles.remotePath("/./.././..") + _, err = repoFiles.RemotePath("/./.././..") assert.ErrorContains(t, err, `relative file path is not inside repo root: /./.././..`) - _, err = repoFiles.remotePath("./../.") + _, err = repoFiles.RemotePath("./../.") assert.ErrorContains(t, err, `relative file path is not inside repo root: ./../.`) - _, err = repoFiles.remotePath("./..") + _, err = repoFiles.RemotePath("./..") assert.ErrorContains(t, err, `relative file path is not inside repo root: ./..`) - _, err = repoFiles.remotePath("./../../..") + _, err = repoFiles.RemotePath("./../../..") assert.ErrorContains(t, err, `relative file path is not inside repo root: ./../../..`) - _, err = repoFiles.remotePath("./../a/./b../../..") + _, err = repoFiles.RemotePath("./../a/./b../../..") assert.ErrorContains(t, err, `relative file path is not inside repo root: ./../a/./b../../..`) - _, err = repoFiles.remotePath("../..") + _, err = repoFiles.RemotePath("../..") assert.ErrorContains(t, err, `relative file path is not inside repo root: ../..`) - _, err = repoFiles.remotePath(".//a/..//./b/..") + _, err = repoFiles.RemotePath(".//a/..//./b/..") assert.ErrorContains(t, err, `file path relative to repo root cannot be empty`) - _, err = repoFiles.remotePath("a/b/../..") + _, err = repoFiles.RemotePath("a/b/../..") assert.ErrorContains(t, err, "file path relative to repo root cannot be empty") - _, err = repoFiles.remotePath("") + _, err = repoFiles.RemotePath("") assert.ErrorContains(t, err, "file path relative to repo root cannot be empty") - _, err = repoFiles.remotePath(".") + _, err = repoFiles.RemotePath(".") assert.ErrorContains(t, err, "file path relative to repo root cannot be empty") - _, err = repoFiles.remotePath("/") + _, err = repoFiles.RemotePath("/") assert.ErrorContains(t, err, "file path relative to repo root cannot be empty") } @@ -81,7 +81,7 @@ func TestRepoReadLocal(t *testing.T) { err := os.WriteFile(helloPath, []byte("my name is doraemon :P"), os.ModePerm) assert.NoError(t, err) - repoFiles := Create("/Repos/doraemon/bar", tempDir, nil) + repoFiles := Create("/Repos/doraemon/bar", tempDir, nil, nil) bytes, err := repoFiles.readLocal("./a/../hello.txt") assert.NoError(t, err) assert.Equal(t, "my name is doraemon :P", string(bytes)) diff --git a/libs/sync/sync.go b/libs/sync/sync.go index 54d0624e77..e19340fbac 100644 --- a/libs/sync/sync.go +++ b/libs/sync/sync.go @@ -24,6 +24,10 @@ type SyncOptions struct { WorkspaceClient *databricks.WorkspaceClient Host string + + // If set, sync will not be able to overwrite any existing paths on the + // workspace file system. + DisallowOverwrites bool } type Sync struct { @@ -76,8 +80,9 @@ func New(ctx context.Context, opts SyncOptions) (*Sync, error) { return nil, fmt.Errorf("unable to load sync snapshot: %w", err) } } - - repoFiles := repofiles.Create(opts.RemotePath, opts.LocalPath, opts.WorkspaceClient) + repoFiles := repofiles.Create(opts.RemotePath, opts.LocalPath, opts.WorkspaceClient, &repofiles.RepoFileOptions{ + OverwriteIfExists: !opts.DisallowOverwrites, + }) return &Sync{ SyncOptions: &opts, @@ -125,6 +130,14 @@ func (s *Sync) notifyComplete(ctx context.Context, d diff) { s.seq++ } +func (s *Sync) RemotePath(localPath string) (string, error) { + relativePath, ok := s.snapshot.LocalToRemoteNames[localPath] + if !ok { + return "", fmt.Errorf("could not find remote path for %s", localPath) + } + return s.repoFiles.RemotePath(relativePath) +} + func (s *Sync) RunOnce(ctx context.Context) error { // tradeoff: doing portable monitoring only due to macOS max descriptor manual ulimit setting requirement // https://github.com/gorakhargosh/watchdog/blob/master/src/watchdog/observers/kqueue.py#L394-L418 From 2303f20f95235190599a20a7e31938cdcfd29c1b Mon Sep 17 00:00:00 2001 From: Shreyas Goenka Date: Fri, 2 Jun 2023 00:04:18 +0200 Subject: [PATCH 02/26] sync options redefine better --- bundle/deploy/files/sync.go | 3 +++ cmd/bundle/sync.go | 3 +++ cmd/workspace/workspace/import_dir.go | 12 +++++++----- 3 files changed, 13 insertions(+), 5 deletions(-) diff --git a/bundle/deploy/files/sync.go b/bundle/deploy/files/sync.go index 77c64e529d..6f792a1bc4 100644 --- a/bundle/deploy/files/sync.go +++ b/bundle/deploy/files/sync.go @@ -21,6 +21,9 @@ func getSync(ctx context.Context, b *bundle.Bundle) (*sync.Sync, error) { SnapshotBasePath: cacheDir, WorkspaceClient: b.WorkspaceClient(), + + PersistSnapshot: true, + AllowOverwrites: true, } return sync.New(ctx, opts) } diff --git a/cmd/bundle/sync.go b/cmd/bundle/sync.go index 19adc2dd6a..c0687daeca 100644 --- a/cmd/bundle/sync.go +++ b/cmd/bundle/sync.go @@ -25,6 +25,9 @@ func syncOptionsFromBundle(cmd *cobra.Command, b *bundle.Bundle) (*sync.SyncOpti SnapshotBasePath: cacheDir, WorkspaceClient: b.WorkspaceClient(), + + PersistSnapshot: true, + AllowOverwrites: true, } return &opts, nil } diff --git a/cmd/workspace/workspace/import_dir.go b/cmd/workspace/workspace/import_dir.go index 385deca4ac..3d4657229f 100644 --- a/cmd/workspace/workspace/import_dir.go +++ b/cmd/workspace/workspace/import_dir.go @@ -31,11 +31,13 @@ var importDirCmd = &cobra.Command{ // Initialize syncer to do a full sync with the correct from source to target. // This will upload the local files opts := sync.SyncOptions{ - LocalPath: sourcePath, - RemotePath: targetPath, - Full: true, - WorkspaceClient: databricks.Must(databricks.NewWorkspaceClient()), - DisallowOverwrites: !importDirOverwrite, + LocalPath: sourcePath, + RemotePath: targetPath, + Full: true, + WorkspaceClient: databricks.Must(databricks.NewWorkspaceClient()), + + AllowOverwrites: importDirOverwrite, + PersistSnapshot: false, } s, err := sync.New(ctx, opts) if err != nil { From 0382c749c532df6058b045184d4eb962ae3b9784 Mon Sep 17 00:00:00 2001 From: Shreyas Goenka Date: Fri, 2 Jun 2023 00:04:42 +0200 Subject: [PATCH 03/26] - --- cmd/sync/sync.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/cmd/sync/sync.go b/cmd/sync/sync.go index d13a85d033..510f859b9e 100644 --- a/cmd/sync/sync.go +++ b/cmd/sync/sync.go @@ -56,6 +56,9 @@ func syncOptionsFromArgs(cmd *cobra.Command, args []string) (*sync.SyncOptions, // exist and add it to the `.gitignore` file in the root. SnapshotBasePath: filepath.Join(args[0], ".databricks"), WorkspaceClient: databricks.Must(databricks.NewWorkspaceClient()), + + PersistSnapshot: true, + AllowOverwrites: true, } return &opts, nil } From 289dd0db32b70c2e7f7a258c9df7653a681f602c Mon Sep 17 00:00:00 2001 From: Shreyas Goenka Date: Fri, 2 Jun 2023 00:05:17 +0200 Subject: [PATCH 04/26] - --- libs/sync/sync.go | 25 ++++++++++++++++--------- 1 file changed, 16 insertions(+), 9 deletions(-) diff --git a/libs/sync/sync.go b/libs/sync/sync.go index e19340fbac..de8cea65ec 100644 --- a/libs/sync/sync.go +++ b/libs/sync/sync.go @@ -25,9 +25,11 @@ type SyncOptions struct { Host string - // If set, sync will not be able to overwrite any existing paths on the - // workspace file system. - DisallowOverwrites bool + // Allow sync to overwrite existing files in the workspace + AllowOverwrites bool + + // Persist the snapshot on the local file systems for future sync runs + PersistSnapshot bool } type Sync struct { @@ -80,9 +82,12 @@ func New(ctx context.Context, opts SyncOptions) (*Sync, error) { return nil, fmt.Errorf("unable to load sync snapshot: %w", err) } } - repoFiles := repofiles.Create(opts.RemotePath, opts.LocalPath, opts.WorkspaceClient, &repofiles.RepoFileOptions{ - OverwriteIfExists: !opts.DisallowOverwrites, + repoFiles, err := repofiles.Create(opts.RemotePath, opts.LocalPath, opts.WorkspaceClient, &repofiles.RepoFileOptions{ + OverwriteIfExists: opts.AllowOverwrites, }) + if err != nil { + return nil, err + } return &Sync{ SyncOptions: &opts, @@ -163,10 +168,12 @@ func (s *Sync) RunOnce(ctx context.Context) error { return err } - err = s.snapshot.Save(ctx) - if err != nil { - log.Errorf(ctx, "cannot store snapshot: %s", err) - return err + if s.PersistSnapshot { + err = s.snapshot.Save(ctx) + if err != nil { + log.Errorf(ctx, "cannot store snapshot: %s", err) + return err + } } s.notifyComplete(ctx, change) From fb5ea166cae205b904fffd218ad7a52787cd23a7 Mon Sep 17 00:00:00 2001 From: Shreyas Goenka Date: Fri, 2 Jun 2023 00:05:51 +0200 Subject: [PATCH 05/26] helper wsfs function --- internal/filer_test.go | 34 ++++------------------------------ internal/helpers.go | 32 ++++++++++++++++++++++++++++++++ 2 files changed, 36 insertions(+), 30 deletions(-) diff --git a/internal/filer_test.go b/internal/filer_test.go index b8fd63657d..80935220ad 100644 --- a/internal/filer_test.go +++ b/internal/filer_test.go @@ -15,7 +15,6 @@ import ( "github.com/databricks/databricks-sdk-go" "github.com/databricks/databricks-sdk-go/apierr" "github.com/databricks/databricks-sdk-go/service/files" - "github.com/databricks/databricks-sdk-go/service/workspace" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -138,35 +137,10 @@ func runFilerReadDirTest(t *testing.T, ctx context.Context, f filer.Filer) { assert.Len(t, entries, 1) assert.Equal(t, "c", entries[0].Name()) assert.True(t, entries[0].IsDir()) -} - -func temporaryWorkspaceDir(t *testing.T, w *databricks.WorkspaceClient) string { - ctx := context.Background() - me, err := w.CurrentUser.Me(ctx) - require.NoError(t, err) - - path := fmt.Sprintf("/Users/%s/%s", me.UserName, RandomName("integration-test-filer-wsfs-")) - - // Ensure directory exists, but doesn't exist YET! - // Otherwise we could inadvertently remove a directory that already exists on cleanup. - t.Logf("mkdir %s", path) - err = w.Workspace.MkdirsByPath(ctx, path) - require.NoError(t, err) - - // Remove test directory on test completion. - t.Cleanup(func() { - t.Logf("rm -rf %s", path) - err := w.Workspace.Delete(ctx, workspace.Delete{ - Path: path, - Recursive: true, - }) - if err == nil || apierr.IsMissing(err) { - return - } - t.Logf("unable to remove temporary workspace directory %s: %#v", path, err) - }) - return path + // TODO: split into a separate PR + _, err = f.ReadDir(ctx, "/hello.txt") + assert.ErrorIs(t, err, filer.ErrNotADirectory) } func setupWorkspaceFilesTest(t *testing.T) (context.Context, filer.Filer) { @@ -174,7 +148,7 @@ func setupWorkspaceFilesTest(t *testing.T) (context.Context, filer.Filer) { ctx := context.Background() w := databricks.Must(databricks.NewWorkspaceClient()) - tmpdir := temporaryWorkspaceDir(t, w) + tmpdir := TemporaryWorkspaceDir(t, w) f, err := filer.NewWorkspaceFilesClient(w, tmpdir) require.NoError(t, err) diff --git a/internal/helpers.go b/internal/helpers.go index b51d005b27..0f739429d4 100644 --- a/internal/helpers.go +++ b/internal/helpers.go @@ -14,6 +14,9 @@ import ( "github.com/databricks/cli/cmd/root" _ "github.com/databricks/cli/cmd/version" + "github.com/databricks/databricks-sdk-go" + "github.com/databricks/databricks-sdk-go/apierr" + "github.com/databricks/databricks-sdk-go/service/workspace" "github.com/stretchr/testify/require" _ "github.com/databricks/cli/cmd/workspace" @@ -176,3 +179,32 @@ func writeFile(t *testing.T, name string, body string) string { f.Close() return f.Name() } + +func TemporaryWorkspaceDir(t *testing.T, w *databricks.WorkspaceClient) string { + ctx := context.Background() + me, err := w.CurrentUser.Me(ctx) + require.NoError(t, err) + + path := fmt.Sprintf("/Users/%s/%s", me.UserName, RandomName("integration-test-wsfs-")) + + // Ensure directory exists, but doesn't exist YET! + // Otherwise we could inadvertently remove a directory that already exists on cleanup. + t.Logf("mkdir %s", path) + err = w.Workspace.MkdirsByPath(ctx, path) + require.NoError(t, err) + + // Remove test directory on test completion. + t.Cleanup(func() { + t.Logf("rm -rf %s", path) + err := w.Workspace.Delete(ctx, workspace.Delete{ + Path: path, + Recursive: true, + }) + if err == nil || apierr.IsMissing(err) { + return + } + t.Logf("unable to remove temporary workspace directory %s: %#v", path, err) + }) + + return path +} From 931fae1fd43ec88504ee67b75799e7606df4dc24 Mon Sep 17 00:00:00 2001 From: Shreyas Goenka Date: Fri, 2 Jun 2023 00:06:49 +0200 Subject: [PATCH 06/26] add check for isdir in readdir --- libs/filer/dbfs_client.go | 4 ++++ libs/filer/filer.go | 15 +++++++++++++++ libs/filer/workspace_files_client.go | 6 ++++++ 3 files changed, 25 insertions(+) diff --git a/libs/filer/dbfs_client.go b/libs/filer/dbfs_client.go index 7a3084ac65..0a9d5b086a 100644 --- a/libs/filer/dbfs_client.go +++ b/libs/filer/dbfs_client.go @@ -222,6 +222,10 @@ func (w *DbfsClient) ReadDir(ctx context.Context, name string) ([]fs.DirEntry, e return nil, err } + if len(res.Files) == 1 && !res.Files[0].IsDir && res.Files[0].Path == absPath { + return nil, NotADirectory{absPath} + } + info := make([]fs.DirEntry, len(res.Files)) for i, v := range res.Files { info[i] = dbfsDirEntry{dbfsFileInfo: dbfsFileInfo{fi: v}} diff --git a/libs/filer/filer.go b/libs/filer/filer.go index e54efb96c6..9d50539951 100644 --- a/libs/filer/filer.go +++ b/libs/filer/filer.go @@ -2,6 +2,7 @@ package filer import ( "context" + "errors" "fmt" "io" "io/fs" @@ -50,6 +51,20 @@ func (err NoSuchDirectoryError) Is(other error) bool { return other == fs.ErrNotExist } +var ErrNotADirectory = errors.New("not a directory") + +type NotADirectory struct { + path string +} + +func (err NotADirectory) Error() string { + return fmt.Sprintf("%s is not a directory", err.path) +} + +func (err NotADirectory) Is(other error) bool { + return other == ErrNotADirectory +} + // Filer is used to access files in a workspace. // It has implementations for accessing files in WSFS and in DBFS. type Filer interface { diff --git a/libs/filer/workspace_files_client.go b/libs/filer/workspace_files_client.go index b9f0f3db3c..594e1dbc80 100644 --- a/libs/filer/workspace_files_client.go +++ b/libs/filer/workspace_files_client.go @@ -222,6 +222,12 @@ func (w *WorkspaceFilesClient) ReadDir(ctx context.Context, name string) ([]fs.D objects, err := w.workspaceClient.Workspace.ListAll(ctx, workspace.ListWorkspaceRequest{ Path: absPath, }) + + // TODO: add integration test for this + if len(objects) == 1 && objects[0].Path == absPath { + return nil, NotADirectory{absPath} + } + if err != nil { // If we got an API error we deal with it below. var aerr *apierr.APIError From 57a95786f73510a143e1efd3d0adfb6c0f5001ff Mon Sep 17 00:00:00 2001 From: Shreyas Goenka Date: Fri, 2 Jun 2023 00:07:16 +0200 Subject: [PATCH 07/26] added some intgegration test for repofiles and moved to filer --- libs/sync/repofiles/repofiles.go | 87 ++++++++++----------------- libs/sync/repofiles/repofiles_test.go | 7 ++- 2 files changed, 37 insertions(+), 57 deletions(-) diff --git a/libs/sync/repofiles/repofiles.go b/libs/sync/repofiles/repofiles.go index 9771021732..9a6428a168 100644 --- a/libs/sync/repofiles/repofiles.go +++ b/libs/sync/repofiles/repofiles.go @@ -1,19 +1,18 @@ package repofiles import ( + "bytes" "context" "errors" "fmt" - "net/http" - "net/url" "os" "path" "path/filepath" "strings" + "github.com/databricks/cli/libs/filer" "github.com/databricks/databricks-sdk-go" "github.com/databricks/databricks-sdk-go/apierr" - "github.com/databricks/databricks-sdk-go/client" "github.com/databricks/databricks-sdk-go/service/workspace" ) @@ -22,22 +21,32 @@ type RepoFileOptions struct { } // RepoFiles wraps reading and writing into a remote repo with safeguards to prevent -// accidental deletion of repos and more robust methods to overwrite workspace files +// accidental deletion of repos and more robust methods to overwrite workspac e files type RepoFiles struct { *RepoFileOptions repoRoot string localRoot string workspaceClient *databricks.WorkspaceClient + f filer.Filer } -func Create(repoRoot, localRoot string, workspaceClient *databricks.WorkspaceClient, opts *RepoFileOptions) *RepoFiles { +func Create(repoRoot, localRoot string, w *databricks.WorkspaceClient, opts *RepoFileOptions) (*RepoFiles, error) { + // override default timeout to support uploading larger files + w.Config.HTTPTimeoutSeconds = 600 + + // create filer to interact with WSFS + f, err := filer.NewWorkspaceFilesClient(w, repoRoot) + if err != nil { + return nil, err + } return &RepoFiles{ repoRoot: repoRoot, localRoot: localRoot, - workspaceClient: workspaceClient, + workspaceClient: w, RepoFileOptions: opts, - } + f: f, + }, nil } func (r *RepoFiles) RemotePath(relativePath string) (string, error) { @@ -59,36 +68,25 @@ func (r *RepoFiles) readLocal(relativePath string) ([]byte, error) { } func (r *RepoFiles) writeRemote(ctx context.Context, relativePath string, content []byte) error { - apiClientConfig := r.workspaceClient.Config - apiClientConfig.HTTPTimeoutSeconds = 600 - apiClient, err := client.New(apiClientConfig) - if err != nil { - return err + if !r.OverwriteIfExists { + return r.f.Write(ctx, relativePath, bytes.NewReader(content), filer.CreateParentDirectories) } - remotePath, err := r.RemotePath(relativePath) - if err != nil { - return err - } - escapedPath := url.PathEscape(strings.TrimLeft(remotePath, "/")) - apiPath := fmt.Sprintf("/api/2.0/workspace-files/import-file/%s?overwrite=%t", escapedPath, r.OverwriteIfExists) - - err = apiClient.Do(ctx, http.MethodPost, apiPath, content, nil) - - // Handling some edge cases when an upload might fail - // - // We cannot do more precise error scoping here because the API does not - // provide descriptive errors yet - // - // TODO: narrow down the error condition scope of this "if" block to only - // trigger for the specific edge cases instead of all errors once the API - // implements them + + err := r.f.Write(ctx, relativePath, bytes.NewReader(content), filer.CreateParentDirectories, filer.OverwriteIfExists) + + // TODO(pietern): Use the new FS interface to avoid needing to make a recursive + // delete call here. This call is dangerous if err != nil { // Delete any artifact files incase non overwriteable by the current file // type and thus are failing the PUT request. // files, folders and notebooks might not have been cleaned up and they // can't overwrite each other. If a folder `foo` exists, then attempts to // PUT a file `foo` will fail - err := r.workspaceClient.Workspace.Delete(ctx, + remotePath, err := r.RemotePath(relativePath) + if err != nil { + return err + } + err = r.workspaceClient.Workspace.Delete(ctx, workspace.Delete{ Path: remotePath, Recursive: true, @@ -103,33 +101,15 @@ func (r *RepoFiles) writeRemote(ctx context.Context, relativePath string, conten return err } - // Mkdir parent dirs incase they are what's causing the PUT request to - // fail - err = r.workspaceClient.Workspace.MkdirsByPath(ctx, path.Dir(remotePath)) - if err != nil { - return fmt.Errorf("could not mkdir to put file: %s", err) - } - - // Attempt to upload file again after cleanup/setup - err = apiClient.Do(ctx, http.MethodPost, apiPath, content, nil) - if err != nil { - return err - } + // Attempt to write the file again, this time without the CreateParentDirectories and + // OverwriteIfExists flags + return r.f.Write(ctx, relativePath, bytes.NewReader(content)) } return nil } func (r *RepoFiles) deleteRemote(ctx context.Context, relativePath string) error { - remotePath, err := r.RemotePath(relativePath) - if err != nil { - return err - } - return r.workspaceClient.Workspace.Delete(ctx, - workspace.Delete{ - Path: remotePath, - Recursive: false, - }, - ) + return r.f.Delete(ctx, relativePath) } // The API calls for a python script foo.py would be @@ -161,6 +141,3 @@ func (r *RepoFiles) DeleteFile(ctx context.Context, relativePath string) error { } return nil } - -// TODO: write integration tests for all non happy path cases that rely on -// specific behaviour of the workspace apis diff --git a/libs/sync/repofiles/repofiles_test.go b/libs/sync/repofiles/repofiles_test.go index e71f26abf2..ce2a14c2c0 100644 --- a/libs/sync/repofiles/repofiles_test.go +++ b/libs/sync/repofiles/repofiles_test.go @@ -6,11 +6,13 @@ import ( "testing" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) func TestRepoFilesRemotePath(t *testing.T) { repoRoot := "/Repos/doraemon/bar" - repoFiles := Create(repoRoot, "/doraemon/foo/bar", nil, nil) + repoFiles, err := Create(repoRoot, "/doraemon/foo/bar", nil, nil) + require.NoError(t, err) remotePath, err := repoFiles.RemotePath("a/b/c") assert.NoError(t, err) @@ -81,7 +83,8 @@ func TestRepoReadLocal(t *testing.T) { err := os.WriteFile(helloPath, []byte("my name is doraemon :P"), os.ModePerm) assert.NoError(t, err) - repoFiles := Create("/Repos/doraemon/bar", tempDir, nil, nil) + repoFiles, err := Create("/Repos/doraemon/bar", tempDir, nil, nil) + require.NoError(t, err) bytes, err := repoFiles.readLocal("./a/../hello.txt") assert.NoError(t, err) assert.Equal(t, "my name is doraemon :P", string(bytes)) From dc0ff8925a72ef5ce9ffda95068e2d596eaa4dfd Mon Sep 17 00:00:00 2001 From: Shreyas Goenka Date: Fri, 2 Jun 2023 00:07:35 +0200 Subject: [PATCH 08/26] repofiles integration test --- internal/repofiles_test.go | 155 +++++++++++++++++++++++++++++++++++++ 1 file changed, 155 insertions(+) create mode 100644 internal/repofiles_test.go diff --git a/internal/repofiles_test.go b/internal/repofiles_test.go new file mode 100644 index 0000000000..055fe0c243 --- /dev/null +++ b/internal/repofiles_test.go @@ -0,0 +1,155 @@ +package internal + +import ( + "context" + "os" + "path" + "path/filepath" + "strings" + "testing" + + "github.com/databricks/cli/libs/filer" + "github.com/databricks/cli/libs/sync/repofiles" + "github.com/databricks/databricks-sdk-go" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// TODO: skip if not cloud env, these are integration tests +// TODO: split into a separate PR + +func TestRepoFilesPutFile(t *testing.T) { + w, err := databricks.NewWorkspaceClient() + require.NoError(t, err) + ctx := context.Background() + + // initialize client + wsfsTmpDir := TemporaryWorkspaceDir(t, w) + localTmpDir := t.TempDir() + r, err := repofiles.Create(wsfsTmpDir, localTmpDir, w, &repofiles.RepoFileOptions{ + OverwriteIfExists: true, + }) + require.NoError(t, err) + f, err := filer.NewWorkspaceFilesClient(w, wsfsTmpDir) + require.NoError(t, err) + + // create local file + err = os.WriteFile(filepath.Join(localTmpDir, "foo.txt"), []byte(`hello, world`), os.ModePerm) + require.NoError(t, err) + err = r.PutFile(ctx, "foo.txt") + require.NoError(t, err) + + require.NoError(t, f.Mkdir(ctx, "bar")) + + entries, err := f.ReadDir(ctx, "bar") + require.NoError(t, err) + require.Len(t, entries, 1) + + assertFileContains(t, ctx, f, "foo.txt", "hello, world") +} + +func TestRepoFilesFileOverwritesNotebook(t *testing.T) { + w, err := databricks.NewWorkspaceClient() + require.NoError(t, err) + ctx := context.Background() + + // initialize client + wsfsTmpDir := TemporaryWorkspaceDir(t, w) + localTmpDir := t.TempDir() + r, err := repofiles.Create(wsfsTmpDir, localTmpDir, w, &repofiles.RepoFileOptions{ + OverwriteIfExists: true, + }) + require.NoError(t, err) + f, err := filer.NewWorkspaceFilesClient(w, wsfsTmpDir) + require.NoError(t, err) + + // create local notebook + err = os.WriteFile(filepath.Join(localTmpDir, "foo.py"), []byte("#Databricks notebook source\nprint(1)"), os.ModePerm) + require.NoError(t, err) + + // upload notebook + err = r.PutFile(ctx, "foo.py") + require.NoError(t, err) + assertNotebookExists(t, ctx, w, path.Join(wsfsTmpDir, "foo")) + + // upload file, and assert that it overwrites the notebook + err = os.WriteFile(filepath.Join(localTmpDir, "foo"), []byte("I am going to overwrite the notebook"), os.ModePerm) + require.NoError(t, err) + err = r.PutFile(ctx, "foo") + require.NoError(t, err) + assertFileContains(t, ctx, f, "foo", "I am going to overwrite the notebook") +} + +func TestRepoFilesFileOverwritesEmptyDirectoryTree(t *testing.T) { + w, err := databricks.NewWorkspaceClient() + require.NoError(t, err) + ctx := context.Background() + + // initialize client + wsfsTmpDir := TemporaryWorkspaceDir(t, w) + localTmpDir := t.TempDir() + r, err := repofiles.Create(wsfsTmpDir, localTmpDir, w, &repofiles.RepoFileOptions{ + OverwriteIfExists: true, + }) + require.NoError(t, err) + f, err := filer.NewWorkspaceFilesClient(w, wsfsTmpDir) + require.NoError(t, err) + + // create local file + err = os.WriteFile(filepath.Join(localTmpDir, "foo"), []byte(`hello, world`), os.ModePerm) + require.NoError(t, err) + + // construct a directory tree without files in the workspace + err = f.Mkdir(ctx, "foo/a/b/c") + require.NoError(t, err) + err = f.Mkdir(ctx, "foo/a/b/d/e") + require.NoError(t, err) + err = f.Mkdir(ctx, "foo/f/g/i") + require.NoError(t, err) + + // assert the directories exist + entries, err := f.ReadDir(ctx, "foo") + require.NoError(t, err) + assert.Len(t, entries, 2) + assert.True(t, entries[0].IsDir()) + assert.True(t, entries[1].IsDir()) + + // upload file, and assert that it overwrites the empty directories + err = r.PutFile(ctx, "foo") + require.NoError(t, err) + assertFileContains(t, ctx, f, "foo", "hello, world") + + // assert the directories do not exist anymore + _, err = f.ReadDir(ctx, "foo") + assert.ErrorIs(t, err, filer.ErrNotADirectory) +} + +func TestRepoFilesFileInDirOverwritesExistingNotebook(t *testing.T) { + w, err := databricks.NewWorkspaceClient() + require.NoError(t, err) + ctx := context.Background() + + // initialize client + wsfsTmpDir := TemporaryWorkspaceDir(t, w) + localTmpDir := t.TempDir() + r, err := repofiles.Create(wsfsTmpDir, localTmpDir, w, &repofiles.RepoFileOptions{ + OverwriteIfExists: true, + }) + require.NoError(t, err) + f, err := filer.NewWorkspaceFilesClient(w, wsfsTmpDir) + require.NoError(t, err) + + // create local notebook + err = f.Write(ctx, "foo.py", strings.NewReader("#Databricks notebook source\nprint(1)")) + require.NoError(t, err) + assertNotebookExists(t, ctx, w, path.Join(wsfsTmpDir, "foo")) + + // upload file + err = os.Mkdir(filepath.Join(localTmpDir, "foo"), os.ModePerm) + require.NoError(t, err) + err = os.WriteFile(filepath.Join(localTmpDir, "foo/bar.txt"), []byte("I am going to overwrite the notebook"), os.ModePerm) + require.NoError(t, err) + err = r.PutFile(ctx, "foo/bar.txt") + require.NoError(t, err) + assertFileContains(t, ctx, f, "foo/bar.txt", "I am going to overwrite the notebook") +} From 2be6b85beb4d384793d6bc67bf1280d39c41e614 Mon Sep 17 00:00:00 2001 From: Shreyas Goenka Date: Fri, 2 Jun 2023 00:08:29 +0200 Subject: [PATCH 09/26] wip import-dir acc tests --- internal/import_dir_test.go | 81 +++++++++++++++++++++++++++++++++++++ 1 file changed, 81 insertions(+) create mode 100644 internal/import_dir_test.go diff --git a/internal/import_dir_test.go b/internal/import_dir_test.go new file mode 100644 index 0000000000..27a53d5f35 --- /dev/null +++ b/internal/import_dir_test.go @@ -0,0 +1,81 @@ +package internal + +import ( + "bytes" + "context" + "io" + "path" + "regexp" + "testing" + + "github.com/databricks/cli/libs/filer" + "github.com/databricks/databricks-sdk-go" + "github.com/databricks/databricks-sdk-go/service/workspace" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestWorkspaceImportDir(t *testing.T) { + t.Log(GetEnvOrSkipTest(t, "CLOUD_ENV")) + + ctx := context.Background() + w := databricks.Must(databricks.NewWorkspaceClient()) + tmpdir := TemporaryWorkspaceDir(t, w) + + // run import_dir command + RequireSuccessfulRun(t, "workspace", "import_dir", "./testdata/import_dir/default", tmpdir) + + // assert files are uploaded + f, err := filer.NewWorkspaceFilesClient(w, tmpdir) + require.NoError(t, err) + assertFileContains(t, ctx, f, "foo.txt", "hello, world") + assertFileContains(t, ctx, f, ".gitignore", ".databricks") + assertFileContains(t, ctx, f, "bar/apple.py", "print(1)") + assertNotebookExists(t, ctx, w, path.Join(tmpdir, "bar/mango")) +} + +func TestWorkspaceImportDirOverwriteFlag(t *testing.T) { + // t.Log(GetEnvOrSkipTest(t, "CLOUD_ENV")) + + // ctx := context.Background() + w := databricks.Must(databricks.NewWorkspaceClient()) + tmpdir := TemporaryWorkspaceDir(t, w) + + // run import_dir command + RequireSuccessfulRun(t, "workspace", "import_dir", "./testdata/import_dir/override/a", tmpdir) + + // // assert files are uploaded + // f, err := filer.NewWorkspaceFilesClient(w, tmpdir) + // require.NoError(t, err) + // assertFileContains(t, ctx, f, "bar.txt", "from directory A") + + + // Assert another run fails with path already exists error from the server + _, _, err := RequireErrorRun(t, "workspace", "import_dir", "./testdata/import_dir/override/b", tmpdir) + assert.Regexp(t, regexp.MustCompile("Path (.*) already exists."), err.Error()) + + // // Succeeds with the overwrite flag + // RequireSuccessfulRun(t, "workspace", "import_dir", "./testdata/import_dir/override/b", tmpdir, "--overwrite") + // require.NoError(t, err) + // assertFileContains(t, ctx, f, "bar.txt", "from directory B") +} + +func assertFileContains(t *testing.T, ctx context.Context, f filer.Filer, name, contents string) { + r, err := f.Read(ctx, name) + require.NoError(t, err) + + var b bytes.Buffer + _, err = io.Copy(&b, r) + require.NoError(t, err) + + assert.Contains(t, b.String(), contents) +} + +func assertNotebookExists(t *testing.T, ctx context.Context, w *databricks.WorkspaceClient, path string) { + info, err := w.Workspace.ListAll(ctx, workspace.ListWorkspaceRequest{ + Path: path, + }) + require.NoError(t, err) + assert.Len(t, info, 1) + assert.Equal(t, info[0].ObjectType, workspace.ObjectTypeNotebook) +} From c1ccf8204f43faed4a7da75fd50e50520bebc8b0 Mon Sep 17 00:00:00 2001 From: Shreyas Goenka Date: Fri, 2 Jun 2023 00:08:40 +0200 Subject: [PATCH 10/26] testdata --- internal/foo.txt | 1 + internal/testdata/import_dir/default/bar/.gitignore | 2 ++ internal/testdata/import_dir/default/bar/apple.py | 1 + internal/testdata/import_dir/default/bar/foo.txt | 1 + internal/testdata/import_dir/default/bar/mango.py | 2 ++ internal/testdata/import_dir/override/a/.gitignore | 2 ++ internal/testdata/import_dir/override/a/bar.txt | 2 ++ internal/testdata/import_dir/override/b/.gitignore | 2 ++ internal/testdata/import_dir/override/b/bar.txt | 2 ++ 9 files changed, 15 insertions(+) create mode 100755 internal/foo.txt create mode 100644 internal/testdata/import_dir/default/bar/.gitignore create mode 100644 internal/testdata/import_dir/default/bar/apple.py create mode 100644 internal/testdata/import_dir/default/bar/foo.txt create mode 100644 internal/testdata/import_dir/default/bar/mango.py create mode 100644 internal/testdata/import_dir/override/a/.gitignore create mode 100644 internal/testdata/import_dir/override/a/bar.txt create mode 100644 internal/testdata/import_dir/override/b/.gitignore create mode 100644 internal/testdata/import_dir/override/b/bar.txt diff --git a/internal/foo.txt b/internal/foo.txt new file mode 100755 index 0000000000..8c01d89ae0 --- /dev/null +++ b/internal/foo.txt @@ -0,0 +1 @@ +hello, world \ No newline at end of file diff --git a/internal/testdata/import_dir/default/bar/.gitignore b/internal/testdata/import_dir/default/bar/.gitignore new file mode 100644 index 0000000000..de811f118b --- /dev/null +++ b/internal/testdata/import_dir/default/bar/.gitignore @@ -0,0 +1,2 @@ + +.databricks diff --git a/internal/testdata/import_dir/default/bar/apple.py b/internal/testdata/import_dir/default/bar/apple.py new file mode 100644 index 0000000000..b917a726c9 --- /dev/null +++ b/internal/testdata/import_dir/default/bar/apple.py @@ -0,0 +1 @@ +print(1) diff --git a/internal/testdata/import_dir/default/bar/foo.txt b/internal/testdata/import_dir/default/bar/foo.txt new file mode 100644 index 0000000000..4b5fa63702 --- /dev/null +++ b/internal/testdata/import_dir/default/bar/foo.txt @@ -0,0 +1 @@ +hello, world diff --git a/internal/testdata/import_dir/default/bar/mango.py b/internal/testdata/import_dir/default/bar/mango.py new file mode 100644 index 0000000000..4707166b28 --- /dev/null +++ b/internal/testdata/import_dir/default/bar/mango.py @@ -0,0 +1,2 @@ +#Databricks notebook source +print(2) diff --git a/internal/testdata/import_dir/override/a/.gitignore b/internal/testdata/import_dir/override/a/.gitignore new file mode 100644 index 0000000000..de811f118b --- /dev/null +++ b/internal/testdata/import_dir/override/a/.gitignore @@ -0,0 +1,2 @@ + +.databricks diff --git a/internal/testdata/import_dir/override/a/bar.txt b/internal/testdata/import_dir/override/a/bar.txt new file mode 100644 index 0000000000..8211f1031e --- /dev/null +++ b/internal/testdata/import_dir/override/a/bar.txt @@ -0,0 +1,2 @@ +#Databricks notebook source +print("from dir A") diff --git a/internal/testdata/import_dir/override/b/.gitignore b/internal/testdata/import_dir/override/b/.gitignore new file mode 100644 index 0000000000..de811f118b --- /dev/null +++ b/internal/testdata/import_dir/override/b/.gitignore @@ -0,0 +1,2 @@ + +.databricks diff --git a/internal/testdata/import_dir/override/b/bar.txt b/internal/testdata/import_dir/override/b/bar.txt new file mode 100644 index 0000000000..73e457003c --- /dev/null +++ b/internal/testdata/import_dir/override/b/bar.txt @@ -0,0 +1,2 @@ +#Databricks notebook source +print("from dir B") From efc72b5bbd697ec739b9b6c460e2f7a2ef1fb873 Mon Sep 17 00:00:00 2001 From: Shreyas Goenka Date: Fri, 2 Jun 2023 00:15:02 +0200 Subject: [PATCH 11/26] - --- internal/filer_test.go | 2 +- internal/helpers.go | 2 +- internal/import_dir_test.go | 5 ++--- internal/repofiles_test.go | 8 ++++---- 4 files changed, 8 insertions(+), 9 deletions(-) diff --git a/internal/filer_test.go b/internal/filer_test.go index 80935220ad..223f5d847b 100644 --- a/internal/filer_test.go +++ b/internal/filer_test.go @@ -148,7 +148,7 @@ func setupWorkspaceFilesTest(t *testing.T) (context.Context, filer.Filer) { ctx := context.Background() w := databricks.Must(databricks.NewWorkspaceClient()) - tmpdir := TemporaryWorkspaceDir(t, w) + tmpdir := temporaryWorkspaceDir(t, w) f, err := filer.NewWorkspaceFilesClient(w, tmpdir) require.NoError(t, err) diff --git a/internal/helpers.go b/internal/helpers.go index 0f739429d4..f1901f14e1 100644 --- a/internal/helpers.go +++ b/internal/helpers.go @@ -180,7 +180,7 @@ func writeFile(t *testing.T, name string, body string) string { return f.Name() } -func TemporaryWorkspaceDir(t *testing.T, w *databricks.WorkspaceClient) string { +func temporaryWorkspaceDir(t *testing.T, w *databricks.WorkspaceClient) string { ctx := context.Background() me, err := w.CurrentUser.Me(ctx) require.NoError(t, err) diff --git a/internal/import_dir_test.go b/internal/import_dir_test.go index 27a53d5f35..6120607ff1 100644 --- a/internal/import_dir_test.go +++ b/internal/import_dir_test.go @@ -20,7 +20,7 @@ func TestWorkspaceImportDir(t *testing.T) { ctx := context.Background() w := databricks.Must(databricks.NewWorkspaceClient()) - tmpdir := TemporaryWorkspaceDir(t, w) + tmpdir := temporaryWorkspaceDir(t, w) // run import_dir command RequireSuccessfulRun(t, "workspace", "import_dir", "./testdata/import_dir/default", tmpdir) @@ -39,7 +39,7 @@ func TestWorkspaceImportDirOverwriteFlag(t *testing.T) { // ctx := context.Background() w := databricks.Must(databricks.NewWorkspaceClient()) - tmpdir := TemporaryWorkspaceDir(t, w) + tmpdir := temporaryWorkspaceDir(t, w) // run import_dir command RequireSuccessfulRun(t, "workspace", "import_dir", "./testdata/import_dir/override/a", tmpdir) @@ -49,7 +49,6 @@ func TestWorkspaceImportDirOverwriteFlag(t *testing.T) { // require.NoError(t, err) // assertFileContains(t, ctx, f, "bar.txt", "from directory A") - // Assert another run fails with path already exists error from the server _, _, err := RequireErrorRun(t, "workspace", "import_dir", "./testdata/import_dir/override/b", tmpdir) assert.Regexp(t, regexp.MustCompile("Path (.*) already exists."), err.Error()) diff --git a/internal/repofiles_test.go b/internal/repofiles_test.go index 055fe0c243..07c488e043 100644 --- a/internal/repofiles_test.go +++ b/internal/repofiles_test.go @@ -24,7 +24,7 @@ func TestRepoFilesPutFile(t *testing.T) { ctx := context.Background() // initialize client - wsfsTmpDir := TemporaryWorkspaceDir(t, w) + wsfsTmpDir := temporaryWorkspaceDir(t, w) localTmpDir := t.TempDir() r, err := repofiles.Create(wsfsTmpDir, localTmpDir, w, &repofiles.RepoFileOptions{ OverwriteIfExists: true, @@ -54,7 +54,7 @@ func TestRepoFilesFileOverwritesNotebook(t *testing.T) { ctx := context.Background() // initialize client - wsfsTmpDir := TemporaryWorkspaceDir(t, w) + wsfsTmpDir := temporaryWorkspaceDir(t, w) localTmpDir := t.TempDir() r, err := repofiles.Create(wsfsTmpDir, localTmpDir, w, &repofiles.RepoFileOptions{ OverwriteIfExists: true, @@ -86,7 +86,7 @@ func TestRepoFilesFileOverwritesEmptyDirectoryTree(t *testing.T) { ctx := context.Background() // initialize client - wsfsTmpDir := TemporaryWorkspaceDir(t, w) + wsfsTmpDir := temporaryWorkspaceDir(t, w) localTmpDir := t.TempDir() r, err := repofiles.Create(wsfsTmpDir, localTmpDir, w, &repofiles.RepoFileOptions{ OverwriteIfExists: true, @@ -130,7 +130,7 @@ func TestRepoFilesFileInDirOverwritesExistingNotebook(t *testing.T) { ctx := context.Background() // initialize client - wsfsTmpDir := TemporaryWorkspaceDir(t, w) + wsfsTmpDir := temporaryWorkspaceDir(t, w) localTmpDir := t.TempDir() r, err := repofiles.Create(wsfsTmpDir, localTmpDir, w, &repofiles.RepoFileOptions{ OverwriteIfExists: true, From 0f1cb67c985ba024e981d5ec35d94cbafd6b99e4 Mon Sep 17 00:00:00 2001 From: Shreyas Goenka Date: Mon, 5 Jun 2023 18:07:41 +0200 Subject: [PATCH 12/26] remove underscore --- cmd/workspace/workspace/import_dir.go | 2 +- internal/import_dir_test.go | 12 ++++++------ 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/cmd/workspace/workspace/import_dir.go b/cmd/workspace/workspace/import_dir.go index 3d4657229f..33febe2bda 100644 --- a/cmd/workspace/workspace/import_dir.go +++ b/cmd/workspace/workspace/import_dir.go @@ -11,7 +11,7 @@ import ( // TODO: add some comments here var importDirCmd = &cobra.Command{ - Use: "import_dir SOURCE_PATH TARGET_PATH", + Use: "import-dir SOURCE_PATH TARGET_PATH", Short: `Recursively imports a directory from local to the Databricks workspace.`, Long: ` Imports directory to the workspace. diff --git a/internal/import_dir_test.go b/internal/import_dir_test.go index 6120607ff1..2c2966b794 100644 --- a/internal/import_dir_test.go +++ b/internal/import_dir_test.go @@ -22,8 +22,8 @@ func TestWorkspaceImportDir(t *testing.T) { w := databricks.Must(databricks.NewWorkspaceClient()) tmpdir := temporaryWorkspaceDir(t, w) - // run import_dir command - RequireSuccessfulRun(t, "workspace", "import_dir", "./testdata/import_dir/default", tmpdir) + // run import-dir command + RequireSuccessfulRun(t, "workspace", "import-dir", "./testdata/import_dir/default", tmpdir) // assert files are uploaded f, err := filer.NewWorkspaceFilesClient(w, tmpdir) @@ -41,8 +41,8 @@ func TestWorkspaceImportDirOverwriteFlag(t *testing.T) { w := databricks.Must(databricks.NewWorkspaceClient()) tmpdir := temporaryWorkspaceDir(t, w) - // run import_dir command - RequireSuccessfulRun(t, "workspace", "import_dir", "./testdata/import_dir/override/a", tmpdir) + // run import-dir command + RequireSuccessfulRun(t, "workspace", "import-dir", "./testdata/import_dir/override/a", tmpdir) // // assert files are uploaded // f, err := filer.NewWorkspaceFilesClient(w, tmpdir) @@ -50,11 +50,11 @@ func TestWorkspaceImportDirOverwriteFlag(t *testing.T) { // assertFileContains(t, ctx, f, "bar.txt", "from directory A") // Assert another run fails with path already exists error from the server - _, _, err := RequireErrorRun(t, "workspace", "import_dir", "./testdata/import_dir/override/b", tmpdir) + _, _, err := RequireErrorRun(t, "workspace", "import-dir", "./testdata/import_dir/override/b", tmpdir) assert.Regexp(t, regexp.MustCompile("Path (.*) already exists."), err.Error()) // // Succeeds with the overwrite flag - // RequireSuccessfulRun(t, "workspace", "import_dir", "./testdata/import_dir/override/b", tmpdir, "--overwrite") + // RequireSuccessfulRun(t, "workspace", "import-dir", "./testdata/import_dir/override/b", tmpdir, "--overwrite") // require.NoError(t, err) // assertFileContains(t, ctx, f, "bar.txt", "from directory B") } From 49f7969363b2819399931ef6e8655b5c9330d117 Mon Sep 17 00:00:00 2001 From: Shreyas Goenka Date: Mon, 5 Jun 2023 18:09:24 +0200 Subject: [PATCH 13/26] set template in annotation --- cmd/workspace/workspace/import_dir.go | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/cmd/workspace/workspace/import_dir.go b/cmd/workspace/workspace/import_dir.go index 33febe2bda..266a6a5244 100644 --- a/cmd/workspace/workspace/import_dir.go +++ b/cmd/workspace/workspace/import_dir.go @@ -20,9 +20,16 @@ var importDirCmd = &cobra.Command{ .scala, .py, .sql, .r, .R, .ipynb are stripped of their extensions. `, - Annotations: map[string]string{}, - PreRunE: root.MustWorkspaceClient, - Args: cobra.ExactArgs(2), + Annotations: map[string]string{ + "template": cmdio.Heredoc(` + {{if eq .Type "IMPORT_STARTED"}}Import started + {{else if eq .Type "UPLOAD_COMPLETE"}}Uploaded {{.SourcePath}} -> {{.TargetPath}} + {{else if eq .Type "IMPORT_COMPLETE"}}Import completed + {{end}} + `), + }, + PreRunE: root.MustWorkspaceClient, + Args: cobra.ExactArgs(2), RunE: func(cmd *cobra.Command, args []string) (err error) { ctx := cmd.Context() sourcePath := args[0] @@ -75,12 +82,6 @@ var importDirCmd = &cobra.Command{ var importDirOverwrite bool func init() { - importDirCmd.Annotations["template"] = cmdio.Heredoc(` - {{if eq .Type "IMPORT_STARTED"}}Import started - {{else if eq .Type "UPLOAD_COMPLETE"}}Uploaded {{.SourcePath}} -> {{.TargetPath}} - {{else if eq .Type "IMPORT_COMPLETE"}}Import completed - {{end}} - `) importDirCmd.Flags().BoolVar(&importDirOverwrite, "overwrite", false, "Overwrite if file already exists in the workspace") Cmd.AddCommand(importDirCmd) } From 298fed28e523c8cf5f01a34cb1e87bd2eed0bcb1 Mon Sep 17 00:00:00 2001 From: Shreyas Goenka Date: Mon, 5 Jun 2023 18:45:21 +0200 Subject: [PATCH 14/26] undo remote path public --- cmd/workspace/workspace/import_dir.go | 3 +- cmd/workspace/workspace/import_events.go | 11 +++---- libs/sync/repofiles/repofiles.go | 4 +-- libs/sync/repofiles/repofiles_test.go | 38 ++++++++++++------------ libs/sync/sync.go | 8 ----- 5 files changed, 29 insertions(+), 35 deletions(-) diff --git a/cmd/workspace/workspace/import_dir.go b/cmd/workspace/workspace/import_dir.go index 266a6a5244..c2c08bc392 100644 --- a/cmd/workspace/workspace/import_dir.go +++ b/cmd/workspace/workspace/import_dir.go @@ -54,9 +54,10 @@ var importDirCmd = &cobra.Command{ // Initialize error wait group, and spawn the progress event emitter inside // the error wait group group, ctx := errgroup.WithContext(ctx) + eventsChannel := s.Events() group.Go( func() error { - return renderSyncEvents(ctx, s.Events(), s) + return renderSyncEvents(ctx, eventsChannel, s) }) // Start Uploading local files diff --git a/cmd/workspace/workspace/import_events.go b/cmd/workspace/workspace/import_events.go index 7627691c86..74aebee220 100644 --- a/cmd/workspace/workspace/import_events.go +++ b/cmd/workspace/workspace/import_events.go @@ -53,11 +53,12 @@ func renderSyncEvents(ctx context.Context, eventChannel <-chan sync.Event, synce return nil } // TODO: test this works with windows paths - remotePath, err := syncer.RemotePath(progressEvent.Path) - if err != nil { - return err - } - err = cmdio.Render(ctx, newUploadCompleteEvent(progressEvent.Path, remotePath)) + // remotePath, err := syncer.RemotePath(progressEvent.Path) + // if err != nil { + // return err + // } + remotePath := "TODO" + err := cmdio.Render(ctx, newUploadCompleteEvent(progressEvent.Path, remotePath)) if err != nil { return err } diff --git a/libs/sync/repofiles/repofiles.go b/libs/sync/repofiles/repofiles.go index 9a6428a168..6ea9f6bd75 100644 --- a/libs/sync/repofiles/repofiles.go +++ b/libs/sync/repofiles/repofiles.go @@ -49,7 +49,7 @@ func Create(repoRoot, localRoot string, w *databricks.WorkspaceClient, opts *Rep }, nil } -func (r *RepoFiles) RemotePath(relativePath string) (string, error) { +func (r *RepoFiles) remotePath(relativePath string) (string, error) { fullPath := path.Join(r.repoRoot, relativePath) cleanFullPath := path.Clean(fullPath) if !strings.HasPrefix(cleanFullPath, r.repoRoot) { @@ -82,7 +82,7 @@ func (r *RepoFiles) writeRemote(ctx context.Context, relativePath string, conten // files, folders and notebooks might not have been cleaned up and they // can't overwrite each other. If a folder `foo` exists, then attempts to // PUT a file `foo` will fail - remotePath, err := r.RemotePath(relativePath) + remotePath, err := r.remotePath(relativePath) if err != nil { return err } diff --git a/libs/sync/repofiles/repofiles_test.go b/libs/sync/repofiles/repofiles_test.go index ce2a14c2c0..dc9abbcddf 100644 --- a/libs/sync/repofiles/repofiles_test.go +++ b/libs/sync/repofiles/repofiles_test.go @@ -14,66 +14,66 @@ func TestRepoFilesRemotePath(t *testing.T) { repoFiles, err := Create(repoRoot, "/doraemon/foo/bar", nil, nil) require.NoError(t, err) - remotePath, err := repoFiles.RemotePath("a/b/c") + remotePath, err := repoFiles.remotePath("a/b/c") assert.NoError(t, err) assert.Equal(t, repoRoot+"/a/b/c", remotePath) - remotePath, err = repoFiles.RemotePath("a/b/../d") + remotePath, err = repoFiles.remotePath("a/b/../d") assert.NoError(t, err) assert.Equal(t, repoRoot+"/a/d", remotePath) - remotePath, err = repoFiles.RemotePath("a/../c") + remotePath, err = repoFiles.remotePath("a/../c") assert.NoError(t, err) assert.Equal(t, repoRoot+"/c", remotePath) - remotePath, err = repoFiles.RemotePath("a/b/c/.") + remotePath, err = repoFiles.remotePath("a/b/c/.") assert.NoError(t, err) assert.Equal(t, repoRoot+"/a/b/c", remotePath) - remotePath, err = repoFiles.RemotePath("a/b/c/d/./../../f/g") + remotePath, err = repoFiles.remotePath("a/b/c/d/./../../f/g") assert.NoError(t, err) assert.Equal(t, repoRoot+"/a/b/f/g", remotePath) - _, err = repoFiles.RemotePath("..") + _, err = repoFiles.remotePath("..") assert.ErrorContains(t, err, `relative file path is not inside repo root: ..`) - _, err = repoFiles.RemotePath("a/../..") + _, err = repoFiles.remotePath("a/../..") assert.ErrorContains(t, err, `relative file path is not inside repo root: a/../..`) - _, err = repoFiles.RemotePath("./../.") + _, err = repoFiles.remotePath("./../.") assert.ErrorContains(t, err, `relative file path is not inside repo root: ./../.`) - _, err = repoFiles.RemotePath("/./.././..") + _, err = repoFiles.remotePath("/./.././..") assert.ErrorContains(t, err, `relative file path is not inside repo root: /./.././..`) - _, err = repoFiles.RemotePath("./../.") + _, err = repoFiles.remotePath("./../.") assert.ErrorContains(t, err, `relative file path is not inside repo root: ./../.`) - _, err = repoFiles.RemotePath("./..") + _, err = repoFiles.remotePath("./..") assert.ErrorContains(t, err, `relative file path is not inside repo root: ./..`) - _, err = repoFiles.RemotePath("./../../..") + _, err = repoFiles.remotePath("./../../..") assert.ErrorContains(t, err, `relative file path is not inside repo root: ./../../..`) - _, err = repoFiles.RemotePath("./../a/./b../../..") + _, err = repoFiles.remotePath("./../a/./b../../..") assert.ErrorContains(t, err, `relative file path is not inside repo root: ./../a/./b../../..`) - _, err = repoFiles.RemotePath("../..") + _, err = repoFiles.remotePath("../..") assert.ErrorContains(t, err, `relative file path is not inside repo root: ../..`) - _, err = repoFiles.RemotePath(".//a/..//./b/..") + _, err = repoFiles.remotePath(".//a/..//./b/..") assert.ErrorContains(t, err, `file path relative to repo root cannot be empty`) - _, err = repoFiles.RemotePath("a/b/../..") + _, err = repoFiles.remotePath("a/b/../..") assert.ErrorContains(t, err, "file path relative to repo root cannot be empty") - _, err = repoFiles.RemotePath("") + _, err = repoFiles.remotePath("") assert.ErrorContains(t, err, "file path relative to repo root cannot be empty") - _, err = repoFiles.RemotePath(".") + _, err = repoFiles.remotePath(".") assert.ErrorContains(t, err, "file path relative to repo root cannot be empty") - _, err = repoFiles.RemotePath("/") + _, err = repoFiles.remotePath("/") assert.ErrorContains(t, err, "file path relative to repo root cannot be empty") } diff --git a/libs/sync/sync.go b/libs/sync/sync.go index de8cea65ec..505c9d3f7d 100644 --- a/libs/sync/sync.go +++ b/libs/sync/sync.go @@ -135,14 +135,6 @@ func (s *Sync) notifyComplete(ctx context.Context, d diff) { s.seq++ } -func (s *Sync) RemotePath(localPath string) (string, error) { - relativePath, ok := s.snapshot.LocalToRemoteNames[localPath] - if !ok { - return "", fmt.Errorf("could not find remote path for %s", localPath) - } - return s.repoFiles.RemotePath(relativePath) -} - func (s *Sync) RunOnce(ctx context.Context) error { // tradeoff: doing portable monitoring only due to macOS max descriptor manual ulimit setting requirement // https://github.com/gorakhargosh/watchdog/blob/master/src/watchdog/observers/kqueue.py#L394-L418 From 6cc2571add76aa5f9024fa581e4db0d7fdd09c01 Mon Sep 17 00:00:00 2001 From: Shreyas Goenka Date: Tue, 6 Jun 2023 00:07:21 +0200 Subject: [PATCH 15/26] address some comments --- cmd/workspace/workspace/import_dir.go | 4 +-- cmd/workspace/workspace/import_events.go | 45 ++++++++++++------------ libs/sync/event.go | 13 ------- 3 files changed, 24 insertions(+), 38 deletions(-) diff --git a/cmd/workspace/workspace/import_dir.go b/cmd/workspace/workspace/import_dir.go index c2c08bc392..5445cc3934 100644 --- a/cmd/workspace/workspace/import_dir.go +++ b/cmd/workspace/workspace/import_dir.go @@ -4,7 +4,6 @@ import ( "github.com/databricks/cli/cmd/root" "github.com/databricks/cli/libs/cmdio" "github.com/databricks/cli/libs/sync" - "github.com/databricks/databricks-sdk-go" "github.com/spf13/cobra" "golang.org/x/sync/errgroup" ) @@ -21,6 +20,7 @@ var importDirCmd = &cobra.Command{ `, Annotations: map[string]string{ + // TODO: use render with template at individual call sites for these events. "template": cmdio.Heredoc(` {{if eq .Type "IMPORT_STARTED"}}Import started {{else if eq .Type "UPLOAD_COMPLETE"}}Uploaded {{.SourcePath}} -> {{.TargetPath}} @@ -41,7 +41,7 @@ var importDirCmd = &cobra.Command{ LocalPath: sourcePath, RemotePath: targetPath, Full: true, - WorkspaceClient: databricks.Must(databricks.NewWorkspaceClient()), + WorkspaceClient: root.WorkspaceClient(ctx), AllowOverwrites: importDirOverwrite, PersistSnapshot: false, diff --git a/cmd/workspace/workspace/import_events.go b/cmd/workspace/workspace/import_events.go index 74aebee220..4110fe07b4 100644 --- a/cmd/workspace/workspace/import_events.go +++ b/cmd/workspace/workspace/import_events.go @@ -7,31 +7,38 @@ import ( "github.com/databricks/cli/libs/sync" ) +// TODO: do not emit target directory in upload complete events. + type fileIOEvent struct { SourcePath string `json:"source_path,omitempty"` TargetPath string `json:"target_path,omitempty"` Type string `json:"type"` } +const ( + EventTypeImportStarted = "IMPORT_STARTED" + EventTypeImportComplete = "IMPORT_COMPLETE" + EventTypeUploadComplete = "UPLOAD_COMPLETE" +) + func newImportStartedEvent(sourcePath, targetPath string) fileIOEvent { return fileIOEvent{ SourcePath: sourcePath, TargetPath: targetPath, - Type: "IMPORT_STARTED", + Type: EventTypeImportStarted, } } func newImportCompleteEvent(sourcePath, targetPath string) fileIOEvent { return fileIOEvent{ - Type: "IMPORT_COMPLETE", + Type: EventTypeImportComplete, } } -func newUploadCompleteEvent(sourcePath, targetPath string) fileIOEvent { +func newUploadCompleteEvent(sourcePath string) fileIOEvent { return fileIOEvent{ SourcePath: sourcePath, - TargetPath: targetPath, - Type: "UPLOAD_COMPLETE", + Type: EventTypeUploadComplete, } } @@ -44,25 +51,17 @@ func renderSyncEvents(ctx context.Context, eventChannel <-chan sync.Event, synce if !ok { return nil } - - // We parse progress events from the sync to track when file uploads - // are complete and emit the corresponding events - if e.String() != "" && e.Type() == sync.EventTypeProgress { - progressEvent := e.(*sync.EventSyncProgress) - if progressEvent.Progress < 1 { - return nil - } - // TODO: test this works with windows paths - // remotePath, err := syncer.RemotePath(progressEvent.Path) - // if err != nil { - // return err - // } - remotePath := "TODO" - err := cmdio.Render(ctx, newUploadCompleteEvent(progressEvent.Path, remotePath)) - if err != nil { - return err - } + if e.String() == "" { + return nil } + switch v := e.(type) { + case *sync.EventSyncProgress: + // TODO: only emit this event if the the sync event has progress 1.o0 + // File upload has been completed. This renders the event for that + // on the console + return cmdio.Render(ctx, newUploadCompleteEvent(v.Path)) + } + } } } diff --git a/libs/sync/event.go b/libs/sync/event.go index dcba223275..8e5c0efa2e 100644 --- a/libs/sync/event.go +++ b/libs/sync/event.go @@ -24,7 +24,6 @@ const ( type Event interface { fmt.Stringer - Type() EventType } type EventBase struct { @@ -74,10 +73,6 @@ func (e *EventStart) String() string { return fmt.Sprintf("Action: %s", e.EventChanges.String()) } -func (e *EventStart) Type() EventType { - return EventTypeStart -} - func newEventStart(seq int, put []string, delete []string) Event { return &EventStart{ EventBase: newEventBase(seq, EventTypeStart), @@ -111,10 +106,6 @@ func (e *EventSyncProgress) String() string { } } -func (e *EventSyncProgress) Type() EventType { - return EventTypeProgress -} - func newEventProgress(seq int, action EventAction, path string, progress float32) Event { return &EventSyncProgress{ EventBase: newEventBase(seq, EventTypeProgress), @@ -142,10 +133,6 @@ func (e *EventSyncComplete) String() string { return "Complete" } -func (e *EventSyncComplete) Type() EventType { - return EventTypeComplete -} - func newEventComplete(seq int, put []string, delete []string) Event { return &EventSyncComplete{ EventBase: newEventBase(seq, EventTypeComplete), From bb175469f0cc7167ae39786f2e40ffdbd83b38ca Mon Sep 17 00:00:00 2001 From: Shreyas Goenka Date: Tue, 6 Jun 2023 00:25:12 +0200 Subject: [PATCH 16/26] - --- libs/filer/dbfs_client.go | 4 ---- 1 file changed, 4 deletions(-) diff --git a/libs/filer/dbfs_client.go b/libs/filer/dbfs_client.go index 8229e97b11..c86a80b1e1 100644 --- a/libs/filer/dbfs_client.go +++ b/libs/filer/dbfs_client.go @@ -222,10 +222,6 @@ func (w *DbfsClient) ReadDir(ctx context.Context, name string) ([]fs.DirEntry, e return nil, err } - if len(res.Files) == 1 && !res.Files[0].IsDir && res.Files[0].Path == absPath { - return nil, NotADirectory{absPath} - } - info := make([]fs.DirEntry, len(res.Files)) for i, v := range res.Files { info[i] = dbfsDirEntry{dbfsFileInfo: dbfsFileInfo{fi: v}} From ae09abf1f5935706c7cd681c6fcb8021a7043625 Mon Sep 17 00:00:00 2001 From: Shreyas Goenka Date: Tue, 6 Jun 2023 00:26:41 +0200 Subject: [PATCH 17/26] - --- libs/filer/workspace_files_client.go | 5 ----- 1 file changed, 5 deletions(-) diff --git a/libs/filer/workspace_files_client.go b/libs/filer/workspace_files_client.go index 7111d2678c..12e644ccdb 100644 --- a/libs/filer/workspace_files_client.go +++ b/libs/filer/workspace_files_client.go @@ -223,11 +223,6 @@ func (w *WorkspaceFilesClient) ReadDir(ctx context.Context, name string) ([]fs.D Path: absPath, }) - // TODO: add integration test for this - if len(objects) == 1 && objects[0].Path == absPath { - return nil, NotADirectory{absPath} - } - if err != nil { // If we got an API error we deal with it below. var aerr *apierr.APIError From 14b63d032688c006ce5df886b795f0e21ffef302 Mon Sep 17 00:00:00 2001 From: Shreyas Goenka Date: Tue, 6 Jun 2023 00:27:02 +0200 Subject: [PATCH 18/26] - --- libs/filer/workspace_files_client.go | 1 - 1 file changed, 1 deletion(-) diff --git a/libs/filer/workspace_files_client.go b/libs/filer/workspace_files_client.go index 12e644ccdb..967f9a1de5 100644 --- a/libs/filer/workspace_files_client.go +++ b/libs/filer/workspace_files_client.go @@ -222,7 +222,6 @@ func (w *WorkspaceFilesClient) ReadDir(ctx context.Context, name string) ([]fs.D objects, err := w.workspaceClient.Workspace.ListAll(ctx, workspace.ListWorkspaceRequest{ Path: absPath, }) - if err != nil { // If we got an API error we deal with it below. var aerr *apierr.APIError From 5fdd8a200cc40f369b07bad471b7907432d86885 Mon Sep 17 00:00:00 2001 From: Shreyas Goenka Date: Tue, 6 Jun 2023 00:27:42 +0200 Subject: [PATCH 19/26] - --- libs/cmdio/render.go | 4 ---- 1 file changed, 4 deletions(-) diff --git a/libs/cmdio/render.go b/libs/cmdio/render.go index c5abe2cbe8..8aff2b8d2c 100644 --- a/libs/cmdio/render.go +++ b/libs/cmdio/render.go @@ -49,10 +49,6 @@ func renderJson(w io.Writer, v any) error { return err } _, err = w.Write(pretty) - if err != nil { - return err - } - _, err = w.Write([]byte("\n")) return err } From 96290320b2ef7fbbee6cda292ed7ba44d5e53f65 Mon Sep 17 00:00:00 2001 From: Shreyas Goenka Date: Tue, 6 Jun 2023 00:32:51 +0200 Subject: [PATCH 20/26] - --- cmd/workspace/workspace/import_dir.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/cmd/workspace/workspace/import_dir.go b/cmd/workspace/workspace/import_dir.go index 5445cc3934..052cbf2427 100644 --- a/cmd/workspace/workspace/import_dir.go +++ b/cmd/workspace/workspace/import_dir.go @@ -8,7 +8,8 @@ import ( "golang.org/x/sync/errgroup" ) -// TODO: add some comments here +// TODO: check whether we need mutex for any events been emitted since they are accessing +// state var importDirCmd = &cobra.Command{ Use: "import-dir SOURCE_PATH TARGET_PATH", Short: `Recursively imports a directory from local to the Databricks workspace.`, From e9a309e85dd7ce60291f2aa70c9cf7f6bf9c0a15 Mon Sep 17 00:00:00 2001 From: Shreyas Goenka Date: Tue, 6 Jun 2023 00:34:09 +0200 Subject: [PATCH 21/26] removes not a directory error --- libs/filer/filer.go | 14 -------------- 1 file changed, 14 deletions(-) diff --git a/libs/filer/filer.go b/libs/filer/filer.go index 1525aba3a0..c63a89f6df 100644 --- a/libs/filer/filer.go +++ b/libs/filer/filer.go @@ -51,20 +51,6 @@ func (err NoSuchDirectoryError) Is(other error) bool { return other == fs.ErrNotExist } -var ErrNotADirectory = errors.New("not a directory") - -type NotADirectory struct { - path string -} - -func (err NotADirectory) Error() string { - return fmt.Sprintf("%s is not a directory", err.path) -} - -func (err NotADirectory) Is(other error) bool { - return other == ErrNotADirectory -} - // Filer is used to access files in a workspace. // It has implementations for accessing files in WSFS and in DBFS. type Filer interface { From 74cb87f1b9403cacfb7c7d6dbdf1262d6db23a3a Mon Sep 17 00:00:00 2001 From: Shreyas Goenka Date: Tue, 6 Jun 2023 00:35:50 +0200 Subject: [PATCH 22/26] - --- internal/filer_test.go | 52 ++++++++++++++++++++++++-- internal/helpers.go | 85 +++++++++++++++++++++++++----------------- 2 files changed, 100 insertions(+), 37 deletions(-) diff --git a/internal/filer_test.go b/internal/filer_test.go index 02a8333985..81c3e4aea8 100644 --- a/internal/filer_test.go +++ b/internal/filer_test.go @@ -15,6 +15,7 @@ import ( "github.com/databricks/databricks-sdk-go" "github.com/databricks/databricks-sdk-go/apierr" "github.com/databricks/databricks-sdk-go/service/files" + "github.com/databricks/databricks-sdk-go/service/workspace" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -159,9 +160,54 @@ func runFilerReadDirTest(t *testing.T, ctx context.Context, f filer.Filer) { assert.Equal(t, "c", entries[0].Name()) assert.True(t, entries[0].IsDir()) - // TODO: split into a separate PR + // Expect an error trying to call ReadDir on a file _, err = f.ReadDir(ctx, "/hello.txt") - assert.ErrorIs(t, err, filer.ErrNotADirectory) + assert.ErrorIs(t, err, fs.ErrInvalid) + + // Expect 0 entries for an empty directory + err = f.Mkdir(ctx, "empty-dir") + require.NoError(t, err) + entries, err = f.ReadDir(ctx, "empty-dir") + assert.NoError(t, err) + assert.Len(t, entries, 0) + + // Expect one entry for a directory with a file in it + err = f.Write(ctx, "dir-with-one-file/my-file.txt", strings.NewReader("abc"), filer.CreateParentDirectories) + require.NoError(t, err) + entries, err = f.ReadDir(ctx, "dir-with-one-file") + assert.NoError(t, err) + assert.Len(t, entries, 1) + assert.Equal(t, entries[0].Name(), "my-file.txt") + assert.False(t, entries[0].IsDir()) +} + +func temporaryWorkspaceDir(t *testing.T, w *databricks.WorkspaceClient) string { + ctx := context.Background() + me, err := w.CurrentUser.Me(ctx) + require.NoError(t, err) + + path := fmt.Sprintf("/Users/%s/%s", me.UserName, RandomName("integration-test-wsfs-")) + + // Ensure directory exists, but doesn't exist YET! + // Otherwise we could inadvertently remove a directory that already exists on cleanup. + t.Logf("mkdir %s", path) + err = w.Workspace.MkdirsByPath(ctx, path) + require.NoError(t, err) + + // Remove test directory on test completion. + t.Cleanup(func() { + t.Logf("rm -rf %s", path) + err := w.Workspace.Delete(ctx, workspace.Delete{ + Path: path, + Recursive: true, + }) + if err == nil || apierr.IsMissing(err) { + return + } + t.Logf("unable to remove temporary workspace directory %s: %#v", path, err) + }) + + return path } func setupWorkspaceFilesTest(t *testing.T) (context.Context, filer.Filer) { @@ -195,7 +241,7 @@ func TestAccFilerWorkspaceFilesReadDir(t *testing.T) { func temporaryDbfsDir(t *testing.T, w *databricks.WorkspaceClient) string { ctx := context.Background() - path := fmt.Sprintf("/tmp/%s", RandomName("integration-test-filer-dbfs-")) + path := fmt.Sprintf("/tmp/%s", RandomName("integration-test-dbfs-")) // This call fails if the path already exists. t.Logf("mkdir dbfs:%s", path) diff --git a/internal/helpers.go b/internal/helpers.go index f1901f14e1..1f75687199 100644 --- a/internal/helpers.go +++ b/internal/helpers.go @@ -5,18 +5,17 @@ import ( "bytes" "context" "fmt" + "io" "math/rand" "os" "path/filepath" "strings" + "sync" "testing" "time" "github.com/databricks/cli/cmd/root" _ "github.com/databricks/cli/cmd/version" - "github.com/databricks/databricks-sdk-go" - "github.com/databricks/databricks-sdk-go/apierr" - "github.com/databricks/databricks-sdk-go/service/workspace" "github.com/stretchr/testify/require" _ "github.com/databricks/cli/cmd/workspace" @@ -57,18 +56,54 @@ type cobraTestRunner struct { stdout bytes.Buffer stderr bytes.Buffer + // Line-by-line output. + // Background goroutines populate these channels by reading from stdout/stderr pipes. + stdoutLines <-chan string + stderrLines <-chan string + errch <-chan error } +func consumeLines(ctx context.Context, wg *sync.WaitGroup, r io.Reader) <-chan string { + ch := make(chan string, 1000) + wg.Add(1) + go func() { + defer close(ch) + defer wg.Done() + scanner := bufio.NewScanner(r) + for scanner.Scan() { + select { + case <-ctx.Done(): + return + case ch <- scanner.Text(): + } + } + }() + return ch +} + func (t *cobraTestRunner) RunBackground() { + var stdoutR, stderrR io.Reader + var stdoutW, stderrW io.WriteCloser + stdoutR, stdoutW = io.Pipe() + stderrR, stderrW = io.Pipe() root := root.RootCmd - root.SetOut(&t.stdout) - root.SetErr(&t.stderr) + root.SetOut(stdoutW) + root.SetErr(stderrW) root.SetArgs(t.args) errch := make(chan error) ctx, cancel := context.WithCancel(context.Background()) + // Tee stdout/stderr to buffers. + stdoutR = io.TeeReader(stdoutR, &t.stdout) + stderrR = io.TeeReader(stderrR, &t.stderr) + + // Consume stdout/stderr line-by-line. + var wg sync.WaitGroup + t.stdoutLines = consumeLines(ctx, &wg, stdoutR) + t.stderrLines = consumeLines(ctx, &wg, stderrR) + // Run command in background. go func() { cmd, err := root.ExecuteContextC(ctx) @@ -76,6 +111,14 @@ func (t *cobraTestRunner) RunBackground() { t.Logf("Error running command: %s", err) } + // Close pipes to signal EOF. + stdoutW.Close() + stderrW.Close() + + // Wait for the [consumeLines] routines to finish now that + // the pipes they're reading from have closed. + wg.Wait() + if t.stdout.Len() > 0 { // Make a copy of the buffer such that it remains "unread". scanner := bufio.NewScanner(bytes.NewBuffer(t.stdout.Bytes())) @@ -130,6 +173,9 @@ func (c *cobraTestRunner) Eventually(condition func() bool, waitFor time.Duratio ticker := time.NewTicker(tick) defer ticker.Stop() + // Kick off condition check immediately. + go func() { ch <- condition() }() + for tick := ticker.C; ; { select { case err := <-c.errch: @@ -179,32 +225,3 @@ func writeFile(t *testing.T, name string, body string) string { f.Close() return f.Name() } - -func temporaryWorkspaceDir(t *testing.T, w *databricks.WorkspaceClient) string { - ctx := context.Background() - me, err := w.CurrentUser.Me(ctx) - require.NoError(t, err) - - path := fmt.Sprintf("/Users/%s/%s", me.UserName, RandomName("integration-test-wsfs-")) - - // Ensure directory exists, but doesn't exist YET! - // Otherwise we could inadvertently remove a directory that already exists on cleanup. - t.Logf("mkdir %s", path) - err = w.Workspace.MkdirsByPath(ctx, path) - require.NoError(t, err) - - // Remove test directory on test completion. - t.Cleanup(func() { - t.Logf("rm -rf %s", path) - err := w.Workspace.Delete(ctx, workspace.Delete{ - Path: path, - Recursive: true, - }) - if err == nil || apierr.IsMissing(err) { - return - } - t.Logf("unable to remove temporary workspace directory %s: %#v", path, err) - }) - - return path -} From 58acf64cf1464a80fe565a703703d4e39935a54e Mon Sep 17 00:00:00 2001 From: Shreyas Goenka Date: Tue, 6 Jun 2023 01:01:42 +0200 Subject: [PATCH 23/26] some cleanup --- cmd/workspace/workspace/import_dir.go | 19 ++++++------------- cmd/workspace/workspace/import_events.go | 2 +- libs/filer/filer.go | 1 - 3 files changed, 7 insertions(+), 15 deletions(-) diff --git a/cmd/workspace/workspace/import_dir.go b/cmd/workspace/workspace/import_dir.go index 052cbf2427..cee1d8ec8e 100644 --- a/cmd/workspace/workspace/import_dir.go +++ b/cmd/workspace/workspace/import_dir.go @@ -10,25 +10,18 @@ import ( // TODO: check whether we need mutex for any events been emitted since they are accessing // state +// TODO: Error: path must be nested under /Users/shreyas.goenka@databricks.com or /Repos/shreyas.goenka@databricks.com. Should this validation be +// removed? Yes var importDirCmd = &cobra.Command{ Use: "import-dir SOURCE_PATH TARGET_PATH", - Short: `Recursively imports a directory from local to the Databricks workspace.`, + Short: `Import directory to a Databricks workspace.`, Long: ` - Imports directory to the workspace. + Recursively imports a directory from the local filesystem to a Databricks workspace. This command respects your git ignore configuration. Notebooks with extensions .scala, .py, .sql, .r, .R, .ipynb are stripped of their extensions. `, - Annotations: map[string]string{ - // TODO: use render with template at individual call sites for these events. - "template": cmdio.Heredoc(` - {{if eq .Type "IMPORT_STARTED"}}Import started - {{else if eq .Type "UPLOAD_COMPLETE"}}Uploaded {{.SourcePath}} -> {{.TargetPath}} - {{else if eq .Type "IMPORT_COMPLETE"}}Import completed - {{end}} - `), - }, PreRunE: root.MustWorkspaceClient, Args: cobra.ExactArgs(2), RunE: func(cmd *cobra.Command, args []string) (err error) { @@ -62,7 +55,7 @@ var importDirCmd = &cobra.Command{ }) // Start Uploading local files - cmdio.Render(ctx, newImportStartedEvent(sourcePath, targetPath)) + cmdio.RenderWithTemplate(ctx, newImportStartedEvent(sourcePath, targetPath), `Starting import {{.SourcePath}} -> {{TargetPath}}`) err = s.RunOnce(ctx) if err != nil { return err @@ -76,7 +69,7 @@ var importDirCmd = &cobra.Command{ } // Render import completetion event - cmdio.Render(ctx, newImportCompleteEvent(sourcePath, targetPath)) + cmdio.RenderWithTemplate(ctx, newImportCompleteEvent(sourcePath, targetPath), `Completed import. Files available at {{.TargetPath}}`) return nil }, } diff --git a/cmd/workspace/workspace/import_events.go b/cmd/workspace/workspace/import_events.go index 4110fe07b4..faacb83347 100644 --- a/cmd/workspace/workspace/import_events.go +++ b/cmd/workspace/workspace/import_events.go @@ -59,7 +59,7 @@ func renderSyncEvents(ctx context.Context, eventChannel <-chan sync.Event, synce // TODO: only emit this event if the the sync event has progress 1.o0 // File upload has been completed. This renders the event for that // on the console - return cmdio.Render(ctx, newUploadCompleteEvent(v.Path)) + return cmdio.RenderWithTemplate(ctx, newUploadCompleteEvent(v.Path), `Uploaded {{.SourcePath}}`) } } diff --git a/libs/filer/filer.go b/libs/filer/filer.go index 3e3e8fc416..58273fff36 100644 --- a/libs/filer/filer.go +++ b/libs/filer/filer.go @@ -2,7 +2,6 @@ package filer import ( "context" - "errors" "fmt" "io" "io/fs" From 8166aed6177e4975416deb345231999d448e3a66 Mon Sep 17 00:00:00 2001 From: Shreyas Goenka Date: Tue, 6 Jun 2023 02:00:30 +0200 Subject: [PATCH 24/26] revert changes to reposfile --- libs/sync/repofiles/repofiles.go | 88 +++++++++++++++++---------- libs/sync/repofiles/repofiles_test.go | 7 +-- 2 files changed, 57 insertions(+), 38 deletions(-) diff --git a/libs/sync/repofiles/repofiles.go b/libs/sync/repofiles/repofiles.go index 6ea9f6bd75..cfbf762fd0 100644 --- a/libs/sync/repofiles/repofiles.go +++ b/libs/sync/repofiles/repofiles.go @@ -1,18 +1,19 @@ package repofiles import ( - "bytes" "context" "errors" "fmt" + "net/http" + "net/url" "os" "path" "path/filepath" "strings" - "github.com/databricks/cli/libs/filer" "github.com/databricks/databricks-sdk-go" "github.com/databricks/databricks-sdk-go/apierr" + "github.com/databricks/databricks-sdk-go/client" "github.com/databricks/databricks-sdk-go/service/workspace" ) @@ -21,32 +22,21 @@ type RepoFileOptions struct { } // RepoFiles wraps reading and writing into a remote repo with safeguards to prevent -// accidental deletion of repos and more robust methods to overwrite workspac e files +// accidental deletion of repos and more robust methods to overwrite workspace files type RepoFiles struct { *RepoFileOptions repoRoot string localRoot string workspaceClient *databricks.WorkspaceClient - f filer.Filer } -func Create(repoRoot, localRoot string, w *databricks.WorkspaceClient, opts *RepoFileOptions) (*RepoFiles, error) { - // override default timeout to support uploading larger files - w.Config.HTTPTimeoutSeconds = 600 - - // create filer to interact with WSFS - f, err := filer.NewWorkspaceFilesClient(w, repoRoot) - if err != nil { - return nil, err - } +func Create(repoRoot, localRoot string, workspaceClient *databricks.WorkspaceClient) *RepoFiles { return &RepoFiles{ repoRoot: repoRoot, localRoot: localRoot, - workspaceClient: w, - RepoFileOptions: opts, - f: f, - }, nil + workspaceClient: workspaceClient, + } } func (r *RepoFiles) remotePath(relativePath string) (string, error) { @@ -68,25 +58,36 @@ func (r *RepoFiles) readLocal(relativePath string) ([]byte, error) { } func (r *RepoFiles) writeRemote(ctx context.Context, relativePath string, content []byte) error { - if !r.OverwriteIfExists { - return r.f.Write(ctx, relativePath, bytes.NewReader(content), filer.CreateParentDirectories) + apiClientConfig := r.workspaceClient.Config + apiClientConfig.HTTPTimeoutSeconds = 600 + apiClient, err := client.New(apiClientConfig) + if err != nil { + return err } - - err := r.f.Write(ctx, relativePath, bytes.NewReader(content), filer.CreateParentDirectories, filer.OverwriteIfExists) - - // TODO(pietern): Use the new FS interface to avoid needing to make a recursive - // delete call here. This call is dangerous + remotePath, err := r.remotePath(relativePath) + if err != nil { + return err + } + escapedPath := url.PathEscape(strings.TrimLeft(remotePath, "/")) + apiPath := fmt.Sprintf("/api/2.0/workspace-files/import-file/%s?overwrite=%t", escapedPath, r.OverwriteIfExists) + + err = apiClient.Do(ctx, http.MethodPost, apiPath, content, nil) + + // Handling some edge cases when an upload might fail + // + // We cannot do more precise error scoping here because the API does not + // provide descriptive errors yet + // + // TODO: narrow down the error condition scope of this "if" block to only + // trigger for the specific edge cases instead of all errors once the API + // implements them if err != nil { // Delete any artifact files incase non overwriteable by the current file // type and thus are failing the PUT request. // files, folders and notebooks might not have been cleaned up and they // can't overwrite each other. If a folder `foo` exists, then attempts to // PUT a file `foo` will fail - remotePath, err := r.remotePath(relativePath) - if err != nil { - return err - } - err = r.workspaceClient.Workspace.Delete(ctx, + err := r.workspaceClient.Workspace.Delete(ctx, workspace.Delete{ Path: remotePath, Recursive: true, @@ -101,15 +102,33 @@ func (r *RepoFiles) writeRemote(ctx context.Context, relativePath string, conten return err } - // Attempt to write the file again, this time without the CreateParentDirectories and - // OverwriteIfExists flags - return r.f.Write(ctx, relativePath, bytes.NewReader(content)) + // Mkdir parent dirs incase they are what's causing the PUT request to + // fail + err = r.workspaceClient.Workspace.MkdirsByPath(ctx, path.Dir(remotePath)) + if err != nil { + return fmt.Errorf("could not mkdir to put file: %s", err) + } + + // Attempt to upload file again after cleanup/setup + err = apiClient.Do(ctx, http.MethodPost, apiPath, content, nil) + if err != nil { + return err + } } return nil } func (r *RepoFiles) deleteRemote(ctx context.Context, relativePath string) error { - return r.f.Delete(ctx, relativePath) + remotePath, err := r.remotePath(relativePath) + if err != nil { + return err + } + return r.workspaceClient.Workspace.Delete(ctx, + workspace.Delete{ + Path: remotePath, + Recursive: false, + }, + ) } // The API calls for a python script foo.py would be @@ -141,3 +160,6 @@ func (r *RepoFiles) DeleteFile(ctx context.Context, relativePath string) error { } return nil } + +// TODO: write integration tests for all non happy path cases that rely on +// specific behaviour of the workspace apis diff --git a/libs/sync/repofiles/repofiles_test.go b/libs/sync/repofiles/repofiles_test.go index dc9abbcddf..2a881d90d0 100644 --- a/libs/sync/repofiles/repofiles_test.go +++ b/libs/sync/repofiles/repofiles_test.go @@ -6,13 +6,11 @@ import ( "testing" "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" ) func TestRepoFilesRemotePath(t *testing.T) { repoRoot := "/Repos/doraemon/bar" - repoFiles, err := Create(repoRoot, "/doraemon/foo/bar", nil, nil) - require.NoError(t, err) + repoFiles := Create(repoRoot, "/doraemon/foo/bar", nil) remotePath, err := repoFiles.remotePath("a/b/c") assert.NoError(t, err) @@ -83,8 +81,7 @@ func TestRepoReadLocal(t *testing.T) { err := os.WriteFile(helloPath, []byte("my name is doraemon :P"), os.ModePerm) assert.NoError(t, err) - repoFiles, err := Create("/Repos/doraemon/bar", tempDir, nil, nil) - require.NoError(t, err) + repoFiles := Create("/Repos/doraemon/bar", tempDir, nil) bytes, err := repoFiles.readLocal("./a/../hello.txt") assert.NoError(t, err) assert.Equal(t, "my name is doraemon :P", string(bytes)) From f0f75ff98ff45934d704a12925d03a2a5b791ffe Mon Sep 17 00:00:00 2001 From: Shreyas Goenka Date: Tue, 6 Jun 2023 02:03:04 +0200 Subject: [PATCH 25/26] - --- internal/repofiles_test.go | 155 ------------------------------------- 1 file changed, 155 deletions(-) delete mode 100644 internal/repofiles_test.go diff --git a/internal/repofiles_test.go b/internal/repofiles_test.go deleted file mode 100644 index 07c488e043..0000000000 --- a/internal/repofiles_test.go +++ /dev/null @@ -1,155 +0,0 @@ -package internal - -import ( - "context" - "os" - "path" - "path/filepath" - "strings" - "testing" - - "github.com/databricks/cli/libs/filer" - "github.com/databricks/cli/libs/sync/repofiles" - "github.com/databricks/databricks-sdk-go" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" -) - -// TODO: skip if not cloud env, these are integration tests -// TODO: split into a separate PR - -func TestRepoFilesPutFile(t *testing.T) { - w, err := databricks.NewWorkspaceClient() - require.NoError(t, err) - ctx := context.Background() - - // initialize client - wsfsTmpDir := temporaryWorkspaceDir(t, w) - localTmpDir := t.TempDir() - r, err := repofiles.Create(wsfsTmpDir, localTmpDir, w, &repofiles.RepoFileOptions{ - OverwriteIfExists: true, - }) - require.NoError(t, err) - f, err := filer.NewWorkspaceFilesClient(w, wsfsTmpDir) - require.NoError(t, err) - - // create local file - err = os.WriteFile(filepath.Join(localTmpDir, "foo.txt"), []byte(`hello, world`), os.ModePerm) - require.NoError(t, err) - err = r.PutFile(ctx, "foo.txt") - require.NoError(t, err) - - require.NoError(t, f.Mkdir(ctx, "bar")) - - entries, err := f.ReadDir(ctx, "bar") - require.NoError(t, err) - require.Len(t, entries, 1) - - assertFileContains(t, ctx, f, "foo.txt", "hello, world") -} - -func TestRepoFilesFileOverwritesNotebook(t *testing.T) { - w, err := databricks.NewWorkspaceClient() - require.NoError(t, err) - ctx := context.Background() - - // initialize client - wsfsTmpDir := temporaryWorkspaceDir(t, w) - localTmpDir := t.TempDir() - r, err := repofiles.Create(wsfsTmpDir, localTmpDir, w, &repofiles.RepoFileOptions{ - OverwriteIfExists: true, - }) - require.NoError(t, err) - f, err := filer.NewWorkspaceFilesClient(w, wsfsTmpDir) - require.NoError(t, err) - - // create local notebook - err = os.WriteFile(filepath.Join(localTmpDir, "foo.py"), []byte("#Databricks notebook source\nprint(1)"), os.ModePerm) - require.NoError(t, err) - - // upload notebook - err = r.PutFile(ctx, "foo.py") - require.NoError(t, err) - assertNotebookExists(t, ctx, w, path.Join(wsfsTmpDir, "foo")) - - // upload file, and assert that it overwrites the notebook - err = os.WriteFile(filepath.Join(localTmpDir, "foo"), []byte("I am going to overwrite the notebook"), os.ModePerm) - require.NoError(t, err) - err = r.PutFile(ctx, "foo") - require.NoError(t, err) - assertFileContains(t, ctx, f, "foo", "I am going to overwrite the notebook") -} - -func TestRepoFilesFileOverwritesEmptyDirectoryTree(t *testing.T) { - w, err := databricks.NewWorkspaceClient() - require.NoError(t, err) - ctx := context.Background() - - // initialize client - wsfsTmpDir := temporaryWorkspaceDir(t, w) - localTmpDir := t.TempDir() - r, err := repofiles.Create(wsfsTmpDir, localTmpDir, w, &repofiles.RepoFileOptions{ - OverwriteIfExists: true, - }) - require.NoError(t, err) - f, err := filer.NewWorkspaceFilesClient(w, wsfsTmpDir) - require.NoError(t, err) - - // create local file - err = os.WriteFile(filepath.Join(localTmpDir, "foo"), []byte(`hello, world`), os.ModePerm) - require.NoError(t, err) - - // construct a directory tree without files in the workspace - err = f.Mkdir(ctx, "foo/a/b/c") - require.NoError(t, err) - err = f.Mkdir(ctx, "foo/a/b/d/e") - require.NoError(t, err) - err = f.Mkdir(ctx, "foo/f/g/i") - require.NoError(t, err) - - // assert the directories exist - entries, err := f.ReadDir(ctx, "foo") - require.NoError(t, err) - assert.Len(t, entries, 2) - assert.True(t, entries[0].IsDir()) - assert.True(t, entries[1].IsDir()) - - // upload file, and assert that it overwrites the empty directories - err = r.PutFile(ctx, "foo") - require.NoError(t, err) - assertFileContains(t, ctx, f, "foo", "hello, world") - - // assert the directories do not exist anymore - _, err = f.ReadDir(ctx, "foo") - assert.ErrorIs(t, err, filer.ErrNotADirectory) -} - -func TestRepoFilesFileInDirOverwritesExistingNotebook(t *testing.T) { - w, err := databricks.NewWorkspaceClient() - require.NoError(t, err) - ctx := context.Background() - - // initialize client - wsfsTmpDir := temporaryWorkspaceDir(t, w) - localTmpDir := t.TempDir() - r, err := repofiles.Create(wsfsTmpDir, localTmpDir, w, &repofiles.RepoFileOptions{ - OverwriteIfExists: true, - }) - require.NoError(t, err) - f, err := filer.NewWorkspaceFilesClient(w, wsfsTmpDir) - require.NoError(t, err) - - // create local notebook - err = f.Write(ctx, "foo.py", strings.NewReader("#Databricks notebook source\nprint(1)")) - require.NoError(t, err) - assertNotebookExists(t, ctx, w, path.Join(wsfsTmpDir, "foo")) - - // upload file - err = os.Mkdir(filepath.Join(localTmpDir, "foo"), os.ModePerm) - require.NoError(t, err) - err = os.WriteFile(filepath.Join(localTmpDir, "foo/bar.txt"), []byte("I am going to overwrite the notebook"), os.ModePerm) - require.NoError(t, err) - err = r.PutFile(ctx, "foo/bar.txt") - require.NoError(t, err) - assertFileContains(t, ctx, f, "foo/bar.txt", "I am going to overwrite the notebook") -} From 43aadba1f84eea8376d299d68e6769db7b10f2fa Mon Sep 17 00:00:00 2001 From: Shreyas Goenka Date: Tue, 6 Jun 2023 15:12:56 +0200 Subject: [PATCH 26/26] made events work and moved validation outside sync object --- cmd/sync/sync.go | 6 ++++++ cmd/workspace/workspace/import_dir.go | 8 +++++--- cmd/workspace/workspace/import_events.go | 10 +++++++--- internal/sync_test.go | 10 +++++----- libs/sync/path.go | 6 +++--- libs/sync/repofiles/repofiles.go | 3 ++- libs/sync/repofiles/repofiles_test.go | 6 +++--- libs/sync/sync.go | 11 +---------- 8 files changed, 32 insertions(+), 28 deletions(-) diff --git a/cmd/sync/sync.go b/cmd/sync/sync.go index 510f859b9e..d55429be75 100644 --- a/cmd/sync/sync.go +++ b/cmd/sync/sync.go @@ -99,6 +99,12 @@ var syncCmd = &cobra.Command{ return err } + // Verify that the remote path we're about to synchronize to is valid and allowed. + err = sync.EnsureRemotePathIsUserScoped(ctx, opts.WorkspaceClient, opts.RemotePath) + if err != nil { + return err + } + var outputFunc func(context.Context, <-chan sync.Event, io.Writer) switch output { case flags.OutputText: diff --git a/cmd/workspace/workspace/import_dir.go b/cmd/workspace/workspace/import_dir.go index cee1d8ec8e..1de3b24aa6 100644 --- a/cmd/workspace/workspace/import_dir.go +++ b/cmd/workspace/workspace/import_dir.go @@ -55,7 +55,10 @@ var importDirCmd = &cobra.Command{ }) // Start Uploading local files - cmdio.RenderWithTemplate(ctx, newImportStartedEvent(sourcePath, targetPath), `Starting import {{.SourcePath}} -> {{TargetPath}}`) + err = cmdio.RenderWithTemplate(ctx, newImportStartedEvent(sourcePath, targetPath), "Starting import {{.SourcePath}} -> {{.TargetPath}}\n") + if err != nil { + return err + } err = s.RunOnce(ctx) if err != nil { return err @@ -69,8 +72,7 @@ var importDirCmd = &cobra.Command{ } // Render import completetion event - cmdio.RenderWithTemplate(ctx, newImportCompleteEvent(sourcePath, targetPath), `Completed import. Files available at {{.TargetPath}}`) - return nil + return cmdio.RenderWithTemplate(ctx, newImportCompleteEvent(sourcePath, targetPath), "Completed import. Files available at {{.TargetPath}}\n") }, } diff --git a/cmd/workspace/workspace/import_events.go b/cmd/workspace/workspace/import_events.go index faacb83347..1663882ceb 100644 --- a/cmd/workspace/workspace/import_events.go +++ b/cmd/workspace/workspace/import_events.go @@ -31,7 +31,8 @@ func newImportStartedEvent(sourcePath, targetPath string) fileIOEvent { func newImportCompleteEvent(sourcePath, targetPath string) fileIOEvent { return fileIOEvent{ - Type: EventTypeImportComplete, + Type: EventTypeImportComplete, + TargetPath: targetPath, } } @@ -52,14 +53,17 @@ func renderSyncEvents(ctx context.Context, eventChannel <-chan sync.Event, synce return nil } if e.String() == "" { - return nil + continue } switch v := e.(type) { case *sync.EventSyncProgress: // TODO: only emit this event if the the sync event has progress 1.o0 // File upload has been completed. This renders the event for that // on the console - return cmdio.RenderWithTemplate(ctx, newUploadCompleteEvent(v.Path), `Uploaded {{.SourcePath}}`) + err := cmdio.RenderWithTemplate(ctx, newUploadCompleteEvent(v.Path), "Uploaded {{.SourcePath}}\n") + if err != nil { + return err + } } } diff --git a/internal/sync_test.go b/internal/sync_test.go index 89b318a970..fc2c45ff9e 100644 --- a/internal/sync_test.go +++ b/internal/sync_test.go @@ -459,12 +459,12 @@ func TestAccSyncEnsureRemotePathIsUsableIfRepoDoesntExist(t *testing.T) { // Hypothetical repo path doesn't exist. nonExistingRepoPath := fmt.Sprintf("/Repos/%s/%s", me.UserName, RandomName("doesnt-exist-")) - err = sync.EnsureRemotePathIsUsable(ctx, wsc, nonExistingRepoPath) + err = sync.EnsureRemotePathIsUserScoped(ctx, wsc, nonExistingRepoPath) assert.ErrorContains(t, err, " does not exist; please create it first") // Paths nested under a hypothetical repo path should yield the same error. nestedPath := path.Join(nonExistingRepoPath, "nested/directory") - err = sync.EnsureRemotePathIsUsable(ctx, wsc, nestedPath) + err = sync.EnsureRemotePathIsUserScoped(ctx, wsc, nestedPath) assert.ErrorContains(t, err, " does not exist; please create it first") } @@ -476,12 +476,12 @@ func TestAccSyncEnsureRemotePathIsUsableIfRepoExists(t *testing.T) { _, remoteRepoPath := setupRepo(t, wsc, ctx) // Repo itself is usable. - err := sync.EnsureRemotePathIsUsable(ctx, wsc, remoteRepoPath) + err := sync.EnsureRemotePathIsUserScoped(ctx, wsc, remoteRepoPath) assert.NoError(t, err) // Path nested under repo path is usable. nestedPath := path.Join(remoteRepoPath, "nested/directory") - err = sync.EnsureRemotePathIsUsable(ctx, wsc, nestedPath) + err = sync.EnsureRemotePathIsUserScoped(ctx, wsc, nestedPath) assert.NoError(t, err) // Verify that the directory has been created. @@ -499,7 +499,7 @@ func TestAccSyncEnsureRemotePathIsUsableInWorkspace(t *testing.T) { require.NoError(t, err) remotePath := fmt.Sprintf("/Users/%s/%s", me.UserName, RandomName("ensure-path-exists-test-")) - err = sync.EnsureRemotePathIsUsable(ctx, wsc, remotePath) + err = sync.EnsureRemotePathIsUserScoped(ctx, wsc, remotePath) assert.NoError(t, err) // Clean up directory after test. diff --git a/libs/sync/path.go b/libs/sync/path.go index 7fd1b9a976..6982357c88 100644 --- a/libs/sync/path.go +++ b/libs/sync/path.go @@ -58,9 +58,9 @@ func repoPathForPath(me *iam.User, remotePath string) string { return remotePath } -// EnsureRemotePathIsUsable checks if the specified path is nested under -// expected base paths and if it is a directory or repository. -func EnsureRemotePathIsUsable(ctx context.Context, wsc *databricks.WorkspaceClient, remotePath string) error { +// EnsureRemotePathIsUserScoped checks if the specified path is nested under +// the caller's username and if it is a directory or repository. +func EnsureRemotePathIsUserScoped(ctx context.Context, wsc *databricks.WorkspaceClient, remotePath string) error { me, err := wsc.CurrentUser.Me(ctx) if err != nil { return err diff --git a/libs/sync/repofiles/repofiles.go b/libs/sync/repofiles/repofiles.go index cfbf762fd0..daf7ef20f8 100644 --- a/libs/sync/repofiles/repofiles.go +++ b/libs/sync/repofiles/repofiles.go @@ -31,11 +31,12 @@ type RepoFiles struct { workspaceClient *databricks.WorkspaceClient } -func Create(repoRoot, localRoot string, workspaceClient *databricks.WorkspaceClient) *RepoFiles { +func Create(repoRoot, localRoot string, workspaceClient *databricks.WorkspaceClient, opts *RepoFileOptions) *RepoFiles { return &RepoFiles{ repoRoot: repoRoot, localRoot: localRoot, workspaceClient: workspaceClient, + RepoFileOptions: opts, } } diff --git a/libs/sync/repofiles/repofiles_test.go b/libs/sync/repofiles/repofiles_test.go index 2a881d90d0..688730442a 100644 --- a/libs/sync/repofiles/repofiles_test.go +++ b/libs/sync/repofiles/repofiles_test.go @@ -10,7 +10,7 @@ import ( func TestRepoFilesRemotePath(t *testing.T) { repoRoot := "/Repos/doraemon/bar" - repoFiles := Create(repoRoot, "/doraemon/foo/bar", nil) + repoFiles := Create(repoRoot, "/doraemon/foo/bar", nil, nil) remotePath, err := repoFiles.remotePath("a/b/c") assert.NoError(t, err) @@ -81,8 +81,8 @@ func TestRepoReadLocal(t *testing.T) { err := os.WriteFile(helloPath, []byte("my name is doraemon :P"), os.ModePerm) assert.NoError(t, err) - repoFiles := Create("/Repos/doraemon/bar", tempDir, nil) + repoFiles := Create("/Repos/doraemon/bar", tempDir, nil, nil) bytes, err := repoFiles.readLocal("./a/../hello.txt") assert.NoError(t, err) - assert.Equal(t, "my name is doraemon :P", string(bytes)) + assert.Equal(t, "my name is doraemon :P", string(bytes), nil) } diff --git a/libs/sync/sync.go b/libs/sync/sync.go index 505c9d3f7d..a351322d0a 100644 --- a/libs/sync/sync.go +++ b/libs/sync/sync.go @@ -55,12 +55,6 @@ func New(ctx context.Context, opts SyncOptions) (*Sync, error) { return nil, err } - // Verify that the remote path we're about to synchronize to is valid and allowed. - err = EnsureRemotePathIsUsable(ctx, opts.WorkspaceClient, opts.RemotePath) - if err != nil { - return nil, err - } - // TODO: The host may be late-initialized in certain Azure setups where we // specify the workspace by its resource ID. tracked in: https://databricks.atlassian.net/browse/DECO-194 opts.Host = opts.WorkspaceClient.Config.Host @@ -82,12 +76,9 @@ func New(ctx context.Context, opts SyncOptions) (*Sync, error) { return nil, fmt.Errorf("unable to load sync snapshot: %w", err) } } - repoFiles, err := repofiles.Create(opts.RemotePath, opts.LocalPath, opts.WorkspaceClient, &repofiles.RepoFileOptions{ + repoFiles := repofiles.Create(opts.RemotePath, opts.LocalPath, opts.WorkspaceClient, &repofiles.RepoFileOptions{ OverwriteIfExists: opts.AllowOverwrites, }) - if err != nil { - return nil, err - } return &Sync{ SyncOptions: &opts,