Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions acceptance/bundle/dms/stale-plan/databricks.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
bundle:
name: dms-stale-plan

experimental:
record_deployment_history: true

resources:
jobs:
foo:
name: foo
3 changes: 3 additions & 0 deletions acceptance/bundle/dms/stale-plan/out.test.toml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

40 changes: 40 additions & 0 deletions acceptance/bundle/dms/stale-plan/output.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@

=== Deploy creates the DMS deployment at version 1
>>> [CLI] bundle deploy
Uploading bundle files to /Workspace/Users/[USERNAME]/.bundle/dms-stale-plan/default/files...
Deploying resources...
Updating deployment state...
Deployment complete!

=== A plan records the DMS version it was generated against (version_id=1, no serial)
>>> [CLI] bundle plan -o json

>>> jq {version_id, serial} plan.json
{
"version_id": "1",
"serial": null
}

=== The plan is usable in DMS mode: applying it succeeds and advances the deployment to version 2
>>> [CLI] bundle deploy --plan plan.json
Uploading bundle files to /Workspace/Users/[USERNAME]/.bundle/dms-stale-plan/default/files...
Deploying resources...
Updating deployment state...
Deployment complete!

=== Re-applying the same plan is now rejected as stale: its version_id (1) no longer matches the live version (2)
>>> [CLI] bundle deploy --plan plan.json
Error: plan is outdated: it was generated against deployment version 1, but the current version is 2. Please run 'bundle plan' again


=== Cleanup: destroy the deployment
>>> [CLI] bundle destroy --auto-approve
The following resources will be deleted:
delete resources.jobs.foo

All files and directories at the following location will be deleted: /Workspace/Users/[USERNAME]/.bundle/dms-stale-plan/default

Deleting files...
Destroy complete!

Exit code: 1
21 changes: 21 additions & 0 deletions acceptance/bundle/dms/stale-plan/script
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
cleanup() {
title "Cleanup: destroy the deployment"
trace $CLI bundle destroy --auto-approve
rm -f out.requests.txt plan.json
}
trap cleanup EXIT

title "Deploy creates the DMS deployment at version 1"
trace $CLI bundle deploy
rm -f out.requests.txt

title "A plan records the DMS version it was generated against (version_id=1, no serial)"
trace $CLI bundle plan -o json > plan.json
trace jq '{version_id, serial}' plan.json

title "The plan is usable in DMS mode: applying it succeeds and advances the deployment to version 2"
trace $CLI bundle deploy --plan plan.json
rm -f out.requests.txt

title "Re-applying the same plan is now rejected as stale: its version_id (1) no longer matches the live version (2)"
trace $CLI bundle deploy --plan plan.json
4 changes: 4 additions & 0 deletions acceptance/bundle/dms/stale-plan/test.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# plan.json is generated by the script; it is not a checked-in input.
# .databricks is the local state cache left by deploy and is not removed by
# destroy when the run ends on a rejected --plan apply.
Ignore = [".databricks", "plan.json"]
50 changes: 44 additions & 6 deletions bundle/deploy/lock/deployment_metadata_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"time"

"github.com/databricks/cli/bundle"
"github.com/databricks/cli/bundle/deployplan"
"github.com/databricks/cli/internal/build"
"github.com/databricks/cli/libs/log"
"github.com/databricks/databricks-sdk-go/apierr"
Expand Down Expand Up @@ -41,6 +42,12 @@ type DeploymentVersionRecorder struct {
goal Goal
enabled bool

// validateVersion is set when applying a pre-computed plan: the plan's
// version_id must match the deployment's current version, otherwise the plan
// is stale and is rejected.
validateVersion bool
expectedVersion string

// populated by CreateVersion
svc sdkbundle.BundleInterface
deploymentID string
Expand All @@ -50,10 +57,16 @@ type DeploymentVersionRecorder struct {

// NewDeploymentVersionRecorder returns a recorder for the given goal. The
// returned recorder is a no-op unless experimental.record_deployment_history
// is set.
func NewDeploymentVersionRecorder(b *bundle.Bundle, goal Goal) *DeploymentVersionRecorder {
// is set. When plan is non-nil (applying a pre-computed plan), the plan's
// version_id is validated against the live deployment version at lock time.
func NewDeploymentVersionRecorder(b *bundle.Bundle, goal Goal, plan *deployplan.Plan) *DeploymentVersionRecorder {
enabled := b.Config.Experimental != nil && b.Config.Experimental.RecordDeploymentHistory
return &DeploymentVersionRecorder{b: b, goal: goal, enabled: enabled}
r := &DeploymentVersionRecorder{b: b, goal: goal, enabled: enabled}
if plan != nil {
r.validateVersion = true
r.expectedVersion = plan.VersionId
}
return r
}

// CreateVersion registers a new deployment version with DMS, claiming it for the
Expand All @@ -68,13 +81,13 @@ func (r *DeploymentVersionRecorder) CreateVersion(ctx context.Context) error {
return fmt.Errorf("%s is not supported with the deployment metadata service", r.goal)
}

r.svc = r.b.WorkspaceClient(ctx).Bundle
svc := r.b.WorkspaceClient(ctx).Bundle

// The deployment ID is the state lineage. GetOrInitLineage generates one on
// the first deploy and stores it so the deploy persists the same value.
r.deploymentID = r.b.DeploymentBundle.StateDB.GetOrInitLineage()

versionID, err := createDeploymentVersion(ctx, r.b, r.svc, r.deploymentID, versionType)
versionID, err := createDeploymentVersion(ctx, r.b, svc, r.deploymentID, versionType, r.expectedVersion, r.validateVersion)
if err != nil {
return err
}
Expand All @@ -83,6 +96,11 @@ func (r *DeploymentVersionRecorder) CreateVersion(ctx context.Context) error {
if err != nil {
return fmt.Errorf("failed to parse version ID %q: %w", versionID, err)
}

// Set svc only after the version is created so CompleteVersion is a no-op
// when creation failed (e.g. a stale plan was rejected): there is no version
// to complete.
r.svc = svc
r.versionNum = versionNum
r.stopHeartbeat = startHeartbeat(ctx, r.svc, r.deploymentID, versionID)
return nil
Expand Down Expand Up @@ -133,12 +151,20 @@ func (r *DeploymentVersionRecorder) CompleteVersion(ctx context.Context, status
// createDeploymentVersion ensures the deployment record exists, then creates a
// new version. The deployment ID is the state lineage: we GetDeployment first
// and only CreateDeployment when it does not exist yet.
func createDeploymentVersion(ctx context.Context, b *bundle.Bundle, svc sdkbundle.BundleInterface, deploymentID string, versionType sdkbundle.VersionType) (versionID string, err error) {
//
// When validateVersion is set (applying a pre-computed plan), the deployment's
// current version must equal expectedVersion — the version the plan was
// generated against. Otherwise the deployment moved since the plan was created
// and the plan is rejected as stale.
func createDeploymentVersion(ctx context.Context, b *bundle.Bundle, svc sdkbundle.BundleInterface, deploymentID string, versionType sdkbundle.VersionType, expectedVersion string, validateVersion bool) (versionID string, err error) {
dep, getErr := svc.GetDeployment(ctx, sdkbundle.GetDeploymentRequest{
Name: "deployments/" + deploymentID,
})
switch {
case errors.Is(getErr, apierr.ErrNotFound):
if validateVersion && expectedVersion != "" {
return "", outdatedPlanErr(expectedVersion, "")
}
// Fresh deployment: create the record and start at version 1.
_, createErr := svc.CreateDeployment(ctx, sdkbundle.CreateDeploymentRequest{
DeploymentId: deploymentID,
Expand All @@ -153,6 +179,9 @@ func createDeploymentVersion(ctx context.Context, b *bundle.Bundle, svc sdkbundl
case getErr != nil:
return "", fmt.Errorf("failed to get deployment: %w", getErr)
default:
if validateVersion && dep.LastVersionId != expectedVersion {
return "", outdatedPlanErr(expectedVersion, dep.LastVersionId)
}
// Existing deployment: increment the last version to get the next one.
lastVersion, parseErr := strconv.ParseInt(dep.LastVersionId, 10, 64)
if parseErr != nil {
Expand Down Expand Up @@ -180,6 +209,15 @@ func createDeploymentVersion(ctx context.Context, b *bundle.Bundle, svc sdkbundl
return versionID, nil
}

// outdatedPlanErr returns the error reported when a pre-computed plan was
// generated against a deployment version that no longer matches the live one.
func outdatedPlanErr(expectedVersion, currentVersion string) error {
if currentVersion == "" {
return fmt.Errorf("plan is outdated: it was generated against deployment version %s, but the deployment no longer exists. Please run 'bundle plan' again", expectedVersion)
}
return fmt.Errorf("plan is outdated: it was generated against deployment version %s, but the current version is %s. Please run 'bundle plan' again", expectedVersion, currentVersion)
}

// startHeartbeat starts a background goroutine that sends heartbeats to keep
// the deployment version's lease alive. Returns a cancel function to stop it.
func startHeartbeat(ctx context.Context, svc sdkbundle.BundleInterface, deploymentID, versionID string) context.CancelFunc {
Expand Down
17 changes: 12 additions & 5 deletions bundle/deployplan/plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,18 @@ import (
const currentPlanVersion = 2

type Plan struct {
PlanVersion int `json:"plan_version,omitempty"`
CLIVersion string `json:"cli_version,omitempty"`
Lineage string `json:"lineage,omitempty"`
Serial int `json:"serial,omitempty"`
Plan map[string]*PlanEntry `json:"plan,omitzero"`
PlanVersion int `json:"plan_version,omitempty"`
CLIVersion string `json:"cli_version,omitempty"`
Lineage string `json:"lineage,omitempty"`
Serial int `json:"serial,omitempty"`
// VersionId is the Deployment Metadata Service (DMS) version the plan was
// generated against. It is set instead of Serial when
// experimental.record_deployment_history is enabled, and is validated
// against the live deployment version when the plan is applied. The two are
// mutually exclusive: serial tracks the local state file, version_id tracks
// the remote DMS deployment.
VersionId string `json:"version_id,omitempty"`
Plan map[string]*PlanEntry `json:"plan,omitzero"`

mutex sync.Mutex `json:"-"`
lockmap lockmap `json:"-"`
Expand Down
45 changes: 43 additions & 2 deletions bundle/direct/bundle_plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (
"github.com/databricks/cli/libs/structs/structpath"
"github.com/databricks/cli/libs/structs/structvar"
"github.com/databricks/databricks-sdk-go"
"github.com/databricks/databricks-sdk-go/apierr"
sdkbundle "github.com/databricks/databricks-sdk-go/service/bundle"
)

var errDelayed = errors.New("must be resolved after apply")
Expand All @@ -37,9 +39,15 @@ func (b *DeploymentBundle) init(client *databricks.WorkspaceClient) error {
return err
}

// ValidatePlanAgainstState validates that a plan's lineage and serial match the given state.
// ValidatePlanAgainstState validates that a plan's lineage matches the given
// state, and that its serial does too unless DMS recording is enabled.
// If the plan has no lineage (first deployment), validation is skipped.
func ValidatePlanAgainstState(stateDB *dstate.DeploymentState, plan *deployplan.Plan) error {
//
// When dmsEnabled is true the plan carries a version_id instead of a serial
// (the two are mutually exclusive); freshness is then validated against the
// live deployment version under the deploy lock, so the serial check is skipped
// here.
func ValidatePlanAgainstState(stateDB *dstate.DeploymentState, plan *deployplan.Plan, dmsEnabled bool) error {
if plan.Lineage == "" {
return nil
}
Expand All @@ -50,13 +58,33 @@ func ValidatePlanAgainstState(stateDB *dstate.DeploymentState, plan *deployplan.
return fmt.Errorf("plan lineage %q does not match state lineage %q; the state may have been modified by another process", plan.Lineage, stateDB.Data.Lineage)
}

if dmsEnabled {
return nil
}

if plan.Serial != stateDB.Data.Serial {
return fmt.Errorf("plan serial %d does not match state serial %d; the state has been modified since the plan was created. Please run 'bundle plan' again", plan.Serial, stateDB.Data.Serial)
}

return nil
}

// getDeploymentVersion returns the DMS deployment's current last_version_id, or
// "" if the deployment does not exist yet. This is the version a plan is
// generated against, mirroring the state serial for the file-state engine.
func getDeploymentVersion(ctx context.Context, svc sdkbundle.BundleInterface, lineage string) (string, error) {
dep, err := svc.GetDeployment(ctx, sdkbundle.GetDeploymentRequest{
Name: "deployments/" + lineage,
})
if errors.Is(err, apierr.ErrNotFound) {
return "", nil
}
if err != nil {
return "", fmt.Errorf("failed to get deployment: %w", err)
}
return dep.LastVersionId, nil
}

// InitForApply initializes the DeploymentBundle for applying a pre-computed plan.
// StateDB must already be open for write before calling this function.
func (b *DeploymentBundle) InitForApply(ctx context.Context, client *databricks.WorkspaceClient, plan *deployplan.Plan) error {
Expand Down Expand Up @@ -107,6 +135,19 @@ func (b *DeploymentBundle) CalculatePlan(ctx context.Context, client *databricks
return nil, fmt.Errorf("reading config: %w", err)
}

// When DMS recording is enabled, the plan's freshness is tracked by the
// remote deployment version rather than the local state serial. Stamp the
// version the plan is generated against and clear the serial so the two stay
// mutually exclusive (see deployplan.Plan.VersionId).
if configRoot != nil && configRoot.Experimental != nil && configRoot.Experimental.RecordDeploymentHistory && plan.Lineage != "" {
versionID, err := getDeploymentVersion(ctx, client.Bundle, plan.Lineage)
if err != nil {
return nil, err
}
plan.VersionId = versionID
plan.Serial = 0
}

b.Plan = plan

g, err := makeGraph(plan)
Expand Down
59 changes: 59 additions & 0 deletions bundle/direct/bundle_plan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package direct
import (
"testing"

"github.com/databricks/cli/bundle/deployplan"
"github.com/databricks/cli/bundle/direct/dstate"
"github.com/databricks/cli/libs/dyn"
"github.com/stretchr/testify/assert"
)
Expand Down Expand Up @@ -35,3 +37,60 @@ func TestDynPathToStructPath(t *testing.T) {
assert.Equal(t, tc.expected, node.String())
}
}

func TestValidatePlanAgainstState(t *testing.T) {
stateDB := func(lineage string, serial int) *dstate.DeploymentState {
return &dstate.DeploymentState{
Path: "state.json",
Data: dstate.Database{Header: dstate.Header{Lineage: lineage, Serial: serial}},
}
}

tests := []struct {
name string
plan *deployplan.Plan
stateDB *dstate.DeploymentState
dmsEnabled bool
wantErr string
}{
{
name: "no lineage skips validation",
plan: &deployplan.Plan{},
stateDB: stateDB("abc", 5),
},
{
name: "lineage mismatch is rejected",
plan: &deployplan.Plan{Lineage: "abc", Serial: 5},
stateDB: stateDB("xyz", 5),
wantErr: "plan lineage \"abc\" does not match state lineage \"xyz\"",
},
{
name: "matching serial passes",
plan: &deployplan.Plan{Lineage: "abc", Serial: 5},
stateDB: stateDB("abc", 5),
},
{
name: "serial mismatch is rejected when DMS is off",
plan: &deployplan.Plan{Lineage: "abc", Serial: 4},
stateDB: stateDB("abc", 5),
wantErr: "plan serial 4 does not match state serial 5",
},
{
name: "serial is ignored when DMS is on",
plan: &deployplan.Plan{Lineage: "abc", VersionId: "3"},
stateDB: stateDB("abc", 5),
dmsEnabled: true,
},
}

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
err := ValidatePlanAgainstState(tc.stateDB, tc.plan, tc.dmsEnabled)
if tc.wantErr == "" {
assert.NoError(t, err)
} else {
assert.ErrorContains(t, err, tc.wantErr)
}
})
}
}
6 changes: 4 additions & 2 deletions bundle/phases/deploy.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,8 +137,10 @@ func Deploy(ctx context.Context, b *bundle.Bundle, outputHandler sync.OutputHand
}

// lock is acquired here. Record a DMS deployment version while the lock is
// held (no-op unless experimental.record_deployment_history is set).
recorder := lock.NewDeploymentVersionRecorder(b, lock.GoalDeploy)
// held (no-op unless experimental.record_deployment_history is set). When
// applying a pre-computed plan (plan != nil), the plan's version_id is
// validated against the live deployment version here, under the lock.
recorder := lock.NewDeploymentVersionRecorder(b, lock.GoalDeploy, plan)
defer func() {
status := lock.DeploymentSuccess
if logdiag.HasError(ctx) {
Expand Down
2 changes: 1 addition & 1 deletion bundle/phases/destroy.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ func Destroy(ctx context.Context, b *bundle.Bundle, engine engine.EngineType) {

// Record a DMS deployment version while the lock is held (no-op unless
// experimental.record_deployment_history is set).
recorder := lock.NewDeploymentVersionRecorder(b, lock.GoalDestroy)
recorder := lock.NewDeploymentVersionRecorder(b, lock.GoalDestroy, nil)
defer func() {
status := lock.DeploymentSuccess
if logdiag.HasError(ctx) {
Expand Down
Loading
Loading