From a7a662ec0e06c06c69c2d1886832e2a142ac6843 Mon Sep 17 00:00:00 2001 From: Pieter Noordhuis Date: Wed, 14 Dec 2022 23:51:32 +0100 Subject: [PATCH 01/10] WIP for DBFS filer --- internal/filer_test.go | 54 ++++++++- libs/filer/dbfs_client.go | 246 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 298 insertions(+), 2 deletions(-) create mode 100644 libs/filer/dbfs_client.go diff --git a/internal/filer_test.go b/internal/filer_test.go index 2dbb8ae662..ee21a6257e 100644 --- a/internal/filer_test.go +++ b/internal/filer_test.go @@ -1,6 +1,7 @@ package internal import ( + "bytes" "context" "errors" "fmt" @@ -28,12 +29,13 @@ func (f filerTest) assertContents(ctx context.Context, name string, contents str return } - body, err := io.ReadAll(reader) + var body bytes.Buffer + _, err = io.Copy(&body, reader) if !assert.NoError(f, err) { return } - assert.Equal(f, contents, string(body)) + assert.Equal(f, contents, body.String()) } func temporaryWorkspaceDir(t *testing.T, w *databricks.WorkspaceClient) string { @@ -111,3 +113,51 @@ func TestAccFilerWorkspaceFiles(t *testing.T) { err = f.Delete(ctx, "/foo/bar") assert.NoError(t, err) } + +func TestAccFilerDbfs(t *testing.T) { + t.Log(GetEnvOrSkipTest(t, "CLOUD_ENV")) + + ctx := context.Background() + w := databricks.Must(databricks.NewWorkspaceClient()) + + // TODO RANDOM PATH + tmpdir := "/tmp/.integration-test/" + f, err := filer.NewDbfsClient(w, tmpdir) + require.NoError(t, err) + + // Check if we can use this API here, skip test if we cannot. + _, err = f.Read(ctx, "we_use_this_call_to_test_if_this_api_is_enabled") + if apierr, ok := err.(apierr.APIError); ok && apierr.StatusCode == http.StatusBadRequest { + t.Skip(apierr.Message) + } + + // Write should fail because the root path doesn't yet exist. + err = f.Write(ctx, "/foo/bar", strings.NewReader(`"hello world"`)) + assert.True(t, errors.As(err, &filer.NoSuchDirectoryError{})) + + // Read should fail because the root path doesn't yet exist. + _, err = f.Read(ctx, "/foo/bar") + assert.True(t, apierr.IsMissing(err)) + + // Write with CreateParentDirectories flag should succeed. + err = f.Write(ctx, "/foo/bar", strings.NewReader(`"hello world"`), filer.CreateParentDirectories) + assert.NoError(t, err) + filerTest{t, f}.assertContents(ctx, "/foo/bar", `"hello world"`) + + // Write should fail because there is an existing file at the specified path. + err = f.Write(ctx, "/foo/bar", strings.NewReader(`"hello universe"`)) + assert.True(t, errors.As(err, &filer.FileAlreadyExistsError{})) + + // Write with OverwriteIfExists should succeed. + err = f.Write(ctx, "/foo/bar", strings.NewReader(`"hello universe"`), filer.OverwriteIfExists) + assert.NoError(t, err) + filerTest{t, f}.assertContents(ctx, "/foo/bar", `"hello universe"`) + + // Delete should fail if the file doesn't exist. + err = f.Delete(ctx, "/doesnt_exist") + assert.True(t, apierr.IsMissing(err)) + + // Delete should succeed for file that does exist. + err = f.Delete(ctx, "/foo/bar") + assert.NoError(t, err) +} diff --git a/libs/filer/dbfs_client.go b/libs/filer/dbfs_client.go new file mode 100644 index 0000000000..45480b3cdd --- /dev/null +++ b/libs/filer/dbfs_client.go @@ -0,0 +1,246 @@ +package filer + +import ( + "bytes" + "context" + "encoding/base64" + "fmt" + "io" + + "github.com/databricks/databricks-sdk-go" + "github.com/databricks/databricks-sdk-go/service/dbfs" +) + +var b64 = base64.StdEncoding + +// Maximum read or write length for the DBFS API. +const maxDbfsBlockSize = 1024 * 1024 + +type dbfsReader struct { + size int64 + offset int64 +} + +type dbfsWriter struct { + handle int64 +} + +type dbfsHandle struct { + ctx context.Context + api dbfs.DbfsAPI + path string + + *dbfsReader + *dbfsWriter +} + +// Implement the [io.Reader] interface. +func (h *dbfsHandle) Read(p []byte) (int, error) { + r := h.dbfsReader + if r == nil { + return 0, fmt.Errorf("dbfs file not open for reading") + } + + if r.offset >= r.size { + return 0, io.EOF + } + + res, err := h.api.Read(h.ctx, dbfs.Read{ + Path: h.path, + Length: len(p), + Offset: int(r.offset), // TODO: make int32/in64 work properly + }) + if err != nil { + return 0, fmt.Errorf("dbfs read: %w", err) + } + + // The guard against offset >= size happens above, so this can only happen + // if the file is modified or truncated while reading. If this happens, + // the read contents will likely be corrupted, so we return an error. + if res.BytesRead == 0 { + return 0, fmt.Errorf("dbfs read: unexpected EOF at offset %d (size %d)", r.offset, r.size) + } + + r.offset += res.BytesRead + return b64.Decode(p, []byte(res.Data)) +} + +// Implement the [io.WriterTo] interface. +func (h *dbfsHandle) WriteTo(w io.Writer) (int64, error) { + r := h.dbfsReader + if r == nil { + return 0, fmt.Errorf("dbfs file not open for reading") + } + + buf := make([]byte, maxDbfsBlockSize) + ntotal := int64(0) + for { + nread, err := h.Read(buf) + if err != nil { + // EOF on read means we're done. + // For writers being done means returning a nil error. + if err == io.EOF { + err = nil + } + return ntotal, err + } + nwritten, err := io.Copy(w, bytes.NewReader(buf[:nread])) + ntotal += nwritten + if err != nil { + return ntotal, err + } + } +} + +// Implement the [io.Writer] interface. +func (h *dbfsHandle) Write(p []byte) (int, error) { + w := h.dbfsWriter + if w == nil { + return 0, fmt.Errorf("dbfs file not open for writing") + } + + err := h.api.AddBlock(h.ctx, dbfs.AddBlock{ + Data: b64.EncodeToString(p), + Handle: w.handle, + }) + if err != nil { + return 0, fmt.Errorf("dbfs add block: %w", err) + } + return len(p), nil +} + +// Implement the [io.ReaderFrom] interface. +func (h *dbfsHandle) ReadFrom(r io.Reader) (int64, error) { + w := h.dbfsWriter + if w == nil { + return 0, fmt.Errorf("dbfs file not open for writing") + } + + buf := make([]byte, maxDbfsBlockSize) + ntotal := int64(0) + for { + nread, err := r.Read(buf) + if err != nil { + // EOF on read means we're done. + // For writers being done means returning a nil error. + if err == io.EOF { + err = nil + } + return ntotal, err + } + + nwritten, err := h.Write(buf[:nread]) + ntotal += int64(nwritten) + if err != nil { + return ntotal, err + } + } +} + +// Implement the [io.Closer] interface. +func (h *dbfsHandle) Close() error { + w := h.dbfsWriter + if w == nil { + return fmt.Errorf("dbfs file not open for writing") + } + + err := h.api.CloseByHandle(h.ctx, w.handle) + if err != nil { + return fmt.Errorf("dbfs close: %w", err) + } + + return nil +} + +func newDbfsHandleForReading(ctx context.Context, w *databricks.WorkspaceClient, path string) (io.Reader, error) { + info, err := w.Dbfs.GetStatusByPath(ctx, path) + if err != nil { + return nil, err + } + + return &dbfsHandle{ + ctx: ctx, + api: *w.Dbfs, + path: path, + + dbfsReader: &dbfsReader{ + size: info.FileSize, + }, + }, nil +} + +func newDbfsHandleForWriting(ctx context.Context, w *databricks.WorkspaceClient, path string) (io.WriteCloser, error) { + res, err := w.Dbfs.Create(ctx, dbfs.Create{ + Path: path, + Overwrite: false, + }) + if err != nil { + return nil, err + } + + return &dbfsHandle{ + ctx: ctx, + api: *w.Dbfs, + path: path, + + dbfsWriter: &dbfsWriter{ + handle: res.Handle, + }, + }, nil +} + +// DbfsClient implements a +type DbfsClient struct { + workspaceClient *databricks.WorkspaceClient + + // File operations will be relative to this path. + root RootPath +} + +func NewDbfsClient(w *databricks.WorkspaceClient, root string) (Filer, error) { + return &DbfsClient{ + workspaceClient: w, + + root: NewRootPath(root), + }, nil +} + +func (w *DbfsClient) Write(ctx context.Context, name string, reader io.Reader, mode ...WriteMode) error { + absPath, err := w.root.Join(name) + if err != nil { + return err + } + + dbfsHandle, err := newDbfsHandleForWriting(ctx, w.workspaceClient, absPath) + if err != nil { + return err + } + + _, err = io.Copy(dbfsHandle, reader) + cerr := dbfsHandle.Close() + if err == nil { + err = cerr + } + return err +} + +func (w *DbfsClient) Read(ctx context.Context, name string) (io.Reader, error) { + absPath, err := w.root.Join(name) + if err != nil { + return nil, err + } + + return newDbfsHandleForReading(ctx, w.workspaceClient, absPath) +} + +func (w *DbfsClient) Delete(ctx context.Context, name string) error { + absPath, err := w.root.Join(name) + if err != nil { + return err + } + + return w.workspaceClient.Dbfs.Delete(ctx, dbfs.Delete{ + Path: absPath, + Recursive: false, + }) +} From e487d040d0427550dda8323f1f3a5007bb23d07d Mon Sep 17 00:00:00 2001 From: Pieter Noordhuis Date: Thu, 15 Dec 2022 11:23:15 +0100 Subject: [PATCH 02/10] Work on DBFS filer; add FileDoesNotExistError --- internal/filer_test.go | 52 +++--- libs/filer/dbfs_client.go | 247 +++++++-------------------- libs/filer/dbfs_handle.go | 217 +++++++++++++++++++++++ libs/filer/filer.go | 8 + libs/filer/workspace_files_client.go | 63 ++++--- 5 files changed, 361 insertions(+), 226 deletions(-) create mode 100644 libs/filer/dbfs_handle.go diff --git a/internal/filer_test.go b/internal/filer_test.go index ee21a6257e..eba39b26bd 100644 --- a/internal/filer_test.go +++ b/internal/filer_test.go @@ -13,6 +13,7 @@ import ( "github.com/databricks/cli/libs/filer" "github.com/databricks/databricks-sdk-go" "github.com/databricks/databricks-sdk-go/apierr" + "github.com/databricks/databricks-sdk-go/service/dbfs" "github.com/databricks/databricks-sdk-go/service/workspace" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -43,7 +44,7 @@ func temporaryWorkspaceDir(t *testing.T, w *databricks.WorkspaceClient) string { me, err := w.CurrentUser.Me(ctx) require.NoError(t, err) - path := fmt.Sprintf("/Users/%s/%s", me.UserName, RandomName("wsfs-files-")) + path := fmt.Sprintf("/Users/%s/%s", me.UserName, RandomName("integration-test-filer-wsfs-")) // Ensure directory exists, but doesn't exist YET! // Otherwise we could inadvertently remove a directory that already exists on cleanup. @@ -61,7 +62,7 @@ func temporaryWorkspaceDir(t *testing.T, w *databricks.WorkspaceClient) string { if err == nil || apierr.IsMissing(err) { return } - t.Logf("unable to remove temporary workspace path %s: %#v", path, err) + t.Logf("unable to remove temporary workspace directory %s: %#v", path, err) }) return path @@ -89,7 +90,7 @@ func TestAccFilerWorkspaceFiles(t *testing.T) { // Read should fail because the root path doesn't yet exist. _, err = f.Read(ctx, "/foo/bar") - assert.True(t, apierr.IsMissing(err)) + assert.True(t, errors.As(err, &filer.FileDoesNotExistError{})) // Write with CreateParentDirectories flag should succeed. err = f.Write(ctx, "/foo/bar", strings.NewReader(`hello world`), filer.CreateParentDirectories) @@ -107,37 +108,50 @@ func TestAccFilerWorkspaceFiles(t *testing.T) { // Delete should fail if the file doesn't exist. err = f.Delete(ctx, "/doesnt_exist") - assert.True(t, apierr.IsMissing(err)) + assert.True(t, errors.As(err, &filer.FileDoesNotExistError{})) // Delete should succeed for file that does exist. err = f.Delete(ctx, "/foo/bar") assert.NoError(t, err) } +func temporaryDbfsDir(t *testing.T, w *databricks.WorkspaceClient) string { + ctx := context.Background() + path := fmt.Sprintf("/tmp/%s", RandomName("integration-test-filer-dbfs-")) + + // This call fails if the path already exists. + t.Logf("mkdir dbfs:%s", path) + err := w.Dbfs.MkdirsByPath(ctx, path) + require.NoError(t, err) + + // Remove test directory on test completion. + t.Cleanup(func() { + t.Logf("rm -rf dbfs:%s", path) + err := w.Dbfs.Delete(ctx, dbfs.Delete{ + Path: path, + Recursive: true, + }) + if err == nil || apierr.IsMissing(err) { + return + } + t.Logf("unable to remove temporary dbfs directory %s: %#v", path, err) + }) + + return path +} + func TestAccFilerDbfs(t *testing.T) { t.Log(GetEnvOrSkipTest(t, "CLOUD_ENV")) ctx := context.Background() w := databricks.Must(databricks.NewWorkspaceClient()) - - // TODO RANDOM PATH - tmpdir := "/tmp/.integration-test/" + tmpdir := temporaryDbfsDir(t, w) f, err := filer.NewDbfsClient(w, tmpdir) require.NoError(t, err) - // Check if we can use this API here, skip test if we cannot. - _, err = f.Read(ctx, "we_use_this_call_to_test_if_this_api_is_enabled") - if apierr, ok := err.(apierr.APIError); ok && apierr.StatusCode == http.StatusBadRequest { - t.Skip(apierr.Message) - } - - // Write should fail because the root path doesn't yet exist. - err = f.Write(ctx, "/foo/bar", strings.NewReader(`"hello world"`)) - assert.True(t, errors.As(err, &filer.NoSuchDirectoryError{})) - // Read should fail because the root path doesn't yet exist. _, err = f.Read(ctx, "/foo/bar") - assert.True(t, apierr.IsMissing(err)) + assert.True(t, errors.As(err, &filer.FileDoesNotExistError{})) // Write with CreateParentDirectories flag should succeed. err = f.Write(ctx, "/foo/bar", strings.NewReader(`"hello world"`), filer.CreateParentDirectories) @@ -155,7 +169,7 @@ func TestAccFilerDbfs(t *testing.T) { // Delete should fail if the file doesn't exist. err = f.Delete(ctx, "/doesnt_exist") - assert.True(t, apierr.IsMissing(err)) + assert.True(t, errors.As(err, &filer.FileDoesNotExistError{})) // Delete should succeed for file that does exist. err = f.Delete(ctx, "/foo/bar") diff --git a/libs/filer/dbfs_client.go b/libs/filer/dbfs_client.go index 45480b3cdd..aa80fdc47e 100644 --- a/libs/filer/dbfs_client.go +++ b/libs/filer/dbfs_client.go @@ -1,195 +1,18 @@ package filer import ( - "bytes" "context" - "encoding/base64" - "fmt" + "errors" "io" + "net/http" "github.com/databricks/databricks-sdk-go" + "github.com/databricks/databricks-sdk-go/apierr" "github.com/databricks/databricks-sdk-go/service/dbfs" + "golang.org/x/exp/slices" ) -var b64 = base64.StdEncoding - -// Maximum read or write length for the DBFS API. -const maxDbfsBlockSize = 1024 * 1024 - -type dbfsReader struct { - size int64 - offset int64 -} - -type dbfsWriter struct { - handle int64 -} - -type dbfsHandle struct { - ctx context.Context - api dbfs.DbfsAPI - path string - - *dbfsReader - *dbfsWriter -} - -// Implement the [io.Reader] interface. -func (h *dbfsHandle) Read(p []byte) (int, error) { - r := h.dbfsReader - if r == nil { - return 0, fmt.Errorf("dbfs file not open for reading") - } - - if r.offset >= r.size { - return 0, io.EOF - } - - res, err := h.api.Read(h.ctx, dbfs.Read{ - Path: h.path, - Length: len(p), - Offset: int(r.offset), // TODO: make int32/in64 work properly - }) - if err != nil { - return 0, fmt.Errorf("dbfs read: %w", err) - } - - // The guard against offset >= size happens above, so this can only happen - // if the file is modified or truncated while reading. If this happens, - // the read contents will likely be corrupted, so we return an error. - if res.BytesRead == 0 { - return 0, fmt.Errorf("dbfs read: unexpected EOF at offset %d (size %d)", r.offset, r.size) - } - - r.offset += res.BytesRead - return b64.Decode(p, []byte(res.Data)) -} - -// Implement the [io.WriterTo] interface. -func (h *dbfsHandle) WriteTo(w io.Writer) (int64, error) { - r := h.dbfsReader - if r == nil { - return 0, fmt.Errorf("dbfs file not open for reading") - } - - buf := make([]byte, maxDbfsBlockSize) - ntotal := int64(0) - for { - nread, err := h.Read(buf) - if err != nil { - // EOF on read means we're done. - // For writers being done means returning a nil error. - if err == io.EOF { - err = nil - } - return ntotal, err - } - nwritten, err := io.Copy(w, bytes.NewReader(buf[:nread])) - ntotal += nwritten - if err != nil { - return ntotal, err - } - } -} - -// Implement the [io.Writer] interface. -func (h *dbfsHandle) Write(p []byte) (int, error) { - w := h.dbfsWriter - if w == nil { - return 0, fmt.Errorf("dbfs file not open for writing") - } - - err := h.api.AddBlock(h.ctx, dbfs.AddBlock{ - Data: b64.EncodeToString(p), - Handle: w.handle, - }) - if err != nil { - return 0, fmt.Errorf("dbfs add block: %w", err) - } - return len(p), nil -} - -// Implement the [io.ReaderFrom] interface. -func (h *dbfsHandle) ReadFrom(r io.Reader) (int64, error) { - w := h.dbfsWriter - if w == nil { - return 0, fmt.Errorf("dbfs file not open for writing") - } - - buf := make([]byte, maxDbfsBlockSize) - ntotal := int64(0) - for { - nread, err := r.Read(buf) - if err != nil { - // EOF on read means we're done. - // For writers being done means returning a nil error. - if err == io.EOF { - err = nil - } - return ntotal, err - } - - nwritten, err := h.Write(buf[:nread]) - ntotal += int64(nwritten) - if err != nil { - return ntotal, err - } - } -} - -// Implement the [io.Closer] interface. -func (h *dbfsHandle) Close() error { - w := h.dbfsWriter - if w == nil { - return fmt.Errorf("dbfs file not open for writing") - } - - err := h.api.CloseByHandle(h.ctx, w.handle) - if err != nil { - return fmt.Errorf("dbfs close: %w", err) - } - - return nil -} - -func newDbfsHandleForReading(ctx context.Context, w *databricks.WorkspaceClient, path string) (io.Reader, error) { - info, err := w.Dbfs.GetStatusByPath(ctx, path) - if err != nil { - return nil, err - } - - return &dbfsHandle{ - ctx: ctx, - api: *w.Dbfs, - path: path, - - dbfsReader: &dbfsReader{ - size: info.FileSize, - }, - }, nil -} - -func newDbfsHandleForWriting(ctx context.Context, w *databricks.WorkspaceClient, path string) (io.WriteCloser, error) { - res, err := w.Dbfs.Create(ctx, dbfs.Create{ - Path: path, - Overwrite: false, - }) - if err != nil { - return nil, err - } - - return &dbfsHandle{ - ctx: ctx, - api: *w.Dbfs, - path: path, - - dbfsWriter: &dbfsWriter{ - handle: res.Handle, - }, - }, nil -} - -// DbfsClient implements a +// DbfsClient implements the [Filer] interface for the DBFS backend. type DbfsClient struct { workspaceClient *databricks.WorkspaceClient @@ -211,8 +34,24 @@ func (w *DbfsClient) Write(ctx context.Context, name string, reader io.Reader, m return err } - dbfsHandle, err := newDbfsHandleForWriting(ctx, w.workspaceClient, absPath) + dbfsMode := DbfsWrite + if slices.Contains(mode, OverwriteIfExists) { + dbfsMode |= DbfsOverwrite + } + dbfsHandle, err := OpenFile(ctx, w.workspaceClient.Dbfs, absPath, dbfsMode) if err != nil { + var aerr apierr.APIError + if !errors.As(err, &aerr) { + return err + } + + // This API returns a 400 if the file already exists. + if aerr.StatusCode == http.StatusBadRequest { + if aerr.ErrorCode == "RESOURCE_ALREADY_EXISTS" { + return FileAlreadyExistsError{absPath} + } + } + return err } @@ -221,6 +60,7 @@ func (w *DbfsClient) Write(ctx context.Context, name string, reader io.Reader, m if err == nil { err = cerr } + return err } @@ -230,7 +70,24 @@ func (w *DbfsClient) Read(ctx context.Context, name string) (io.Reader, error) { return nil, err } - return newDbfsHandleForReading(ctx, w.workspaceClient, absPath) + dbfsHandle, err := OpenFile(ctx, w.workspaceClient.Dbfs, absPath, DbfsRead) + if err != nil { + var aerr apierr.APIError + if !errors.As(err, &aerr) { + return nil, err + } + + // This API returns a 404 if the file doesn't exist. + if aerr.StatusCode == http.StatusNotFound { + if aerr.ErrorCode == "RESOURCE_DOES_NOT_EXIST" { + return nil, FileDoesNotExistError{absPath} + } + } + + return nil, err + } + + return dbfsHandle, nil } func (w *DbfsClient) Delete(ctx context.Context, name string) error { @@ -239,6 +96,28 @@ func (w *DbfsClient) Delete(ctx context.Context, name string) error { return err } + // Issue info call before delete because delete succeeds if the specified path doesn't exist. + // + // For discussion: we could decide this is actually convenient, remove the call below, + // and apply the same semantics for the WSFS filer. + // + _, err = w.workspaceClient.Dbfs.GetStatusByPath(ctx, absPath) + if err != nil { + var aerr apierr.APIError + if !errors.As(err, &aerr) { + return err + } + + // This API returns a 404 if the file doesn't exist. + if aerr.StatusCode == http.StatusNotFound { + if aerr.ErrorCode == "RESOURCE_DOES_NOT_EXIST" { + return FileDoesNotExistError{absPath} + } + } + + return err + } + return w.workspaceClient.Dbfs.Delete(ctx, dbfs.Delete{ Path: absPath, Recursive: false, diff --git a/libs/filer/dbfs_handle.go b/libs/filer/dbfs_handle.go new file mode 100644 index 0000000000..1dd9038c4c --- /dev/null +++ b/libs/filer/dbfs_handle.go @@ -0,0 +1,217 @@ +package filer + +import ( + "bytes" + "context" + "encoding/base64" + "fmt" + "io" + + "github.com/databricks/databricks-sdk-go/service/dbfs" +) + +// DbfsFileMode conveys user intent when opening a file. +type DbfsFileMode int + +const ( + // Exactly one of DbfsRead or DbfsWrite must be specified. + DbfsRead DbfsFileMode = 1 << iota + DbfsWrite + DbfsOverwrite +) + +// Maximum read or write length for the DBFS API. +const maxDbfsBlockSize = 1024 * 1024 + +type dbfsReader struct { + size int64 + offset int64 +} + +type dbfsWriter struct { + handle int64 +} + +type dbfsHandle struct { + ctx context.Context + api *dbfs.DbfsAPI + path string + + *dbfsReader + *dbfsWriter +} + +// Implement the [io.Reader] interface. +func (h *dbfsHandle) Read(p []byte) (int, error) { + r := h.dbfsReader + if r == nil { + return 0, fmt.Errorf("dbfs file not open for reading") + } + + if r.offset >= r.size { + return 0, io.EOF + } + + res, err := h.api.Read(h.ctx, dbfs.Read{ + Path: h.path, + Length: len(p), + Offset: int(r.offset), // TODO: make int32/in64 work properly + }) + if err != nil { + return 0, fmt.Errorf("dbfs read: %w", err) + } + + // The guard against offset >= size happens above, so this can only happen + // if the file is modified or truncated while reading. If this happens, + // the read contents will likely be corrupted, so we return an error. + if res.BytesRead == 0 { + return 0, fmt.Errorf("dbfs read: unexpected EOF at offset %d (size %d)", r.offset, r.size) + } + + r.offset += res.BytesRead + return base64.StdEncoding.Decode(p, []byte(res.Data)) +} + +// Implement the [io.WriterTo] interface. +func (h *dbfsHandle) WriteTo(w io.Writer) (int64, error) { + r := h.dbfsReader + if r == nil { + return 0, fmt.Errorf("dbfs file not open for reading") + } + + buf := make([]byte, maxDbfsBlockSize) + ntotal := int64(0) + for { + nread, err := h.Read(buf) + if err != nil { + // EOF on read means we're done. + // For writers being done means returning a nil error. + if err == io.EOF { + err = nil + } + return ntotal, err + } + nwritten, err := io.Copy(w, bytes.NewReader(buf[:nread])) + ntotal += nwritten + if err != nil { + return ntotal, err + } + } +} + +// Implement the [io.Writer] interface. +func (h *dbfsHandle) Write(p []byte) (int, error) { + w := h.dbfsWriter + if w == nil { + return 0, fmt.Errorf("dbfs: file not open for writing") + } + + err := h.api.AddBlock(h.ctx, dbfs.AddBlock{ + Data: base64.StdEncoding.EncodeToString(p), + Handle: w.handle, + }) + if err != nil { + return 0, fmt.Errorf("dbfs: add block: %w", err) + } + return len(p), nil +} + +// Implement the [io.ReaderFrom] interface. +func (h *dbfsHandle) ReadFrom(r io.Reader) (int64, error) { + w := h.dbfsWriter + if w == nil { + return 0, fmt.Errorf("dbfs: file not open for writing") + } + + buf := make([]byte, maxDbfsBlockSize) + ntotal := int64(0) + for { + nread, err := r.Read(buf) + if err != nil { + // EOF on read means we're done. + // For writers being done means returning a nil error. + if err == io.EOF { + err = nil + } + return ntotal, err + } + + nwritten, err := h.Write(buf[:nread]) + ntotal += int64(nwritten) + if err != nil { + return ntotal, err + } + } +} + +// Implement the [io.Closer] interface. +func (h *dbfsHandle) Close() error { + w := h.dbfsWriter + if w == nil { + return fmt.Errorf("dbfs: file not open for writing") + } + + err := h.api.CloseByHandle(h.ctx, w.handle) + if err != nil { + return fmt.Errorf("dbfs: close: %w", err) + } + + return nil +} + +func (h *dbfsHandle) openForRead(mode DbfsFileMode) (*dbfsHandle, error) { + res, err := h.api.GetStatusByPath(h.ctx, h.path) + if err != nil { + return nil, err + } + h.dbfsReader = &dbfsReader{ + size: res.FileSize, + } + return h, nil +} + +func (h *dbfsHandle) openForWrite(mode DbfsFileMode) (*dbfsHandle, error) { + res, err := h.api.Create(h.ctx, dbfs.Create{ + Path: h.path, + Overwrite: (mode & DbfsOverwrite) != 0, + }) + if err != nil { + return nil, err + } + h.dbfsWriter = &dbfsWriter{ + handle: res.Handle, + } + return h, nil +} + +// OpenFile opens a remote DBFS file for reading or writing. +// The returned object implements relevant [io] interfaces for convenient +// integration with other code that reads or writes bytes. +// +// The [io.WriterTo] interface is provided and maximizes throughput for +// bulk reads by reading data with the DBFS maximum read chunk size of 1MB. +// Similarly, the [io.ReaderFrom] interface is provided for bulk writing. +// +// A file opened for writing must always be closed. +func OpenFile(ctx context.Context, api *dbfs.DbfsAPI, path string, mode DbfsFileMode) (*dbfsHandle, error) { + h := &dbfsHandle{ + ctx: ctx, + api: api, + path: path, + } + + isRead := (mode & DbfsRead) != 0 + isWrite := (mode & DbfsWrite) != 0 + if isRead && isWrite { + return nil, fmt.Errorf("dbfs: cannot open file for reading and writing at the same time") + } + if isRead { + return h.openForRead(mode) + } + if isWrite { + return h.openForWrite(mode) + } + + // No mode specifed. The caller should be explicit so we return an error. + return nil, fmt.Errorf("dbfs: must specify DbfsRead or DbfsWrite") +} diff --git a/libs/filer/filer.go b/libs/filer/filer.go index 92de6e128c..1d07095788 100644 --- a/libs/filer/filer.go +++ b/libs/filer/filer.go @@ -21,6 +21,14 @@ func (err FileAlreadyExistsError) Error() string { return fmt.Sprintf("file already exists: %s", err.path) } +type FileDoesNotExistError struct { + path string +} + +func (err FileDoesNotExistError) Error() string { + return fmt.Sprintf("file does not exist: %s", err.path) +} + type NoSuchDirectoryError struct { path string } diff --git a/libs/filer/workspace_files_client.go b/libs/filer/workspace_files_client.go index ff813f0913..87a018bd93 100644 --- a/libs/filer/workspace_files_client.go +++ b/libs/filer/workspace_files_client.go @@ -66,31 +66,29 @@ func (w *WorkspaceFilesClient) Write(ctx context.Context, name string, reader io err = w.apiClient.Do(ctx, http.MethodPost, urlPath, body, nil) - // If we got an API error we deal with it below. - var aerr *apierr.APIError - if !errors.As(err, &aerr) { - return err - } - - // This API returns a 404 if the parent directory does not exist. - if aerr.StatusCode == http.StatusNotFound { - if !slices.Contains(mode, CreateParentDirectories) { - return NoSuchDirectoryError{path.Dir(absPath)} + // Transform some API errors into filer specific errors. + var aerr apierr.APIError + if errors.As(err, &aerr) { + // This API returns a 404 if the parent directory does not exist. + if aerr.StatusCode == http.StatusNotFound { + if !slices.Contains(mode, CreateParentDirectories) { + return NoSuchDirectoryError{path.Dir(absPath)} + } + + // Create parent directory. + err = w.workspaceClient.Workspace.MkdirsByPath(ctx, path.Dir(absPath)) + if err != nil { + return fmt.Errorf("unable to mkdir to write file %s: %w", absPath, err) + } + + // Retry without CreateParentDirectories mode flag. + return w.Write(ctx, name, bytes.NewReader(body), sliceWithout(mode, CreateParentDirectories)...) } - // Create parent directory. - err = w.workspaceClient.Workspace.MkdirsByPath(ctx, path.Dir(absPath)) - if err != nil { - return fmt.Errorf("unable to mkdir to write file %s: %w", absPath, err) + // This API returns 409 if the file already exists. + if aerr.StatusCode == http.StatusConflict { + return FileAlreadyExistsError{absPath} } - - // Retry without CreateParentDirectories mode flag. - return w.Write(ctx, name, bytes.NewReader(body), sliceWithout(mode, CreateParentDirectories)...) - } - - // This API returns 409 if the file already exists. - if aerr.StatusCode == http.StatusConflict { - return FileAlreadyExistsError{absPath} } return err @@ -110,6 +108,15 @@ func (w *WorkspaceFilesClient) Read(ctx context.Context, name string) (io.Reader var res []byte err = w.apiClient.Do(ctx, http.MethodGet, urlPath, nil, &res) + + // Transform some API errors into filer specific errors. + var aerr apierr.APIError + if errors.As(err, &aerr) { + if aerr.StatusCode == http.StatusNotFound { + return nil, FileDoesNotExistError{absPath} + } + } + if err != nil { return nil, err } @@ -123,8 +130,18 @@ func (w *WorkspaceFilesClient) Delete(ctx context.Context, name string) error { return err } - return w.workspaceClient.Workspace.Delete(ctx, workspace.Delete{ + err = w.workspaceClient.Workspace.Delete(ctx, workspace.Delete{ Path: absPath, Recursive: false, }) + + // Transform some API errors into filer specific errors. + var aerr apierr.APIError + if errors.As(err, &aerr) { + if aerr.StatusCode == http.StatusNotFound { + return FileDoesNotExistError{absPath} + } + } + + return err } From dbfdc74613b11837adc45bcc8adef9162e0bdd31 Mon Sep 17 00:00:00 2001 From: Pieter Noordhuis Date: Thu, 15 Dec 2022 11:25:53 +0100 Subject: [PATCH 03/10] Less nesting --- libs/filer/workspace_files_client.go | 38 +++++++++++++++------------- 1 file changed, 20 insertions(+), 18 deletions(-) diff --git a/libs/filer/workspace_files_client.go b/libs/filer/workspace_files_client.go index 87a018bd93..8522514e5e 100644 --- a/libs/filer/workspace_files_client.go +++ b/libs/filer/workspace_files_client.go @@ -68,27 +68,29 @@ func (w *WorkspaceFilesClient) Write(ctx context.Context, name string, reader io // Transform some API errors into filer specific errors. var aerr apierr.APIError - if errors.As(err, &aerr) { - // This API returns a 404 if the parent directory does not exist. - if aerr.StatusCode == http.StatusNotFound { - if !slices.Contains(mode, CreateParentDirectories) { - return NoSuchDirectoryError{path.Dir(absPath)} - } - - // Create parent directory. - err = w.workspaceClient.Workspace.MkdirsByPath(ctx, path.Dir(absPath)) - if err != nil { - return fmt.Errorf("unable to mkdir to write file %s: %w", absPath, err) - } - - // Retry without CreateParentDirectories mode flag. - return w.Write(ctx, name, bytes.NewReader(body), sliceWithout(mode, CreateParentDirectories)...) + if !errors.As(err, &aerr) { + return err + } + + // This API returns a 404 if the parent directory does not exist. + if aerr.StatusCode == http.StatusNotFound { + if !slices.Contains(mode, CreateParentDirectories) { + return NoSuchDirectoryError{path.Dir(absPath)} } - // This API returns 409 if the file already exists. - if aerr.StatusCode == http.StatusConflict { - return FileAlreadyExistsError{absPath} + // Create parent directory. + err = w.workspaceClient.Workspace.MkdirsByPath(ctx, path.Dir(absPath)) + if err != nil { + return fmt.Errorf("unable to mkdir to write file %s: %w", absPath, err) } + + // Retry without CreateParentDirectories mode flag. + return w.Write(ctx, name, bytes.NewReader(body), sliceWithout(mode, CreateParentDirectories)...) + } + + // This API returns 409 if the file already exists. + if aerr.StatusCode == http.StatusConflict { + return FileAlreadyExistsError{absPath} } return err From e48c89cb61f0f5ac1e125d395795b7e35015ed64 Mon Sep 17 00:00:00 2001 From: Pieter Noordhuis Date: Thu, 15 Dec 2022 11:31:02 +0100 Subject: [PATCH 04/10] Structure --- libs/filer/workspace_files_client.go | 43 +++++++++++++++++++--------- 1 file changed, 29 insertions(+), 14 deletions(-) diff --git a/libs/filer/workspace_files_client.go b/libs/filer/workspace_files_client.go index 8522514e5e..bf02e7c957 100644 --- a/libs/filer/workspace_files_client.go +++ b/libs/filer/workspace_files_client.go @@ -66,7 +66,12 @@ func (w *WorkspaceFilesClient) Write(ctx context.Context, name string, reader io err = w.apiClient.Do(ctx, http.MethodPost, urlPath, body, nil) - // Transform some API errors into filer specific errors. + // Return early on success. + if err == nil { + return nil + } + + // Special handling of this error only if it is an API error. var aerr apierr.APIError if !errors.As(err, &aerr) { return err @@ -111,19 +116,22 @@ func (w *WorkspaceFilesClient) Read(ctx context.Context, name string) (io.Reader var res []byte err = w.apiClient.Do(ctx, http.MethodGet, urlPath, nil, &res) - // Transform some API errors into filer specific errors. - var aerr apierr.APIError - if errors.As(err, &aerr) { - if aerr.StatusCode == http.StatusNotFound { - return nil, FileDoesNotExistError{absPath} - } + // Return early on success. + if err == nil { + return bytes.NewReader(res), nil } - if err != nil { + // Special handling of this error only if it is an API error. + var aerr apierr.APIError + if !errors.As(err, &aerr) { return nil, err } - return bytes.NewReader(res), nil + if aerr.StatusCode == http.StatusNotFound { + return nil, FileDoesNotExistError{absPath} + } + + return nil, err } func (w *WorkspaceFilesClient) Delete(ctx context.Context, name string) error { @@ -137,12 +145,19 @@ func (w *WorkspaceFilesClient) Delete(ctx context.Context, name string) error { Recursive: false, }) - // Transform some API errors into filer specific errors. + // Return early on success. + if err == nil { + return nil + } + + // Special handling of this error only if it is an API error. var aerr apierr.APIError - if errors.As(err, &aerr) { - if aerr.StatusCode == http.StatusNotFound { - return FileDoesNotExistError{absPath} - } + if !errors.As(err, &aerr) { + return err + } + + if aerr.StatusCode == http.StatusNotFound { + return FileDoesNotExistError{absPath} } return err From f75b9ec79c2221a0b2d4821bf4d8663e437790ea Mon Sep 17 00:00:00 2001 From: Pieter Noordhuis Date: Thu, 22 Dec 2022 09:24:02 +0100 Subject: [PATCH 05/10] Update to use DBFS handles in SDK --- libs/filer/dbfs_client.go | 14 +-- libs/filer/dbfs_handle.go | 217 -------------------------------------- 2 files changed, 7 insertions(+), 224 deletions(-) delete mode 100644 libs/filer/dbfs_handle.go diff --git a/libs/filer/dbfs_client.go b/libs/filer/dbfs_client.go index aa80fdc47e..3763cf3197 100644 --- a/libs/filer/dbfs_client.go +++ b/libs/filer/dbfs_client.go @@ -34,11 +34,11 @@ func (w *DbfsClient) Write(ctx context.Context, name string, reader io.Reader, m return err } - dbfsMode := DbfsWrite + fileMode := dbfs.FileModeWrite if slices.Contains(mode, OverwriteIfExists) { - dbfsMode |= DbfsOverwrite + fileMode |= dbfs.FileModeOverwrite } - dbfsHandle, err := OpenFile(ctx, w.workspaceClient.Dbfs, absPath, dbfsMode) + handle, err := w.workspaceClient.Dbfs.Open(ctx, absPath, fileMode) if err != nil { var aerr apierr.APIError if !errors.As(err, &aerr) { @@ -55,8 +55,8 @@ func (w *DbfsClient) Write(ctx context.Context, name string, reader io.Reader, m return err } - _, err = io.Copy(dbfsHandle, reader) - cerr := dbfsHandle.Close() + _, err = io.Copy(handle, reader) + cerr := handle.Close() if err == nil { err = cerr } @@ -70,7 +70,7 @@ func (w *DbfsClient) Read(ctx context.Context, name string) (io.Reader, error) { return nil, err } - dbfsHandle, err := OpenFile(ctx, w.workspaceClient.Dbfs, absPath, DbfsRead) + handle, err := w.workspaceClient.Dbfs.Open(ctx, absPath, dbfs.FileModeRead) if err != nil { var aerr apierr.APIError if !errors.As(err, &aerr) { @@ -87,7 +87,7 @@ func (w *DbfsClient) Read(ctx context.Context, name string) (io.Reader, error) { return nil, err } - return dbfsHandle, nil + return handle, nil } func (w *DbfsClient) Delete(ctx context.Context, name string) error { diff --git a/libs/filer/dbfs_handle.go b/libs/filer/dbfs_handle.go deleted file mode 100644 index 1dd9038c4c..0000000000 --- a/libs/filer/dbfs_handle.go +++ /dev/null @@ -1,217 +0,0 @@ -package filer - -import ( - "bytes" - "context" - "encoding/base64" - "fmt" - "io" - - "github.com/databricks/databricks-sdk-go/service/dbfs" -) - -// DbfsFileMode conveys user intent when opening a file. -type DbfsFileMode int - -const ( - // Exactly one of DbfsRead or DbfsWrite must be specified. - DbfsRead DbfsFileMode = 1 << iota - DbfsWrite - DbfsOverwrite -) - -// Maximum read or write length for the DBFS API. -const maxDbfsBlockSize = 1024 * 1024 - -type dbfsReader struct { - size int64 - offset int64 -} - -type dbfsWriter struct { - handle int64 -} - -type dbfsHandle struct { - ctx context.Context - api *dbfs.DbfsAPI - path string - - *dbfsReader - *dbfsWriter -} - -// Implement the [io.Reader] interface. -func (h *dbfsHandle) Read(p []byte) (int, error) { - r := h.dbfsReader - if r == nil { - return 0, fmt.Errorf("dbfs file not open for reading") - } - - if r.offset >= r.size { - return 0, io.EOF - } - - res, err := h.api.Read(h.ctx, dbfs.Read{ - Path: h.path, - Length: len(p), - Offset: int(r.offset), // TODO: make int32/in64 work properly - }) - if err != nil { - return 0, fmt.Errorf("dbfs read: %w", err) - } - - // The guard against offset >= size happens above, so this can only happen - // if the file is modified or truncated while reading. If this happens, - // the read contents will likely be corrupted, so we return an error. - if res.BytesRead == 0 { - return 0, fmt.Errorf("dbfs read: unexpected EOF at offset %d (size %d)", r.offset, r.size) - } - - r.offset += res.BytesRead - return base64.StdEncoding.Decode(p, []byte(res.Data)) -} - -// Implement the [io.WriterTo] interface. -func (h *dbfsHandle) WriteTo(w io.Writer) (int64, error) { - r := h.dbfsReader - if r == nil { - return 0, fmt.Errorf("dbfs file not open for reading") - } - - buf := make([]byte, maxDbfsBlockSize) - ntotal := int64(0) - for { - nread, err := h.Read(buf) - if err != nil { - // EOF on read means we're done. - // For writers being done means returning a nil error. - if err == io.EOF { - err = nil - } - return ntotal, err - } - nwritten, err := io.Copy(w, bytes.NewReader(buf[:nread])) - ntotal += nwritten - if err != nil { - return ntotal, err - } - } -} - -// Implement the [io.Writer] interface. -func (h *dbfsHandle) Write(p []byte) (int, error) { - w := h.dbfsWriter - if w == nil { - return 0, fmt.Errorf("dbfs: file not open for writing") - } - - err := h.api.AddBlock(h.ctx, dbfs.AddBlock{ - Data: base64.StdEncoding.EncodeToString(p), - Handle: w.handle, - }) - if err != nil { - return 0, fmt.Errorf("dbfs: add block: %w", err) - } - return len(p), nil -} - -// Implement the [io.ReaderFrom] interface. -func (h *dbfsHandle) ReadFrom(r io.Reader) (int64, error) { - w := h.dbfsWriter - if w == nil { - return 0, fmt.Errorf("dbfs: file not open for writing") - } - - buf := make([]byte, maxDbfsBlockSize) - ntotal := int64(0) - for { - nread, err := r.Read(buf) - if err != nil { - // EOF on read means we're done. - // For writers being done means returning a nil error. - if err == io.EOF { - err = nil - } - return ntotal, err - } - - nwritten, err := h.Write(buf[:nread]) - ntotal += int64(nwritten) - if err != nil { - return ntotal, err - } - } -} - -// Implement the [io.Closer] interface. -func (h *dbfsHandle) Close() error { - w := h.dbfsWriter - if w == nil { - return fmt.Errorf("dbfs: file not open for writing") - } - - err := h.api.CloseByHandle(h.ctx, w.handle) - if err != nil { - return fmt.Errorf("dbfs: close: %w", err) - } - - return nil -} - -func (h *dbfsHandle) openForRead(mode DbfsFileMode) (*dbfsHandle, error) { - res, err := h.api.GetStatusByPath(h.ctx, h.path) - if err != nil { - return nil, err - } - h.dbfsReader = &dbfsReader{ - size: res.FileSize, - } - return h, nil -} - -func (h *dbfsHandle) openForWrite(mode DbfsFileMode) (*dbfsHandle, error) { - res, err := h.api.Create(h.ctx, dbfs.Create{ - Path: h.path, - Overwrite: (mode & DbfsOverwrite) != 0, - }) - if err != nil { - return nil, err - } - h.dbfsWriter = &dbfsWriter{ - handle: res.Handle, - } - return h, nil -} - -// OpenFile opens a remote DBFS file for reading or writing. -// The returned object implements relevant [io] interfaces for convenient -// integration with other code that reads or writes bytes. -// -// The [io.WriterTo] interface is provided and maximizes throughput for -// bulk reads by reading data with the DBFS maximum read chunk size of 1MB. -// Similarly, the [io.ReaderFrom] interface is provided for bulk writing. -// -// A file opened for writing must always be closed. -func OpenFile(ctx context.Context, api *dbfs.DbfsAPI, path string, mode DbfsFileMode) (*dbfsHandle, error) { - h := &dbfsHandle{ - ctx: ctx, - api: api, - path: path, - } - - isRead := (mode & DbfsRead) != 0 - isWrite := (mode & DbfsWrite) != 0 - if isRead && isWrite { - return nil, fmt.Errorf("dbfs: cannot open file for reading and writing at the same time") - } - if isRead { - return h.openForRead(mode) - } - if isWrite { - return h.openForWrite(mode) - } - - // No mode specifed. The caller should be explicit so we return an error. - return nil, fmt.Errorf("dbfs: must specify DbfsRead or DbfsWrite") -} From a0e3d250ed1d64eb193eea8fb5f7ce1029d56ddc Mon Sep 17 00:00:00 2001 From: Pieter Noordhuis Date: Mon, 20 Mar 2023 14:32:52 +0100 Subject: [PATCH 06/10] Pointer apierr --- libs/filer/workspace_files_client.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/libs/filer/workspace_files_client.go b/libs/filer/workspace_files_client.go index bf02e7c957..e37c75a80a 100644 --- a/libs/filer/workspace_files_client.go +++ b/libs/filer/workspace_files_client.go @@ -72,7 +72,7 @@ func (w *WorkspaceFilesClient) Write(ctx context.Context, name string, reader io } // Special handling of this error only if it is an API error. - var aerr apierr.APIError + var aerr *apierr.APIError if !errors.As(err, &aerr) { return err } @@ -122,7 +122,7 @@ func (w *WorkspaceFilesClient) Read(ctx context.Context, name string) (io.Reader } // Special handling of this error only if it is an API error. - var aerr apierr.APIError + var aerr *apierr.APIError if !errors.As(err, &aerr) { return nil, err } @@ -151,7 +151,7 @@ func (w *WorkspaceFilesClient) Delete(ctx context.Context, name string) error { } // Special handling of this error only if it is an API error. - var aerr apierr.APIError + var aerr *apierr.APIError if !errors.As(err, &aerr) { return err } From 535b207c98331e8399b3234ae5457062d7c0ab75 Mon Sep 17 00:00:00 2001 From: Pieter Noordhuis Date: Tue, 23 May 2023 13:49:55 +0200 Subject: [PATCH 07/10] Rebase --- internal/filer_test.go | 4 ++-- libs/filer/dbfs_client.go | 14 +++++++------- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/internal/filer_test.go b/internal/filer_test.go index eba39b26bd..f82f554363 100644 --- a/internal/filer_test.go +++ b/internal/filer_test.go @@ -13,7 +13,7 @@ import ( "github.com/databricks/cli/libs/filer" "github.com/databricks/databricks-sdk-go" "github.com/databricks/databricks-sdk-go/apierr" - "github.com/databricks/databricks-sdk-go/service/dbfs" + "github.com/databricks/databricks-sdk-go/service/files" "github.com/databricks/databricks-sdk-go/service/workspace" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -127,7 +127,7 @@ func temporaryDbfsDir(t *testing.T, w *databricks.WorkspaceClient) string { // Remove test directory on test completion. t.Cleanup(func() { t.Logf("rm -rf dbfs:%s", path) - err := w.Dbfs.Delete(ctx, dbfs.Delete{ + err := w.Dbfs.Delete(ctx, files.Delete{ Path: path, Recursive: true, }) diff --git a/libs/filer/dbfs_client.go b/libs/filer/dbfs_client.go index 3763cf3197..64fe906e21 100644 --- a/libs/filer/dbfs_client.go +++ b/libs/filer/dbfs_client.go @@ -8,7 +8,7 @@ import ( "github.com/databricks/databricks-sdk-go" "github.com/databricks/databricks-sdk-go/apierr" - "github.com/databricks/databricks-sdk-go/service/dbfs" + "github.com/databricks/databricks-sdk-go/service/files" "golang.org/x/exp/slices" ) @@ -34,13 +34,13 @@ func (w *DbfsClient) Write(ctx context.Context, name string, reader io.Reader, m return err } - fileMode := dbfs.FileModeWrite + fileMode := files.FileModeWrite if slices.Contains(mode, OverwriteIfExists) { - fileMode |= dbfs.FileModeOverwrite + fileMode |= files.FileModeOverwrite } handle, err := w.workspaceClient.Dbfs.Open(ctx, absPath, fileMode) if err != nil { - var aerr apierr.APIError + var aerr *apierr.APIError if !errors.As(err, &aerr) { return err } @@ -70,9 +70,9 @@ func (w *DbfsClient) Read(ctx context.Context, name string) (io.Reader, error) { return nil, err } - handle, err := w.workspaceClient.Dbfs.Open(ctx, absPath, dbfs.FileModeRead) + handle, err := w.workspaceClient.Dbfs.Open(ctx, absPath, files.FileModeRead) if err != nil { - var aerr apierr.APIError + var aerr *apierr.APIError if !errors.As(err, &aerr) { return nil, err } @@ -118,7 +118,7 @@ func (w *DbfsClient) Delete(ctx context.Context, name string) error { return err } - return w.workspaceClient.Dbfs.Delete(ctx, dbfs.Delete{ + return w.workspaceClient.Dbfs.Delete(ctx, files.Delete{ Path: absPath, Recursive: false, }) From 177ee9024e7312775c4fdbf39b5c0ff0a861924a Mon Sep 17 00:00:00 2001 From: Pieter Noordhuis Date: Wed, 31 May 2023 09:30:39 +0200 Subject: [PATCH 08/10] Add Mkdir and ReadDir functions to filer.Filer interface This cherry-picks the filer changes from #408. --- internal/filer_test.go | 46 +++++++++++++++++++++++++++- libs/filer/filer.go | 23 ++++++++++++++ libs/filer/root_path.go | 5 --- libs/filer/root_path_test.go | 35 ++++++++++++--------- libs/filer/workspace_files_client.go | 40 ++++++++++++++++++++++++ 5 files changed, 128 insertions(+), 21 deletions(-) diff --git a/internal/filer_test.go b/internal/filer_test.go index 2dbb8ae662..79279579c1 100644 --- a/internal/filer_test.go +++ b/internal/filer_test.go @@ -65,7 +65,7 @@ func temporaryWorkspaceDir(t *testing.T, w *databricks.WorkspaceClient) string { return path } -func TestAccFilerWorkspaceFiles(t *testing.T) { +func setupWorkspaceFilesTest(t *testing.T) (context.Context, filer.Filer) { t.Log(GetEnvOrSkipTest(t, "CLOUD_ENV")) ctx := context.Background() @@ -81,6 +81,14 @@ func TestAccFilerWorkspaceFiles(t *testing.T) { t.Skip(aerr.Message) } + return ctx, f +} + +func TestAccFilerWorkspaceFilesReadWrite(t *testing.T) { + var err error + + ctx, f := setupWorkspaceFilesTest(t) + // Write should fail because the root path doesn't yet exist. err = f.Write(ctx, "/foo/bar", strings.NewReader(`hello world`)) assert.True(t, errors.As(err, &filer.NoSuchDirectoryError{})) @@ -111,3 +119,39 @@ func TestAccFilerWorkspaceFiles(t *testing.T) { err = f.Delete(ctx, "/foo/bar") assert.NoError(t, err) } + +func TestAccFilerWorkspaceFilesReadDir(t *testing.T) { + var err error + + ctx, f := setupWorkspaceFilesTest(t) + + // We start with an empty directory. + entries, err := f.ReadDir(ctx, ".") + require.NoError(t, err) + assert.Len(t, entries, 0) + + // Write a file. + err = f.Write(ctx, "/hello.txt", strings.NewReader(`hello world`)) + require.NoError(t, err) + + // Create a directory. + err = f.Mkdir(ctx, "/dir") + require.NoError(t, err) + + // Write a file. + err = f.Write(ctx, "/dir/world.txt", strings.NewReader(`hello world`)) + require.NoError(t, err) + + // Expect two entries in the root. + entries, err = f.ReadDir(ctx, ".") + require.NoError(t, err) + assert.Len(t, entries, 2) + assert.Equal(t, "dir", entries[0].Name) + assert.Equal(t, "hello.txt", entries[1].Name) + + // Expect a single entry in the directory. + entries, err = f.ReadDir(ctx, "/dir") + require.NoError(t, err) + assert.Len(t, entries, 1) + assert.Equal(t, "world.txt", entries[0].Name) +} diff --git a/libs/filer/filer.go b/libs/filer/filer.go index 92de6e128c..841d1b8314 100644 --- a/libs/filer/filer.go +++ b/libs/filer/filer.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "io" + "time" ) type WriteMode int @@ -13,6 +14,22 @@ const ( CreateParentDirectories = iota << 1 ) +// FileInfo abstracts over file information from different file systems. +// Inspired by https://pkg.go.dev/io/fs#FileInfo. +type FileInfo struct { + // The type of the file in workspace. + Type string + + // Base name. + Name string + + // Size in bytes. + Size int64 + + // Modification time. + ModTime time.Time +} + type FileAlreadyExistsError struct { path string } @@ -41,4 +58,10 @@ type Filer interface { // Delete file at `path`. Delete(ctx context.Context, path string) error + + // Return contents of directory at `path`. + ReadDir(ctx context.Context, path string) ([]FileInfo, error) + + // Creates directory at `path`, creating any intermediate directories as required. + Mkdir(ctx context.Context, path string) error } diff --git a/libs/filer/root_path.go b/libs/filer/root_path.go index 65b26d5310..bdeff5d73c 100644 --- a/libs/filer/root_path.go +++ b/libs/filer/root_path.go @@ -30,10 +30,5 @@ func (p *RootPath) Join(name string) (string, error) { return "", fmt.Errorf("relative path escapes root: %s", name) } - // Don't allow name to resolve to the root path. - if strings.TrimPrefix(absPath, p.rootPath) == "" { - return "", fmt.Errorf("relative path resolves to root: %s", name) - } - return absPath, nil } diff --git a/libs/filer/root_path_test.go b/libs/filer/root_path_test.go index 3787ef36bf..965842d030 100644 --- a/libs/filer/root_path_test.go +++ b/libs/filer/root_path_test.go @@ -31,6 +31,26 @@ func testRootPath(t *testing.T, uncleanRoot string) { assert.NoError(t, err) assert.Equal(t, cleanRoot+"/a/b/f/g", remotePath) + remotePath, err = rp.Join(".//a/..//./b/..") + assert.NoError(t, err) + assert.Equal(t, cleanRoot, remotePath) + + remotePath, err = rp.Join("a/b/../..") + assert.NoError(t, err) + assert.Equal(t, cleanRoot, remotePath) + + remotePath, err = rp.Join("") + assert.NoError(t, err) + assert.Equal(t, cleanRoot, remotePath) + + remotePath, err = rp.Join(".") + assert.NoError(t, err) + assert.Equal(t, cleanRoot, remotePath) + + remotePath, err = rp.Join("/") + assert.NoError(t, err) + assert.Equal(t, cleanRoot, remotePath) + _, err = rp.Join("..") assert.ErrorContains(t, err, `relative path escapes root: ..`) @@ -57,21 +77,6 @@ func testRootPath(t *testing.T, uncleanRoot string) { _, err = rp.Join("../..") assert.ErrorContains(t, err, `relative path escapes root: ../..`) - - _, err = rp.Join(".//a/..//./b/..") - assert.ErrorContains(t, err, `relative path resolves to root: .//a/..//./b/..`) - - _, err = rp.Join("a/b/../..") - assert.ErrorContains(t, err, "relative path resolves to root: a/b/../..") - - _, err = rp.Join("") - assert.ErrorContains(t, err, "relative path resolves to root: ") - - _, err = rp.Join(".") - assert.ErrorContains(t, err, "relative path resolves to root: .") - - _, err = rp.Join("/") - assert.ErrorContains(t, err, "relative path resolves to root: /") } func TestRootPathClean(t *testing.T) { diff --git a/libs/filer/workspace_files_client.go b/libs/filer/workspace_files_client.go index ff813f0913..6412a36302 100644 --- a/libs/filer/workspace_files_client.go +++ b/libs/filer/workspace_files_client.go @@ -9,7 +9,9 @@ import ( "net/http" "net/url" "path" + "sort" "strings" + "time" "github.com/databricks/databricks-sdk-go" "github.com/databricks/databricks-sdk-go/apierr" @@ -128,3 +130,41 @@ func (w *WorkspaceFilesClient) Delete(ctx context.Context, name string) error { Recursive: false, }) } + +func (w *WorkspaceFilesClient) ReadDir(ctx context.Context, name string) ([]FileInfo, error) { + absPath, err := w.root.Join(name) + if err != nil { + return nil, err + } + + objects, err := w.workspaceClient.Workspace.ListAll(ctx, workspace.ListWorkspaceRequest{ + Path: absPath, + }) + if err != nil { + return nil, err + } + + info := make([]FileInfo, len(objects)) + for i, v := range objects { + info[i] = FileInfo{ + Type: string(v.ObjectType), + Name: path.Base(v.Path), + Size: v.Size, + ModTime: time.UnixMilli(v.ModifiedAt), + } + } + + // Sort by name for parity with os.ReadDir. + sort.Slice(info, func(i, j int) bool { return info[i].Name < info[j].Name }) + return info, nil +} + +func (w *WorkspaceFilesClient) Mkdir(ctx context.Context, name string) error { + dirPath, err := w.root.Join(name) + if err != nil { + return err + } + return w.workspaceClient.Workspace.Mkdirs(ctx, workspace.Mkdirs{ + Path: dirPath, + }) +} From 2ec82c74f73ff2923024b6f1f4020050887f74b9 Mon Sep 17 00:00:00 2001 From: Pieter Noordhuis Date: Wed, 31 May 2023 10:30:36 +0200 Subject: [PATCH 09/10] Make DBFS filer work with #414 --- internal/filer_test.go | 173 ++++++++++++++++++-------------------- libs/filer/dbfs_client.go | 64 +++++++++++++- 2 files changed, 145 insertions(+), 92 deletions(-) diff --git a/internal/filer_test.go b/internal/filer_test.go index 472e0a68d6..d65d0d7033 100644 --- a/internal/filer_test.go +++ b/internal/filer_test.go @@ -39,6 +39,74 @@ func (f filerTest) assertContents(ctx context.Context, name string, contents str assert.Equal(f, contents, body.String()) } +func runFilerReadWriteTest(t *testing.T, ctx context.Context, f filer.Filer) { + var err error + + // Write should fail because the root path doesn't yet exist. + err = f.Write(ctx, "/foo/bar", strings.NewReader(`hello world`)) + assert.True(t, errors.As(err, &filer.NoSuchDirectoryError{})) + + // Read should fail because the root path doesn't yet exist. + _, err = f.Read(ctx, "/foo/bar") + assert.True(t, errors.As(err, &filer.FileDoesNotExistError{})) + + // Write with CreateParentDirectories flag should succeed. + err = f.Write(ctx, "/foo/bar", strings.NewReader(`hello world`), filer.CreateParentDirectories) + assert.NoError(t, err) + filerTest{t, f}.assertContents(ctx, "/foo/bar", `hello world`) + + // Write should fail because there is an existing file at the specified path. + err = f.Write(ctx, "/foo/bar", strings.NewReader(`hello universe`)) + assert.True(t, errors.As(err, &filer.FileAlreadyExistsError{})) + + // Write with OverwriteIfExists should succeed. + err = f.Write(ctx, "/foo/bar", strings.NewReader(`hello universe`), filer.OverwriteIfExists) + assert.NoError(t, err) + filerTest{t, f}.assertContents(ctx, "/foo/bar", `hello universe`) + + // Delete should fail if the file doesn't exist. + err = f.Delete(ctx, "/doesnt_exist") + assert.True(t, errors.As(err, &filer.FileDoesNotExistError{})) + + // Delete should succeed for file that does exist. + err = f.Delete(ctx, "/foo/bar") + assert.NoError(t, err) +} + +func runFilerReadDirTest(t *testing.T, ctx context.Context, f filer.Filer) { + var err error + + // We start with an empty directory. + entries, err := f.ReadDir(ctx, ".") + require.NoError(t, err) + assert.Len(t, entries, 0) + + // Write a file. + err = f.Write(ctx, "/hello.txt", strings.NewReader(`hello world`)) + require.NoError(t, err) + + // Create a directory. + err = f.Mkdir(ctx, "/dir") + require.NoError(t, err) + + // Write a file. + err = f.Write(ctx, "/dir/world.txt", strings.NewReader(`hello world`)) + require.NoError(t, err) + + // Expect two entries in the root. + entries, err = f.ReadDir(ctx, ".") + require.NoError(t, err) + require.Len(t, entries, 2) + assert.Equal(t, "dir", entries[0].Name) + assert.Equal(t, "hello.txt", entries[1].Name) + + // Expect a single entry in the directory. + entries, err = f.ReadDir(ctx, "/dir") + require.NoError(t, err) + require.Len(t, entries, 1) + assert.Equal(t, "world.txt", entries[0].Name) +} + func temporaryWorkspaceDir(t *testing.T, w *databricks.WorkspaceClient) string { ctx := context.Background() me, err := w.CurrentUser.Me(ctx) @@ -88,39 +156,13 @@ func setupWorkspaceFilesTest(t *testing.T) (context.Context, filer.Filer) { } func TestAccFilerWorkspaceFilesReadWrite(t *testing.T) { - var err error - ctx, f := setupWorkspaceFilesTest(t) + runFilerReadWriteTest(t, ctx, f) +} - // Write should fail because the root path doesn't yet exist. - err = f.Write(ctx, "/foo/bar", strings.NewReader(`hello world`)) - assert.True(t, errors.As(err, &filer.NoSuchDirectoryError{})) - - // Read should fail because the root path doesn't yet exist. - _, err = f.Read(ctx, "/foo/bar") - assert.True(t, errors.As(err, &filer.FileDoesNotExistError{})) - - // Write with CreateParentDirectories flag should succeed. - err = f.Write(ctx, "/foo/bar", strings.NewReader(`hello world`), filer.CreateParentDirectories) - assert.NoError(t, err) - filerTest{t, f}.assertContents(ctx, "/foo/bar", `hello world`) - - // Write should fail because there is an existing file at the specified path. - err = f.Write(ctx, "/foo/bar", strings.NewReader(`hello universe`)) - assert.True(t, errors.As(err, &filer.FileAlreadyExistsError{})) - - // Write with OverwriteIfExists should succeed. - err = f.Write(ctx, "/foo/bar", strings.NewReader(`hello universe`), filer.OverwriteIfExists) - assert.NoError(t, err) - filerTest{t, f}.assertContents(ctx, "/foo/bar", `hello universe`) - - // Delete should fail if the file doesn't exist. - err = f.Delete(ctx, "/doesnt_exist") - assert.True(t, errors.As(err, &filer.FileDoesNotExistError{})) - - // Delete should succeed for file that does exist. - err = f.Delete(ctx, "/foo/bar") - assert.NoError(t, err) +func TestAccFilerWorkspaceFilesReadDir(t *testing.T) { + ctx, f := setupWorkspaceFilesTest(t) + runFilerReadDirTest(t, ctx, f) } func temporaryDbfsDir(t *testing.T, w *databricks.WorkspaceClient) string { @@ -148,7 +190,7 @@ func temporaryDbfsDir(t *testing.T, w *databricks.WorkspaceClient) string { return path } -func TestAccFilerDbfs(t *testing.T) { +func setupFilerDbfsTest(t *testing.T) (context.Context, filer.Filer) { t.Log(GetEnvOrSkipTest(t, "CLOUD_ENV")) ctx := context.Background() @@ -156,66 +198,15 @@ func TestAccFilerDbfs(t *testing.T) { tmpdir := temporaryDbfsDir(t, w) f, err := filer.NewDbfsClient(w, tmpdir) require.NoError(t, err) - - // Read should fail because the root path doesn't yet exist. - _, err = f.Read(ctx, "/foo/bar") - assert.True(t, errors.As(err, &filer.FileDoesNotExistError{})) - - // Write with CreateParentDirectories flag should succeed. - err = f.Write(ctx, "/foo/bar", strings.NewReader(`"hello world"`), filer.CreateParentDirectories) - assert.NoError(t, err) - filerTest{t, f}.assertContents(ctx, "/foo/bar", `"hello world"`) - - // Write should fail because there is an existing file at the specified path. - err = f.Write(ctx, "/foo/bar", strings.NewReader(`"hello universe"`)) - assert.True(t, errors.As(err, &filer.FileAlreadyExistsError{})) - - // Write with OverwriteIfExists should succeed. - err = f.Write(ctx, "/foo/bar", strings.NewReader(`"hello universe"`), filer.OverwriteIfExists) - assert.NoError(t, err) - filerTest{t, f}.assertContents(ctx, "/foo/bar", `"hello universe"`) - - // Delete should fail if the file doesn't exist. - err = f.Delete(ctx, "/doesnt_exist") - assert.True(t, errors.As(err, &filer.FileDoesNotExistError{})) - - // Delete should succeed for file that does exist. - err = f.Delete(ctx, "/foo/bar") - assert.NoError(t, err) + return ctx, f } -func TestAccFilerWorkspaceFilesReadDir(t *testing.T) { - var err error - - ctx, f := setupWorkspaceFilesTest(t) - - // We start with an empty directory. - entries, err := f.ReadDir(ctx, ".") - require.NoError(t, err) - assert.Len(t, entries, 0) - - // Write a file. - err = f.Write(ctx, "/hello.txt", strings.NewReader(`hello world`)) - require.NoError(t, err) - - // Create a directory. - err = f.Mkdir(ctx, "/dir") - require.NoError(t, err) - - // Write a file. - err = f.Write(ctx, "/dir/world.txt", strings.NewReader(`hello world`)) - require.NoError(t, err) - - // Expect two entries in the root. - entries, err = f.ReadDir(ctx, ".") - require.NoError(t, err) - assert.Len(t, entries, 2) - assert.Equal(t, "dir", entries[0].Name) - assert.Equal(t, "hello.txt", entries[1].Name) +func TestAccFilerDbfsReadWrite(t *testing.T) { + ctx, f := setupFilerDbfsTest(t) + runFilerReadWriteTest(t, ctx, f) +} - // Expect a single entry in the directory. - entries, err = f.ReadDir(ctx, "/dir") - require.NoError(t, err) - assert.Len(t, entries, 1) - assert.Equal(t, "world.txt", entries[0].Name) +func TestAccFilerDbfsReadDir(t *testing.T) { + ctx, f := setupFilerDbfsTest(t) + runFilerReadDirTest(t, ctx, f) } diff --git a/libs/filer/dbfs_client.go b/libs/filer/dbfs_client.go index 64fe906e21..cc125a10bb 100644 --- a/libs/filer/dbfs_client.go +++ b/libs/filer/dbfs_client.go @@ -5,6 +5,9 @@ import ( "errors" "io" "net/http" + "path" + "sort" + "time" "github.com/databricks/databricks-sdk-go" "github.com/databricks/databricks-sdk-go/apierr" @@ -38,6 +41,31 @@ func (w *DbfsClient) Write(ctx context.Context, name string, reader io.Reader, m if slices.Contains(mode, OverwriteIfExists) { fileMode |= files.FileModeOverwrite } + + // Issue info call before write because it automatically creates parent directories. + // + // For discussion: we could decide this is actually convenient, remove the call below, + // and apply the same semantics for the WSFS filer. + // + if !slices.Contains(mode, CreateParentDirectories) { + _, err = w.workspaceClient.Dbfs.GetStatusByPath(ctx, path.Dir(absPath)) + if err != nil { + var aerr *apierr.APIError + if !errors.As(err, &aerr) { + return err + } + + // This API returns a 404 if the file doesn't exist. + if aerr.StatusCode == http.StatusNotFound { + if aerr.ErrorCode == "RESOURCE_DOES_NOT_EXIST" { + return NoSuchDirectoryError{path.Dir(absPath)} + } + } + + return err + } + } + handle, err := w.workspaceClient.Dbfs.Open(ctx, absPath, fileMode) if err != nil { var aerr *apierr.APIError @@ -103,7 +131,7 @@ func (w *DbfsClient) Delete(ctx context.Context, name string) error { // _, err = w.workspaceClient.Dbfs.GetStatusByPath(ctx, absPath) if err != nil { - var aerr apierr.APIError + var aerr *apierr.APIError if !errors.As(err, &aerr) { return err } @@ -123,3 +151,37 @@ func (w *DbfsClient) Delete(ctx context.Context, name string) error { Recursive: false, }) } + +func (w *DbfsClient) ReadDir(ctx context.Context, name string) ([]FileInfo, error) { + absPath, err := w.root.Join(name) + if err != nil { + return nil, err + } + + res, err := w.workspaceClient.Dbfs.ListByPath(ctx, absPath) + if err != nil { + return nil, err + } + + info := make([]FileInfo, len(res.Files)) + for i, v := range res.Files { + info[i] = FileInfo{ + Name: path.Base(v.Path), + Size: v.FileSize, + ModTime: time.UnixMilli(v.ModificationTime), + } + } + + // Sort by name for parity with os.ReadDir. + sort.Slice(info, func(i, j int) bool { return info[i].Name < info[j].Name }) + return info, nil +} + +func (w *DbfsClient) Mkdir(ctx context.Context, name string) error { + dirPath, err := w.root.Join(name) + if err != nil { + return err + } + + return w.workspaceClient.Dbfs.MkdirsByPath(ctx, dirPath) +} From 4aa55c654eb6e4c9e26c540507d5f866e5e9ab8c Mon Sep 17 00:00:00 2001 From: Pieter Noordhuis Date: Wed, 31 May 2023 11:19:08 +0200 Subject: [PATCH 10/10] Correct error from ReadDir if directory does not exist --- internal/filer_test.go | 2 +- libs/filer/dbfs_client.go | 12 ++++++++++++ 2 files changed, 13 insertions(+), 1 deletion(-) diff --git a/internal/filer_test.go b/internal/filer_test.go index 8103e9af42..6244a5c474 100644 --- a/internal/filer_test.go +++ b/internal/filer_test.go @@ -99,7 +99,7 @@ func runFilerReadDirTest(t *testing.T, ctx context.Context, f filer.Filer) { // Expect an error if the path doesn't exist. _, err = f.ReadDir(ctx, "/dir/a/b/c/d/e") - assert.True(t, errors.As(err, &filer.NoSuchDirectoryError{})) + assert.True(t, errors.As(err, &filer.NoSuchDirectoryError{}), err) // Expect two entries in the root. entries, err = f.ReadDir(ctx, ".") diff --git a/libs/filer/dbfs_client.go b/libs/filer/dbfs_client.go index cc125a10bb..2e32421074 100644 --- a/libs/filer/dbfs_client.go +++ b/libs/filer/dbfs_client.go @@ -160,6 +160,18 @@ func (w *DbfsClient) ReadDir(ctx context.Context, name string) ([]FileInfo, erro res, err := w.workspaceClient.Dbfs.ListByPath(ctx, absPath) if err != nil { + var aerr *apierr.APIError + if !errors.As(err, &aerr) { + return nil, err + } + + // This API returns a 404 if the file doesn't exist. + if aerr.StatusCode == http.StatusNotFound { + if aerr.ErrorCode == "RESOURCE_DOES_NOT_EXIST" { + return nil, NoSuchDirectoryError{absPath} + } + } + return nil, err }