diff --git a/acceptance/bundle/dms/stale-plan/databricks.yml b/acceptance/bundle/dms/stale-plan/databricks.yml new file mode 100644 index 00000000000..750f47f2ce8 --- /dev/null +++ b/acceptance/bundle/dms/stale-plan/databricks.yml @@ -0,0 +1,10 @@ +bundle: + name: dms-stale-plan + +experimental: + record_deployment_history: true + +resources: + jobs: + foo: + name: foo diff --git a/acceptance/bundle/dms/stale-plan/out.test.toml b/acceptance/bundle/dms/stale-plan/out.test.toml new file mode 100644 index 00000000000..e90b6d5d1ba --- /dev/null +++ b/acceptance/bundle/dms/stale-plan/out.test.toml @@ -0,0 +1,3 @@ +Local = true +Cloud = false +EnvMatrix.DATABRICKS_BUNDLE_ENGINE = ["direct"] diff --git a/acceptance/bundle/dms/stale-plan/output.txt b/acceptance/bundle/dms/stale-plan/output.txt new file mode 100644 index 00000000000..a2192cf3070 --- /dev/null +++ b/acceptance/bundle/dms/stale-plan/output.txt @@ -0,0 +1,40 @@ + +=== Deploy creates the DMS deployment at version 1 +>>> [CLI] bundle deploy +Uploading bundle files to /Workspace/Users/[USERNAME]/.bundle/dms-stale-plan/default/files... +Deploying resources... +Updating deployment state... +Deployment complete! + +=== A plan records the DMS version it was generated against (version_id=1, no serial) +>>> [CLI] bundle plan -o json + +>>> jq {version_id, serial} plan.json +{ + "version_id": "1", + "serial": null +} + +=== The plan is usable in DMS mode: applying it succeeds and advances the deployment to version 2 +>>> [CLI] bundle deploy --plan plan.json +Uploading bundle files to /Workspace/Users/[USERNAME]/.bundle/dms-stale-plan/default/files... +Deploying resources... +Updating deployment state... +Deployment complete! + +=== Re-applying the same plan is now rejected as stale: its version_id (1) no longer matches the live version (2) +>>> [CLI] bundle deploy --plan plan.json +Error: plan is outdated: it was generated against deployment version 1, but the current version is 2. Please run 'bundle plan' again + + +=== Cleanup: destroy the deployment +>>> [CLI] bundle destroy --auto-approve +The following resources will be deleted: + delete resources.jobs.foo + +All files and directories at the following location will be deleted: /Workspace/Users/[USERNAME]/.bundle/dms-stale-plan/default + +Deleting files... +Destroy complete! + +Exit code: 1 diff --git a/acceptance/bundle/dms/stale-plan/script b/acceptance/bundle/dms/stale-plan/script new file mode 100644 index 00000000000..a8b0b566cae --- /dev/null +++ b/acceptance/bundle/dms/stale-plan/script @@ -0,0 +1,21 @@ +cleanup() { + title "Cleanup: destroy the deployment" + trace $CLI bundle destroy --auto-approve + rm -f out.requests.txt plan.json +} +trap cleanup EXIT + +title "Deploy creates the DMS deployment at version 1" +trace $CLI bundle deploy +rm -f out.requests.txt + +title "A plan records the DMS version it was generated against (version_id=1, no serial)" +trace $CLI bundle plan -o json > plan.json +trace jq '{version_id, serial}' plan.json + +title "The plan is usable in DMS mode: applying it succeeds and advances the deployment to version 2" +trace $CLI bundle deploy --plan plan.json +rm -f out.requests.txt + +title "Re-applying the same plan is now rejected as stale: its version_id (1) no longer matches the live version (2)" +trace $CLI bundle deploy --plan plan.json diff --git a/acceptance/bundle/dms/stale-plan/test.toml b/acceptance/bundle/dms/stale-plan/test.toml new file mode 100644 index 00000000000..40ed3ad4198 --- /dev/null +++ b/acceptance/bundle/dms/stale-plan/test.toml @@ -0,0 +1,4 @@ +# plan.json is generated by the script; it is not a checked-in input. +# .databricks is the local state cache left by deploy and is not removed by +# destroy when the run ends on a rejected --plan apply. +Ignore = [".databricks", "plan.json"] diff --git a/bundle/deploy/lock/deployment_metadata_service.go b/bundle/deploy/lock/deployment_metadata_service.go index 1c5159da5a2..d1c0a65bfb6 100644 --- a/bundle/deploy/lock/deployment_metadata_service.go +++ b/bundle/deploy/lock/deployment_metadata_service.go @@ -9,6 +9,7 @@ import ( "time" "github.com/databricks/cli/bundle" + "github.com/databricks/cli/bundle/deployplan" "github.com/databricks/cli/internal/build" "github.com/databricks/cli/libs/log" "github.com/databricks/databricks-sdk-go/apierr" @@ -41,6 +42,12 @@ type DeploymentVersionRecorder struct { goal Goal enabled bool + // validateVersion is set when applying a pre-computed plan: the plan's + // version_id must match the deployment's current version, otherwise the plan + // is stale and is rejected. + validateVersion bool + expectedVersion string + // populated by CreateVersion svc sdkbundle.BundleInterface deploymentID string @@ -50,10 +57,16 @@ type DeploymentVersionRecorder struct { // NewDeploymentVersionRecorder returns a recorder for the given goal. The // returned recorder is a no-op unless experimental.record_deployment_history -// is set. -func NewDeploymentVersionRecorder(b *bundle.Bundle, goal Goal) *DeploymentVersionRecorder { +// is set. When plan is non-nil (applying a pre-computed plan), the plan's +// version_id is validated against the live deployment version at lock time. +func NewDeploymentVersionRecorder(b *bundle.Bundle, goal Goal, plan *deployplan.Plan) *DeploymentVersionRecorder { enabled := b.Config.Experimental != nil && b.Config.Experimental.RecordDeploymentHistory - return &DeploymentVersionRecorder{b: b, goal: goal, enabled: enabled} + r := &DeploymentVersionRecorder{b: b, goal: goal, enabled: enabled} + if plan != nil { + r.validateVersion = true + r.expectedVersion = plan.VersionId + } + return r } // CreateVersion registers a new deployment version with DMS, claiming it for the @@ -68,13 +81,13 @@ func (r *DeploymentVersionRecorder) CreateVersion(ctx context.Context) error { return fmt.Errorf("%s is not supported with the deployment metadata service", r.goal) } - r.svc = r.b.WorkspaceClient(ctx).Bundle + svc := r.b.WorkspaceClient(ctx).Bundle // The deployment ID is the state lineage. GetOrInitLineage generates one on // the first deploy and stores it so the deploy persists the same value. r.deploymentID = r.b.DeploymentBundle.StateDB.GetOrInitLineage() - versionID, err := createDeploymentVersion(ctx, r.b, r.svc, r.deploymentID, versionType) + versionID, err := createDeploymentVersion(ctx, r.b, svc, r.deploymentID, versionType, r.expectedVersion, r.validateVersion) if err != nil { return err } @@ -83,6 +96,11 @@ func (r *DeploymentVersionRecorder) CreateVersion(ctx context.Context) error { if err != nil { return fmt.Errorf("failed to parse version ID %q: %w", versionID, err) } + + // Set svc only after the version is created so CompleteVersion is a no-op + // when creation failed (e.g. a stale plan was rejected): there is no version + // to complete. + r.svc = svc r.versionNum = versionNum r.stopHeartbeat = startHeartbeat(ctx, r.svc, r.deploymentID, versionID) return nil @@ -133,12 +151,20 @@ func (r *DeploymentVersionRecorder) CompleteVersion(ctx context.Context, status // createDeploymentVersion ensures the deployment record exists, then creates a // new version. The deployment ID is the state lineage: we GetDeployment first // and only CreateDeployment when it does not exist yet. -func createDeploymentVersion(ctx context.Context, b *bundle.Bundle, svc sdkbundle.BundleInterface, deploymentID string, versionType sdkbundle.VersionType) (versionID string, err error) { +// +// When validateVersion is set (applying a pre-computed plan), the deployment's +// current version must equal expectedVersion — the version the plan was +// generated against. Otherwise the deployment moved since the plan was created +// and the plan is rejected as stale. +func createDeploymentVersion(ctx context.Context, b *bundle.Bundle, svc sdkbundle.BundleInterface, deploymentID string, versionType sdkbundle.VersionType, expectedVersion string, validateVersion bool) (versionID string, err error) { dep, getErr := svc.GetDeployment(ctx, sdkbundle.GetDeploymentRequest{ Name: "deployments/" + deploymentID, }) switch { case errors.Is(getErr, apierr.ErrNotFound): + if validateVersion && expectedVersion != "" { + return "", outdatedPlanErr(expectedVersion, "") + } // Fresh deployment: create the record and start at version 1. _, createErr := svc.CreateDeployment(ctx, sdkbundle.CreateDeploymentRequest{ DeploymentId: deploymentID, @@ -153,6 +179,9 @@ func createDeploymentVersion(ctx context.Context, b *bundle.Bundle, svc sdkbundl case getErr != nil: return "", fmt.Errorf("failed to get deployment: %w", getErr) default: + if validateVersion && dep.LastVersionId != expectedVersion { + return "", outdatedPlanErr(expectedVersion, dep.LastVersionId) + } // Existing deployment: increment the last version to get the next one. lastVersion, parseErr := strconv.ParseInt(dep.LastVersionId, 10, 64) if parseErr != nil { @@ -180,6 +209,15 @@ func createDeploymentVersion(ctx context.Context, b *bundle.Bundle, svc sdkbundl return versionID, nil } +// outdatedPlanErr returns the error reported when a pre-computed plan was +// generated against a deployment version that no longer matches the live one. +func outdatedPlanErr(expectedVersion, currentVersion string) error { + if currentVersion == "" { + return fmt.Errorf("plan is outdated: it was generated against deployment version %s, but the deployment no longer exists. Please run 'bundle plan' again", expectedVersion) + } + return fmt.Errorf("plan is outdated: it was generated against deployment version %s, but the current version is %s. Please run 'bundle plan' again", expectedVersion, currentVersion) +} + // startHeartbeat starts a background goroutine that sends heartbeats to keep // the deployment version's lease alive. Returns a cancel function to stop it. func startHeartbeat(ctx context.Context, svc sdkbundle.BundleInterface, deploymentID, versionID string) context.CancelFunc { diff --git a/bundle/deployplan/plan.go b/bundle/deployplan/plan.go index 2d06b5fdd25..36b94f8b3da 100644 --- a/bundle/deployplan/plan.go +++ b/bundle/deployplan/plan.go @@ -16,11 +16,18 @@ import ( const currentPlanVersion = 2 type Plan struct { - PlanVersion int `json:"plan_version,omitempty"` - CLIVersion string `json:"cli_version,omitempty"` - Lineage string `json:"lineage,omitempty"` - Serial int `json:"serial,omitempty"` - Plan map[string]*PlanEntry `json:"plan,omitzero"` + PlanVersion int `json:"plan_version,omitempty"` + CLIVersion string `json:"cli_version,omitempty"` + Lineage string `json:"lineage,omitempty"` + Serial int `json:"serial,omitempty"` + // VersionId is the Deployment Metadata Service (DMS) version the plan was + // generated against. It is set instead of Serial when + // experimental.record_deployment_history is enabled, and is validated + // against the live deployment version when the plan is applied. The two are + // mutually exclusive: serial tracks the local state file, version_id tracks + // the remote DMS deployment. + VersionId string `json:"version_id,omitempty"` + Plan map[string]*PlanEntry `json:"plan,omitzero"` mutex sync.Mutex `json:"-"` lockmap lockmap `json:"-"` diff --git a/bundle/direct/bundle_plan.go b/bundle/direct/bundle_plan.go index 79ecb37f8dd..a69582b0816 100644 --- a/bundle/direct/bundle_plan.go +++ b/bundle/direct/bundle_plan.go @@ -24,6 +24,8 @@ import ( "github.com/databricks/cli/libs/structs/structpath" "github.com/databricks/cli/libs/structs/structvar" "github.com/databricks/databricks-sdk-go" + "github.com/databricks/databricks-sdk-go/apierr" + sdkbundle "github.com/databricks/databricks-sdk-go/service/bundle" ) var errDelayed = errors.New("must be resolved after apply") @@ -37,9 +39,15 @@ func (b *DeploymentBundle) init(client *databricks.WorkspaceClient) error { return err } -// ValidatePlanAgainstState validates that a plan's lineage and serial match the given state. +// ValidatePlanAgainstState validates that a plan's lineage matches the given +// state, and that its serial does too unless DMS recording is enabled. // If the plan has no lineage (first deployment), validation is skipped. -func ValidatePlanAgainstState(stateDB *dstate.DeploymentState, plan *deployplan.Plan) error { +// +// When dmsEnabled is true the plan carries a version_id instead of a serial +// (the two are mutually exclusive); freshness is then validated against the +// live deployment version under the deploy lock, so the serial check is skipped +// here. +func ValidatePlanAgainstState(stateDB *dstate.DeploymentState, plan *deployplan.Plan, dmsEnabled bool) error { if plan.Lineage == "" { return nil } @@ -50,6 +58,10 @@ func ValidatePlanAgainstState(stateDB *dstate.DeploymentState, plan *deployplan. return fmt.Errorf("plan lineage %q does not match state lineage %q; the state may have been modified by another process", plan.Lineage, stateDB.Data.Lineage) } + if dmsEnabled { + return nil + } + if plan.Serial != stateDB.Data.Serial { return fmt.Errorf("plan serial %d does not match state serial %d; the state has been modified since the plan was created. Please run 'bundle plan' again", plan.Serial, stateDB.Data.Serial) } @@ -57,6 +69,22 @@ func ValidatePlanAgainstState(stateDB *dstate.DeploymentState, plan *deployplan. return nil } +// getDeploymentVersion returns the DMS deployment's current last_version_id, or +// "" if the deployment does not exist yet. This is the version a plan is +// generated against, mirroring the state serial for the file-state engine. +func getDeploymentVersion(ctx context.Context, svc sdkbundle.BundleInterface, lineage string) (string, error) { + dep, err := svc.GetDeployment(ctx, sdkbundle.GetDeploymentRequest{ + Name: "deployments/" + lineage, + }) + if errors.Is(err, apierr.ErrNotFound) { + return "", nil + } + if err != nil { + return "", fmt.Errorf("failed to get deployment: %w", err) + } + return dep.LastVersionId, nil +} + // InitForApply initializes the DeploymentBundle for applying a pre-computed plan. // StateDB must already be open for write before calling this function. func (b *DeploymentBundle) InitForApply(ctx context.Context, client *databricks.WorkspaceClient, plan *deployplan.Plan) error { @@ -107,6 +135,19 @@ func (b *DeploymentBundle) CalculatePlan(ctx context.Context, client *databricks return nil, fmt.Errorf("reading config: %w", err) } + // When DMS recording is enabled, the plan's freshness is tracked by the + // remote deployment version rather than the local state serial. Stamp the + // version the plan is generated against and clear the serial so the two stay + // mutually exclusive (see deployplan.Plan.VersionId). + if configRoot != nil && configRoot.Experimental != nil && configRoot.Experimental.RecordDeploymentHistory && plan.Lineage != "" { + versionID, err := getDeploymentVersion(ctx, client.Bundle, plan.Lineage) + if err != nil { + return nil, err + } + plan.VersionId = versionID + plan.Serial = 0 + } + b.Plan = plan g, err := makeGraph(plan) diff --git a/bundle/direct/bundle_plan_test.go b/bundle/direct/bundle_plan_test.go index ccfb7cb517f..3b1379a6c31 100644 --- a/bundle/direct/bundle_plan_test.go +++ b/bundle/direct/bundle_plan_test.go @@ -3,6 +3,8 @@ package direct import ( "testing" + "github.com/databricks/cli/bundle/deployplan" + "github.com/databricks/cli/bundle/direct/dstate" "github.com/databricks/cli/libs/dyn" "github.com/stretchr/testify/assert" ) @@ -35,3 +37,60 @@ func TestDynPathToStructPath(t *testing.T) { assert.Equal(t, tc.expected, node.String()) } } + +func TestValidatePlanAgainstState(t *testing.T) { + stateDB := func(lineage string, serial int) *dstate.DeploymentState { + return &dstate.DeploymentState{ + Path: "state.json", + Data: dstate.Database{Header: dstate.Header{Lineage: lineage, Serial: serial}}, + } + } + + tests := []struct { + name string + plan *deployplan.Plan + stateDB *dstate.DeploymentState + dmsEnabled bool + wantErr string + }{ + { + name: "no lineage skips validation", + plan: &deployplan.Plan{}, + stateDB: stateDB("abc", 5), + }, + { + name: "lineage mismatch is rejected", + plan: &deployplan.Plan{Lineage: "abc", Serial: 5}, + stateDB: stateDB("xyz", 5), + wantErr: "plan lineage \"abc\" does not match state lineage \"xyz\"", + }, + { + name: "matching serial passes", + plan: &deployplan.Plan{Lineage: "abc", Serial: 5}, + stateDB: stateDB("abc", 5), + }, + { + name: "serial mismatch is rejected when DMS is off", + plan: &deployplan.Plan{Lineage: "abc", Serial: 4}, + stateDB: stateDB("abc", 5), + wantErr: "plan serial 4 does not match state serial 5", + }, + { + name: "serial is ignored when DMS is on", + plan: &deployplan.Plan{Lineage: "abc", VersionId: "3"}, + stateDB: stateDB("abc", 5), + dmsEnabled: true, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + err := ValidatePlanAgainstState(tc.stateDB, tc.plan, tc.dmsEnabled) + if tc.wantErr == "" { + assert.NoError(t, err) + } else { + assert.ErrorContains(t, err, tc.wantErr) + } + }) + } +} diff --git a/bundle/phases/deploy.go b/bundle/phases/deploy.go index b3c6f3c682d..dd509db7605 100644 --- a/bundle/phases/deploy.go +++ b/bundle/phases/deploy.go @@ -137,8 +137,10 @@ func Deploy(ctx context.Context, b *bundle.Bundle, outputHandler sync.OutputHand } // lock is acquired here. Record a DMS deployment version while the lock is - // held (no-op unless experimental.record_deployment_history is set). - recorder := lock.NewDeploymentVersionRecorder(b, lock.GoalDeploy) + // held (no-op unless experimental.record_deployment_history is set). When + // applying a pre-computed plan (plan != nil), the plan's version_id is + // validated against the live deployment version here, under the lock. + recorder := lock.NewDeploymentVersionRecorder(b, lock.GoalDeploy, plan) defer func() { status := lock.DeploymentSuccess if logdiag.HasError(ctx) { diff --git a/bundle/phases/destroy.go b/bundle/phases/destroy.go index 7a078f6ed6a..241bcfa47b1 100644 --- a/bundle/phases/destroy.go +++ b/bundle/phases/destroy.go @@ -127,7 +127,7 @@ func Destroy(ctx context.Context, b *bundle.Bundle, engine engine.EngineType) { // Record a DMS deployment version while the lock is held (no-op unless // experimental.record_deployment_history is set). - recorder := lock.NewDeploymentVersionRecorder(b, lock.GoalDestroy) + recorder := lock.NewDeploymentVersionRecorder(b, lock.GoalDestroy, nil) defer func() { status := lock.DeploymentSuccess if logdiag.HasError(ctx) { diff --git a/cmd/bundle/utils/process.go b/cmd/bundle/utils/process.go index 5f43cff6acd..d62158a6a92 100644 --- a/cmd/bundle/utils/process.go +++ b/cmd/bundle/utils/process.go @@ -247,9 +247,12 @@ func ProcessBundleRet(cmd *cobra.Command, opts ProcessOptions) (b *bundle.Bundle log.Warnf(ctx, "Plan was created with CLI version %s but current version is %s", plan.CLIVersion, currentVersion) } - // Validate that the plan's lineage and serial match the current state - // This must happen before any file operations - err = direct.ValidatePlanAgainstState(&b.DeploymentBundle.StateDB, plan) + // Validate that the plan's lineage and serial match the current state. + // This must happen before any file operations. When DMS recording is + // enabled the serial is replaced by the deployment version, which is + // validated under the deploy lock instead (see DeploymentVersionRecorder). + dmsEnabled := b.Config.Experimental != nil && b.Config.Experimental.RecordDeploymentHistory + err = direct.ValidatePlanAgainstState(&b.DeploymentBundle.StateDB, plan, dmsEnabled) if err != nil { logdiag.LogError(ctx, err) return b, stateDesc, root.ErrAlreadyPrinted