Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions bundle/deploy/lock/export_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package lock

import (
"context"

"github.com/databricks/cli/libs/filer"
)

// IncrementDeploymentVersion is exported for testing.
var IncrementDeploymentVersion = func(ctx context.Context, f filer.Filer) (int64, error) {
return incrementDeploymentVersion(ctx, f)
}
2 changes: 2 additions & 0 deletions bundle/deploy/lock/lock.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/databricks/cli/bundle"
"github.com/databricks/cli/bundle/permissions"
"github.com/databricks/cli/libs/diag"
"github.com/databricks/cli/libs/filer"
)

// Goal describes the purpose of a deployment operation.
Expand Down Expand Up @@ -60,6 +61,7 @@ func NewDeploymentManager(ctx context.Context, b *bundle.Bundle) DeploymentManag
reportPermissionError: func(ctx context.Context, path string) diag.Diagnostics {
return permissions.ReportPossiblePermissionDenied(ctx, b, path)
},
newStateFiler: filer.NewWorkspaceFilesClient,
}
if enabled {
l.client = b.WorkspaceClient(ctx)
Expand Down
57 changes: 56 additions & 1 deletion bundle/deploy/lock/workspace_filesystem.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,25 @@
package lock

import (
"bytes"
"context"
"encoding/json"
"errors"
"io/fs"

"github.com/databricks/cli/libs/diag"
"github.com/databricks/cli/libs/filer"
"github.com/databricks/cli/libs/locker"
"github.com/databricks/cli/libs/log"
"github.com/databricks/databricks-sdk-go"
)

const versionFileName = "deployment_version.json"

type versionRecord struct {
Version int64 `json:"version"`
}

// workspaceFilesystemLock implements DeploymentManager using a lock file in the
// bundle's workspace state path. Holds only the primitives it needs from the
// bundle.
Expand All @@ -26,6 +35,10 @@ type workspaceFilesystemLock struct {
// Lifted to a callback so this struct does not pin a *bundle.Bundle.
reportPermissionError func(ctx context.Context, path string) diag.Diagnostics

// newStateFiler creates the filer used to read/write deployment state files.
// Overridable in tests.
newStateFiler func(client *databricks.WorkspaceClient, path string) (filer.Filer, error)

locker *locker.Locker
goal Goal
}
Expand Down Expand Up @@ -60,7 +73,19 @@ func (l *workspaceFilesystemLock) CreateVersion(ctx context.Context, goal Goal)
return 0, err
}

return 0, nil
f, err := l.newStateFiler(l.client, l.statePath)
if err != nil {
_ = lk.Unlock(ctx)
return 0, err
}

version, err := incrementDeploymentVersion(ctx, f)
if err != nil {
_ = lk.Unlock(ctx)
return 0, err
}

return version, nil
}

func (l *workspaceFilesystemLock) CompleteVersion(ctx context.Context, _ int64, _ DeploymentStatus) error {
Expand All @@ -85,3 +110,33 @@ func (l *workspaceFilesystemLock) CompleteVersion(ctx context.Context, _ int64,
}
return l.locker.Unlock(ctx)
}

// incrementDeploymentVersion reads the current version from versionFileName,
// increments it, writes the new value back, and returns the new version.
// The first call (no file yet) creates version 1.
func incrementDeploymentVersion(ctx context.Context, f filer.Filer) (int64, error) {
var current versionRecord

r, err := f.Read(ctx, versionFileName)
if err == nil {
defer r.Close()
if err := json.NewDecoder(r).Decode(&current); err != nil {
return 0, err
}
} else if !errors.Is(err, fs.ErrNotExist) {
return 0, err
}

next := current.Version + 1
data, err := json.Marshal(versionRecord{Version: next})
if err != nil {
return 0, err
}

err = f.Write(ctx, versionFileName, bytes.NewReader(data), filer.OverwriteIfExists, filer.CreateParentDirectories)
if err != nil {
return 0, err
}

return next, nil
}
81 changes: 81 additions & 0 deletions bundle/deploy/lock/workspace_filesystem_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package lock_test

import (
"bytes"
"context"
"io"
"io/fs"
"sync"
"testing"

"github.com/databricks/cli/bundle/deploy/lock"
"github.com/databricks/cli/libs/filer"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

// memFiler is a simple in-memory filer for testing.
type memFiler struct {
mu sync.Mutex
files map[string][]byte
}

func newMemFiler() *memFiler {
return &memFiler{files: make(map[string][]byte)}
}

func (m *memFiler) Read(_ context.Context, path string) (io.ReadCloser, error) {
m.mu.Lock()
defer m.mu.Unlock()
data, ok := m.files[path]
if !ok {
return nil, fs.ErrNotExist
}
return io.NopCloser(bytes.NewReader(data)), nil
}

func (m *memFiler) Write(_ context.Context, path string, r io.Reader, _ ...filer.WriteMode) error {
m.mu.Lock()
defer m.mu.Unlock()
data, err := io.ReadAll(r)
if err != nil {
return err
}
m.files[path] = data
return nil
}

func (m *memFiler) Delete(_ context.Context, _ string, _ ...filer.DeleteMode) error {
return nil
}

func (m *memFiler) ReadDir(_ context.Context, _ string) ([]fs.DirEntry, error) {
return nil, nil
}

func (m *memFiler) Mkdir(_ context.Context, _ string) error {
return nil
}

func (m *memFiler) Stat(_ context.Context, _ string) (fs.FileInfo, error) {
return nil, fs.ErrNotExist
}

func TestIncrementDeploymentVersion_FirstDeploy(t *testing.T) {
ctx := t.Context()
f := newMemFiler()
version, err := lock.IncrementDeploymentVersion(ctx, f)
require.NoError(t, err)
assert.Equal(t, int64(1), version)
}

func TestIncrementDeploymentVersion_IncrementsOnEachCall(t *testing.T) {
ctx := t.Context()
f := newMemFiler()

for i := range int64(5) {
version, err := lock.IncrementDeploymentVersion(ctx, f)
require.NoError(t, err)
assert.Equal(t, i+1, version)
}
}
Loading