Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 42 additions & 9 deletions libs/filer/workspace_files_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/databricks/databricks-sdk-go"
"github.com/databricks/databricks-sdk-go/apierr"
"github.com/databricks/databricks-sdk-go/client"
"github.com/databricks/databricks-sdk-go/marshal"
"github.com/databricks/databricks-sdk-go/service/workspace"
)

Expand All @@ -38,7 +39,7 @@ func (entry wsfsDirEntry) Info() (fs.FileInfo, error) {
func wsfsDirEntriesFromObjectInfos(objects []workspace.ObjectInfo) []fs.DirEntry {
info := make([]fs.DirEntry, len(objects))
for i, v := range objects {
info[i] = wsfsDirEntry{wsfsFileInfo{oi: v}}
info[i] = wsfsDirEntry{wsfsFileInfo{ObjectInfo: v}}
}

// Sort by name for parity with os.ReadDir.
Expand All @@ -48,19 +49,22 @@ func wsfsDirEntriesFromObjectInfos(objects []workspace.ObjectInfo) []fs.DirEntry

// Type that implements fs.FileInfo for WSFS.
type wsfsFileInfo struct {
oi workspace.ObjectInfo
workspace.ObjectInfo

// The export format of a notebook. This is not exposed by the SDK.
ReposExportFormat workspace.ExportFormat `json:"repos_export_format,omitempty"`
}

func (info wsfsFileInfo) Name() string {
return path.Base(info.oi.Path)
return path.Base(info.ObjectInfo.Path)
}

func (info wsfsFileInfo) Size() int64 {
return info.oi.Size
return info.ObjectInfo.Size
}

func (info wsfsFileInfo) Mode() fs.FileMode {
switch info.oi.ObjectType {
switch info.ObjectInfo.ObjectType {
case workspace.ObjectTypeDirectory, workspace.ObjectTypeRepo:
return fs.ModeDir
default:
Expand All @@ -69,15 +73,29 @@ func (info wsfsFileInfo) Mode() fs.FileMode {
}

func (info wsfsFileInfo) ModTime() time.Time {
return time.UnixMilli(info.oi.ModifiedAt)
return time.UnixMilli(info.ObjectInfo.ModifiedAt)
}

func (info wsfsFileInfo) IsDir() bool {
return info.Mode() == fs.ModeDir
}

func (info wsfsFileInfo) Sys() any {
return info.oi
return info.ObjectInfo
}

// UnmarshalJSON is a custom unmarshaller for the wsfsFileInfo struct.
// It must be defined for this type because otherwise the implementation
// of the embedded ObjectInfo type will be used.
func (info *wsfsFileInfo) UnmarshalJSON(b []byte) error {
return marshal.Unmarshal(b, info)
}

// MarshalJSON is a custom marshaller for the wsfsFileInfo struct.
// It must be defined for this type because otherwise the implementation
// of the embedded ObjectInfo type will be used.
func (info *wsfsFileInfo) MarshalJSON() ([]byte, error) {
return marshal.Marshal(info)
}

// WorkspaceFilesClient implements the files-in-workspace API.
Expand Down Expand Up @@ -293,7 +311,22 @@ func (w *WorkspaceFilesClient) Stat(ctx context.Context, name string) (fs.FileIn
return nil, err
}

info, err := w.workspaceClient.Workspace.GetStatusByPath(ctx, absPath)
var stat wsfsFileInfo

// Perform bespoke API call because "return_export_info" is not exposed by the SDK.
// We need "repos_export_format" to determine if the file is a py or a ipynb notebook.
// This is not exposed by the SDK so we need to make a direct API call.
err = w.apiClient.Do(
ctx,
http.MethodGet,
"/api/2.0/workspace/get-status",
nil,
map[string]string{
"path": absPath,
"return_export_info": "true",
},
&stat,
)
if err != nil {
// If we got an API error we deal with it below.
var aerr *apierr.APIError
Expand All @@ -307,5 +340,5 @@ func (w *WorkspaceFilesClient) Stat(ctx context.Context, name string) (fs.FileIn
}
}

return wsfsFileInfo{*info}, nil
return stat, nil
}
39 changes: 39 additions & 0 deletions libs/filer/workspace_files_client_test.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package filer

import (
"encoding/json"
"io/fs"
"testing"
"time"

"github.com/databricks/databricks-sdk-go/service/workspace"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -54,3 +56,40 @@ func TestWorkspaceFilesDirEntry(t *testing.T) {
assert.False(t, i1.IsDir())
assert.True(t, i2.IsDir())
}

func TestWorkspaceFilesClient_wsfsUnmarshal(t *testing.T) {
payload := `
{
"created_at": 1671030805916,
"language": "PYTHON",
"modified_at": 1671032235392,
"object_id": 795822750063438,
"object_type": "NOTEBOOK",
"path": "/some/path/to/a/notebook",
"repos_export_format": "SOURCE",
"resource_id": "795822750063438"
}
`

var info wsfsFileInfo
err := json.Unmarshal([]byte(payload), &info)
require.NoError(t, err)

// Fields in the object info.
assert.Equal(t, int64(1671030805916), info.CreatedAt)
assert.Equal(t, workspace.LanguagePython, info.Language)
assert.Equal(t, int64(1671032235392), info.ModifiedAt)
assert.Equal(t, int64(795822750063438), info.ObjectId)
assert.Equal(t, workspace.ObjectTypeNotebook, info.ObjectType)
assert.Equal(t, "/some/path/to/a/notebook", info.Path)
assert.Equal(t, workspace.ExportFormatSource, info.ReposExportFormat)
assert.Equal(t, "795822750063438", info.ResourceId)

// Functions for fs.FileInfo.
assert.Equal(t, "notebook", info.Name())
assert.Equal(t, int64(0), info.Size())
assert.Equal(t, fs.ModePerm, info.Mode())
assert.Equal(t, time.UnixMilli(1671032235392), info.ModTime())
assert.False(t, info.IsDir())
assert.NotNil(t, info.Sys())
}
81 changes: 16 additions & 65 deletions libs/filer/workspace_files_extensions_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,22 +6,17 @@ import (
"fmt"
"io"
"io/fs"
"net/http"
"path"
"strings"

"github.com/databricks/cli/libs/log"
"github.com/databricks/cli/libs/notebook"
"github.com/databricks/databricks-sdk-go"
"github.com/databricks/databricks-sdk-go/apierr"
"github.com/databricks/databricks-sdk-go/client"
"github.com/databricks/databricks-sdk-go/marshal"
"github.com/databricks/databricks-sdk-go/service/workspace"
)

type workspaceFilesExtensionsClient struct {
workspaceClient *databricks.WorkspaceClient
apiClient *client.DatabricksClient

wsfs Filer
root string
Expand All @@ -35,64 +30,20 @@ var extensionsToLanguages = map[string]workspace.Language{
".ipynb": workspace.LanguagePython,
}

// workspaceFileStatus defines a custom response body for the "/api/2.0/workspace/get-status" API.
// The "repos_export_format" field is not exposed by the SDK.
type workspaceFileStatus struct {
*workspace.ObjectInfo

// The export format of the notebook. This is not exposed by the SDK.
ReposExportFormat workspace.ExportFormat `json:"repos_export_format,omitempty"`
wsfsFileInfo

// Name of the file to be used in any API calls made using the workspace files
// filer. For notebooks this path does not include the extension.
nameForWorkspaceAPI string
}

// A custom unmarsaller for the workspaceFileStatus struct. This is needed because
// workspaceFileStatus embeds the workspace.ObjectInfo which itself has a custom
// unmarshaller.
// If a custom unmarshaller is not provided extra fields like ReposExportFormat
// will not have values set.
func (s *workspaceFileStatus) UnmarshalJSON(b []byte) error {
return marshal.Unmarshal(b, s)
}

func (s *workspaceFileStatus) MarshalJSON() ([]byte, error) {
return marshal.Marshal(s)
}

func (w *workspaceFilesExtensionsClient) stat(ctx context.Context, name string) (*workspaceFileStatus, error) {
stat := &workspaceFileStatus{
nameForWorkspaceAPI: name,
}

// Perform bespoke API call because "return_export_info" is not exposed by the SDK.
// We need "repos_export_format" to determine if the file is a py or a ipynb notebook.
// This is not exposed by the SDK so we need to make a direct API call.
err := w.apiClient.Do(
ctx,
http.MethodGet,
"/api/2.0/workspace/get-status",
nil,
map[string]string{
"path": path.Join(w.root, name),
"return_export_info": "true",
},
stat,
)
func (w *workspaceFilesExtensionsClient) stat(ctx context.Context, name string) (wsfsFileInfo, error) {
info, err := w.wsfs.Stat(ctx, name)
if err != nil {
// If we got an API error we deal with it below.
var aerr *apierr.APIError
if !errors.As(err, &aerr) {
return nil, err
}

// This API returns a 404 if the specified path does not exist.
if aerr.StatusCode == http.StatusNotFound {
return nil, FileDoesNotExistError{path.Join(w.root, name)}
}
return wsfsFileInfo{}, err
}
return stat, err
return info.(wsfsFileInfo), err
}

// This function returns the stat for the provided notebook. The stat object itself contains the path
Expand Down Expand Up @@ -146,7 +97,10 @@ func (w *workspaceFilesExtensionsClient) getNotebookStatByNameWithExt(ctx contex
// Modify the stat object path to include the extension. This stat object will be used
// to return the fs.FileInfo object in the stat method.
stat.Path = stat.Path + ext
return stat, nil
return &workspaceFileStatus{
wsfsFileInfo: stat,
nameForWorkspaceAPI: nameWithoutExt,
}, nil
}

func (w *workspaceFilesExtensionsClient) getNotebookStatByNameWithoutExt(ctx context.Context, name string) (*workspaceFileStatus, error) {
Expand All @@ -162,7 +116,7 @@ func (w *workspaceFilesExtensionsClient) getNotebookStatByNameWithoutExt(ctx con
}

// Get the extension for the notebook.
ext := notebook.GetExtensionByLanguage(stat.ObjectInfo)
ext := notebook.GetExtensionByLanguage(&stat.ObjectInfo)

// If the notebook was exported as a Jupyter notebook, the extension should be .ipynb.
if stat.Language == workspace.LanguagePython && stat.ReposExportFormat == workspace.ExportFormatJupyter {
Expand All @@ -172,7 +126,10 @@ func (w *workspaceFilesExtensionsClient) getNotebookStatByNameWithoutExt(ctx con
// Modify the stat object path to include the extension. This stat object will be used
// to return the fs.DirEntry object in the ReadDir method.
stat.Path = stat.Path + ext
return stat, nil
return &workspaceFileStatus{
wsfsFileInfo: stat,
nameForWorkspaceAPI: name,
}, nil
}

type DuplicatePathError struct {
Expand Down Expand Up @@ -200,19 +157,13 @@ func (e DuplicatePathError) Error() string {
// errors for namespace clashes (e.g. a file and a notebook or a directory and a notebook).
// Thus users of these methods should be careful to avoid such clashes.
func NewWorkspaceFilesExtensionsClient(w *databricks.WorkspaceClient, root string) (Filer, error) {
apiClient, err := client.New(w.Config)
if err != nil {
return nil, err
}

filer, err := NewWorkspaceFilesClient(w, root)
if err != nil {
return nil, err
}

return &workspaceFilesExtensionsClient{
workspaceClient: w,
apiClient: apiClient,

wsfs: filer,
root: root,
Expand Down Expand Up @@ -240,7 +191,7 @@ func (w *workspaceFilesExtensionsClient) ReadDir(ctx context.Context, name strin
return nil, err
}
// Replace the entry with the new entry that includes the extension.
entries[i] = wsfsDirEntry{wsfsFileInfo{oi: *stat.ObjectInfo}}
entries[i] = wsfsDirEntry{wsfsFileInfo{ObjectInfo: stat.ObjectInfo}}
}

// Error if we have seen this path before in the current directory.
Expand Down Expand Up @@ -331,7 +282,7 @@ func (w *workspaceFilesExtensionsClient) Stat(ctx context.Context, name string)
return nil, err
}

return wsfsFileInfo{oi: *stat.ObjectInfo}, nil
return wsfsFileInfo{ObjectInfo: stat.ObjectInfo}, nil
}

return info, err
Expand Down