From 0c36933c62e29c94110ea2b1839fc57f1b5697eb Mon Sep 17 00:00:00 2001 From: Pieter Noordhuis Date: Thu, 4 Jul 2024 14:07:11 +0200 Subject: [PATCH 1/8] Move bespoke status call to main workspace files filer --- libs/filer/workspace_files_client.go | 51 +++++++++--- libs/filer/workspace_files_client_test.go | 39 +++++++++ .../workspace_files_extensions_client.go | 81 ++++--------------- 3 files changed, 97 insertions(+), 74 deletions(-) diff --git a/libs/filer/workspace_files_client.go b/libs/filer/workspace_files_client.go index 09f11b161e..d799c1f885 100644 --- a/libs/filer/workspace_files_client.go +++ b/libs/filer/workspace_files_client.go @@ -19,6 +19,7 @@ import ( "github.com/databricks/databricks-sdk-go" "github.com/databricks/databricks-sdk-go/apierr" "github.com/databricks/databricks-sdk-go/client" + "github.com/databricks/databricks-sdk-go/marshal" "github.com/databricks/databricks-sdk-go/service/workspace" ) @@ -38,7 +39,7 @@ func (entry wsfsDirEntry) Info() (fs.FileInfo, error) { func wsfsDirEntriesFromObjectInfos(objects []workspace.ObjectInfo) []fs.DirEntry { info := make([]fs.DirEntry, len(objects)) for i, v := range objects { - info[i] = wsfsDirEntry{wsfsFileInfo{oi: v}} + info[i] = wsfsDirEntry{wsfsFileInfo{ObjectInfo: v}} } // Sort by name for parity with os.ReadDir. @@ -48,19 +49,22 @@ func wsfsDirEntriesFromObjectInfos(objects []workspace.ObjectInfo) []fs.DirEntry // Type that implements fs.FileInfo for WSFS. type wsfsFileInfo struct { - oi workspace.ObjectInfo + workspace.ObjectInfo + + // The export format of a notebook. This is not exposed by the SDK. + ReposExportFormat workspace.ExportFormat `json:"repos_export_format,omitempty"` } func (info wsfsFileInfo) Name() string { - return path.Base(info.oi.Path) + return path.Base(info.ObjectInfo.Path) } func (info wsfsFileInfo) Size() int64 { - return info.oi.Size + return info.ObjectInfo.Size } func (info wsfsFileInfo) Mode() fs.FileMode { - switch info.oi.ObjectType { + switch info.ObjectInfo.ObjectType { case workspace.ObjectTypeDirectory, workspace.ObjectTypeRepo: return fs.ModeDir default: @@ -69,7 +73,7 @@ func (info wsfsFileInfo) Mode() fs.FileMode { } func (info wsfsFileInfo) ModTime() time.Time { - return time.UnixMilli(info.oi.ModifiedAt) + return time.UnixMilli(info.ObjectInfo.ModifiedAt) } func (info wsfsFileInfo) IsDir() bool { @@ -77,7 +81,21 @@ func (info wsfsFileInfo) IsDir() bool { } func (info wsfsFileInfo) Sys() any { - return info.oi + return info.ObjectInfo +} + +// UnmarshalJSON is a custom unmarshaller for the wsfsFileInfo struct. +// It must be defined for this type because otherwise the implementation +// of the embedded ObjectInfo type will be used. +func (info *wsfsFileInfo) UnmarshalJSON(b []byte) error { + return marshal.Unmarshal(b, info) +} + +// MarshalJSON is a custom marshaller for the wsfsFileInfo struct. +// It must be defined for this type because otherwise the implementation +// of the embedded ObjectInfo type will be used. +func (info *wsfsFileInfo) MarshalJSON() ([]byte, error) { + return marshal.Marshal(info) } // WorkspaceFilesClient implements the files-in-workspace API. @@ -293,7 +311,22 @@ func (w *WorkspaceFilesClient) Stat(ctx context.Context, name string) (fs.FileIn return nil, err } - info, err := w.workspaceClient.Workspace.GetStatusByPath(ctx, absPath) + var stat wsfsFileInfo + + // Perform bespoke API call because "return_export_info" is not exposed by the SDK. + // We need "repos_export_format" to determine if the file is a py or a ipynb notebook. + // This is not exposed by the SDK so we need to make a direct API call. + err = w.apiClient.Do( + ctx, + http.MethodGet, + "/api/2.0/workspace/get-status", + nil, + map[string]string{ + "path": absPath, + "return_export_info": "true", + }, + &stat, + ) if err != nil { // If we got an API error we deal with it below. var aerr *apierr.APIError @@ -307,5 +340,5 @@ func (w *WorkspaceFilesClient) Stat(ctx context.Context, name string) (fs.FileIn } } - return wsfsFileInfo{*info}, nil + return stat, nil } diff --git a/libs/filer/workspace_files_client_test.go b/libs/filer/workspace_files_client_test.go index 4e9537641f..650b5be682 100644 --- a/libs/filer/workspace_files_client_test.go +++ b/libs/filer/workspace_files_client_test.go @@ -1,8 +1,10 @@ package filer import ( + "encoding/json" "io/fs" "testing" + "time" "github.com/databricks/databricks-sdk-go/service/workspace" "github.com/stretchr/testify/assert" @@ -54,3 +56,40 @@ func TestWorkspaceFilesDirEntry(t *testing.T) { assert.False(t, i1.IsDir()) assert.True(t, i2.IsDir()) } + +func TestWorkspaceFilesClient_wsfsUnmarshal(t *testing.T) { + payload := ` + { + "created_at": 1671030805916, + "language": "PYTHON", + "modified_at": 1671032235392, + "object_id": 795822750063438, + "object_type": "NOTEBOOK", + "path": "/some/path/to/a/notebook", + "repos_export_format": "SOURCE", + "resource_id": "795822750063438" + } + ` + + var info wsfsFileInfo + err := json.Unmarshal([]byte(payload), &info) + require.NoError(t, err) + + // Fields in the object info. + assert.Equal(t, int64(1671030805916), info.CreatedAt) + assert.Equal(t, workspace.LanguagePython, info.Language) + assert.Equal(t, int64(1671032235392), info.ModifiedAt) + assert.Equal(t, int64(795822750063438), info.ObjectId) + assert.Equal(t, workspace.ObjectTypeNotebook, info.ObjectType) + assert.Equal(t, "/some/path/to/a/notebook", info.Path) + assert.Equal(t, workspace.ExportFormatSource, info.ReposExportFormat) + assert.Equal(t, "795822750063438", info.ResourceId) + + // Functions for fs.FileInfo. + assert.Equal(t, "notebook", info.Name()) + assert.Equal(t, int64(0), info.Size()) + assert.Equal(t, fs.ModePerm, info.Mode()) + assert.Equal(t, time.UnixMilli(1671032235392), info.ModTime()) + assert.False(t, info.IsDir()) + assert.NotNil(t, info.Sys()) +} diff --git a/libs/filer/workspace_files_extensions_client.go b/libs/filer/workspace_files_extensions_client.go index 3ce6913af2..a872dcc65a 100644 --- a/libs/filer/workspace_files_extensions_client.go +++ b/libs/filer/workspace_files_extensions_client.go @@ -6,22 +6,17 @@ import ( "fmt" "io" "io/fs" - "net/http" "path" "strings" "github.com/databricks/cli/libs/log" "github.com/databricks/cli/libs/notebook" "github.com/databricks/databricks-sdk-go" - "github.com/databricks/databricks-sdk-go/apierr" - "github.com/databricks/databricks-sdk-go/client" - "github.com/databricks/databricks-sdk-go/marshal" "github.com/databricks/databricks-sdk-go/service/workspace" ) type workspaceFilesExtensionsClient struct { workspaceClient *databricks.WorkspaceClient - apiClient *client.DatabricksClient wsfs Filer root string @@ -35,64 +30,20 @@ var extensionsToLanguages = map[string]workspace.Language{ ".ipynb": workspace.LanguagePython, } -// workspaceFileStatus defines a custom response body for the "/api/2.0/workspace/get-status" API. -// The "repos_export_format" field is not exposed by the SDK. type workspaceFileStatus struct { - *workspace.ObjectInfo - - // The export format of the notebook. This is not exposed by the SDK. - ReposExportFormat workspace.ExportFormat `json:"repos_export_format,omitempty"` + wsfsFileInfo // Name of the file to be used in any API calls made using the workspace files // filer. For notebooks this path does not include the extension. nameForWorkspaceAPI string } -// A custom unmarsaller for the workspaceFileStatus struct. This is needed because -// workspaceFileStatus embeds the workspace.ObjectInfo which itself has a custom -// unmarshaller. -// If a custom unmarshaller is not provided extra fields like ReposExportFormat -// will not have values set. -func (s *workspaceFileStatus) UnmarshalJSON(b []byte) error { - return marshal.Unmarshal(b, s) -} - -func (s *workspaceFileStatus) MarshalJSON() ([]byte, error) { - return marshal.Marshal(s) -} - -func (w *workspaceFilesExtensionsClient) stat(ctx context.Context, name string) (*workspaceFileStatus, error) { - stat := &workspaceFileStatus{ - nameForWorkspaceAPI: name, - } - - // Perform bespoke API call because "return_export_info" is not exposed by the SDK. - // We need "repos_export_format" to determine if the file is a py or a ipynb notebook. - // This is not exposed by the SDK so we need to make a direct API call. - err := w.apiClient.Do( - ctx, - http.MethodGet, - "/api/2.0/workspace/get-status", - nil, - map[string]string{ - "path": path.Join(w.root, name), - "return_export_info": "true", - }, - stat, - ) +func (w *workspaceFilesExtensionsClient) stat(ctx context.Context, name string) (wsfsFileInfo, error) { + info, err := w.wsfs.Stat(ctx, name) if err != nil { - // If we got an API error we deal with it below. - var aerr *apierr.APIError - if !errors.As(err, &aerr) { - return nil, err - } - - // This API returns a 404 if the specified path does not exist. - if aerr.StatusCode == http.StatusNotFound { - return nil, FileDoesNotExistError{path.Join(w.root, name)} - } + return wsfsFileInfo{}, err } - return stat, err + return info.(wsfsFileInfo), err } // This function returns the stat for the provided notebook. The stat object itself contains the path @@ -146,7 +97,10 @@ func (w *workspaceFilesExtensionsClient) getNotebookStatByNameWithExt(ctx contex // Modify the stat object path to include the extension. This stat object will be used // to return the fs.FileInfo object in the stat method. stat.Path = stat.Path + ext - return stat, nil + return &workspaceFileStatus{ + wsfsFileInfo: stat, + nameForWorkspaceAPI: nameWithoutExt, + }, nil } func (w *workspaceFilesExtensionsClient) getNotebookStatByNameWithoutExt(ctx context.Context, name string) (*workspaceFileStatus, error) { @@ -162,7 +116,7 @@ func (w *workspaceFilesExtensionsClient) getNotebookStatByNameWithoutExt(ctx con } // Get the extension for the notebook. - ext := notebook.GetExtensionByLanguage(stat.ObjectInfo) + ext := notebook.GetExtensionByLanguage(&stat.ObjectInfo) // If the notebook was exported as a Jupyter notebook, the extension should be .ipynb. if stat.Language == workspace.LanguagePython && stat.ReposExportFormat == workspace.ExportFormatJupyter { @@ -172,7 +126,10 @@ func (w *workspaceFilesExtensionsClient) getNotebookStatByNameWithoutExt(ctx con // Modify the stat object path to include the extension. This stat object will be used // to return the fs.DirEntry object in the ReadDir method. stat.Path = stat.Path + ext - return stat, nil + return &workspaceFileStatus{ + wsfsFileInfo: stat, + nameForWorkspaceAPI: name, + }, nil } type DuplicatePathError struct { @@ -200,11 +157,6 @@ func (e DuplicatePathError) Error() string { // errors for namespace clashes (e.g. a file and a notebook or a directory and a notebook). // Thus users of these methods should be careful to avoid such clashes. func NewWorkspaceFilesExtensionsClient(w *databricks.WorkspaceClient, root string) (Filer, error) { - apiClient, err := client.New(w.Config) - if err != nil { - return nil, err - } - filer, err := NewWorkspaceFilesClient(w, root) if err != nil { return nil, err @@ -212,7 +164,6 @@ func NewWorkspaceFilesExtensionsClient(w *databricks.WorkspaceClient, root strin return &workspaceFilesExtensionsClient{ workspaceClient: w, - apiClient: apiClient, wsfs: filer, root: root, @@ -240,7 +191,7 @@ func (w *workspaceFilesExtensionsClient) ReadDir(ctx context.Context, name strin return nil, err } // Replace the entry with the new entry that includes the extension. - entries[i] = wsfsDirEntry{wsfsFileInfo{oi: *stat.ObjectInfo}} + entries[i] = wsfsDirEntry{wsfsFileInfo{ObjectInfo: stat.ObjectInfo}} } // Error if we have seen this path before in the current directory. @@ -331,7 +282,7 @@ func (w *workspaceFilesExtensionsClient) Stat(ctx context.Context, name string) return nil, err } - return wsfsFileInfo{oi: *stat.ObjectInfo}, nil + return wsfsFileInfo{ObjectInfo: stat.ObjectInfo}, nil } return info, err From 16a46243d1b5ee8ac95b3703031ecefaed4613db Mon Sep 17 00:00:00 2001 From: Pieter Noordhuis Date: Fri, 5 Jul 2024 15:13:28 +0200 Subject: [PATCH 2/8] Let notebook detection code use underlying metadata if available --- libs/notebook/detect.go | 76 ++++++++++++++++++++++++++++++----- libs/notebook/detect_test.go | 18 +++++++++ libs/notebook/fakefs_test.go | 77 ++++++++++++++++++++++++++++++++++++ 3 files changed, 161 insertions(+), 10 deletions(-) create mode 100644 libs/notebook/fakefs_test.go diff --git a/libs/notebook/detect.go b/libs/notebook/detect.go index 0b7c04d6d1..b8dea62133 100644 --- a/libs/notebook/detect.go +++ b/libs/notebook/detect.go @@ -12,27 +12,69 @@ import ( "github.com/databricks/databricks-sdk-go/service/workspace" ) +// FileInfoWithObjectInfo is an interface implemented by [fs.FileInfo] values that +// contain a file's underlying [workspace.ObjectInfo]. +// +// This may be the case when working with a [filer.Filer] backed by the workspace API. +// For these files we do not need to read a file's header to know if it is a notebook; +// we can use the [workspace.ObjectInfo] value directly. +type FileInfoWithObjectInfo interface { + ObjectInfo() workspace.ObjectInfo +} + // Maximum length in bytes of the notebook header. const headerLength = 32 -// readHeader reads the first N bytes from a file. -func readHeader(fsys fs.FS, name string) ([]byte, error) { +// file wraps an fs.File and implements a few helper methods such that +// they don't need to be inlined in the [DetectWithFS] function below. +type file struct { + f fs.File +} + +func openFile(fsys fs.FS, name string) (*file, error) { f, err := fsys.Open(name) if err != nil { return nil, err } - defer f.Close() + return &file{f: f}, nil +} + +func (f file) close() error { + return f.f.Close() +} +func (f file) readHeader() (string, error) { // Scan header line with some padding. var buf = make([]byte, headerLength) - n, err := f.Read([]byte(buf)) + n, err := f.f.Read([]byte(buf)) if err != nil && err != io.EOF { - return nil, err + return "", err } // Trim buffer to actual read bytes. - return buf[:n], nil + buf = buf[:n] + + // Read the first line from the buffer. + scanner := bufio.NewScanner(bytes.NewReader(buf)) + scanner.Scan() + return scanner.Text(), nil +} + +// getObjectInfo returns the [workspace.ObjectInfo] for the file if it is +// part of the [fs.FileInfo] value returned by the [fs.Stat] call. +func (f file) getObjectInfo() (oi workspace.ObjectInfo, ok bool, err error) { + stat, err := f.f.Stat() + if err != nil { + return workspace.ObjectInfo{}, false, err + } + + // Use object info if available. + if i, ok := stat.(FileInfoWithObjectInfo); ok { + return i.ObjectInfo(), true, nil + } + + return workspace.ObjectInfo{}, false, nil } // Detect returns whether the file at path is a Databricks notebook. @@ -40,13 +82,27 @@ func readHeader(fsys fs.FS, name string) ([]byte, error) { func DetectWithFS(fsys fs.FS, name string) (notebook bool, language workspace.Language, err error) { header := "" - buf, err := readHeader(fsys, name) + f, err := openFile(fsys, name) + if err != nil { + return false, "", err + } + + defer f.close() + + // Use object info if available. + oi, ok, err := f.getObjectInfo() + if err != nil { + return false, "", err + } + if ok { + return oi.ObjectType == workspace.ObjectTypeNotebook, oi.Language, nil + } + + // Read the first line of the file. + fileHeader, err := f.readHeader() if err != nil { return false, "", err } - scanner := bufio.NewScanner(bytes.NewReader(buf)) - scanner.Scan() - fileHeader := scanner.Text() // Determine which header to expect based on filename extension. ext := strings.ToLower(filepath.Ext(name)) diff --git a/libs/notebook/detect_test.go b/libs/notebook/detect_test.go index fd3337579c..ad89d6dd53 100644 --- a/libs/notebook/detect_test.go +++ b/libs/notebook/detect_test.go @@ -99,3 +99,21 @@ func TestDetectFileWithLongHeader(t *testing.T) { require.NoError(t, err) assert.False(t, nb) } + +func TestDetectWithObjectInfo(t *testing.T) { + fakeFS := &fakeFS{ + fakeFile{ + fakeFileInfo{ + workspace.ObjectInfo{ + ObjectType: workspace.ObjectTypeNotebook, + Language: workspace.LanguagePython, + }, + }, + }, + } + + nb, lang, err := DetectWithFS(fakeFS, "doesntmatter") + require.NoError(t, err) + assert.True(t, nb) + assert.Equal(t, workspace.LanguagePython, lang) +} diff --git a/libs/notebook/fakefs_test.go b/libs/notebook/fakefs_test.go new file mode 100644 index 0000000000..7b50f69d81 --- /dev/null +++ b/libs/notebook/fakefs_test.go @@ -0,0 +1,77 @@ +package notebook_test + +import ( + "fmt" + "io/fs" + "time" + + "github.com/databricks/databricks-sdk-go/service/workspace" +) + +type fakeFS struct { + fakeFile +} + +type fakeFile struct { + fakeFileInfo +} + +func (f fakeFile) Close() error { + return nil +} + +func (f fakeFile) Read(p []byte) (n int, err error) { + return 0, fmt.Errorf("not implemented") +} + +func (f fakeFile) Stat() (fs.FileInfo, error) { + return f.fakeFileInfo, nil +} + +type fakeFileInfo struct { + oi workspace.ObjectInfo +} + +func (f fakeFileInfo) ObjectInfo() workspace.ObjectInfo { + return f.oi +} + +func (f fakeFileInfo) Name() string { + return "" +} + +func (f fakeFileInfo) Size() int64 { + return 0 +} + +func (f fakeFileInfo) Mode() fs.FileMode { + return 0 +} + +func (f fakeFileInfo) ModTime() time.Time { + return time.Time{} +} + +func (f fakeFileInfo) IsDir() bool { + return false +} + +func (f fakeFileInfo) Sys() any { + return nil +} + +func (f fakeFS) Open(name string) (fs.File, error) { + return f.fakeFile, nil +} + +func (f fakeFS) Stat(name string) (fs.FileInfo, error) { + panic("not implemented") +} + +func (f fakeFS) ReadDir(name string) ([]fs.DirEntry, error) { + panic("not implemented") +} + +func (f fakeFS) ReadFile(name string) ([]byte, error) { + panic("not implemented") +} From 6f216b656cbdf127a01b230cd52d2a2f1ab5ea7f Mon Sep 17 00:00:00 2001 From: Pieter Noordhuis Date: Fri, 5 Jul 2024 16:33:17 +0200 Subject: [PATCH 3/8] Fix --- libs/notebook/fakefs_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libs/notebook/fakefs_test.go b/libs/notebook/fakefs_test.go index 7b50f69d81..5729a1cefe 100644 --- a/libs/notebook/fakefs_test.go +++ b/libs/notebook/fakefs_test.go @@ -1,4 +1,4 @@ -package notebook_test +package notebook import ( "fmt" From bbc964d469980b0d1489f93fcf5a6520aeb8acb7 Mon Sep 17 00:00:00 2001 From: Pieter Noordhuis Date: Mon, 8 Jul 2024 13:44:43 +0200 Subject: [PATCH 4/8] Rename interface and interface method --- libs/notebook/detect.go | 10 +++++----- libs/notebook/fakefs_test.go | 2 +- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/libs/notebook/detect.go b/libs/notebook/detect.go index b8dea62133..582a88479f 100644 --- a/libs/notebook/detect.go +++ b/libs/notebook/detect.go @@ -12,14 +12,14 @@ import ( "github.com/databricks/databricks-sdk-go/service/workspace" ) -// FileInfoWithObjectInfo is an interface implemented by [fs.FileInfo] values that +// FileInfoWithWorkspaceObjectInfo is an interface implemented by [fs.FileInfo] values that // contain a file's underlying [workspace.ObjectInfo]. // // This may be the case when working with a [filer.Filer] backed by the workspace API. // For these files we do not need to read a file's header to know if it is a notebook; // we can use the [workspace.ObjectInfo] value directly. -type FileInfoWithObjectInfo interface { - ObjectInfo() workspace.ObjectInfo +type FileInfoWithWorkspaceObjectInfo interface { + WorkspaceObjectInfo() workspace.ObjectInfo } // Maximum length in bytes of the notebook header. @@ -70,8 +70,8 @@ func (f file) getObjectInfo() (oi workspace.ObjectInfo, ok bool, err error) { } // Use object info if available. - if i, ok := stat.(FileInfoWithObjectInfo); ok { - return i.ObjectInfo(), true, nil + if i, ok := stat.(FileInfoWithWorkspaceObjectInfo); ok { + return i.WorkspaceObjectInfo(), true, nil } return workspace.ObjectInfo{}, false, nil diff --git a/libs/notebook/fakefs_test.go b/libs/notebook/fakefs_test.go index 5729a1cefe..4ac135dd4a 100644 --- a/libs/notebook/fakefs_test.go +++ b/libs/notebook/fakefs_test.go @@ -32,7 +32,7 @@ type fakeFileInfo struct { oi workspace.ObjectInfo } -func (f fakeFileInfo) ObjectInfo() workspace.ObjectInfo { +func (f fakeFileInfo) WorkspaceObjectInfo() workspace.ObjectInfo { return f.oi } From 4043104786207d1a73b284f41868ef5adc64c20c Mon Sep 17 00:00:00 2001 From: Pieter Noordhuis Date: Mon, 8 Jul 2024 22:12:37 +0200 Subject: [PATCH 5/8] Implement readahead cache for Workspace API calls --- libs/filer/workspace_files_cache.go | 388 ++++++++++++++++++ libs/filer/workspace_files_client.go | 4 + .../workspace_files_extensions_client.go | 3 +- 3 files changed, 394 insertions(+), 1 deletion(-) create mode 100644 libs/filer/workspace_files_cache.go diff --git a/libs/filer/workspace_files_cache.go b/libs/filer/workspace_files_cache.go new file mode 100644 index 0000000000..4808f4dc0f --- /dev/null +++ b/libs/filer/workspace_files_cache.go @@ -0,0 +1,388 @@ +package filer + +import ( + "context" + "fmt" + "io" + "io/fs" + "path" + "slices" + "sync" + "time" + + "github.com/databricks/cli/libs/log" + "github.com/databricks/databricks-sdk-go/service/workspace" +) + +const kMaxQueueSize = 10_000 + +// queueFullError is returned when the queue is at capacity. +type queueFullError struct { + name string +} + +// Error returns the error message. +func (e queueFullError) Error() string { + return fmt.Sprintf("queue is at capacity (%d); cannot enqueue work for %q", kMaxQueueSize, e.name) +} + +// Common type for all cacheable calls. +type cacheEntry struct { + // Channel to signal that the operation has completed. + ch chan struct{} + + // The (cleaned) name of the file or directory being operated on. + name string + + // Return values of the operation. + err error +} + +// String returns the path of the file or directory being operated on. +func (e *cacheEntry) String() string { + return e.name +} + +// Mark this entry as errored. +func (e *cacheEntry) markError(err error) { + e.err = err + close(e.ch) +} + +// readDirEntry is the cache entry for a [ReadDir] call. +type readDirEntry struct { + cacheEntry + + // Return values of a [ReadDir] call. + entries []fs.DirEntry +} + +// Create a new readDirEntry. +func newReadDirEntry(name string) *readDirEntry { + return &readDirEntry{cacheEntry: cacheEntry{ch: make(chan struct{}), name: name}} +} + +// Execute the operation and signal completion. +func (e *readDirEntry) execute(ctx context.Context, c *cache) { + t1 := time.Now() + e.entries, e.err = c.f.ReadDir(ctx, e.name) + t2 := time.Now() + log.Tracef(ctx, "readdir for %s took %f", e.name, t2.Sub(t1).Seconds()) + + // Finalize the read call by adding all directory entries to the stat cache. + c.completeReadDir(e.name, e.entries) + + // Signal that the operation has completed. + // The return value can now be used by routines waiting on it. + close(e.ch) +} + +// Wait for the operation to complete and return the result. +func (e *readDirEntry) wait(ctx context.Context) ([]fs.DirEntry, error) { + select { + case <-ctx.Done(): + return nil, ctx.Err() + case <-e.ch: + // Note: return a copy of the slice to prevent the caller from modifying the cache. + // The underlying elements are values (see [wsfsDirEntry]) so a shallow copy is sufficient. + return slices.Clone(e.entries), e.err + } +} + +// statEntry is the cache entry for a [Stat] call. +type statEntry struct { + cacheEntry + + // Return values of a [Stat] call. + info fs.FileInfo +} + +// Create a new stat entry. +func newStatEntry(name string) *statEntry { + return &statEntry{cacheEntry: cacheEntry{ch: make(chan struct{}), name: name}} +} + +// Execute the operation and signal completion. +func (e *statEntry) execute(ctx context.Context, c *cache) { + t1 := time.Now() + e.info, e.err = c.f.Stat(ctx, e.name) + t2 := time.Now() + log.Tracef(ctx, "stat for %s took %f", e.name, t2.Sub(t1).Seconds()) + + // Signal that the operation has completed. + // The return value can now be used by routines waiting on it. + close(e.ch) +} + +// Wait for the operation to complete and return the result. +func (e *statEntry) wait(ctx context.Context) (fs.FileInfo, error) { + select { + case <-ctx.Done(): + return nil, ctx.Err() + case <-e.ch: + return e.info, e.err + } +} + +// Mark the stat entry as done. +func (e *statEntry) markDone(info fs.FileInfo, err error) { + e.info = info + e.err = err + close(e.ch) +} + +// executable is the interface all cacheable calls must implement. +type executable interface { + fmt.Stringer + + execute(ctx context.Context, c *cache) +} + +// cache stores all entries for cacheable Workspace File System calls. +// We care about caching only [ReadDir] and [Stat] calls. +type cache struct { + f Filer + m sync.Mutex + + readDir map[string]*readDirEntry + stat map[string]*statEntry + + // Queue of operations to execute. + queue chan executable + + // For tracking the number of active goroutines. + wg sync.WaitGroup +} + +func newCache(f Filer) *cache { + c := &cache{ + f: f, + + readDir: make(map[string]*readDirEntry), + stat: make(map[string]*statEntry), + + queue: make(chan executable, kMaxQueueSize), + } + + ctx := context.Background() + for range 10 { + c.wg.Add(1) + go c.work(ctx) + } + + return c +} + +// work until the queue is closed. +func (c *cache) work(ctx context.Context) { + defer c.wg.Done() + + for { + e, ok := <-c.queue + if !ok { + return + } + e.execute(ctx, c) + } +} + +// enqueue adds an operation to the queue. +// If the context is canceled, an error is returned. +// If the queue is full, an error is returned. +// +// Its caller is holding the lock so it cannot block. +func (c *cache) enqueue(ctx context.Context, e executable) error { + select { + case <-ctx.Done(): + return ctx.Err() + case c.queue <- e: + return nil + default: + return queueFullError{e.String()} + } +} + +func (c *cache) completeReadDirForDir(name string, dirEntry fs.DirEntry) { + // Add to the stat cache if not already present. + if _, ok := c.stat[name]; !ok { + e := newStatEntry(name) + e.markDone(dirEntry.Info()) + c.stat[name] = e + } + + // Queue a [ReadDir] call for the directory if not already present. + if _, ok := c.readDir[name]; !ok { + // Create a new cache entry and queue the operation. + e := newReadDirEntry(name) + err := c.enqueue(context.Background(), e) + if err != nil { + e.markError(err) + } + + // Add the entry to the cache, even if has an error. + c.readDir[name] = e + } +} + +func (c *cache) completeReadDirForFile(name string, dirEntry fs.DirEntry) { + // Skip if this entry is already in the cache. + if _, ok := c.stat[name]; ok { + return + } + + // Create a new cache entry. + e := newStatEntry(name) + + // Depending on the object type, we either have to perform a real + // stat call, or we can use the [fs.DirEntry] info directly. + switch dirEntry.(wsfsDirEntry).ObjectType { + case workspace.ObjectTypeNotebook: + // Queue a [Stat] call for the file. + err := c.enqueue(context.Background(), e) + if err != nil { + e.markError(err) + } + default: + // Use the [fs.DirEntry] info directly. + e.markDone(dirEntry.Info()) + } + + // Add the entry to the cache, even if has an error. + c.stat[name] = e +} + +func (c *cache) completeReadDir(dir string, entries []fs.DirEntry) { + c.m.Lock() + defer c.m.Unlock() + + for _, e := range entries { + name := path.Join(dir, e.Name()) + + if e.IsDir() { + c.completeReadDirForDir(name, e) + } else { + c.completeReadDirForFile(name, e) + } + } +} + +// Cleanup closes the queue and waits for all goroutines to exit. +func (c *cache) Cleanup() { + close(c.queue) + c.wg.Wait() +} + +// Write passes through to the underlying Filer. +func (c *cache) Write(ctx context.Context, name string, reader io.Reader, mode ...WriteMode) error { + return c.f.Write(ctx, name, reader, mode...) +} + +// Read passes through to the underlying Filer. +func (c *cache) Read(ctx context.Context, name string) (io.ReadCloser, error) { + return c.f.Read(ctx, name) +} + +// Delete passes through to the underlying Filer. +func (c *cache) Delete(ctx context.Context, name string, mode ...DeleteMode) error { + return c.f.Delete(ctx, name, mode...) +} + +// Mkdir passes through to the underlying Filer. +func (c *cache) Mkdir(ctx context.Context, name string) error { + return c.f.Mkdir(ctx, name) +} + +// ReadDir returns the entries in a directory. +// If the directory is already in the cache, the cached value is returned. +func (c *cache) ReadDir(ctx context.Context, name string) ([]fs.DirEntry, error) { + name = path.Clean(name) + + // Lock before R/W access to the cache. + c.m.Lock() + + // If the directory is already in the cache, wait for and return the cached value. + if e, ok := c.readDir[name]; ok { + c.m.Unlock() + return e.wait(ctx) + } + + // Otherwise, create a new cache entry and queue the operation. + e := newReadDirEntry(name) + err := c.enqueue(ctx, e) + if err != nil { + c.m.Unlock() + return nil, err + } + + c.readDir[name] = e + c.m.Unlock() + + // Wait for the operation to complete. + return e.wait(ctx) +} + +// statFromReadDir returns the file info for a file or directory. +// If the file info is already in the cache, the cached value is returned. +func (c *cache) statFromReadDir(ctx context.Context, name string, entry *readDirEntry) (fs.FileInfo, error) { + _, err := entry.wait(ctx) + if err != nil { + return nil, err + } + + // Upon completion of a [ReadDir] call, all directory entries are added to the stat cache and + // enqueue a [Stat] call if necessary (entries for notebooks are incomplete and require a + // real stat call). + // + // This means that the file or directory we're trying to stat, either + // + // - is present in the stat cache + // - doesn't exist. + // + c.m.Lock() + e, ok := c.stat[name] + c.m.Unlock() + if ok { + return e.wait(ctx) + } + + return nil, FileDoesNotExistError{name} +} + +// Stat returns the file info for a file or directory. +// If the file info is already in the cache, the cached value is returned. +func (c *cache) Stat(ctx context.Context, name string) (fs.FileInfo, error) { + name = path.Clean(name) + + // Lock before R/W access to the cache. + c.m.Lock() + + // If the file info is already in the cache, wait for and return the cached value. + if e, ok := c.stat[name]; ok { + c.m.Unlock() + return e.wait(ctx) + } + + // If the parent directory is in the cache (or queued to be read), + // wait for it to complete to avoid redundant stat calls. + dir := path.Dir(name) + if dir != name { + if e, ok := c.readDir[dir]; ok { + c.m.Unlock() + return c.statFromReadDir(ctx, name, e) + } + } + + // Otherwise, create a new cache entry and queue the operation. + e := newStatEntry(name) + err := c.enqueue(ctx, e) + if err != nil { + c.m.Unlock() + return nil, err + } + + c.stat[name] = e + c.m.Unlock() + + // Wait for the operation to complete. + return e.wait(ctx) +} diff --git a/libs/filer/workspace_files_client.go b/libs/filer/workspace_files_client.go index d799c1f885..e911f4409b 100644 --- a/libs/filer/workspace_files_client.go +++ b/libs/filer/workspace_files_client.go @@ -84,6 +84,10 @@ func (info wsfsFileInfo) Sys() any { return info.ObjectInfo } +func (info wsfsFileInfo) WorkspaceObjectInfo() workspace.ObjectInfo { + return info.ObjectInfo +} + // UnmarshalJSON is a custom unmarshaller for the wsfsFileInfo struct. // It must be defined for this type because otherwise the implementation // of the embedded ObjectInfo type will be used. diff --git a/libs/filer/workspace_files_extensions_client.go b/libs/filer/workspace_files_extensions_client.go index a872dcc65a..8f943dc21b 100644 --- a/libs/filer/workspace_files_extensions_client.go +++ b/libs/filer/workspace_files_extensions_client.go @@ -162,10 +162,11 @@ func NewWorkspaceFilesExtensionsClient(w *databricks.WorkspaceClient, root strin return nil, err } + cache := newCache(filer) return &workspaceFilesExtensionsClient{ workspaceClient: w, - wsfs: filer, + wsfs: cache, root: root, }, nil } From 56977a24ac67dcc9fe43f232fc09cb56c0438e87 Mon Sep 17 00:00:00 2001 From: Pieter Noordhuis Date: Mon, 15 Jul 2024 14:18:18 +0200 Subject: [PATCH 6/8] Add comment --- libs/filer/workspace_files_cache.go | 43 +++++++++++++++++-- .../workspace_files_extensions_client.go | 2 +- 2 files changed, 41 insertions(+), 4 deletions(-) diff --git a/libs/filer/workspace_files_cache.go b/libs/filer/workspace_files_cache.go index 4808f4dc0f..532c7d3de9 100644 --- a/libs/filer/workspace_files_cache.go +++ b/libs/filer/workspace_files_cache.go @@ -14,7 +14,44 @@ import ( "github.com/databricks/databricks-sdk-go/service/workspace" ) -const kMaxQueueSize = 10_000 +// This readahead cache is designed to optimize file system operations by caching the results of +// directory reads (ReadDir) and file/directory metadata reads (Stat). This cache aims to eliminate +// redundant operations and improve performance by storing the results of these operations and +// reusing them when possible. Additionally, the cache performs readahead on ReadDir calls, +// proactively caching information about files and subdirectories to speed up future access. +// +// The cache maintains two primary maps: one for ReadDir results and another for Stat results. +// When a directory read or a stat operation is requested, the cache first checks if the result +// is already available. If it is, the cached result is returned immediately. If not, the +// operation is queued for execution, and the result is stored in the cache once the operation +// completes. In cases where the result is not immediately available, the caller may need to wait +// for the cache entry to be populated. However, because the queue is processed in order by a +// fixed number of worker goroutines, we are guaranteed that the required cache entry will be +// populated and available once the queue processes the corresponding task. +// +// The cache uses a worker pool to process the queued operations concurrently. This is +// implemented using a fixed number of worker goroutines that continually pull tasks from a +// queue and execute them. The queue itself is logically unbounded in the sense that it needs to +// accommodate all the new tasks that may be generated dynamically during the execution of ReadDir +// calls. Specifically, a single ReadDir call can add an unknown number of new Stat and ReadDir +// tasks to the queue because each directory entry may represent a file or subdirectory that +// requires further processing. +// +// For practical reasons, we are not using an unbounded queue but a channel with a maximum size +// of 10,000. This helps prevent excessive memory usage and ensures that the system remains +// responsive under load. If we encounter real examples of subtrees with more than 10,000 +// elements, we can consider addressing this limitation in the future. For now, this approach +// balances the need for readahead efficiency with practical constraints. +// +// It is crucial to note that each ReadDir and Stat call is executed only once. The result of a +// Stat call can be served from the cache if the information was already returned by an earlier +// ReadDir call. This helps to avoid redundant operations and ensures that the system remains +// efficient even under high load. + +const ( + kMaxQueueSize = 10_000 + kNumCacheWorkers = 10 +) // queueFullError is returned when the queue is at capacity. type queueFullError struct { @@ -154,7 +191,7 @@ type cache struct { wg sync.WaitGroup } -func newCache(f Filer) *cache { +func newWorkspaceFilesReadaheadCache(f Filer) *cache { c := &cache{ f: f, @@ -165,7 +202,7 @@ func newCache(f Filer) *cache { } ctx := context.Background() - for range 10 { + for range kNumCacheWorkers { c.wg.Add(1) go c.work(ctx) } diff --git a/libs/filer/workspace_files_extensions_client.go b/libs/filer/workspace_files_extensions_client.go index 8f943dc21b..d5d0ce554e 100644 --- a/libs/filer/workspace_files_extensions_client.go +++ b/libs/filer/workspace_files_extensions_client.go @@ -162,7 +162,7 @@ func NewWorkspaceFilesExtensionsClient(w *databricks.WorkspaceClient, root strin return nil, err } - cache := newCache(filer) + cache := newWorkspaceFilesReadaheadCache(filer) return &workspaceFilesExtensionsClient{ workspaceClient: w, From 3c93a7cd4caab282c98784db547729c523ddb29b Mon Sep 17 00:00:00 2001 From: Pieter Noordhuis Date: Wed, 17 Jul 2024 10:59:56 +0200 Subject: [PATCH 7/8] Address comments --- libs/filer/workspace_files_cache.go | 27 +++++++++++++-------------- 1 file changed, 13 insertions(+), 14 deletions(-) diff --git a/libs/filer/workspace_files_cache.go b/libs/filer/workspace_files_cache.go index 532c7d3de9..d19a8b0c21 100644 --- a/libs/filer/workspace_files_cache.go +++ b/libs/filer/workspace_files_cache.go @@ -66,7 +66,7 @@ func (e queueFullError) Error() string { // Common type for all cacheable calls. type cacheEntry struct { // Channel to signal that the operation has completed. - ch chan struct{} + done chan struct{} // The (cleaned) name of the file or directory being operated on. name string @@ -83,7 +83,7 @@ func (e *cacheEntry) String() string { // Mark this entry as errored. func (e *cacheEntry) markError(err error) { e.err = err - close(e.ch) + close(e.done) } // readDirEntry is the cache entry for a [ReadDir] call. @@ -96,7 +96,7 @@ type readDirEntry struct { // Create a new readDirEntry. func newReadDirEntry(name string) *readDirEntry { - return &readDirEntry{cacheEntry: cacheEntry{ch: make(chan struct{}), name: name}} + return &readDirEntry{cacheEntry: cacheEntry{done: make(chan struct{}), name: name}} } // Execute the operation and signal completion. @@ -111,7 +111,7 @@ func (e *readDirEntry) execute(ctx context.Context, c *cache) { // Signal that the operation has completed. // The return value can now be used by routines waiting on it. - close(e.ch) + close(e.done) } // Wait for the operation to complete and return the result. @@ -119,7 +119,7 @@ func (e *readDirEntry) wait(ctx context.Context) ([]fs.DirEntry, error) { select { case <-ctx.Done(): return nil, ctx.Err() - case <-e.ch: + case <-e.done: // Note: return a copy of the slice to prevent the caller from modifying the cache. // The underlying elements are values (see [wsfsDirEntry]) so a shallow copy is sufficient. return slices.Clone(e.entries), e.err @@ -136,7 +136,7 @@ type statEntry struct { // Create a new stat entry. func newStatEntry(name string) *statEntry { - return &statEntry{cacheEntry: cacheEntry{ch: make(chan struct{}), name: name}} + return &statEntry{cacheEntry: cacheEntry{done: make(chan struct{}), name: name}} } // Execute the operation and signal completion. @@ -148,7 +148,7 @@ func (e *statEntry) execute(ctx context.Context, c *cache) { // Signal that the operation has completed. // The return value can now be used by routines waiting on it. - close(e.ch) + close(e.done) } // Wait for the operation to complete and return the result. @@ -156,7 +156,7 @@ func (e *statEntry) wait(ctx context.Context) (fs.FileInfo, error) { select { case <-ctx.Done(): return nil, ctx.Err() - case <-e.ch: + case <-e.done: return e.info, e.err } } @@ -165,7 +165,7 @@ func (e *statEntry) wait(ctx context.Context) (fs.FileInfo, error) { func (e *statEntry) markDone(info fs.FileInfo, err error) { e.info = info e.err = err - close(e.ch) + close(e.done) } // executable is the interface all cacheable calls must implement. @@ -214,11 +214,7 @@ func newWorkspaceFilesReadaheadCache(f Filer) *cache { func (c *cache) work(ctx context.Context) { defer c.wg.Done() - for { - e, ok := <-c.queue - if !ok { - return - } + for e := range c.queue { e.execute(ctx, c) } } @@ -274,6 +270,9 @@ func (c *cache) completeReadDirForFile(name string, dirEntry fs.DirEntry) { // stat call, or we can use the [fs.DirEntry] info directly. switch dirEntry.(wsfsDirEntry).ObjectType { case workspace.ObjectTypeNotebook: + // Note: once the list API returns `repos_export_format` we can avoid this additional stat call. + // This is the only (?) case where this implementation is tied to the workspace filer. + // Queue a [Stat] call for the file. err := c.enqueue(context.Background(), e) if err != nil { From a6e04faed41be7097f10b35ad76664b2c50582ce Mon Sep 17 00:00:00 2001 From: Pieter Noordhuis Date: Wed, 17 Jul 2024 14:57:46 +0200 Subject: [PATCH 8/8] Reduce number of cache workers to 1 --- libs/filer/workspace_files_cache.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/libs/filer/workspace_files_cache.go b/libs/filer/workspace_files_cache.go index d19a8b0c21..5837ad27dc 100644 --- a/libs/filer/workspace_files_cache.go +++ b/libs/filer/workspace_files_cache.go @@ -49,8 +49,12 @@ import ( // efficient even under high load. const ( - kMaxQueueSize = 10_000 - kNumCacheWorkers = 10 + kMaxQueueSize = 10_000 + + // Number of worker goroutines to process the queue. + // These workers share the same HTTP client and therefore the same rate limiter. + // If this number is increased, the rate limiter should be modified as well. + kNumCacheWorkers = 1 ) // queueFullError is returned when the queue is at capacity.