diff --git a/internal/filer_test.go b/internal/filer_test.go index 45227b225e..6244a5c474 100644 --- a/internal/filer_test.go +++ b/internal/filer_test.go @@ -1,6 +1,7 @@ package internal import ( + "bytes" "context" "errors" "fmt" @@ -12,6 +13,7 @@ import ( "github.com/databricks/cli/libs/filer" "github.com/databricks/databricks-sdk-go" "github.com/databricks/databricks-sdk-go/apierr" + "github.com/databricks/databricks-sdk-go/service/files" "github.com/databricks/databricks-sdk-go/service/workspace" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -28,74 +30,25 @@ func (f filerTest) assertContents(ctx context.Context, name string, contents str return } - body, err := io.ReadAll(reader) + var body bytes.Buffer + _, err = io.Copy(&body, reader) if !assert.NoError(f, err) { return } - assert.Equal(f, contents, string(body)) + assert.Equal(f, contents, body.String()) } -func temporaryWorkspaceDir(t *testing.T, w *databricks.WorkspaceClient) string { - ctx := context.Background() - me, err := w.CurrentUser.Me(ctx) - require.NoError(t, err) - - path := fmt.Sprintf("/Users/%s/%s", me.UserName, RandomName("wsfs-files-")) - - // Ensure directory exists, but doesn't exist YET! - // Otherwise we could inadvertently remove a directory that already exists on cleanup. - t.Logf("mkdir %s", path) - err = w.Workspace.MkdirsByPath(ctx, path) - require.NoError(t, err) - - // Remove test directory on test completion. - t.Cleanup(func() { - t.Logf("rm -rf %s", path) - err := w.Workspace.Delete(ctx, workspace.Delete{ - Path: path, - Recursive: true, - }) - if err == nil || apierr.IsMissing(err) { - return - } - t.Logf("unable to remove temporary workspace path %s: %#v", path, err) - }) - - return path -} - -func setupWorkspaceFilesTest(t *testing.T) (context.Context, filer.Filer) { - t.Log(GetEnvOrSkipTest(t, "CLOUD_ENV")) - - ctx := context.Background() - w := databricks.Must(databricks.NewWorkspaceClient()) - tmpdir := temporaryWorkspaceDir(t, w) - f, err := filer.NewWorkspaceFilesClient(w, tmpdir) - require.NoError(t, err) - - // Check if we can use this API here, skip test if we cannot. - _, err = f.Read(ctx, "we_use_this_call_to_test_if_this_api_is_enabled") - var aerr *apierr.APIError - if errors.As(err, &aerr) && aerr.StatusCode == http.StatusBadRequest { - t.Skip(aerr.Message) - } - - return ctx, f -} - -func TestAccFilerWorkspaceFilesReadWrite(t *testing.T) { +func runFilerReadWriteTest(t *testing.T, ctx context.Context, f filer.Filer) { var err error - ctx, f := setupWorkspaceFilesTest(t) - // Write should fail because the root path doesn't yet exist. err = f.Write(ctx, "/foo/bar", strings.NewReader(`hello world`)) assert.True(t, errors.As(err, &filer.NoSuchDirectoryError{})) // Read should fail because the root path doesn't yet exist. _, err = f.Read(ctx, "/foo/bar") - assert.True(t, apierr.IsMissing(err)) + assert.True(t, errors.As(err, &filer.FileDoesNotExistError{})) // Write with CreateParentDirectories flag should succeed. err = f.Write(ctx, "/foo/bar", strings.NewReader(`hello world`), filer.CreateParentDirectories) @@ -113,18 +66,16 @@ func TestAccFilerWorkspaceFilesReadWrite(t *testing.T) { // Delete should fail if the file doesn't exist. err = f.Delete(ctx, "/doesnt_exist") - assert.True(t, apierr.IsMissing(err)) + assert.True(t, errors.As(err, &filer.FileDoesNotExistError{})) // Delete should succeed for file that does exist. err = f.Delete(ctx, "/foo/bar") assert.NoError(t, err) } -func TestAccFilerWorkspaceFilesReadDir(t *testing.T) { +func runFilerReadDirTest(t *testing.T, ctx context.Context, f filer.Filer) { var err error - ctx, f := setupWorkspaceFilesTest(t) - // We start with an empty directory. entries, err := f.ReadDir(ctx, ".") require.NoError(t, err) @@ -148,7 +99,7 @@ func TestAccFilerWorkspaceFilesReadDir(t *testing.T) { // Expect an error if the path doesn't exist. _, err = f.ReadDir(ctx, "/dir/a/b/c/d/e") - assert.True(t, errors.As(err, &filer.NoSuchDirectoryError{})) + assert.True(t, errors.As(err, &filer.NoSuchDirectoryError{}), err) // Expect two entries in the root. entries, err = f.ReadDir(ctx, ".") @@ -172,3 +123,107 @@ func TestAccFilerWorkspaceFilesReadDir(t *testing.T) { assert.Len(t, entries, 1) assert.Equal(t, "c", entries[0].Name) } + +func temporaryWorkspaceDir(t *testing.T, w *databricks.WorkspaceClient) string { + ctx := context.Background() + me, err := w.CurrentUser.Me(ctx) + require.NoError(t, err) + + path := fmt.Sprintf("/Users/%s/%s", me.UserName, RandomName("integration-test-filer-wsfs-")) + + // Ensure directory exists, but doesn't exist YET! + // Otherwise we could inadvertently remove a directory that already exists on cleanup. + t.Logf("mkdir %s", path) + err = w.Workspace.MkdirsByPath(ctx, path) + require.NoError(t, err) + + // Remove test directory on test completion. + t.Cleanup(func() { + t.Logf("rm -rf %s", path) + err := w.Workspace.Delete(ctx, workspace.Delete{ + Path: path, + Recursive: true, + }) + if err == nil || apierr.IsMissing(err) { + return + } + t.Logf("unable to remove temporary workspace directory %s: %#v", path, err) + }) + + return path +} + +func setupWorkspaceFilesTest(t *testing.T) (context.Context, filer.Filer) { + t.Log(GetEnvOrSkipTest(t, "CLOUD_ENV")) + + ctx := context.Background() + w := databricks.Must(databricks.NewWorkspaceClient()) + tmpdir := temporaryWorkspaceDir(t, w) + f, err := filer.NewWorkspaceFilesClient(w, tmpdir) + require.NoError(t, err) + + // Check if we can use this API here, skip test if we cannot. + _, err = f.Read(ctx, "we_use_this_call_to_test_if_this_api_is_enabled") + var aerr *apierr.APIError + if errors.As(err, &aerr) && aerr.StatusCode == http.StatusBadRequest { + t.Skip(aerr.Message) + } + + return ctx, f +} + +func TestAccFilerWorkspaceFilesReadWrite(t *testing.T) { + ctx, f := setupWorkspaceFilesTest(t) + runFilerReadWriteTest(t, ctx, f) +} + +func TestAccFilerWorkspaceFilesReadDir(t *testing.T) { + ctx, f := setupWorkspaceFilesTest(t) + runFilerReadDirTest(t, ctx, f) +} + +func temporaryDbfsDir(t *testing.T, w *databricks.WorkspaceClient) string { + ctx := context.Background() + path := fmt.Sprintf("/tmp/%s", RandomName("integration-test-filer-dbfs-")) + + // This call fails if the path already exists. + t.Logf("mkdir dbfs:%s", path) + err := w.Dbfs.MkdirsByPath(ctx, path) + require.NoError(t, err) + + // Remove test directory on test completion. + t.Cleanup(func() { + t.Logf("rm -rf dbfs:%s", path) + err := w.Dbfs.Delete(ctx, files.Delete{ + Path: path, + Recursive: true, + }) + if err == nil || apierr.IsMissing(err) { + return + } + t.Logf("unable to remove temporary dbfs directory %s: %#v", path, err) + }) + + return path +} + +func setupFilerDbfsTest(t *testing.T) (context.Context, filer.Filer) { + t.Log(GetEnvOrSkipTest(t, "CLOUD_ENV")) + + ctx := context.Background() + w := databricks.Must(databricks.NewWorkspaceClient()) + tmpdir := temporaryDbfsDir(t, w) + f, err := filer.NewDbfsClient(w, tmpdir) + require.NoError(t, err) + return ctx, f +} + +func TestAccFilerDbfsReadWrite(t *testing.T) { + ctx, f := setupFilerDbfsTest(t) + runFilerReadWriteTest(t, ctx, f) +} + +func TestAccFilerDbfsReadDir(t *testing.T) { + ctx, f := setupFilerDbfsTest(t) + runFilerReadDirTest(t, ctx, f) +} diff --git a/libs/filer/dbfs_client.go b/libs/filer/dbfs_client.go new file mode 100644 index 0000000000..2e32421074 --- /dev/null +++ b/libs/filer/dbfs_client.go @@ -0,0 +1,199 @@ +package filer + +import ( + "context" + "errors" + "io" + "net/http" + "path" + "sort" + "time" + + "github.com/databricks/databricks-sdk-go" + "github.com/databricks/databricks-sdk-go/apierr" + "github.com/databricks/databricks-sdk-go/service/files" + "golang.org/x/exp/slices" +) + +// DbfsClient implements the [Filer] interface for the DBFS backend. +type DbfsClient struct { + workspaceClient *databricks.WorkspaceClient + + // File operations will be relative to this path. + root RootPath +} + +func NewDbfsClient(w *databricks.WorkspaceClient, root string) (Filer, error) { + return &DbfsClient{ + workspaceClient: w, + + root: NewRootPath(root), + }, nil +} + +func (w *DbfsClient) Write(ctx context.Context, name string, reader io.Reader, mode ...WriteMode) error { + absPath, err := w.root.Join(name) + if err != nil { + return err + } + + fileMode := files.FileModeWrite + if slices.Contains(mode, OverwriteIfExists) { + fileMode |= files.FileModeOverwrite + } + + // Issue info call before write because it automatically creates parent directories. + // + // For discussion: we could decide this is actually convenient, remove the call below, + // and apply the same semantics for the WSFS filer. + // + if !slices.Contains(mode, CreateParentDirectories) { + _, err = w.workspaceClient.Dbfs.GetStatusByPath(ctx, path.Dir(absPath)) + if err != nil { + var aerr *apierr.APIError + if !errors.As(err, &aerr) { + return err + } + + // This API returns a 404 if the file doesn't exist. + if aerr.StatusCode == http.StatusNotFound { + if aerr.ErrorCode == "RESOURCE_DOES_NOT_EXIST" { + return NoSuchDirectoryError{path.Dir(absPath)} + } + } + + return err + } + } + + handle, err := w.workspaceClient.Dbfs.Open(ctx, absPath, fileMode) + if err != nil { + var aerr *apierr.APIError + if !errors.As(err, &aerr) { + return err + } + + // This API returns a 400 if the file already exists. + if aerr.StatusCode == http.StatusBadRequest { + if aerr.ErrorCode == "RESOURCE_ALREADY_EXISTS" { + return FileAlreadyExistsError{absPath} + } + } + + return err + } + + _, err = io.Copy(handle, reader) + cerr := handle.Close() + if err == nil { + err = cerr + } + + return err +} + +func (w *DbfsClient) Read(ctx context.Context, name string) (io.Reader, error) { + absPath, err := w.root.Join(name) + if err != nil { + return nil, err + } + + handle, err := w.workspaceClient.Dbfs.Open(ctx, absPath, files.FileModeRead) + if err != nil { + var aerr *apierr.APIError + if !errors.As(err, &aerr) { + return nil, err + } + + // This API returns a 404 if the file doesn't exist. + if aerr.StatusCode == http.StatusNotFound { + if aerr.ErrorCode == "RESOURCE_DOES_NOT_EXIST" { + return nil, FileDoesNotExistError{absPath} + } + } + + return nil, err + } + + return handle, nil +} + +func (w *DbfsClient) Delete(ctx context.Context, name string) error { + absPath, err := w.root.Join(name) + if err != nil { + return err + } + + // Issue info call before delete because delete succeeds if the specified path doesn't exist. + // + // For discussion: we could decide this is actually convenient, remove the call below, + // and apply the same semantics for the WSFS filer. + // + _, err = w.workspaceClient.Dbfs.GetStatusByPath(ctx, absPath) + if err != nil { + var aerr *apierr.APIError + if !errors.As(err, &aerr) { + return err + } + + // This API returns a 404 if the file doesn't exist. + if aerr.StatusCode == http.StatusNotFound { + if aerr.ErrorCode == "RESOURCE_DOES_NOT_EXIST" { + return FileDoesNotExistError{absPath} + } + } + + return err + } + + return w.workspaceClient.Dbfs.Delete(ctx, files.Delete{ + Path: absPath, + Recursive: false, + }) +} + +func (w *DbfsClient) ReadDir(ctx context.Context, name string) ([]FileInfo, error) { + absPath, err := w.root.Join(name) + if err != nil { + return nil, err + } + + res, err := w.workspaceClient.Dbfs.ListByPath(ctx, absPath) + if err != nil { + var aerr *apierr.APIError + if !errors.As(err, &aerr) { + return nil, err + } + + // This API returns a 404 if the file doesn't exist. + if aerr.StatusCode == http.StatusNotFound { + if aerr.ErrorCode == "RESOURCE_DOES_NOT_EXIST" { + return nil, NoSuchDirectoryError{absPath} + } + } + + return nil, err + } + + info := make([]FileInfo, len(res.Files)) + for i, v := range res.Files { + info[i] = FileInfo{ + Name: path.Base(v.Path), + Size: v.FileSize, + ModTime: time.UnixMilli(v.ModificationTime), + } + } + + // Sort by name for parity with os.ReadDir. + sort.Slice(info, func(i, j int) bool { return info[i].Name < info[j].Name }) + return info, nil +} + +func (w *DbfsClient) Mkdir(ctx context.Context, name string) error { + dirPath, err := w.root.Join(name) + if err != nil { + return err + } + + return w.workspaceClient.Dbfs.MkdirsByPath(ctx, dirPath) +} diff --git a/libs/filer/filer.go b/libs/filer/filer.go index 841d1b8314..88de7e466a 100644 --- a/libs/filer/filer.go +++ b/libs/filer/filer.go @@ -38,6 +38,14 @@ func (err FileAlreadyExistsError) Error() string { return fmt.Sprintf("file already exists: %s", err.path) } +type FileDoesNotExistError struct { + path string +} + +func (err FileDoesNotExistError) Error() string { + return fmt.Sprintf("file does not exist: %s", err.path) +} + type NoSuchDirectoryError struct { path string } diff --git a/libs/filer/workspace_files_client.go b/libs/filer/workspace_files_client.go index 33fefc3d2f..df2c0bdbb9 100644 --- a/libs/filer/workspace_files_client.go +++ b/libs/filer/workspace_files_client.go @@ -68,7 +68,12 @@ func (w *WorkspaceFilesClient) Write(ctx context.Context, name string, reader io err = w.apiClient.Do(ctx, http.MethodPost, urlPath, body, nil) - // If we got an API error we deal with it below. + // Return early on success. + if err == nil { + return nil + } + + // Special handling of this error only if it is an API error. var aerr *apierr.APIError if !errors.As(err, &aerr) { return err @@ -112,11 +117,23 @@ func (w *WorkspaceFilesClient) Read(ctx context.Context, name string) (io.Reader var res []byte err = w.apiClient.Do(ctx, http.MethodGet, urlPath, nil, &res) - if err != nil { + + // Return early on success. + if err == nil { + return bytes.NewReader(res), nil + } + + // Special handling of this error only if it is an API error. + var aerr *apierr.APIError + if !errors.As(err, &aerr) { return nil, err } - return bytes.NewReader(res), nil + if aerr.StatusCode == http.StatusNotFound { + return nil, FileDoesNotExistError{absPath} + } + + return nil, err } func (w *WorkspaceFilesClient) Delete(ctx context.Context, name string) error { @@ -125,10 +142,27 @@ func (w *WorkspaceFilesClient) Delete(ctx context.Context, name string) error { return err } - return w.workspaceClient.Workspace.Delete(ctx, workspace.Delete{ + err = w.workspaceClient.Workspace.Delete(ctx, workspace.Delete{ Path: absPath, Recursive: false, }) + + // Return early on success. + if err == nil { + return nil + } + + // Special handling of this error only if it is an API error. + var aerr *apierr.APIError + if !errors.As(err, &aerr) { + return err + } + + if aerr.StatusCode == http.StatusNotFound { + return FileDoesNotExistError{absPath} + } + + return err } func (w *WorkspaceFilesClient) ReadDir(ctx context.Context, name string) ([]FileInfo, error) {