diff --git a/bundle/deploy/files/sync.go b/bundle/deploy/files/sync.go index 77c64e529d..6f792a1bc4 100644 --- a/bundle/deploy/files/sync.go +++ b/bundle/deploy/files/sync.go @@ -21,6 +21,9 @@ func getSync(ctx context.Context, b *bundle.Bundle) (*sync.Sync, error) { SnapshotBasePath: cacheDir, WorkspaceClient: b.WorkspaceClient(), + + PersistSnapshot: true, + AllowOverwrites: true, } return sync.New(ctx, opts) } diff --git a/cmd/bundle/sync.go b/cmd/bundle/sync.go index 19adc2dd6a..c0687daeca 100644 --- a/cmd/bundle/sync.go +++ b/cmd/bundle/sync.go @@ -25,6 +25,9 @@ func syncOptionsFromBundle(cmd *cobra.Command, b *bundle.Bundle) (*sync.SyncOpti SnapshotBasePath: cacheDir, WorkspaceClient: b.WorkspaceClient(), + + PersistSnapshot: true, + AllowOverwrites: true, } return &opts, nil } diff --git a/cmd/sync/sync.go b/cmd/sync/sync.go index d13a85d033..d55429be75 100644 --- a/cmd/sync/sync.go +++ b/cmd/sync/sync.go @@ -56,6 +56,9 @@ func syncOptionsFromArgs(cmd *cobra.Command, args []string) (*sync.SyncOptions, // exist and add it to the `.gitignore` file in the root. SnapshotBasePath: filepath.Join(args[0], ".databricks"), WorkspaceClient: databricks.Must(databricks.NewWorkspaceClient()), + + PersistSnapshot: true, + AllowOverwrites: true, } return &opts, nil } @@ -96,6 +99,12 @@ var syncCmd = &cobra.Command{ return err } + // Verify that the remote path we're about to synchronize to is valid and allowed. + err = sync.EnsureRemotePathIsUserScoped(ctx, opts.WorkspaceClient, opts.RemotePath) + if err != nil { + return err + } + var outputFunc func(context.Context, <-chan sync.Event, io.Writer) switch output { case flags.OutputText: diff --git a/cmd/workspace/workspace/import_dir.go b/cmd/workspace/workspace/import_dir.go new file mode 100644 index 0000000000..1de3b24aa6 --- /dev/null +++ b/cmd/workspace/workspace/import_dir.go @@ -0,0 +1,84 @@ +package workspace + +import ( + "github.com/databricks/cli/cmd/root" + "github.com/databricks/cli/libs/cmdio" + "github.com/databricks/cli/libs/sync" + "github.com/spf13/cobra" + "golang.org/x/sync/errgroup" +) + +// TODO: check whether we need mutex for any events been emitted since they are accessing +// state +// TODO: Error: path must be nested under /Users/shreyas.goenka@databricks.com or /Repos/shreyas.goenka@databricks.com. Should this validation be +// removed? Yes +var importDirCmd = &cobra.Command{ + Use: "import-dir SOURCE_PATH TARGET_PATH", + Short: `Import directory to a Databricks workspace.`, + Long: ` + Recursively imports a directory from the local filesystem to a Databricks workspace. + + This command respects your git ignore configuration. Notebooks with extensions + .scala, .py, .sql, .r, .R, .ipynb are stripped of their extensions. +`, + + PreRunE: root.MustWorkspaceClient, + Args: cobra.ExactArgs(2), + RunE: func(cmd *cobra.Command, args []string) (err error) { + ctx := cmd.Context() + sourcePath := args[0] + targetPath := args[1] + + // Initialize syncer to do a full sync with the correct from source to target. + // This will upload the local files + opts := sync.SyncOptions{ + LocalPath: sourcePath, + RemotePath: targetPath, + Full: true, + WorkspaceClient: root.WorkspaceClient(ctx), + + AllowOverwrites: importDirOverwrite, + PersistSnapshot: false, + } + s, err := sync.New(ctx, opts) + if err != nil { + return err + } + + // Initialize error wait group, and spawn the progress event emitter inside + // the error wait group + group, ctx := errgroup.WithContext(ctx) + eventsChannel := s.Events() + group.Go( + func() error { + return renderSyncEvents(ctx, eventsChannel, s) + }) + + // Start Uploading local files + err = cmdio.RenderWithTemplate(ctx, newImportStartedEvent(sourcePath, targetPath), "Starting import {{.SourcePath}} -> {{.TargetPath}}\n") + if err != nil { + return err + } + err = s.RunOnce(ctx) + if err != nil { + return err + } + // Upload completed, close the syncer + s.Close() + + // Wait for any inflight progress events to be emitted + if err := group.Wait(); err != nil { + return err + } + + // Render import completetion event + return cmdio.RenderWithTemplate(ctx, newImportCompleteEvent(sourcePath, targetPath), "Completed import. Files available at {{.TargetPath}}\n") + }, +} + +var importDirOverwrite bool + +func init() { + importDirCmd.Flags().BoolVar(&importDirOverwrite, "overwrite", false, "Overwrite if file already exists in the workspace") + Cmd.AddCommand(importDirCmd) +} diff --git a/cmd/workspace/workspace/import_events.go b/cmd/workspace/workspace/import_events.go new file mode 100644 index 0000000000..1663882ceb --- /dev/null +++ b/cmd/workspace/workspace/import_events.go @@ -0,0 +1,71 @@ +package workspace + +import ( + "context" + + "github.com/databricks/cli/libs/cmdio" + "github.com/databricks/cli/libs/sync" +) + +// TODO: do not emit target directory in upload complete events. + +type fileIOEvent struct { + SourcePath string `json:"source_path,omitempty"` + TargetPath string `json:"target_path,omitempty"` + Type string `json:"type"` +} + +const ( + EventTypeImportStarted = "IMPORT_STARTED" + EventTypeImportComplete = "IMPORT_COMPLETE" + EventTypeUploadComplete = "UPLOAD_COMPLETE" +) + +func newImportStartedEvent(sourcePath, targetPath string) fileIOEvent { + return fileIOEvent{ + SourcePath: sourcePath, + TargetPath: targetPath, + Type: EventTypeImportStarted, + } +} + +func newImportCompleteEvent(sourcePath, targetPath string) fileIOEvent { + return fileIOEvent{ + Type: EventTypeImportComplete, + TargetPath: targetPath, + } +} + +func newUploadCompleteEvent(sourcePath string) fileIOEvent { + return fileIOEvent{ + SourcePath: sourcePath, + Type: EventTypeUploadComplete, + } +} + +func renderSyncEvents(ctx context.Context, eventChannel <-chan sync.Event, syncer *sync.Sync) error { + for { + select { + case <-ctx.Done(): + return nil + case e, ok := <-eventChannel: + if !ok { + return nil + } + if e.String() == "" { + continue + } + switch v := e.(type) { + case *sync.EventSyncProgress: + // TODO: only emit this event if the the sync event has progress 1.o0 + // File upload has been completed. This renders the event for that + // on the console + err := cmdio.RenderWithTemplate(ctx, newUploadCompleteEvent(v.Path), "Uploaded {{.SourcePath}}\n") + if err != nil { + return err + } + } + + } + } +} diff --git a/internal/foo.txt b/internal/foo.txt new file mode 100755 index 0000000000..8c01d89ae0 --- /dev/null +++ b/internal/foo.txt @@ -0,0 +1 @@ +hello, world \ No newline at end of file diff --git a/internal/import_dir_test.go b/internal/import_dir_test.go new file mode 100644 index 0000000000..2c2966b794 --- /dev/null +++ b/internal/import_dir_test.go @@ -0,0 +1,80 @@ +package internal + +import ( + "bytes" + "context" + "io" + "path" + "regexp" + "testing" + + "github.com/databricks/cli/libs/filer" + "github.com/databricks/databricks-sdk-go" + "github.com/databricks/databricks-sdk-go/service/workspace" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestWorkspaceImportDir(t *testing.T) { + t.Log(GetEnvOrSkipTest(t, "CLOUD_ENV")) + + ctx := context.Background() + w := databricks.Must(databricks.NewWorkspaceClient()) + tmpdir := temporaryWorkspaceDir(t, w) + + // run import-dir command + RequireSuccessfulRun(t, "workspace", "import-dir", "./testdata/import_dir/default", tmpdir) + + // assert files are uploaded + f, err := filer.NewWorkspaceFilesClient(w, tmpdir) + require.NoError(t, err) + assertFileContains(t, ctx, f, "foo.txt", "hello, world") + assertFileContains(t, ctx, f, ".gitignore", ".databricks") + assertFileContains(t, ctx, f, "bar/apple.py", "print(1)") + assertNotebookExists(t, ctx, w, path.Join(tmpdir, "bar/mango")) +} + +func TestWorkspaceImportDirOverwriteFlag(t *testing.T) { + // t.Log(GetEnvOrSkipTest(t, "CLOUD_ENV")) + + // ctx := context.Background() + w := databricks.Must(databricks.NewWorkspaceClient()) + tmpdir := temporaryWorkspaceDir(t, w) + + // run import-dir command + RequireSuccessfulRun(t, "workspace", "import-dir", "./testdata/import_dir/override/a", tmpdir) + + // // assert files are uploaded + // f, err := filer.NewWorkspaceFilesClient(w, tmpdir) + // require.NoError(t, err) + // assertFileContains(t, ctx, f, "bar.txt", "from directory A") + + // Assert another run fails with path already exists error from the server + _, _, err := RequireErrorRun(t, "workspace", "import-dir", "./testdata/import_dir/override/b", tmpdir) + assert.Regexp(t, regexp.MustCompile("Path (.*) already exists."), err.Error()) + + // // Succeeds with the overwrite flag + // RequireSuccessfulRun(t, "workspace", "import-dir", "./testdata/import_dir/override/b", tmpdir, "--overwrite") + // require.NoError(t, err) + // assertFileContains(t, ctx, f, "bar.txt", "from directory B") +} + +func assertFileContains(t *testing.T, ctx context.Context, f filer.Filer, name, contents string) { + r, err := f.Read(ctx, name) + require.NoError(t, err) + + var b bytes.Buffer + _, err = io.Copy(&b, r) + require.NoError(t, err) + + assert.Contains(t, b.String(), contents) +} + +func assertNotebookExists(t *testing.T, ctx context.Context, w *databricks.WorkspaceClient, path string) { + info, err := w.Workspace.ListAll(ctx, workspace.ListWorkspaceRequest{ + Path: path, + }) + require.NoError(t, err) + assert.Len(t, info, 1) + assert.Equal(t, info[0].ObjectType, workspace.ObjectTypeNotebook) +} diff --git a/internal/sync_test.go b/internal/sync_test.go index 89b318a970..fc2c45ff9e 100644 --- a/internal/sync_test.go +++ b/internal/sync_test.go @@ -459,12 +459,12 @@ func TestAccSyncEnsureRemotePathIsUsableIfRepoDoesntExist(t *testing.T) { // Hypothetical repo path doesn't exist. nonExistingRepoPath := fmt.Sprintf("/Repos/%s/%s", me.UserName, RandomName("doesnt-exist-")) - err = sync.EnsureRemotePathIsUsable(ctx, wsc, nonExistingRepoPath) + err = sync.EnsureRemotePathIsUserScoped(ctx, wsc, nonExistingRepoPath) assert.ErrorContains(t, err, " does not exist; please create it first") // Paths nested under a hypothetical repo path should yield the same error. nestedPath := path.Join(nonExistingRepoPath, "nested/directory") - err = sync.EnsureRemotePathIsUsable(ctx, wsc, nestedPath) + err = sync.EnsureRemotePathIsUserScoped(ctx, wsc, nestedPath) assert.ErrorContains(t, err, " does not exist; please create it first") } @@ -476,12 +476,12 @@ func TestAccSyncEnsureRemotePathIsUsableIfRepoExists(t *testing.T) { _, remoteRepoPath := setupRepo(t, wsc, ctx) // Repo itself is usable. - err := sync.EnsureRemotePathIsUsable(ctx, wsc, remoteRepoPath) + err := sync.EnsureRemotePathIsUserScoped(ctx, wsc, remoteRepoPath) assert.NoError(t, err) // Path nested under repo path is usable. nestedPath := path.Join(remoteRepoPath, "nested/directory") - err = sync.EnsureRemotePathIsUsable(ctx, wsc, nestedPath) + err = sync.EnsureRemotePathIsUserScoped(ctx, wsc, nestedPath) assert.NoError(t, err) // Verify that the directory has been created. @@ -499,7 +499,7 @@ func TestAccSyncEnsureRemotePathIsUsableInWorkspace(t *testing.T) { require.NoError(t, err) remotePath := fmt.Sprintf("/Users/%s/%s", me.UserName, RandomName("ensure-path-exists-test-")) - err = sync.EnsureRemotePathIsUsable(ctx, wsc, remotePath) + err = sync.EnsureRemotePathIsUserScoped(ctx, wsc, remotePath) assert.NoError(t, err) // Clean up directory after test. diff --git a/internal/testdata/import_dir/default/bar/.gitignore b/internal/testdata/import_dir/default/bar/.gitignore new file mode 100644 index 0000000000..de811f118b --- /dev/null +++ b/internal/testdata/import_dir/default/bar/.gitignore @@ -0,0 +1,2 @@ + +.databricks diff --git a/internal/testdata/import_dir/default/bar/apple.py b/internal/testdata/import_dir/default/bar/apple.py new file mode 100644 index 0000000000..b917a726c9 --- /dev/null +++ b/internal/testdata/import_dir/default/bar/apple.py @@ -0,0 +1 @@ +print(1) diff --git a/internal/testdata/import_dir/default/bar/foo.txt b/internal/testdata/import_dir/default/bar/foo.txt new file mode 100644 index 0000000000..4b5fa63702 --- /dev/null +++ b/internal/testdata/import_dir/default/bar/foo.txt @@ -0,0 +1 @@ +hello, world diff --git a/internal/testdata/import_dir/default/bar/mango.py b/internal/testdata/import_dir/default/bar/mango.py new file mode 100644 index 0000000000..4707166b28 --- /dev/null +++ b/internal/testdata/import_dir/default/bar/mango.py @@ -0,0 +1,2 @@ +#Databricks notebook source +print(2) diff --git a/internal/testdata/import_dir/override/a/.gitignore b/internal/testdata/import_dir/override/a/.gitignore new file mode 100644 index 0000000000..de811f118b --- /dev/null +++ b/internal/testdata/import_dir/override/a/.gitignore @@ -0,0 +1,2 @@ + +.databricks diff --git a/internal/testdata/import_dir/override/a/bar.txt b/internal/testdata/import_dir/override/a/bar.txt new file mode 100644 index 0000000000..8211f1031e --- /dev/null +++ b/internal/testdata/import_dir/override/a/bar.txt @@ -0,0 +1,2 @@ +#Databricks notebook source +print("from dir A") diff --git a/internal/testdata/import_dir/override/b/.gitignore b/internal/testdata/import_dir/override/b/.gitignore new file mode 100644 index 0000000000..de811f118b --- /dev/null +++ b/internal/testdata/import_dir/override/b/.gitignore @@ -0,0 +1,2 @@ + +.databricks diff --git a/internal/testdata/import_dir/override/b/bar.txt b/internal/testdata/import_dir/override/b/bar.txt new file mode 100644 index 0000000000..73e457003c --- /dev/null +++ b/internal/testdata/import_dir/override/b/bar.txt @@ -0,0 +1,2 @@ +#Databricks notebook source +print("from dir B") diff --git a/libs/sync/path.go b/libs/sync/path.go index 7fd1b9a976..6982357c88 100644 --- a/libs/sync/path.go +++ b/libs/sync/path.go @@ -58,9 +58,9 @@ func repoPathForPath(me *iam.User, remotePath string) string { return remotePath } -// EnsureRemotePathIsUsable checks if the specified path is nested under -// expected base paths and if it is a directory or repository. -func EnsureRemotePathIsUsable(ctx context.Context, wsc *databricks.WorkspaceClient, remotePath string) error { +// EnsureRemotePathIsUserScoped checks if the specified path is nested under +// the caller's username and if it is a directory or repository. +func EnsureRemotePathIsUserScoped(ctx context.Context, wsc *databricks.WorkspaceClient, remotePath string) error { me, err := wsc.CurrentUser.Me(ctx) if err != nil { return err diff --git a/libs/sync/repofiles/repofiles.go b/libs/sync/repofiles/repofiles.go index 8fcabc113e..daf7ef20f8 100644 --- a/libs/sync/repofiles/repofiles.go +++ b/libs/sync/repofiles/repofiles.go @@ -17,19 +17,26 @@ import ( "github.com/databricks/databricks-sdk-go/service/workspace" ) +type RepoFileOptions struct { + OverwriteIfExists bool +} + // RepoFiles wraps reading and writing into a remote repo with safeguards to prevent // accidental deletion of repos and more robust methods to overwrite workspace files type RepoFiles struct { + *RepoFileOptions + repoRoot string localRoot string workspaceClient *databricks.WorkspaceClient } -func Create(repoRoot, localRoot string, workspaceClient *databricks.WorkspaceClient) *RepoFiles { +func Create(repoRoot, localRoot string, workspaceClient *databricks.WorkspaceClient, opts *RepoFileOptions) *RepoFiles { return &RepoFiles{ repoRoot: repoRoot, localRoot: localRoot, workspaceClient: workspaceClient, + RepoFileOptions: opts, } } @@ -63,7 +70,7 @@ func (r *RepoFiles) writeRemote(ctx context.Context, relativePath string, conten return err } escapedPath := url.PathEscape(strings.TrimLeft(remotePath, "/")) - apiPath := fmt.Sprintf("/api/2.0/workspace-files/import-file/%s?overwrite=true", escapedPath) + apiPath := fmt.Sprintf("/api/2.0/workspace-files/import-file/%s?overwrite=%t", escapedPath, r.OverwriteIfExists) err = apiClient.Do(ctx, http.MethodPost, apiPath, content, nil) diff --git a/libs/sync/repofiles/repofiles_test.go b/libs/sync/repofiles/repofiles_test.go index 2a881d90d0..688730442a 100644 --- a/libs/sync/repofiles/repofiles_test.go +++ b/libs/sync/repofiles/repofiles_test.go @@ -10,7 +10,7 @@ import ( func TestRepoFilesRemotePath(t *testing.T) { repoRoot := "/Repos/doraemon/bar" - repoFiles := Create(repoRoot, "/doraemon/foo/bar", nil) + repoFiles := Create(repoRoot, "/doraemon/foo/bar", nil, nil) remotePath, err := repoFiles.remotePath("a/b/c") assert.NoError(t, err) @@ -81,8 +81,8 @@ func TestRepoReadLocal(t *testing.T) { err := os.WriteFile(helloPath, []byte("my name is doraemon :P"), os.ModePerm) assert.NoError(t, err) - repoFiles := Create("/Repos/doraemon/bar", tempDir, nil) + repoFiles := Create("/Repos/doraemon/bar", tempDir, nil, nil) bytes, err := repoFiles.readLocal("./a/../hello.txt") assert.NoError(t, err) - assert.Equal(t, "my name is doraemon :P", string(bytes)) + assert.Equal(t, "my name is doraemon :P", string(bytes), nil) } diff --git a/libs/sync/sync.go b/libs/sync/sync.go index 54d0624e77..a351322d0a 100644 --- a/libs/sync/sync.go +++ b/libs/sync/sync.go @@ -24,6 +24,12 @@ type SyncOptions struct { WorkspaceClient *databricks.WorkspaceClient Host string + + // Allow sync to overwrite existing files in the workspace + AllowOverwrites bool + + // Persist the snapshot on the local file systems for future sync runs + PersistSnapshot bool } type Sync struct { @@ -49,12 +55,6 @@ func New(ctx context.Context, opts SyncOptions) (*Sync, error) { return nil, err } - // Verify that the remote path we're about to synchronize to is valid and allowed. - err = EnsureRemotePathIsUsable(ctx, opts.WorkspaceClient, opts.RemotePath) - if err != nil { - return nil, err - } - // TODO: The host may be late-initialized in certain Azure setups where we // specify the workspace by its resource ID. tracked in: https://databricks.atlassian.net/browse/DECO-194 opts.Host = opts.WorkspaceClient.Config.Host @@ -76,8 +76,9 @@ func New(ctx context.Context, opts SyncOptions) (*Sync, error) { return nil, fmt.Errorf("unable to load sync snapshot: %w", err) } } - - repoFiles := repofiles.Create(opts.RemotePath, opts.LocalPath, opts.WorkspaceClient) + repoFiles := repofiles.Create(opts.RemotePath, opts.LocalPath, opts.WorkspaceClient, &repofiles.RepoFileOptions{ + OverwriteIfExists: opts.AllowOverwrites, + }) return &Sync{ SyncOptions: &opts, @@ -150,10 +151,12 @@ func (s *Sync) RunOnce(ctx context.Context) error { return err } - err = s.snapshot.Save(ctx) - if err != nil { - log.Errorf(ctx, "cannot store snapshot: %s", err) - return err + if s.PersistSnapshot { + err = s.snapshot.Save(ctx) + if err != nil { + log.Errorf(ctx, "cannot store snapshot: %s", err) + return err + } } s.notifyComplete(ctx, change)