Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
9f4bf12
Add workspace import_dir command to the CLI
shreyas-goenka Jun 1, 2023
1244b3a
Merge remote-tracking branch 'origin' into import_dir
shreyas-goenka Jun 1, 2023
2303f20
sync options redefine better
shreyas-goenka Jun 1, 2023
0382c74
-
shreyas-goenka Jun 1, 2023
289dd0d
-
shreyas-goenka Jun 1, 2023
fb5ea16
helper wsfs function
shreyas-goenka Jun 1, 2023
931fae1
add check for isdir in readdir
shreyas-goenka Jun 1, 2023
57a9578
added some intgegration test for repofiles and moved to filer
shreyas-goenka Jun 1, 2023
dc0ff89
repofiles integration test
shreyas-goenka Jun 1, 2023
2be6b85
wip import-dir acc tests
shreyas-goenka Jun 1, 2023
c1ccf82
testdata
shreyas-goenka Jun 1, 2023
efc72b5
-
shreyas-goenka Jun 1, 2023
7099874
Merge remote-tracking branch 'origin' into import_dir
shreyas-goenka Jun 1, 2023
0f1cb67
remove underscore
shreyas-goenka Jun 5, 2023
49f7969
set template in annotation
shreyas-goenka Jun 5, 2023
298fed2
undo remote path public
shreyas-goenka Jun 5, 2023
6cc2571
address some comments
shreyas-goenka Jun 5, 2023
bb17546
-
shreyas-goenka Jun 5, 2023
ae09abf
-
shreyas-goenka Jun 5, 2023
14b63d0
-
shreyas-goenka Jun 5, 2023
5fdd8a2
-
shreyas-goenka Jun 5, 2023
9629032
-
shreyas-goenka Jun 5, 2023
e9a309e
removes not a directory error
shreyas-goenka Jun 5, 2023
74cb87f
-
shreyas-goenka Jun 5, 2023
59081f7
Merge remote-tracking branch 'origin' into import_dir
shreyas-goenka Jun 5, 2023
58acf64
some cleanup
shreyas-goenka Jun 5, 2023
f1e99d3
Merge remote-tracking branch 'origin' into import_dir
shreyas-goenka Jun 5, 2023
764d9e9
Merge remote-tracking branch 'origin' into import_dir
shreyas-goenka Jun 5, 2023
8166aed
revert changes to reposfile
shreyas-goenka Jun 6, 2023
f0f75ff
-
shreyas-goenka Jun 6, 2023
43aadba
made events work and moved validation outside sync object
shreyas-goenka Jun 6, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions bundle/deploy/files/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ func getSync(ctx context.Context, b *bundle.Bundle) (*sync.Sync, error) {

SnapshotBasePath: cacheDir,
WorkspaceClient: b.WorkspaceClient(),

PersistSnapshot: true,
AllowOverwrites: true,
}
return sync.New(ctx, opts)
}
3 changes: 3 additions & 0 deletions cmd/bundle/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ func syncOptionsFromBundle(cmd *cobra.Command, b *bundle.Bundle) (*sync.SyncOpti

SnapshotBasePath: cacheDir,
WorkspaceClient: b.WorkspaceClient(),

PersistSnapshot: true,
AllowOverwrites: true,
}
return &opts, nil
}
Expand Down
9 changes: 9 additions & 0 deletions cmd/sync/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ func syncOptionsFromArgs(cmd *cobra.Command, args []string) (*sync.SyncOptions,
// exist and add it to the `.gitignore` file in the root.
SnapshotBasePath: filepath.Join(args[0], ".databricks"),
WorkspaceClient: databricks.Must(databricks.NewWorkspaceClient()),

PersistSnapshot: true,
AllowOverwrites: true,
}
return &opts, nil
}
Expand Down Expand Up @@ -96,6 +99,12 @@ var syncCmd = &cobra.Command{
return err
}

// Verify that the remote path we're about to synchronize to is valid and allowed.
err = sync.EnsureRemotePathIsUserScoped(ctx, opts.WorkspaceClient, opts.RemotePath)
if err != nil {
return err
}

var outputFunc func(context.Context, <-chan sync.Event, io.Writer)
switch output {
case flags.OutputText:
Expand Down
84 changes: 84 additions & 0 deletions cmd/workspace/workspace/import_dir.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package workspace

import (
"github.com/databricks/cli/cmd/root"
"github.com/databricks/cli/libs/cmdio"
"github.com/databricks/cli/libs/sync"
"github.com/spf13/cobra"
"golang.org/x/sync/errgroup"
)

// TODO: check whether we need mutex for any events been emitted since they are accessing
// state
// TODO: Error: path must be nested under /Users/shreyas.goenka@databricks.com or /Repos/shreyas.goenka@databricks.com. Should this validation be
// removed? Yes
var importDirCmd = &cobra.Command{
Use: "import-dir SOURCE_PATH TARGET_PATH",
Short: `Import directory to a Databricks workspace.`,
Long: `
Recursively imports a directory from the local filesystem to a Databricks workspace.

This command respects your git ignore configuration. Notebooks with extensions
.scala, .py, .sql, .r, .R, .ipynb are stripped of their extensions.
`,

PreRunE: root.MustWorkspaceClient,
Args: cobra.ExactArgs(2),
RunE: func(cmd *cobra.Command, args []string) (err error) {
ctx := cmd.Context()
sourcePath := args[0]
targetPath := args[1]

// Initialize syncer to do a full sync with the correct from source to target.
// This will upload the local files
opts := sync.SyncOptions{
LocalPath: sourcePath,
RemotePath: targetPath,
Full: true,
WorkspaceClient: root.WorkspaceClient(ctx),

AllowOverwrites: importDirOverwrite,
PersistSnapshot: false,
}
s, err := sync.New(ctx, opts)
if err != nil {
return err
}

// Initialize error wait group, and spawn the progress event emitter inside
// the error wait group
group, ctx := errgroup.WithContext(ctx)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the behavior of the returned context?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The returned context is cancelled if there's an error in any of the subroutines. see: https://pkg.go.dev/golang.org/x/sync/errgroup#WithContext

eventsChannel := s.Events()
group.Go(
func() error {
return renderSyncEvents(ctx, eventsChannel, s)
})

// Start Uploading local files
err = cmdio.RenderWithTemplate(ctx, newImportStartedEvent(sourcePath, targetPath), "Starting import {{.SourcePath}} -> {{.TargetPath}}\n")
if err != nil {
return err
}
err = s.RunOnce(ctx)
if err != nil {
return err
}
// Upload completed, close the syncer
s.Close()

// Wait for any inflight progress events to be emitted
if err := group.Wait(); err != nil {
return err
}

// Render import completetion event
return cmdio.RenderWithTemplate(ctx, newImportCompleteEvent(sourcePath, targetPath), "Completed import. Files available at {{.TargetPath}}\n")
},
}

var importDirOverwrite bool

func init() {
importDirCmd.Flags().BoolVar(&importDirOverwrite, "overwrite", false, "Overwrite if file already exists in the workspace")
Cmd.AddCommand(importDirCmd)
}
71 changes: 71 additions & 0 deletions cmd/workspace/workspace/import_events.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package workspace

import (
"context"

"github.com/databricks/cli/libs/cmdio"
"github.com/databricks/cli/libs/sync"
)

// TODO: do not emit target directory in upload complete events.

type fileIOEvent struct {
SourcePath string `json:"source_path,omitempty"`
TargetPath string `json:"target_path,omitempty"`
Type string `json:"type"`
}

const (
EventTypeImportStarted = "IMPORT_STARTED"
EventTypeImportComplete = "IMPORT_COMPLETE"
EventTypeUploadComplete = "UPLOAD_COMPLETE"
)

func newImportStartedEvent(sourcePath, targetPath string) fileIOEvent {
return fileIOEvent{
SourcePath: sourcePath,
TargetPath: targetPath,
Type: EventTypeImportStarted,
}
}

func newImportCompleteEvent(sourcePath, targetPath string) fileIOEvent {
return fileIOEvent{
Type: EventTypeImportComplete,
TargetPath: targetPath,
}
}

func newUploadCompleteEvent(sourcePath string) fileIOEvent {
return fileIOEvent{
SourcePath: sourcePath,
Type: EventTypeUploadComplete,
}
}

func renderSyncEvents(ctx context.Context, eventChannel <-chan sync.Event, syncer *sync.Sync) error {
for {
select {
case <-ctx.Done():
return nil
case e, ok := <-eventChannel:
if !ok {
return nil
}
if e.String() == "" {
continue
}
switch v := e.(type) {
case *sync.EventSyncProgress:
// TODO: only emit this event if the the sync event has progress 1.o0
// File upload has been completed. This renders the event for that
// on the console
err := cmdio.RenderWithTemplate(ctx, newUploadCompleteEvent(v.Path), "Uploaded {{.SourcePath}}\n")
if err != nil {
return err
}
}

}
}
}
1 change: 1 addition & 0 deletions internal/foo.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
hello, world
80 changes: 80 additions & 0 deletions internal/import_dir_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package internal

import (
"bytes"
"context"
"io"
"path"
"regexp"
"testing"

"github.com/databricks/cli/libs/filer"
"github.com/databricks/databricks-sdk-go"
"github.com/databricks/databricks-sdk-go/service/workspace"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestWorkspaceImportDir(t *testing.T) {
t.Log(GetEnvOrSkipTest(t, "CLOUD_ENV"))

ctx := context.Background()
w := databricks.Must(databricks.NewWorkspaceClient())
tmpdir := temporaryWorkspaceDir(t, w)

// run import-dir command
RequireSuccessfulRun(t, "workspace", "import-dir", "./testdata/import_dir/default", tmpdir)

// assert files are uploaded
f, err := filer.NewWorkspaceFilesClient(w, tmpdir)
require.NoError(t, err)
assertFileContains(t, ctx, f, "foo.txt", "hello, world")
assertFileContains(t, ctx, f, ".gitignore", ".databricks")
assertFileContains(t, ctx, f, "bar/apple.py", "print(1)")
assertNotebookExists(t, ctx, w, path.Join(tmpdir, "bar/mango"))
}

func TestWorkspaceImportDirOverwriteFlag(t *testing.T) {
// t.Log(GetEnvOrSkipTest(t, "CLOUD_ENV"))

// ctx := context.Background()
w := databricks.Must(databricks.NewWorkspaceClient())
tmpdir := temporaryWorkspaceDir(t, w)

// run import-dir command
RequireSuccessfulRun(t, "workspace", "import-dir", "./testdata/import_dir/override/a", tmpdir)

// // assert files are uploaded
// f, err := filer.NewWorkspaceFilesClient(w, tmpdir)
// require.NoError(t, err)
// assertFileContains(t, ctx, f, "bar.txt", "from directory A")

// Assert another run fails with path already exists error from the server
_, _, err := RequireErrorRun(t, "workspace", "import-dir", "./testdata/import_dir/override/b", tmpdir)
assert.Regexp(t, regexp.MustCompile("Path (.*) already exists."), err.Error())

// // Succeeds with the overwrite flag
// RequireSuccessfulRun(t, "workspace", "import-dir", "./testdata/import_dir/override/b", tmpdir, "--overwrite")
// require.NoError(t, err)
// assertFileContains(t, ctx, f, "bar.txt", "from directory B")
}

func assertFileContains(t *testing.T, ctx context.Context, f filer.Filer, name, contents string) {
r, err := f.Read(ctx, name)
require.NoError(t, err)

var b bytes.Buffer
_, err = io.Copy(&b, r)
require.NoError(t, err)

assert.Contains(t, b.String(), contents)
}

func assertNotebookExists(t *testing.T, ctx context.Context, w *databricks.WorkspaceClient, path string) {
info, err := w.Workspace.ListAll(ctx, workspace.ListWorkspaceRequest{
Path: path,
})
require.NoError(t, err)
assert.Len(t, info, 1)
assert.Equal(t, info[0].ObjectType, workspace.ObjectTypeNotebook)
}
10 changes: 5 additions & 5 deletions internal/sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -459,12 +459,12 @@ func TestAccSyncEnsureRemotePathIsUsableIfRepoDoesntExist(t *testing.T) {

// Hypothetical repo path doesn't exist.
nonExistingRepoPath := fmt.Sprintf("/Repos/%s/%s", me.UserName, RandomName("doesnt-exist-"))
err = sync.EnsureRemotePathIsUsable(ctx, wsc, nonExistingRepoPath)
err = sync.EnsureRemotePathIsUserScoped(ctx, wsc, nonExistingRepoPath)
assert.ErrorContains(t, err, " does not exist; please create it first")

// Paths nested under a hypothetical repo path should yield the same error.
nestedPath := path.Join(nonExistingRepoPath, "nested/directory")
err = sync.EnsureRemotePathIsUsable(ctx, wsc, nestedPath)
err = sync.EnsureRemotePathIsUserScoped(ctx, wsc, nestedPath)
assert.ErrorContains(t, err, " does not exist; please create it first")
}

Expand All @@ -476,12 +476,12 @@ func TestAccSyncEnsureRemotePathIsUsableIfRepoExists(t *testing.T) {
_, remoteRepoPath := setupRepo(t, wsc, ctx)

// Repo itself is usable.
err := sync.EnsureRemotePathIsUsable(ctx, wsc, remoteRepoPath)
err := sync.EnsureRemotePathIsUserScoped(ctx, wsc, remoteRepoPath)
assert.NoError(t, err)

// Path nested under repo path is usable.
nestedPath := path.Join(remoteRepoPath, "nested/directory")
err = sync.EnsureRemotePathIsUsable(ctx, wsc, nestedPath)
err = sync.EnsureRemotePathIsUserScoped(ctx, wsc, nestedPath)
assert.NoError(t, err)

// Verify that the directory has been created.
Expand All @@ -499,7 +499,7 @@ func TestAccSyncEnsureRemotePathIsUsableInWorkspace(t *testing.T) {
require.NoError(t, err)

remotePath := fmt.Sprintf("/Users/%s/%s", me.UserName, RandomName("ensure-path-exists-test-"))
err = sync.EnsureRemotePathIsUsable(ctx, wsc, remotePath)
err = sync.EnsureRemotePathIsUserScoped(ctx, wsc, remotePath)
assert.NoError(t, err)

// Clean up directory after test.
Expand Down
2 changes: 2 additions & 0 deletions internal/testdata/import_dir/default/bar/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@

.databricks
1 change: 1 addition & 0 deletions internal/testdata/import_dir/default/bar/apple.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
print(1)
1 change: 1 addition & 0 deletions internal/testdata/import_dir/default/bar/foo.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
hello, world
2 changes: 2 additions & 0 deletions internal/testdata/import_dir/default/bar/mango.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
#Databricks notebook source
print(2)
2 changes: 2 additions & 0 deletions internal/testdata/import_dir/override/a/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@

.databricks
2 changes: 2 additions & 0 deletions internal/testdata/import_dir/override/a/bar.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
#Databricks notebook source
print("from dir A")
2 changes: 2 additions & 0 deletions internal/testdata/import_dir/override/b/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@

.databricks
2 changes: 2 additions & 0 deletions internal/testdata/import_dir/override/b/bar.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
#Databricks notebook source
print("from dir B")
6 changes: 3 additions & 3 deletions libs/sync/path.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,9 @@ func repoPathForPath(me *iam.User, remotePath string) string {
return remotePath
}

// EnsureRemotePathIsUsable checks if the specified path is nested under
// expected base paths and if it is a directory or repository.
func EnsureRemotePathIsUsable(ctx context.Context, wsc *databricks.WorkspaceClient, remotePath string) error {
// EnsureRemotePathIsUserScoped checks if the specified path is nested under
// the caller's username and if it is a directory or repository.
func EnsureRemotePathIsUserScoped(ctx context.Context, wsc *databricks.WorkspaceClient, remotePath string) error {
me, err := wsc.CurrentUser.Me(ctx)
if err != nil {
return err
Expand Down
11 changes: 9 additions & 2 deletions libs/sync/repofiles/repofiles.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,26 @@ import (
"github.com/databricks/databricks-sdk-go/service/workspace"
)

type RepoFileOptions struct {
OverwriteIfExists bool
}

// RepoFiles wraps reading and writing into a remote repo with safeguards to prevent
// accidental deletion of repos and more robust methods to overwrite workspace files
type RepoFiles struct {
*RepoFileOptions

repoRoot string
localRoot string
workspaceClient *databricks.WorkspaceClient
}

func Create(repoRoot, localRoot string, workspaceClient *databricks.WorkspaceClient) *RepoFiles {
func Create(repoRoot, localRoot string, workspaceClient *databricks.WorkspaceClient, opts *RepoFileOptions) *RepoFiles {
return &RepoFiles{
repoRoot: repoRoot,
localRoot: localRoot,
workspaceClient: workspaceClient,
RepoFileOptions: opts,
}
}

Expand Down Expand Up @@ -63,7 +70,7 @@ func (r *RepoFiles) writeRemote(ctx context.Context, relativePath string, conten
return err
}
escapedPath := url.PathEscape(strings.TrimLeft(remotePath, "/"))
apiPath := fmt.Sprintf("/api/2.0/workspace-files/import-file/%s?overwrite=true", escapedPath)
apiPath := fmt.Sprintf("/api/2.0/workspace-files/import-file/%s?overwrite=%t", escapedPath, r.OverwriteIfExists)

err = apiClient.Do(ctx, http.MethodPost, apiPath, content, nil)

Expand Down
Loading