Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 50 additions & 0 deletions bundle/config/mutator/configure_wsfs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package mutator

import (
"context"
"strings"

"github.com/databricks/cli/bundle"
"github.com/databricks/cli/libs/diag"
"github.com/databricks/cli/libs/env"
"github.com/databricks/cli/libs/filer"
"github.com/databricks/cli/libs/vfs"
)

const envDatabricksRuntimeVersion = "DATABRICKS_RUNTIME_VERSION"

type configureWSFS struct{}

func ConfigureWSFS() bundle.Mutator {
return &configureWSFS{}
}

func (m *configureWSFS) Name() string {
return "ConfigureWSFS"
}

func (m *configureWSFS) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnostics {
root := b.BundleRoot.Native()

// The bundle root must be located in /Workspace/
if !strings.HasPrefix(root, "/Workspace/") {
return nil
}

// The executable must be running on DBR.
if _, ok := env.Lookup(ctx, envDatabricksRuntimeVersion); !ok {
return nil
}

// If so, swap out vfs.Path instance of the sync root with one that
// makes all Workspace File System interactions extension aware.
p, err := vfs.NewFilerPath(ctx, root, func(path string) (filer.Filer, error) {
return filer.NewWorkspaceFilesExtensionsClient(b.WorkspaceClient(), path)
})
if err != nil {
return diag.FromErr(err)
}

b.BundleRoot = p
return nil
}
4 changes: 4 additions & 0 deletions bundle/phases/initialize.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ func Initialize() bundle.Mutator {
mutator.ProcessTargetMode(),
mutator.DefaultQueueing(),
mutator.ExpandPipelineGlobPaths(),

// Configure use of WSFS for reads if the CLI is running on Databricks.
mutator.ConfigureWSFS(),

mutator.TranslatePaths(),
python.WrapperWarning(),
permissions.ApplyBundlePermissions(),
Expand Down
66 changes: 66 additions & 0 deletions libs/vfs/filer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package vfs

import (
"context"
"io/fs"
"path/filepath"

"github.com/databricks/cli/libs/filer"
)

type filerPath struct {
ctx context.Context
path string
fs FS

construct func(path string) (filer.Filer, error)
}

func NewFilerPath(ctx context.Context, path string, construct func(path string) (filer.Filer, error)) (Path, error) {
f, err := construct(path)
if err != nil {
return nil, err
}

return &filerPath{
ctx: ctx,
path: path,
fs: filer.NewFS(ctx, f).(FS),

construct: construct,
}, nil
}

func (f filerPath) Open(name string) (fs.File, error) {
return f.fs.Open(name)
}

func (f filerPath) Stat(name string) (fs.FileInfo, error) {
return f.fs.Stat(name)
}

func (f filerPath) ReadDir(name string) ([]fs.DirEntry, error) {
return f.fs.ReadDir(name)
}

func (f filerPath) ReadFile(name string) ([]byte, error) {
return f.fs.ReadFile(name)
}

func (f filerPath) Parent() Path {
if f.path == "/" {
return nil
}

dir := filepath.Dir(f.path)
nf, err := NewFilerPath(f.ctx, dir, f.construct)
if err != nil {
panic(err)
}

return nf
}

func (f filerPath) Native() string {
return f.path
}
79 changes: 79 additions & 0 deletions libs/vfs/filer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package vfs

import (
"context"
"errors"
"io/fs"
"os"
"path/filepath"
"strings"
"testing"

"github.com/databricks/cli/libs/filer"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestFilerPath(t *testing.T) {
ctx := context.Background()
wd, err := os.Getwd()
require.NoError(t, err)

// Create a new filer-backed path.
p, err := NewFilerPath(ctx, filepath.FromSlash(wd), filer.NewLocalClient)
require.NoError(t, err)

// Open self.
f, err := p.Open("filer_test.go")
require.NoError(t, err)
defer f.Close()

// Run stat on self.
s, err := f.Stat()
require.NoError(t, err)
assert.Equal(t, "filer_test.go", s.Name())
assert.GreaterOrEqual(t, int(s.Size()), 128)

// Read some bytes.
buf := make([]byte, 1024)
_, err = f.Read(buf)
require.NoError(t, err)
assert.True(t, strings.HasPrefix(string(buf), "package vfs"))

// Open non-existent file.
_, err = p.Open("doesntexist_test.go")
assert.True(t, errors.Is(err, fs.ErrNotExist))

// Stat self.
s, err = p.Stat("filer_test.go")
require.NoError(t, err)
assert.Equal(t, "filer_test.go", s.Name())
assert.GreaterOrEqual(t, int(s.Size()), 128)

// Stat non-existent file.
_, err = p.Stat("doesntexist_test.go")
assert.True(t, errors.Is(err, fs.ErrNotExist))

// ReadDir self.
entries, err := p.ReadDir(".")
require.NoError(t, err)
assert.GreaterOrEqual(t, len(entries), 1)

// ReadDir non-existent directory.
_, err = p.ReadDir("doesntexist")
assert.True(t, errors.Is(err, fs.ErrNotExist))

// ReadFile self.
buf, err = p.ReadFile("filer_test.go")
require.NoError(t, err)
assert.True(t, strings.HasPrefix(string(buf), "package vfs"))

// ReadFile non-existent file.
_, err = p.ReadFile("doesntexist_test.go")
assert.True(t, errors.Is(err, fs.ErrNotExist))

// Parent self.
pp := p.Parent()
require.NotNil(t, pp)
assert.Equal(t, filepath.Join(pp.Native(), "vfs"), p.Native())
}