From f2471e4db90cc38b1239ec9e352652f257b871c2 Mon Sep 17 00:00:00 2001 From: Shreyas Goenka Date: Thu, 26 Mar 2026 16:50:44 +0000 Subject: [PATCH 1/3] Integrate deployment metadata service for server-side locking and state Add client integration with the deployment metadata service API for server-side deployment locking and resource state tracking. Gated behind DATABRICKS_BUNDLE_DEPLOYMENT_SERVICE=true environment variable. Co-authored-by: Isaac --- bundle/deploy/metadata/service/client.go | 183 +++++++++++++ bundle/deploy/metadata/service/heartbeat.go | 37 +++ bundle/deploy/metadata/service/types.go | 189 ++++++++++++++ bundle/deploy/state_update.go | 6 + bundle/env/deployment_metadata.go | 15 ++ bundle/phases/deploy.go | 6 + bundle/phases/deploy_metadata.go | 269 ++++++++++++++++++++ bundle/phases/destroy.go | 6 + bundle/phases/destroy_metadata.go | 169 ++++++++++++ 9 files changed, 880 insertions(+) create mode 100644 bundle/deploy/metadata/service/client.go create mode 100644 bundle/deploy/metadata/service/heartbeat.go create mode 100644 bundle/deploy/metadata/service/types.go create mode 100644 bundle/env/deployment_metadata.go create mode 100644 bundle/phases/deploy_metadata.go create mode 100644 bundle/phases/destroy_metadata.go diff --git a/bundle/deploy/metadata/service/client.go b/bundle/deploy/metadata/service/client.go new file mode 100644 index 0000000000..ffe2fb36fc --- /dev/null +++ b/bundle/deploy/metadata/service/client.go @@ -0,0 +1,183 @@ +package service + +import ( + "context" + "fmt" + "net/http" + + "errors" + + "github.com/databricks/databricks-sdk-go" + "github.com/databricks/databricks-sdk-go/apierr" + "github.com/databricks/databricks-sdk-go/client" +) + +const basePath = "/api/2.0/bundle" + +// Client wraps the Databricks API client for the deployment metadata service. +type Client struct { + api *client.DatabricksClient +} + +// NewClient creates a new deployment metadata service client from a workspace client. +func NewClient(w *databricks.WorkspaceClient) (*Client, error) { + apiClient, err := client.New(w.Config) + if err != nil { + return nil, fmt.Errorf("failed to create deployment metadata API client: %w", err) + } + return &Client{api: apiClient}, nil +} + +// CreateDeployment creates a new deployment. +func (c *Client) CreateDeployment(ctx context.Context, deploymentID string, deployment *Deployment) (*Deployment, error) { + resp := &Deployment{} + path := fmt.Sprintf("%s/deployments", basePath) + err := c.api.Do(ctx, http.MethodPost, path, nil, nil, CreateDeploymentRequest{ + DeploymentID: deploymentID, + Deployment: deployment, + }, resp) + if err != nil { + return nil, mapError("create deployment", err) + } + return resp, nil +} + +// GetDeployment retrieves a deployment by ID. +func (c *Client) GetDeployment(ctx context.Context, deploymentID string) (*Deployment, error) { + resp := &Deployment{} + path := fmt.Sprintf("%s/deployments/%s", basePath, deploymentID) + err := c.api.Do(ctx, http.MethodGet, path, nil, nil, nil, resp) + if err != nil { + return nil, mapError("get deployment", err) + } + return resp, nil +} + +// DeleteDeployment soft-deletes a deployment. +func (c *Client) DeleteDeployment(ctx context.Context, deploymentID string) error { + path := fmt.Sprintf("%s/deployments/%s", basePath, deploymentID) + err := c.api.Do(ctx, http.MethodDelete, path, nil, nil, nil, nil) + if err != nil { + return mapError("delete deployment", err) + } + return nil +} + +// CreateVersion creates a new version (acquires the deployment lock). +func (c *Client) CreateVersion(ctx context.Context, deploymentID string, versionID string, version *Version) (*Version, error) { + resp := &Version{} + path := fmt.Sprintf("%s/deployments/%s/versions", basePath, deploymentID) + err := c.api.Do(ctx, http.MethodPost, path, nil, nil, CreateVersionRequest{ + Parent: fmt.Sprintf("deployments/%s", deploymentID), + Version: version, + VersionID: versionID, + }, resp) + if err != nil { + return nil, mapError("create version", err) + } + return resp, nil +} + +// GetVersion retrieves a version. +func (c *Client) GetVersion(ctx context.Context, deploymentID, versionID string) (*Version, error) { + resp := &Version{} + path := fmt.Sprintf("%s/deployments/%s/versions/%s", basePath, deploymentID, versionID) + err := c.api.Do(ctx, http.MethodGet, path, nil, nil, nil, resp) + if err != nil { + return nil, mapError("get version", err) + } + return resp, nil +} + +// Heartbeat renews the lock lease for an in-progress version. +func (c *Client) Heartbeat(ctx context.Context, deploymentID, versionID string) (*HeartbeatResponse, error) { + resp := &HeartbeatResponse{} + path := fmt.Sprintf("%s/deployments/%s/versions/%s/heartbeat", basePath, deploymentID, versionID) + err := c.api.Do(ctx, http.MethodPost, path, nil, nil, struct{}{}, resp) + if err != nil { + return nil, mapError("heartbeat", err) + } + return resp, nil +} + +// CompleteVersion marks a version as completed (releases the deployment lock). +func (c *Client) CompleteVersion(ctx context.Context, deploymentID, versionID string, reason VersionComplete, force bool) (*Version, error) { + resp := &Version{} + path := fmt.Sprintf("%s/deployments/%s/versions/%s/complete", basePath, deploymentID, versionID) + err := c.api.Do(ctx, http.MethodPost, path, nil, nil, CompleteVersionRequest{ + Name: fmt.Sprintf("deployments/%s/versions/%s", deploymentID, versionID), + CompletionReason: reason, + Force: force, + }, resp) + if err != nil { + return nil, mapError("complete version", err) + } + return resp, nil +} + +// CreateOperation records a resource operation for a version. +func (c *Client) CreateOperation(ctx context.Context, deploymentID, versionID, resourceKey string, operation *Operation) (*Operation, error) { + resp := &Operation{} + path := fmt.Sprintf("%s/deployments/%s/versions/%s/operations", basePath, deploymentID, versionID) + err := c.api.Do(ctx, http.MethodPost, path, nil, nil, CreateOperationRequest{ + Parent: fmt.Sprintf("deployments/%s/versions/%s", deploymentID, versionID), + ResourceKey: resourceKey, + Operation: operation, + }, resp) + if err != nil { + return nil, mapError("create operation", err) + } + return resp, nil +} + +// ListResources lists all resources for a deployment. +func (c *Client) ListResources(ctx context.Context, deploymentID string) ([]Resource, error) { + var allResources []Resource + pageToken := "" + + for { + resp := &ListResourcesResponse{} + path := fmt.Sprintf("%s/deployments/%s/resources", basePath, deploymentID) + + q := map[string]any{ + "parent": fmt.Sprintf("deployments/%s", deploymentID), + "page_size": 1000, + } + if pageToken != "" { + q["page_token"] = pageToken + } + + err := c.api.Do(ctx, http.MethodGet, path, nil, q, nil, resp) + if err != nil { + return nil, mapError("list resources", err) + } + + allResources = append(allResources, resp.Resources...) + if resp.NextPageToken == "" { + break + } + pageToken = resp.NextPageToken + } + + return allResources, nil +} + +// mapError translates API errors into user-friendly messages. +func mapError(operation string, err error) error { + var apiErr *apierr.APIError + if !errors.As(err, &apiErr) { + return fmt.Errorf("%s: %w", operation, err) + } + + switch apiErr.StatusCode { + case http.StatusConflict: + return fmt.Errorf("%s: deployment is locked by another active deployment. "+ + "Use --force-lock to override", operation) + case http.StatusNotFound: + return fmt.Errorf("%s: resource not found: %w", operation, err) + case http.StatusBadRequest: + return fmt.Errorf("%s: bad request: %s", operation, apiErr.Message) + default: + return fmt.Errorf("%s: %w", operation, err) + } +} diff --git a/bundle/deploy/metadata/service/heartbeat.go b/bundle/deploy/metadata/service/heartbeat.go new file mode 100644 index 0000000000..d32e0a24f0 --- /dev/null +++ b/bundle/deploy/metadata/service/heartbeat.go @@ -0,0 +1,37 @@ +package service + +import ( + "context" + "time" + + "github.com/databricks/cli/libs/log" +) + +const DefaultHeartbeatInterval = 2 * time.Minute + +// StartHeartbeat starts a background goroutine that sends heartbeats to keep +// the deployment lock alive. Returns a cancel function to stop the heartbeat. +func StartHeartbeat(ctx context.Context, client *Client, deploymentID, versionID string, interval time.Duration) context.CancelFunc { + ctx, cancel := context.WithCancel(ctx) + + go func() { + ticker := time.NewTicker(interval) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + _, err := client.Heartbeat(ctx, deploymentID, versionID) + if err != nil { + log.Warnf(ctx, "Failed to send deployment heartbeat: %v", err) + } else { + log.Debugf(ctx, "Deployment heartbeat sent for deployment=%s version=%s", deploymentID, versionID) + } + } + } + }() + + return cancel +} diff --git a/bundle/deploy/metadata/service/types.go b/bundle/deploy/metadata/service/types.go new file mode 100644 index 0000000000..05e0cf03b1 --- /dev/null +++ b/bundle/deploy/metadata/service/types.go @@ -0,0 +1,189 @@ +package service + +import "time" + +// Enum types matching the proto definitions. + +type DeploymentStatus int +type VersionStatus int +type VersionComplete int +type VersionType int +type OperationStatus int +type OperationActionType int +type DeploymentResourceType int + +const ( + DeploymentStatusUnspecified DeploymentStatus = 0 + DeploymentStatusActive DeploymentStatus = 1 + DeploymentStatusFailed DeploymentStatus = 2 + DeploymentStatusInProgress DeploymentStatus = 3 + DeploymentStatusDeleted DeploymentStatus = 4 +) + +const ( + VersionStatusUnspecified VersionStatus = 0 + VersionStatusInProgress VersionStatus = 1 + VersionStatusCompleted VersionStatus = 2 +) + +const ( + VersionCompleteUnspecified VersionComplete = 0 + VersionCompleteSuccess VersionComplete = 1 + VersionCompleteFailure VersionComplete = 2 + VersionCompleteForceAbort VersionComplete = 3 + VersionCompleteLeaseExpire VersionComplete = 4 +) + +const ( + VersionTypeUnspecified VersionType = 0 + VersionTypeDeploy VersionType = 1 + VersionTypeDestroy VersionType = 2 +) + +const ( + OperationStatusUnspecified OperationStatus = 0 + OperationStatusSucceeded OperationStatus = 1 + OperationStatusFailed OperationStatus = 2 +) + +const ( + OperationActionTypeUnspecified OperationActionType = 0 + OperationActionTypeResize OperationActionType = 1 + OperationActionTypeUpdate OperationActionType = 2 + OperationActionTypeUpdateWithID OperationActionType = 3 + OperationActionTypeCreate OperationActionType = 4 + OperationActionTypeRecreate OperationActionType = 5 + OperationActionTypeDelete OperationActionType = 6 + OperationActionTypeBind OperationActionType = 7 + OperationActionTypeBindAndUpdate OperationActionType = 8 + OperationActionTypeInitRegister OperationActionType = 9 +) + +const ( + ResourceTypeUnspecified DeploymentResourceType = 0 + ResourceTypeJob DeploymentResourceType = 1 + ResourceTypePipeline DeploymentResourceType = 2 + ResourceTypeModel DeploymentResourceType = 4 + ResourceTypeRegisteredModel DeploymentResourceType = 5 + ResourceTypeExperiment DeploymentResourceType = 6 + ResourceTypeServingEndpoint DeploymentResourceType = 7 + ResourceTypeQualityMonitor DeploymentResourceType = 8 + ResourceTypeSchema DeploymentResourceType = 9 + ResourceTypeVolume DeploymentResourceType = 10 + ResourceTypeCluster DeploymentResourceType = 11 + ResourceTypeDashboard DeploymentResourceType = 12 + ResourceTypeApp DeploymentResourceType = 13 + ResourceTypeCatalog DeploymentResourceType = 14 + ResourceTypeExternalLocation DeploymentResourceType = 15 + ResourceTypeSecretScope DeploymentResourceType = 16 + ResourceTypeAlert DeploymentResourceType = 17 + ResourceTypeSQLWarehouse DeploymentResourceType = 18 + ResourceTypeDatabaseInstance DeploymentResourceType = 19 + ResourceTypeDatabaseCatalog DeploymentResourceType = 20 + ResourceTypeSyncedDBTable DeploymentResourceType = 21 + ResourceTypePostgresProject DeploymentResourceType = 22 + ResourceTypePostgresBranch DeploymentResourceType = 23 + ResourceTypePostgresEndpoint DeploymentResourceType = 24 +) + +// Deployment represents a bundle deployment registered with the control plane. +type Deployment struct { + Name string `json:"name,omitempty"` + DisplayName string `json:"display_name,omitempty"` + TargetName string `json:"target_name,omitempty"` + Status DeploymentStatus `json:"status,omitempty"` + LastVersionID string `json:"last_version_id,omitempty"` + CreatedBy string `json:"created_by,omitempty"` + CreateTime *time.Time `json:"create_time,omitempty"` + UpdateTime *time.Time `json:"update_time,omitempty"` + DestroyTime *time.Time `json:"destroy_time,omitempty"` + DestroyedBy string `json:"destroyed_by,omitempty"` +} + +// Version represents a single invocation of deploy/destroy against a deployment. +type Version struct { + Name string `json:"name,omitempty"` + VersionID string `json:"version_id,omitempty"` + CreatedBy string `json:"created_by,omitempty"` + CreateTime *time.Time `json:"create_time,omitempty"` + CompleteTime *time.Time `json:"complete_time,omitempty"` + CliVersion string `json:"cli_version,omitempty"` + Status VersionStatus `json:"status,omitempty"` + VersionType VersionType `json:"version_type,omitempty"` + CompletionReason VersionComplete `json:"completion_reason,omitempty"` + CompletedBy string `json:"completed_by,omitempty"` + DisplayName string `json:"display_name,omitempty"` + TargetName string `json:"target_name,omitempty"` +} + +// Operation records the result of applying a resource change. +type Operation struct { + Name string `json:"name,omitempty"` + ResourceKey string `json:"resource_key,omitempty"` + ActionType OperationActionType `json:"action_type,omitempty"` + State any `json:"state,omitempty"` + ResourceID string `json:"resource_id,omitempty"` + CreateTime *time.Time `json:"create_time,omitempty"` + Status OperationStatus `json:"status,omitempty"` + ErrorMessage string `json:"error_message,omitempty"` +} + +// Resource represents a resource managed by a deployment. +type Resource struct { + Name string `json:"name,omitempty"` + ResourceKey string `json:"resource_key,omitempty"` + State any `json:"state,omitempty"` + ResourceID string `json:"resource_id,omitempty"` + LastActionType OperationActionType `json:"last_action_type,omitempty"` + LastVersionID string `json:"last_version_id,omitempty"` + ResourceType DeploymentResourceType `json:"resource_type,omitempty"` +} + +// Request/Response types. + +type CreateDeploymentRequest struct { + DeploymentID string `json:"deployment_id"` + Deployment *Deployment `json:"deployment"` +} + +type ListDeploymentsResponse struct { + Deployments []Deployment `json:"deployments"` + NextPageToken string `json:"next_page_token,omitempty"` +} + +type CreateVersionRequest struct { + Parent string `json:"parent"` + Version *Version `json:"version"` + VersionID string `json:"version_id"` +} + +type ListVersionsResponse struct { + Versions []Version `json:"versions"` + NextPageToken string `json:"next_page_token,omitempty"` +} + +type HeartbeatResponse struct { + ExpireTime *time.Time `json:"expire_time,omitempty"` +} + +type CompleteVersionRequest struct { + Name string `json:"name"` + CompletionReason VersionComplete `json:"completion_reason"` + Force bool `json:"force,omitempty"` +} + +type CreateOperationRequest struct { + Parent string `json:"parent"` + ResourceKey string `json:"resource_key"` + Operation *Operation `json:"operation"` +} + +type ListOperationsResponse struct { + Operations []Operation `json:"operations"` + NextPageToken string `json:"next_page_token,omitempty"` +} + +type ListResourcesResponse struct { + Resources []Resource `json:"resources"` + NextPageToken string `json:"next_page_token,omitempty"` +} diff --git a/bundle/deploy/state_update.go b/bundle/deploy/state_update.go index 55cf2393bf..06326c8a93 100644 --- a/bundle/deploy/state_update.go +++ b/bundle/deploy/state_update.go @@ -81,6 +81,12 @@ func StateUpdate() bundle.Mutator { return &stateUpdate{} } +// LoadState loads the deployment state from the local cache directory. +// If no state file exists, a new default DeploymentState is returned. +func LoadState(ctx context.Context, b *bundle.Bundle) (*DeploymentState, error) { + return load(ctx, b) +} + func load(ctx context.Context, b *bundle.Bundle) (*DeploymentState, error) { // If the file does not exist, return a new DeploymentState. statePath, err := getPathToStateFile(ctx, b) diff --git a/bundle/env/deployment_metadata.go b/bundle/env/deployment_metadata.go new file mode 100644 index 0000000000..60e896c045 --- /dev/null +++ b/bundle/env/deployment_metadata.go @@ -0,0 +1,15 @@ +package env + +import "context" + +// deploymentServiceVariable names the environment variable that controls whether the +// deployment metadata service is used for locking and resource state management. +const deploymentServiceVariable = "DATABRICKS_BUNDLE_DEPLOYMENT_SERVICE" + +// DeploymentService returns the environment variable that controls whether the +// deployment metadata service is used for locking and resource state management. +func DeploymentService(ctx context.Context) (string, bool) { + return get(ctx, []string{ + deploymentServiceVariable, + }) +} diff --git a/bundle/phases/deploy.go b/bundle/phases/deploy.go index 4613a7a211..7a1fa6e778 100644 --- a/bundle/phases/deploy.go +++ b/bundle/phases/deploy.go @@ -9,6 +9,7 @@ import ( "github.com/databricks/cli/bundle/config" "github.com/databricks/cli/bundle/config/engine" "github.com/databricks/cli/bundle/deploy" + "github.com/databricks/cli/bundle/env" "github.com/databricks/cli/bundle/deploy/files" "github.com/databricks/cli/bundle/deploy/lock" "github.com/databricks/cli/bundle/deploy/metadata" @@ -139,6 +140,11 @@ func uploadLibraries(ctx context.Context, b *bundle.Bundle, libs map[string][]li // The deploy phase deploys artifacts and resources. // If readPlanPath is provided, the plan is loaded from that file instead of being calculated. func Deploy(ctx context.Context, b *bundle.Bundle, outputHandler sync.OutputHandler, engine engine.EngineType, libs map[string][]libraries.LocationToUpdate, plan *deployplan.Plan) { + if v, _ := env.DeploymentService(ctx); v == "true" { + deployWithMetadataService(ctx, b, outputHandler, engine, libs, plan) + return + } + log.Info(ctx, "Phase: deploy") // Core mutators that CRUD resources and modify deployment state. These diff --git a/bundle/phases/deploy_metadata.go b/bundle/phases/deploy_metadata.go new file mode 100644 index 0000000000..bbe1197b5b --- /dev/null +++ b/bundle/phases/deploy_metadata.go @@ -0,0 +1,269 @@ +package phases + +import ( + "context" + "errors" + "fmt" + "net/http" + + "github.com/databricks/cli/bundle" + "github.com/databricks/cli/bundle/artifacts" + "github.com/databricks/cli/bundle/config" + "github.com/databricks/cli/bundle/config/engine" + "github.com/databricks/cli/bundle/deploy" + "github.com/databricks/cli/bundle/deploy/files" + "github.com/databricks/cli/bundle/deploy/metadata" + metadataservice "github.com/databricks/cli/bundle/deploy/metadata/service" + "github.com/databricks/cli/bundle/deploy/terraform" + "github.com/databricks/cli/bundle/deployplan" + "github.com/databricks/cli/bundle/direct" + "github.com/databricks/cli/bundle/libraries" + "github.com/databricks/cli/bundle/metrics" + "github.com/databricks/cli/bundle/permissions" + "github.com/databricks/cli/bundle/scripts" + "github.com/databricks/cli/bundle/statemgmt" + "github.com/databricks/cli/internal/build" + "github.com/databricks/cli/libs/cmdio" + "github.com/databricks/cli/libs/log" + "github.com/databricks/cli/libs/logdiag" + "github.com/databricks/cli/libs/sync" + "github.com/databricks/databricks-sdk-go/apierr" + "github.com/google/uuid" +) + +func deployWithMetadataService(ctx context.Context, b *bundle.Bundle, outputHandler sync.OutputHandler, targetEngine engine.EngineType, libs map[string][]libraries.LocationToUpdate, plan *deployplan.Plan) { + log.Info(ctx, "Phase: deploy (with metadata service)") + + bundle.ApplyContext(ctx, b, scripts.Execute(config.ScriptPreDeploy)) + if logdiag.HasError(ctx) { + return + } + + // Create the metadata service client. + svc, err := metadataservice.NewClient(b.WorkspaceClient()) + if err != nil { + logdiag.LogError(ctx, fmt.Errorf("failed to create metadata service client: %w", err)) + return + } + + // Load local deployment state to get the deployment ID and sequence number. + state, err := deploy.LoadState(ctx, b) + if err != nil { + logdiag.LogError(ctx, fmt.Errorf("failed to load deployment state: %w", err)) + return + } + + // Generate a deployment ID if one doesn't exist yet. + if state.ID == uuid.Nil { + state.ID = uuid.New() + } + deploymentID := state.ID.String() + + // Ensure the deployment exists in the metadata service. + _, err = svc.CreateDeployment(ctx, deploymentID, &metadataservice.Deployment{ + TargetName: b.Config.Bundle.Target, + }) + if err != nil && !isAlreadyExists(err) { + logdiag.LogError(ctx, fmt.Errorf("failed to create deployment: %w", err)) + return + } + + // Create a version to acquire the deployment lock. + versionID := fmt.Sprintf("%d", state.Seq+1) + version, err := svc.CreateVersion(ctx, deploymentID, versionID, &metadataservice.Version{ + CliVersion: build.GetInfo().Version, + VersionType: metadataservice.VersionTypeDeploy, + TargetName: b.Config.Bundle.Target, + }) + if err != nil { + logdiag.LogError(ctx, fmt.Errorf("failed to acquire deployment lock: %w", err)) + return + } + + log.Infof(ctx, "Acquired deployment lock: deployment=%s version=%s", deploymentID, version.VersionID) + + // Start heartbeat to keep the lock alive. + stopHeartbeat := metadataservice.StartHeartbeat(ctx, svc, deploymentID, versionID, metadataservice.DefaultHeartbeatInterval) + + // Ensure we always complete the version (release the lock) and stop heartbeat. + var deployFailed bool + defer func() { + stopHeartbeat() + + reason := metadataservice.VersionCompleteSuccess + if deployFailed || logdiag.HasError(ctx) { + reason = metadataservice.VersionCompleteFailure + } + + _, completeErr := svc.CompleteVersion(ctx, deploymentID, versionID, reason, false) + if completeErr != nil { + log.Warnf(ctx, "Failed to release deployment lock: %v", completeErr) + } else { + log.Infof(ctx, "Released deployment lock: deployment=%s version=%s reason=%d", deploymentID, versionID, reason) + } + }() + + // Upload libraries. + bundle.ApplySeqContext(ctx, b, + artifacts.CleanUp(), + libraries.Upload(libs), + ) + if logdiag.HasError(ctx) { + deployFailed = true + return + } + + // Upload files, update state, apply permissions. + bundle.ApplySeqContext(ctx, b, + files.Upload(outputHandler), + deploy.StateUpdate(), + deploy.StatePush(), + permissions.ApplyWorkspaceRootPermissions(), + metrics.TrackUsedCompute(), + deploy.ResourcePathMkdir(), + ) + if logdiag.HasError(ctx) { + deployFailed = true + return + } + + // Calculate or load the deploy plan. + if plan != nil { + _, localPath := b.StateFilenameDirect(ctx) + err := b.DeploymentBundle.InitForApply(ctx, b.WorkspaceClient(), localPath, plan) + if err != nil { + logdiag.LogError(ctx, err) + deployFailed = true + return + } + } else { + plan = RunPlan(ctx, b, targetEngine) + } + if logdiag.HasError(ctx) { + deployFailed = true + return + } + + // Seek approval for potentially destructive changes. + haveApproval, err := approvalForDeploy(ctx, b, plan) + if err != nil { + logdiag.LogError(ctx, err) + deployFailed = true + return + } + if !haveApproval { + cmdio.LogString(ctx, "Deployment cancelled!") + return + } + + // Apply the deployment. + deployCoreWithMetadata(ctx, b, plan, targetEngine, svc, deploymentID, versionID) + if logdiag.HasError(ctx) { + deployFailed = true + return + } + + logDeployTelemetry(ctx, b) + bundle.ApplyContext(ctx, b, scripts.Execute(config.ScriptPostDeploy)) +} + +// deployCoreWithMetadata applies the deployment plan and reports operations to +// the metadata service. +func deployCoreWithMetadata(ctx context.Context, b *bundle.Bundle, plan *deployplan.Plan, targetEngine engine.EngineType, svc *metadataservice.Client, deploymentID, versionID string) { + cmdio.LogString(ctx, "Deploying resources...") + + if targetEngine.IsDirect() { + b.DeploymentBundle.Apply(ctx, b.WorkspaceClient(), plan, direct.MigrateMode(false)) + } else { + bundle.ApplyContext(ctx, b, terraform.Apply()) + } + + // Push resource state even on failure. + statemgmt.PushResourcesState(ctx, b, targetEngine) + + // Report operations to the metadata service (best-effort). + reportOperations(ctx, b, svc, deploymentID, versionID, plan) + + if logdiag.HasError(ctx) { + return + } + + bundle.ApplySeqContext(ctx, b, + statemgmt.Load(targetEngine), + metadata.Compute(), + metadata.Upload(), + statemgmt.UploadStateForYamlSync(targetEngine), + ) + + if !logdiag.HasError(ctx) { + cmdio.LogString(ctx, "Deployment complete!") + } +} + +// reportOperations reports each resource operation to the metadata service. +// This is best-effort: failures are logged as warnings, not fatal errors. +func reportOperations(ctx context.Context, b *bundle.Bundle, svc *metadataservice.Client, deploymentID, versionID string, plan *deployplan.Plan) { + if plan == nil { + return + } + + // Fetch existing resources to determine if this is the first time we're + // tracking each resource in the metadata service. + knownResources := map[string]bool{} + existing, err := svc.ListResources(ctx, deploymentID) + if err != nil { + log.Warnf(ctx, "Failed to list existing resources from metadata service, will use INITIAL_REGISTER for all: %v", err) + } else { + for _, r := range existing { + knownResources[r.ResourceKey] = true + } + } + + for resourceKey, entry := range plan.Plan { + var actionType metadataservice.OperationActionType + if knownResources[resourceKey] { + // Resource is already tracked; use the plan's action type. + actionType = planActionToOperationAction(entry.Action) + } else { + // First time tracking this resource in the service. + actionType = metadataservice.OperationActionTypeInitRegister + } + + if actionType == metadataservice.OperationActionTypeUnspecified { + continue + } + + _, err := svc.CreateOperation(ctx, deploymentID, versionID, resourceKey, &metadataservice.Operation{ + ResourceKey: resourceKey, + Status: metadataservice.OperationStatusSucceeded, + ActionType: actionType, + }) + if err != nil { + log.Warnf(ctx, "Failed to report operation for resource %s: %v", resourceKey, err) + } + } +} + +func planActionToOperationAction(action deployplan.ActionType) metadataservice.OperationActionType { + switch action { + case deployplan.Create: + return metadataservice.OperationActionTypeCreate + case deployplan.Update: + return metadataservice.OperationActionTypeUpdate + case deployplan.Delete: + return metadataservice.OperationActionTypeDelete + case deployplan.Recreate: + return metadataservice.OperationActionTypeRecreate + default: + return metadataservice.OperationActionTypeUnspecified + } +} + +// isAlreadyExists checks if an error indicates the resource already exists (HTTP 409). +func isAlreadyExists(err error) bool { + var apiErr *apierr.APIError + if errors.As(err, &apiErr) && apiErr.StatusCode == http.StatusConflict { + return true + } + return false +} diff --git a/bundle/phases/destroy.go b/bundle/phases/destroy.go index e6be00b579..374b533ca7 100644 --- a/bundle/phases/destroy.go +++ b/bundle/phases/destroy.go @@ -8,6 +8,7 @@ import ( "github.com/databricks/cli/bundle" "github.com/databricks/cli/bundle/config/engine" "github.com/databricks/cli/bundle/config/mutator" + "github.com/databricks/cli/bundle/env" "github.com/databricks/cli/bundle/deploy/files" "github.com/databricks/cli/bundle/deploy/lock" "github.com/databricks/cli/bundle/deploy/terraform" @@ -115,6 +116,11 @@ func destroyCore(ctx context.Context, b *bundle.Bundle, plan *deployplan.Plan, e // The destroy phase deletes artifacts and resources. func Destroy(ctx context.Context, b *bundle.Bundle, engine engine.EngineType) { + if v, _ := env.DeploymentService(ctx); v == "true" { + destroyWithMetadataService(ctx, b, engine) + return + } + log.Info(ctx, "Phase: destroy") ok, err := assertRootPathExists(ctx, b) diff --git a/bundle/phases/destroy_metadata.go b/bundle/phases/destroy_metadata.go new file mode 100644 index 0000000000..acb776d540 --- /dev/null +++ b/bundle/phases/destroy_metadata.go @@ -0,0 +1,169 @@ +package phases + +import ( + "context" + "errors" + "fmt" + "net/http" + + "github.com/databricks/cli/bundle" + "github.com/databricks/cli/bundle/config/engine" + "github.com/databricks/cli/bundle/config/mutator" + "github.com/databricks/cli/bundle/deploy" + "github.com/databricks/cli/bundle/deploy/files" + metadataservice "github.com/databricks/cli/bundle/deploy/metadata/service" + "github.com/databricks/cli/bundle/deploy/terraform" + "github.com/databricks/cli/bundle/deployplan" + "github.com/databricks/cli/bundle/direct" + "github.com/databricks/cli/internal/build" + "github.com/databricks/cli/libs/cmdio" + "github.com/databricks/cli/libs/log" + "github.com/databricks/cli/libs/logdiag" + "github.com/databricks/databricks-sdk-go/apierr" +) + +func destroyWithMetadataService(ctx context.Context, b *bundle.Bundle, targetEngine engine.EngineType) { + log.Info(ctx, "Phase: destroy (with metadata service)") + + ok, err := assertRootPathExists(ctx, b) + if err != nil { + logdiag.LogError(ctx, err) + return + } + if !ok { + cmdio.LogString(ctx, "No active deployment found to destroy!") + return + } + + // Create the metadata service client. + svc, err := metadataservice.NewClient(b.WorkspaceClient()) + if err != nil { + logdiag.LogError(ctx, fmt.Errorf("failed to create metadata service client: %w", err)) + return + } + + // Load local deployment state to get the deployment ID and sequence number. + state, err := deploy.LoadState(ctx, b) + if err != nil { + logdiag.LogError(ctx, fmt.Errorf("failed to load deployment state: %w", err)) + return + } + + deploymentID := state.ID.String() + + // Check that the deployment exists. + _, err = svc.GetDeployment(ctx, deploymentID) + if err != nil { + var apiErr *apierr.APIError + if errors.As(err, &apiErr) && apiErr.StatusCode == http.StatusNotFound { + log.Infof(ctx, "No deployment found in metadata service for %s, nothing to destroy", deploymentID) + cmdio.LogString(ctx, "No active deployment found to destroy!") + return + } + logdiag.LogError(ctx, fmt.Errorf("failed to get deployment: %w", err)) + return + } + + // Create a version to acquire the deployment lock. + versionID := fmt.Sprintf("%d", state.Seq+1) + _, err = svc.CreateVersion(ctx, deploymentID, versionID, &metadataservice.Version{ + CliVersion: build.GetInfo().Version, + VersionType: metadataservice.VersionTypeDestroy, + TargetName: b.Config.Bundle.Target, + }) + if err != nil { + logdiag.LogError(ctx, fmt.Errorf("failed to acquire deployment lock: %w", err)) + return + } + + log.Infof(ctx, "Acquired deployment lock for destroy: deployment=%s version=%s", deploymentID, versionID) + + // Start heartbeat to keep the lock alive. + stopHeartbeat := metadataservice.StartHeartbeat(ctx, svc, deploymentID, versionID, metadataservice.DefaultHeartbeatInterval) + + var destroyFailed bool + defer func() { + stopHeartbeat() + + reason := metadataservice.VersionCompleteSuccess + if destroyFailed || logdiag.HasError(ctx) { + reason = metadataservice.VersionCompleteFailure + } + + _, completeErr := svc.CompleteVersion(ctx, deploymentID, versionID, reason, false) + if completeErr != nil { + log.Warnf(ctx, "Failed to release deployment lock: %v", completeErr) + } else { + log.Infof(ctx, "Released deployment lock: deployment=%s version=%s reason=%d", deploymentID, versionID, reason) + } + }() + + // Calculate the destroy plan. + if !targetEngine.IsDirect() { + bundle.ApplySeqContext(ctx, b, + mutator.ResolveVariableReferencesWithoutResources("artifacts"), + mutator.ResolveVariableReferencesOnlyResources("artifacts"), + terraform.Interpolate(), + terraform.Write(), + terraform.Plan(terraform.PlanGoal("destroy")), + ) + } + + if logdiag.HasError(ctx) { + destroyFailed = true + return + } + + var plan *deployplan.Plan + if targetEngine.IsDirect() { + _, localPath := b.StateFilenameDirect(ctx) + plan, err = b.DeploymentBundle.CalculatePlan(ctx, b.WorkspaceClient(), nil, localPath) + if err != nil { + logdiag.LogError(ctx, err) + destroyFailed = true + return + } + } else { + tf := b.Terraform + if tf == nil { + logdiag.LogError(ctx, fmt.Errorf("terraform not initialized")) + destroyFailed = true + return + } + + plan, err = terraform.ShowPlanFile(ctx, tf, b.TerraformPlanPath) + if err != nil { + logdiag.LogError(ctx, err) + destroyFailed = true + return + } + } + + hasApproval, err := approvalForDestroy(ctx, b, plan) + if err != nil { + logdiag.LogError(ctx, err) + destroyFailed = true + return + } + + if hasApproval { + if targetEngine.IsDirect() { + b.DeploymentBundle.Apply(ctx, b.WorkspaceClient(), plan, direct.MigrateMode(false)) + } else { + bundle.ApplyContext(ctx, b, terraform.Apply()) + } + + if logdiag.HasError(ctx) { + destroyFailed = true + return + } + + bundle.ApplyContext(ctx, b, files.Delete()) + + if !logdiag.HasError(ctx) { + cmdio.LogString(ctx, "Destroy complete!") + } + } else { + cmdio.LogString(ctx, "Destroy cancelled!") + } +} From 9d055f4f1804260918ed553e6e5262933c6c13b2 Mon Sep 17 00:00:00 2001 From: Shreyas Goenka Date: Mon, 30 Mar 2026 17:41:35 +0000 Subject: [PATCH 2/3] Fix correctness bugs and improve code quality from self-review - Use background context with timeout for CompleteVersion in defer blocks, so the lock is released even if the parent context is cancelled (e.g. Ctrl+C) - Add nil state.ID guard in destroy to avoid querying with zero UUID - Fix misleading --force-lock error message to explain lock expiry behavior - Fix import ordering Co-authored-by: Isaac --- bundle/deploy/metadata/service/client.go | 6 +++--- bundle/phases/deploy_metadata.go | 8 +++++++- bundle/phases/destroy_metadata.go | 13 ++++++++++++- 3 files changed, 22 insertions(+), 5 deletions(-) diff --git a/bundle/deploy/metadata/service/client.go b/bundle/deploy/metadata/service/client.go index ffe2fb36fc..df25bb39ce 100644 --- a/bundle/deploy/metadata/service/client.go +++ b/bundle/deploy/metadata/service/client.go @@ -2,11 +2,10 @@ package service import ( "context" + "errors" "fmt" "net/http" - "errors" - "github.com/databricks/databricks-sdk-go" "github.com/databricks/databricks-sdk-go/apierr" "github.com/databricks/databricks-sdk-go/client" @@ -172,7 +171,8 @@ func mapError(operation string, err error) error { switch apiErr.StatusCode { case http.StatusConflict: return fmt.Errorf("%s: deployment is locked by another active deployment. "+ - "Use --force-lock to override", operation) + "If the prior deployment failed, the lock will expire automatically after 5 minutes. "+ + "You can also force-acquire the lock by running deploy with the --force-lock flag", operation) case http.StatusNotFound: return fmt.Errorf("%s: resource not found: %w", operation, err) case http.StatusBadRequest: diff --git a/bundle/phases/deploy_metadata.go b/bundle/phases/deploy_metadata.go index bbe1197b5b..e9f41a56cf 100644 --- a/bundle/phases/deploy_metadata.go +++ b/bundle/phases/deploy_metadata.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "net/http" + "time" "github.com/databricks/cli/bundle" "github.com/databricks/cli/bundle/artifacts" @@ -95,7 +96,12 @@ func deployWithMetadataService(ctx context.Context, b *bundle.Bundle, outputHand reason = metadataservice.VersionCompleteFailure } - _, completeErr := svc.CompleteVersion(ctx, deploymentID, versionID, reason, false) + // Use a separate context for cleanup so the lock is released even if the + // parent context was cancelled (e.g. user hit Ctrl+C). + cleanupCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + _, completeErr := svc.CompleteVersion(cleanupCtx, deploymentID, versionID, reason, false) if completeErr != nil { log.Warnf(ctx, "Failed to release deployment lock: %v", completeErr) } else { diff --git a/bundle/phases/destroy_metadata.go b/bundle/phases/destroy_metadata.go index acb776d540..d7992e72cc 100644 --- a/bundle/phases/destroy_metadata.go +++ b/bundle/phases/destroy_metadata.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "net/http" + "time" "github.com/databricks/cli/bundle" "github.com/databricks/cli/bundle/config/engine" @@ -20,6 +21,7 @@ import ( "github.com/databricks/cli/libs/log" "github.com/databricks/cli/libs/logdiag" "github.com/databricks/databricks-sdk-go/apierr" + "github.com/google/uuid" ) func destroyWithMetadataService(ctx context.Context, b *bundle.Bundle, targetEngine engine.EngineType) { @@ -49,6 +51,10 @@ func destroyWithMetadataService(ctx context.Context, b *bundle.Bundle, targetEng return } + if state.ID == uuid.Nil { + cmdio.LogString(ctx, "No active deployment found to destroy!") + return + } deploymentID := state.ID.String() // Check that the deployment exists. @@ -90,7 +96,12 @@ func destroyWithMetadataService(ctx context.Context, b *bundle.Bundle, targetEng reason = metadataservice.VersionCompleteFailure } - _, completeErr := svc.CompleteVersion(ctx, deploymentID, versionID, reason, false) + // Use a separate context for cleanup so the lock is released even if the + // parent context was cancelled (e.g. user hit Ctrl+C). + cleanupCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + _, completeErr := svc.CompleteVersion(cleanupCtx, deploymentID, versionID, reason, false) if completeErr != nil { log.Warnf(ctx, "Failed to release deployment lock: %v", completeErr) } else { From 342fef82db47a5521bfd570a8bc5731190cdb805 Mon Sep 17 00:00:00 2001 From: Shreyas Goenka Date: Mon, 30 Mar 2026 19:57:23 +0000 Subject: [PATCH 3/3] Refactor to SDK-style tempdms package and unify deploy/destroy flows Move the deployment metadata service client from bundle/deploy/metadata/service to libs/tempdms with SDK-style method signatures (single request struct param). When the protos land in the Go SDK, migration is just an import path change. Unify deploy and destroy flows: instead of separate *WithMetadataService functions that duplicated all mutator calls, the core logic stays in Deploy() and Destroy() with conditional lock management based on the env var. Co-authored-by: Isaac --- bundle/deploy/metadata/service/client.go | 183 -------- bundle/phases/deploy.go | 102 +++-- bundle/phases/deploy_metadata.go | 234 +++------- bundle/phases/destroy.go | 70 ++- bundle/phases/destroy_metadata.go | 181 +------- .../metadata/service => phases}/heartbeat.go | 14 +- libs/tempdms/api.go | 164 +++++++ .../service => libs/tempdms}/types.go | 146 ++++--- libs/testserver/deployment_metadata.go | 400 ++++++++++++++++++ libs/testserver/fake_workspace.go | 3 + libs/testserver/handlers.go | 38 ++ 11 files changed, 893 insertions(+), 642 deletions(-) delete mode 100644 bundle/deploy/metadata/service/client.go rename bundle/{deploy/metadata/service => phases}/heartbeat.go (56%) create mode 100644 libs/tempdms/api.go rename {bundle/deploy/metadata/service => libs/tempdms}/types.go (58%) create mode 100644 libs/testserver/deployment_metadata.go diff --git a/bundle/deploy/metadata/service/client.go b/bundle/deploy/metadata/service/client.go deleted file mode 100644 index df25bb39ce..0000000000 --- a/bundle/deploy/metadata/service/client.go +++ /dev/null @@ -1,183 +0,0 @@ -package service - -import ( - "context" - "errors" - "fmt" - "net/http" - - "github.com/databricks/databricks-sdk-go" - "github.com/databricks/databricks-sdk-go/apierr" - "github.com/databricks/databricks-sdk-go/client" -) - -const basePath = "/api/2.0/bundle" - -// Client wraps the Databricks API client for the deployment metadata service. -type Client struct { - api *client.DatabricksClient -} - -// NewClient creates a new deployment metadata service client from a workspace client. -func NewClient(w *databricks.WorkspaceClient) (*Client, error) { - apiClient, err := client.New(w.Config) - if err != nil { - return nil, fmt.Errorf("failed to create deployment metadata API client: %w", err) - } - return &Client{api: apiClient}, nil -} - -// CreateDeployment creates a new deployment. -func (c *Client) CreateDeployment(ctx context.Context, deploymentID string, deployment *Deployment) (*Deployment, error) { - resp := &Deployment{} - path := fmt.Sprintf("%s/deployments", basePath) - err := c.api.Do(ctx, http.MethodPost, path, nil, nil, CreateDeploymentRequest{ - DeploymentID: deploymentID, - Deployment: deployment, - }, resp) - if err != nil { - return nil, mapError("create deployment", err) - } - return resp, nil -} - -// GetDeployment retrieves a deployment by ID. -func (c *Client) GetDeployment(ctx context.Context, deploymentID string) (*Deployment, error) { - resp := &Deployment{} - path := fmt.Sprintf("%s/deployments/%s", basePath, deploymentID) - err := c.api.Do(ctx, http.MethodGet, path, nil, nil, nil, resp) - if err != nil { - return nil, mapError("get deployment", err) - } - return resp, nil -} - -// DeleteDeployment soft-deletes a deployment. -func (c *Client) DeleteDeployment(ctx context.Context, deploymentID string) error { - path := fmt.Sprintf("%s/deployments/%s", basePath, deploymentID) - err := c.api.Do(ctx, http.MethodDelete, path, nil, nil, nil, nil) - if err != nil { - return mapError("delete deployment", err) - } - return nil -} - -// CreateVersion creates a new version (acquires the deployment lock). -func (c *Client) CreateVersion(ctx context.Context, deploymentID string, versionID string, version *Version) (*Version, error) { - resp := &Version{} - path := fmt.Sprintf("%s/deployments/%s/versions", basePath, deploymentID) - err := c.api.Do(ctx, http.MethodPost, path, nil, nil, CreateVersionRequest{ - Parent: fmt.Sprintf("deployments/%s", deploymentID), - Version: version, - VersionID: versionID, - }, resp) - if err != nil { - return nil, mapError("create version", err) - } - return resp, nil -} - -// GetVersion retrieves a version. -func (c *Client) GetVersion(ctx context.Context, deploymentID, versionID string) (*Version, error) { - resp := &Version{} - path := fmt.Sprintf("%s/deployments/%s/versions/%s", basePath, deploymentID, versionID) - err := c.api.Do(ctx, http.MethodGet, path, nil, nil, nil, resp) - if err != nil { - return nil, mapError("get version", err) - } - return resp, nil -} - -// Heartbeat renews the lock lease for an in-progress version. -func (c *Client) Heartbeat(ctx context.Context, deploymentID, versionID string) (*HeartbeatResponse, error) { - resp := &HeartbeatResponse{} - path := fmt.Sprintf("%s/deployments/%s/versions/%s/heartbeat", basePath, deploymentID, versionID) - err := c.api.Do(ctx, http.MethodPost, path, nil, nil, struct{}{}, resp) - if err != nil { - return nil, mapError("heartbeat", err) - } - return resp, nil -} - -// CompleteVersion marks a version as completed (releases the deployment lock). -func (c *Client) CompleteVersion(ctx context.Context, deploymentID, versionID string, reason VersionComplete, force bool) (*Version, error) { - resp := &Version{} - path := fmt.Sprintf("%s/deployments/%s/versions/%s/complete", basePath, deploymentID, versionID) - err := c.api.Do(ctx, http.MethodPost, path, nil, nil, CompleteVersionRequest{ - Name: fmt.Sprintf("deployments/%s/versions/%s", deploymentID, versionID), - CompletionReason: reason, - Force: force, - }, resp) - if err != nil { - return nil, mapError("complete version", err) - } - return resp, nil -} - -// CreateOperation records a resource operation for a version. -func (c *Client) CreateOperation(ctx context.Context, deploymentID, versionID, resourceKey string, operation *Operation) (*Operation, error) { - resp := &Operation{} - path := fmt.Sprintf("%s/deployments/%s/versions/%s/operations", basePath, deploymentID, versionID) - err := c.api.Do(ctx, http.MethodPost, path, nil, nil, CreateOperationRequest{ - Parent: fmt.Sprintf("deployments/%s/versions/%s", deploymentID, versionID), - ResourceKey: resourceKey, - Operation: operation, - }, resp) - if err != nil { - return nil, mapError("create operation", err) - } - return resp, nil -} - -// ListResources lists all resources for a deployment. -func (c *Client) ListResources(ctx context.Context, deploymentID string) ([]Resource, error) { - var allResources []Resource - pageToken := "" - - for { - resp := &ListResourcesResponse{} - path := fmt.Sprintf("%s/deployments/%s/resources", basePath, deploymentID) - - q := map[string]any{ - "parent": fmt.Sprintf("deployments/%s", deploymentID), - "page_size": 1000, - } - if pageToken != "" { - q["page_token"] = pageToken - } - - err := c.api.Do(ctx, http.MethodGet, path, nil, q, nil, resp) - if err != nil { - return nil, mapError("list resources", err) - } - - allResources = append(allResources, resp.Resources...) - if resp.NextPageToken == "" { - break - } - pageToken = resp.NextPageToken - } - - return allResources, nil -} - -// mapError translates API errors into user-friendly messages. -func mapError(operation string, err error) error { - var apiErr *apierr.APIError - if !errors.As(err, &apiErr) { - return fmt.Errorf("%s: %w", operation, err) - } - - switch apiErr.StatusCode { - case http.StatusConflict: - return fmt.Errorf("%s: deployment is locked by another active deployment. "+ - "If the prior deployment failed, the lock will expire automatically after 5 minutes. "+ - "You can also force-acquire the lock by running deploy with the --force-lock flag", operation) - case http.StatusNotFound: - return fmt.Errorf("%s: resource not found: %w", operation, err) - case http.StatusBadRequest: - return fmt.Errorf("%s: bad request: %s", operation, apiErr.Message) - default: - return fmt.Errorf("%s: %w", operation, err) - } -} diff --git a/bundle/phases/deploy.go b/bundle/phases/deploy.go index 7a1fa6e778..5ca8745f06 100644 --- a/bundle/phases/deploy.go +++ b/bundle/phases/deploy.go @@ -3,19 +3,20 @@ package phases import ( "context" "errors" + "fmt" "github.com/databricks/cli/bundle" "github.com/databricks/cli/bundle/artifacts" "github.com/databricks/cli/bundle/config" "github.com/databricks/cli/bundle/config/engine" "github.com/databricks/cli/bundle/deploy" - "github.com/databricks/cli/bundle/env" "github.com/databricks/cli/bundle/deploy/files" "github.com/databricks/cli/bundle/deploy/lock" "github.com/databricks/cli/bundle/deploy/metadata" "github.com/databricks/cli/bundle/deploy/terraform" "github.com/databricks/cli/bundle/deployplan" "github.com/databricks/cli/bundle/direct" + "github.com/databricks/cli/bundle/env" "github.com/databricks/cli/bundle/libraries" "github.com/databricks/cli/bundle/metrics" "github.com/databricks/cli/bundle/permissions" @@ -25,6 +26,7 @@ import ( "github.com/databricks/cli/libs/log" "github.com/databricks/cli/libs/logdiag" "github.com/databricks/cli/libs/sync" + "github.com/databricks/cli/libs/tempdms" ) func approvalForDeploy(ctx context.Context, b *bundle.Bundle, plan *deployplan.Plan) (bool, error) { @@ -98,9 +100,11 @@ func approvalForDeploy(ctx context.Context, b *bundle.Bundle, plan *deployplan.P return approved, nil } -func deployCore(ctx context.Context, b *bundle.Bundle, plan *deployplan.Plan, targetEngine engine.EngineType) { - // Core mutators that CRUD resources and modify deployment state. These - // mutators need informed consent if they are potentially destructive. +// postApplyHook is called after the deployment plan is applied (terraform/direct Apply). +// It can be used for additional state reporting (e.g. to the metadata service). +type postApplyHook func(ctx context.Context, b *bundle.Bundle, plan *deployplan.Plan) + +func deployCore(ctx context.Context, b *bundle.Bundle, plan *deployplan.Plan, targetEngine engine.EngineType, hook postApplyHook) { cmdio.LogString(ctx, "Deploying resources...") if targetEngine.IsDirect() { @@ -109,8 +113,14 @@ func deployCore(ctx context.Context, b *bundle.Bundle, plan *deployplan.Plan, ta bundle.ApplyContext(ctx, b, terraform.Apply()) } - // Even if deployment failed, there might be updates in states that we need to upload + // Even if deployment failed, there might be updates in states that we need to upload. statemgmt.PushResourcesState(ctx, b, targetEngine) + + // Run any additional post-apply logic (e.g. metadata service operation reporting). + if hook != nil { + hook(ctx, b, plan) + } + if logdiag.HasError(ctx) { return } @@ -139,33 +149,55 @@ func uploadLibraries(ctx context.Context, b *bundle.Bundle, libs map[string][]li // The deploy phase deploys artifacts and resources. // If readPlanPath is provided, the plan is loaded from that file instead of being calculated. -func Deploy(ctx context.Context, b *bundle.Bundle, outputHandler sync.OutputHandler, engine engine.EngineType, libs map[string][]libraries.LocationToUpdate, plan *deployplan.Plan) { - if v, _ := env.DeploymentService(ctx); v == "true" { - deployWithMetadataService(ctx, b, outputHandler, engine, libs, plan) - return - } +func Deploy(ctx context.Context, b *bundle.Bundle, outputHandler sync.OutputHandler, targetEngine engine.EngineType, libs map[string][]libraries.LocationToUpdate, plan *deployplan.Plan) { + useMetadataService, _ := env.DeploymentService(ctx) - log.Info(ctx, "Phase: deploy") - - // Core mutators that CRUD resources and modify deployment state. These - // mutators need informed consent if they are potentially destructive. - bundle.ApplySeqContext(ctx, b, - scripts.Execute(config.ScriptPreDeploy), - lock.Acquire(), - ) + if useMetadataService == "true" { + log.Info(ctx, "Phase: deploy (with metadata service)") + } else { + log.Info(ctx, "Phase: deploy") + } + bundle.ApplyContext(ctx, b, scripts.Execute(config.ScriptPreDeploy)) if logdiag.HasError(ctx) { - // lock is not acquired here return } - // lock is acquired here - defer func() { - bundle.ApplyContext(ctx, b, lock.Release(lock.GoalDeploy)) - }() + // Acquire the deployment lock. + var svc *tempdms.DeploymentMetadataAPI + var deploymentID, versionID string + var failed bool + + if useMetadataService == "true" { + var err error + svc, err = tempdms.NewDeploymentMetadataAPI(b.WorkspaceClient()) + if err != nil { + logdiag.LogError(ctx, fmt.Errorf("failed to create metadata service client: %w", err)) + return + } + + var cleanup func(failed bool) + deploymentID, versionID, cleanup, err = deployMetadataLock(ctx, b, svc, tempdms.VersionTypeDeploy) + if err != nil { + logdiag.LogError(ctx, err) + return + } + defer func() { + cleanup(failed || logdiag.HasError(ctx)) + }() + } else { + bundle.ApplyContext(ctx, b, lock.Acquire()) + if logdiag.HasError(ctx) { + return + } + defer func() { + bundle.ApplyContext(ctx, b, lock.Release(lock.GoalDeploy)) + }() + } uploadLibraries(ctx, b, libs) if logdiag.HasError(ctx) { + failed = true return } @@ -177,40 +209,50 @@ func Deploy(ctx context.Context, b *bundle.Bundle, outputHandler sync.OutputHand metrics.TrackUsedCompute(), deploy.ResourcePathMkdir(), ) - if logdiag.HasError(ctx) { + failed = true return } if plan != nil { - // Initialize DeploymentBundle for applying the loaded plan + // Initialize DeploymentBundle for applying the loaded plan. _, localPath := b.StateFilenameDirect(ctx) err := b.DeploymentBundle.InitForApply(ctx, b.WorkspaceClient(), localPath, plan) if err != nil { logdiag.LogError(ctx, err) + failed = true return } } else { - plan = RunPlan(ctx, b, engine) + plan = RunPlan(ctx, b, targetEngine) } - if logdiag.HasError(ctx) { + failed = true return } haveApproval, err := approvalForDeploy(ctx, b, plan) if err != nil { logdiag.LogError(ctx, err) + failed = true return } - if haveApproval { - deployCore(ctx, b, plan, engine) - } else { + if !haveApproval { cmdio.LogString(ctx, "Deployment cancelled!") return } + // Build the post-apply hook for metadata service reporting (nil for file-based). + var hook postApplyHook + if useMetadataService == "true" { + hook = func(ctx context.Context, b *bundle.Bundle, plan *deployplan.Plan) { + reportOperations(ctx, svc, deploymentID, versionID, plan) + } + } + + deployCore(ctx, b, plan, targetEngine, hook) if logdiag.HasError(ctx) { + failed = true return } diff --git a/bundle/phases/deploy_metadata.go b/bundle/phases/deploy_metadata.go index e9f41a56cf..40d1d7d620 100644 --- a/bundle/phases/deploy_metadata.go +++ b/bundle/phases/deploy_metadata.go @@ -8,92 +8,72 @@ import ( "time" "github.com/databricks/cli/bundle" - "github.com/databricks/cli/bundle/artifacts" - "github.com/databricks/cli/bundle/config" - "github.com/databricks/cli/bundle/config/engine" "github.com/databricks/cli/bundle/deploy" - "github.com/databricks/cli/bundle/deploy/files" - "github.com/databricks/cli/bundle/deploy/metadata" - metadataservice "github.com/databricks/cli/bundle/deploy/metadata/service" - "github.com/databricks/cli/bundle/deploy/terraform" "github.com/databricks/cli/bundle/deployplan" - "github.com/databricks/cli/bundle/direct" - "github.com/databricks/cli/bundle/libraries" - "github.com/databricks/cli/bundle/metrics" - "github.com/databricks/cli/bundle/permissions" - "github.com/databricks/cli/bundle/scripts" - "github.com/databricks/cli/bundle/statemgmt" "github.com/databricks/cli/internal/build" - "github.com/databricks/cli/libs/cmdio" "github.com/databricks/cli/libs/log" "github.com/databricks/cli/libs/logdiag" - "github.com/databricks/cli/libs/sync" + "github.com/databricks/cli/libs/tempdms" "github.com/databricks/databricks-sdk-go/apierr" "github.com/google/uuid" ) -func deployWithMetadataService(ctx context.Context, b *bundle.Bundle, outputHandler sync.OutputHandler, targetEngine engine.EngineType, libs map[string][]libraries.LocationToUpdate, plan *deployplan.Plan) { - log.Info(ctx, "Phase: deploy (with metadata service)") - - bundle.ApplyContext(ctx, b, scripts.Execute(config.ScriptPreDeploy)) - if logdiag.HasError(ctx) { - return - } - - // Create the metadata service client. - svc, err := metadataservice.NewClient(b.WorkspaceClient()) - if err != nil { - logdiag.LogError(ctx, fmt.Errorf("failed to create metadata service client: %w", err)) - return - } - +// deployMetadataLock implements the lock acquire/release lifecycle using the +// deployment metadata service (CreateVersion / CompleteVersion). +// +// It returns a cleanup function that must be deferred by the caller to release +// the lock and stop the heartbeat, as well as any error from acquiring the lock. +func deployMetadataLock(ctx context.Context, b *bundle.Bundle, svc *tempdms.DeploymentMetadataAPI, versionType tempdms.VersionType) (deploymentID, versionID string, cleanup func(failed bool), err error) { // Load local deployment state to get the deployment ID and sequence number. - state, err := deploy.LoadState(ctx, b) - if err != nil { - logdiag.LogError(ctx, fmt.Errorf("failed to load deployment state: %w", err)) - return + state, loadErr := deploy.LoadState(ctx, b) + if loadErr != nil { + return "", "", nil, fmt.Errorf("failed to load deployment state: %w", loadErr) } // Generate a deployment ID if one doesn't exist yet. if state.ID == uuid.Nil { state.ID = uuid.New() } - deploymentID := state.ID.String() + deploymentID = state.ID.String() // Ensure the deployment exists in the metadata service. - _, err = svc.CreateDeployment(ctx, deploymentID, &metadataservice.Deployment{ - TargetName: b.Config.Bundle.Target, + _, createErr := svc.CreateDeployment(ctx, tempdms.CreateDeploymentRequest{ + DeploymentID: deploymentID, + Deployment: &tempdms.Deployment{ + TargetName: b.Config.Bundle.Target, + }, }) - if err != nil && !isAlreadyExists(err) { - logdiag.LogError(ctx, fmt.Errorf("failed to create deployment: %w", err)) - return + if createErr != nil && !isAlreadyExists(createErr) { + return "", "", nil, fmt.Errorf("failed to create deployment: %w", createErr) } // Create a version to acquire the deployment lock. - versionID := fmt.Sprintf("%d", state.Seq+1) - version, err := svc.CreateVersion(ctx, deploymentID, versionID, &metadataservice.Version{ - CliVersion: build.GetInfo().Version, - VersionType: metadataservice.VersionTypeDeploy, - TargetName: b.Config.Bundle.Target, + versionID = fmt.Sprintf("%d", state.Seq+1) + version, versionErr := svc.CreateVersion(ctx, tempdms.CreateVersionRequest{ + DeploymentID: deploymentID, + Parent: fmt.Sprintf("deployments/%s", deploymentID), + VersionID: versionID, + Version: &tempdms.Version{ + CliVersion: build.GetInfo().Version, + VersionType: versionType, + TargetName: b.Config.Bundle.Target, + }, }) - if err != nil { - logdiag.LogError(ctx, fmt.Errorf("failed to acquire deployment lock: %w", err)) - return + if versionErr != nil { + return "", "", nil, fmt.Errorf("failed to acquire deployment lock: %w", versionErr) } log.Infof(ctx, "Acquired deployment lock: deployment=%s version=%s", deploymentID, version.VersionID) // Start heartbeat to keep the lock alive. - stopHeartbeat := metadataservice.StartHeartbeat(ctx, svc, deploymentID, versionID, metadataservice.DefaultHeartbeatInterval) + stopHeartbeat := startHeartbeat(ctx, svc, deploymentID, versionID, defaultHeartbeatInterval) - // Ensure we always complete the version (release the lock) and stop heartbeat. - var deployFailed bool - defer func() { + cleanup = func(failed bool) { stopHeartbeat() - reason := metadataservice.VersionCompleteSuccess - if deployFailed || logdiag.HasError(ctx) { - reason = metadataservice.VersionCompleteFailure + reason := tempdms.VersionCompleteSuccess + if failed { + reason = tempdms.VersionCompleteFailure } // Use a separate context for cleanup so the lock is released even if the @@ -101,114 +81,25 @@ func deployWithMetadataService(ctx context.Context, b *bundle.Bundle, outputHand cleanupCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() - _, completeErr := svc.CompleteVersion(cleanupCtx, deploymentID, versionID, reason, false) + _, completeErr := svc.CompleteVersion(cleanupCtx, tempdms.CompleteVersionRequest{ + DeploymentID: deploymentID, + VersionID: versionID, + Name: fmt.Sprintf("deployments/%s/versions/%s", deploymentID, versionID), + CompletionReason: reason, + }) if completeErr != nil { log.Warnf(ctx, "Failed to release deployment lock: %v", completeErr) } else { log.Infof(ctx, "Released deployment lock: deployment=%s version=%s reason=%d", deploymentID, versionID, reason) } - }() - - // Upload libraries. - bundle.ApplySeqContext(ctx, b, - artifacts.CleanUp(), - libraries.Upload(libs), - ) - if logdiag.HasError(ctx) { - deployFailed = true - return - } - - // Upload files, update state, apply permissions. - bundle.ApplySeqContext(ctx, b, - files.Upload(outputHandler), - deploy.StateUpdate(), - deploy.StatePush(), - permissions.ApplyWorkspaceRootPermissions(), - metrics.TrackUsedCompute(), - deploy.ResourcePathMkdir(), - ) - if logdiag.HasError(ctx) { - deployFailed = true - return } - // Calculate or load the deploy plan. - if plan != nil { - _, localPath := b.StateFilenameDirect(ctx) - err := b.DeploymentBundle.InitForApply(ctx, b.WorkspaceClient(), localPath, plan) - if err != nil { - logdiag.LogError(ctx, err) - deployFailed = true - return - } - } else { - plan = RunPlan(ctx, b, targetEngine) - } - if logdiag.HasError(ctx) { - deployFailed = true - return - } - - // Seek approval for potentially destructive changes. - haveApproval, err := approvalForDeploy(ctx, b, plan) - if err != nil { - logdiag.LogError(ctx, err) - deployFailed = true - return - } - if !haveApproval { - cmdio.LogString(ctx, "Deployment cancelled!") - return - } - - // Apply the deployment. - deployCoreWithMetadata(ctx, b, plan, targetEngine, svc, deploymentID, versionID) - if logdiag.HasError(ctx) { - deployFailed = true - return - } - - logDeployTelemetry(ctx, b) - bundle.ApplyContext(ctx, b, scripts.Execute(config.ScriptPostDeploy)) -} - -// deployCoreWithMetadata applies the deployment plan and reports operations to -// the metadata service. -func deployCoreWithMetadata(ctx context.Context, b *bundle.Bundle, plan *deployplan.Plan, targetEngine engine.EngineType, svc *metadataservice.Client, deploymentID, versionID string) { - cmdio.LogString(ctx, "Deploying resources...") - - if targetEngine.IsDirect() { - b.DeploymentBundle.Apply(ctx, b.WorkspaceClient(), plan, direct.MigrateMode(false)) - } else { - bundle.ApplyContext(ctx, b, terraform.Apply()) - } - - // Push resource state even on failure. - statemgmt.PushResourcesState(ctx, b, targetEngine) - - // Report operations to the metadata service (best-effort). - reportOperations(ctx, b, svc, deploymentID, versionID, plan) - - if logdiag.HasError(ctx) { - return - } - - bundle.ApplySeqContext(ctx, b, - statemgmt.Load(targetEngine), - metadata.Compute(), - metadata.Upload(), - statemgmt.UploadStateForYamlSync(targetEngine), - ) - - if !logdiag.HasError(ctx) { - cmdio.LogString(ctx, "Deployment complete!") - } + return deploymentID, versionID, cleanup, nil } // reportOperations reports each resource operation to the metadata service. // This is best-effort: failures are logged as warnings, not fatal errors. -func reportOperations(ctx context.Context, b *bundle.Bundle, svc *metadataservice.Client, deploymentID, versionID string, plan *deployplan.Plan) { +func reportOperations(ctx context.Context, svc *tempdms.DeploymentMetadataAPI, deploymentID, versionID string, plan *deployplan.Plan) { if plan == nil { return } @@ -216,7 +107,10 @@ func reportOperations(ctx context.Context, b *bundle.Bundle, svc *metadataservic // Fetch existing resources to determine if this is the first time we're // tracking each resource in the metadata service. knownResources := map[string]bool{} - existing, err := svc.ListResources(ctx, deploymentID) + existing, err := svc.ListResources(ctx, tempdms.ListResourcesRequest{ + DeploymentID: deploymentID, + Parent: fmt.Sprintf("deployments/%s", deploymentID), + }) if err != nil { log.Warnf(ctx, "Failed to list existing resources from metadata service, will use INITIAL_REGISTER for all: %v", err) } else { @@ -226,23 +120,27 @@ func reportOperations(ctx context.Context, b *bundle.Bundle, svc *metadataservic } for resourceKey, entry := range plan.Plan { - var actionType metadataservice.OperationActionType + var actionType tempdms.OperationActionType if knownResources[resourceKey] { - // Resource is already tracked; use the plan's action type. actionType = planActionToOperationAction(entry.Action) } else { - // First time tracking this resource in the service. - actionType = metadataservice.OperationActionTypeInitRegister + actionType = tempdms.OperationActionTypeInitRegister } - if actionType == metadataservice.OperationActionTypeUnspecified { + if actionType == tempdms.OperationActionTypeUnspecified { continue } - _, err := svc.CreateOperation(ctx, deploymentID, versionID, resourceKey, &metadataservice.Operation{ - ResourceKey: resourceKey, - Status: metadataservice.OperationStatusSucceeded, - ActionType: actionType, + _, err := svc.CreateOperation(ctx, tempdms.CreateOperationRequest{ + DeploymentID: deploymentID, + VersionID: versionID, + Parent: fmt.Sprintf("deployments/%s/versions/%s", deploymentID, versionID), + ResourceKey: resourceKey, + Operation: &tempdms.Operation{ + ResourceKey: resourceKey, + Status: tempdms.OperationStatusSucceeded, + ActionType: actionType, + }, }) if err != nil { log.Warnf(ctx, "Failed to report operation for resource %s: %v", resourceKey, err) @@ -250,18 +148,18 @@ func reportOperations(ctx context.Context, b *bundle.Bundle, svc *metadataservic } } -func planActionToOperationAction(action deployplan.ActionType) metadataservice.OperationActionType { +func planActionToOperationAction(action deployplan.ActionType) tempdms.OperationActionType { switch action { case deployplan.Create: - return metadataservice.OperationActionTypeCreate + return tempdms.OperationActionTypeCreate case deployplan.Update: - return metadataservice.OperationActionTypeUpdate + return tempdms.OperationActionTypeUpdate case deployplan.Delete: - return metadataservice.OperationActionTypeDelete + return tempdms.OperationActionTypeDelete case deployplan.Recreate: - return metadataservice.OperationActionTypeRecreate + return tempdms.OperationActionTypeRecreate default: - return metadataservice.OperationActionTypeUnspecified + return tempdms.OperationActionTypeUnspecified } } diff --git a/bundle/phases/destroy.go b/bundle/phases/destroy.go index 374b533ca7..81e52a3445 100644 --- a/bundle/phases/destroy.go +++ b/bundle/phases/destroy.go @@ -3,20 +3,22 @@ package phases import ( "context" "errors" + "fmt" "net/http" "github.com/databricks/cli/bundle" "github.com/databricks/cli/bundle/config/engine" "github.com/databricks/cli/bundle/config/mutator" - "github.com/databricks/cli/bundle/env" "github.com/databricks/cli/bundle/deploy/files" "github.com/databricks/cli/bundle/deploy/lock" "github.com/databricks/cli/bundle/deploy/terraform" "github.com/databricks/cli/bundle/deployplan" "github.com/databricks/cli/bundle/direct" + "github.com/databricks/cli/bundle/env" "github.com/databricks/cli/libs/cmdio" "github.com/databricks/cli/libs/log" "github.com/databricks/cli/libs/logdiag" + "github.com/databricks/cli/libs/tempdms" "github.com/databricks/databricks-sdk-go/apierr" ) @@ -95,11 +97,10 @@ func approvalForDestroy(ctx context.Context, b *bundle.Bundle, plan *deployplan. return approved, nil } -func destroyCore(ctx context.Context, b *bundle.Bundle, plan *deployplan.Plan, engine engine.EngineType) { - if engine.IsDirect() { +func destroyCore(ctx context.Context, b *bundle.Bundle, plan *deployplan.Plan, targetEngine engine.EngineType) { + if targetEngine.IsDirect() { b.DeploymentBundle.Apply(ctx, b.WorkspaceClient(), plan, direct.MigrateMode(false)) } else { - // Core destructive mutators for destroy. These require informed user consent. bundle.ApplyContext(ctx, b, terraform.Apply()) } @@ -115,35 +116,54 @@ func destroyCore(ctx context.Context, b *bundle.Bundle, plan *deployplan.Plan, e } // The destroy phase deletes artifacts and resources. -func Destroy(ctx context.Context, b *bundle.Bundle, engine engine.EngineType) { - if v, _ := env.DeploymentService(ctx); v == "true" { - destroyWithMetadataService(ctx, b, engine) - return - } +func Destroy(ctx context.Context, b *bundle.Bundle, targetEngine engine.EngineType) { + useMetadataService, _ := env.DeploymentService(ctx) - log.Info(ctx, "Phase: destroy") + if useMetadataService == "true" { + log.Info(ctx, "Phase: destroy (with metadata service)") + } else { + log.Info(ctx, "Phase: destroy") + } ok, err := assertRootPathExists(ctx, b) if err != nil { logdiag.LogError(ctx, err) return } - if !ok { cmdio.LogString(ctx, "No active deployment found to destroy!") return } - bundle.ApplyContext(ctx, b, lock.Acquire()) - if logdiag.HasError(ctx) { - return - } + // Acquire the deployment lock. + var failed bool - defer func() { - bundle.ApplyContext(ctx, b, lock.Release(lock.GoalDestroy)) - }() + if useMetadataService == "true" { + svc, svcErr := tempdms.NewDeploymentMetadataAPI(b.WorkspaceClient()) + if svcErr != nil { + logdiag.LogError(ctx, fmt.Errorf("failed to create metadata service client: %w", svcErr)) + return + } + + _, _, cleanup, lockErr := deployMetadataLock(ctx, b, svc, tempdms.VersionTypeDestroy) + if lockErr != nil { + logdiag.LogError(ctx, lockErr) + return + } + defer func() { + cleanup(failed || logdiag.HasError(ctx)) + }() + } else { + bundle.ApplyContext(ctx, b, lock.Acquire()) + if logdiag.HasError(ctx) { + return + } + defer func() { + bundle.ApplyContext(ctx, b, lock.Release(lock.GoalDestroy)) + }() + } - if !engine.IsDirect() { + if !targetEngine.IsDirect() { bundle.ApplySeqContext(ctx, b, // We need to resolve artifact variable (how we do it in build phase) // because some of the to-be-destroyed resource might use this variable. @@ -158,27 +178,31 @@ func Destroy(ctx context.Context, b *bundle.Bundle, engine engine.EngineType) { } if logdiag.HasError(ctx) { + failed = true return } var plan *deployplan.Plan - if engine.IsDirect() { + if targetEngine.IsDirect() { _, localPath := b.StateFilenameDirect(ctx) plan, err = b.DeploymentBundle.CalculatePlan(ctx, b.WorkspaceClient(), nil, localPath) if err != nil { logdiag.LogError(ctx, err) + failed = true return } } else { tf := b.Terraform if tf == nil { logdiag.LogError(ctx, errors.New("terraform not initialized")) + failed = true return } plan, err = terraform.ShowPlanFile(ctx, tf, b.TerraformPlanPath) if err != nil { logdiag.LogError(ctx, err) + failed = true return } } @@ -186,11 +210,15 @@ func Destroy(ctx context.Context, b *bundle.Bundle, engine engine.EngineType) { hasApproval, err := approvalForDestroy(ctx, b, plan) if err != nil { logdiag.LogError(ctx, err) + failed = true return } if hasApproval { - destroyCore(ctx, b, plan, engine) + destroyCore(ctx, b, plan, targetEngine) + if logdiag.HasError(ctx) { + failed = true + } } else { cmdio.LogString(ctx, "Destroy cancelled!") } diff --git a/bundle/phases/destroy_metadata.go b/bundle/phases/destroy_metadata.go index d7992e72cc..6cfa47ecc0 100644 --- a/bundle/phases/destroy_metadata.go +++ b/bundle/phases/destroy_metadata.go @@ -1,180 +1,3 @@ +// This file is intentionally left minimal. The destroy flow with metadata service +// support has been unified into destroy.go using the deployMetadataLock helper. package phases - -import ( - "context" - "errors" - "fmt" - "net/http" - "time" - - "github.com/databricks/cli/bundle" - "github.com/databricks/cli/bundle/config/engine" - "github.com/databricks/cli/bundle/config/mutator" - "github.com/databricks/cli/bundle/deploy" - "github.com/databricks/cli/bundle/deploy/files" - metadataservice "github.com/databricks/cli/bundle/deploy/metadata/service" - "github.com/databricks/cli/bundle/deploy/terraform" - "github.com/databricks/cli/bundle/deployplan" - "github.com/databricks/cli/bundle/direct" - "github.com/databricks/cli/internal/build" - "github.com/databricks/cli/libs/cmdio" - "github.com/databricks/cli/libs/log" - "github.com/databricks/cli/libs/logdiag" - "github.com/databricks/databricks-sdk-go/apierr" - "github.com/google/uuid" -) - -func destroyWithMetadataService(ctx context.Context, b *bundle.Bundle, targetEngine engine.EngineType) { - log.Info(ctx, "Phase: destroy (with metadata service)") - - ok, err := assertRootPathExists(ctx, b) - if err != nil { - logdiag.LogError(ctx, err) - return - } - if !ok { - cmdio.LogString(ctx, "No active deployment found to destroy!") - return - } - - // Create the metadata service client. - svc, err := metadataservice.NewClient(b.WorkspaceClient()) - if err != nil { - logdiag.LogError(ctx, fmt.Errorf("failed to create metadata service client: %w", err)) - return - } - - // Load local deployment state to get the deployment ID and sequence number. - state, err := deploy.LoadState(ctx, b) - if err != nil { - logdiag.LogError(ctx, fmt.Errorf("failed to load deployment state: %w", err)) - return - } - - if state.ID == uuid.Nil { - cmdio.LogString(ctx, "No active deployment found to destroy!") - return - } - deploymentID := state.ID.String() - - // Check that the deployment exists. - _, err = svc.GetDeployment(ctx, deploymentID) - if err != nil { - var apiErr *apierr.APIError - if errors.As(err, &apiErr) && apiErr.StatusCode == http.StatusNotFound { - log.Infof(ctx, "No deployment found in metadata service for %s, nothing to destroy", deploymentID) - cmdio.LogString(ctx, "No active deployment found to destroy!") - return - } - logdiag.LogError(ctx, fmt.Errorf("failed to get deployment: %w", err)) - return - } - - // Create a version to acquire the deployment lock. - versionID := fmt.Sprintf("%d", state.Seq+1) - _, err = svc.CreateVersion(ctx, deploymentID, versionID, &metadataservice.Version{ - CliVersion: build.GetInfo().Version, - VersionType: metadataservice.VersionTypeDestroy, - TargetName: b.Config.Bundle.Target, - }) - if err != nil { - logdiag.LogError(ctx, fmt.Errorf("failed to acquire deployment lock: %w", err)) - return - } - - log.Infof(ctx, "Acquired deployment lock for destroy: deployment=%s version=%s", deploymentID, versionID) - - // Start heartbeat to keep the lock alive. - stopHeartbeat := metadataservice.StartHeartbeat(ctx, svc, deploymentID, versionID, metadataservice.DefaultHeartbeatInterval) - - var destroyFailed bool - defer func() { - stopHeartbeat() - - reason := metadataservice.VersionCompleteSuccess - if destroyFailed || logdiag.HasError(ctx) { - reason = metadataservice.VersionCompleteFailure - } - - // Use a separate context for cleanup so the lock is released even if the - // parent context was cancelled (e.g. user hit Ctrl+C). - cleanupCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second) - defer cancel() - - _, completeErr := svc.CompleteVersion(cleanupCtx, deploymentID, versionID, reason, false) - if completeErr != nil { - log.Warnf(ctx, "Failed to release deployment lock: %v", completeErr) - } else { - log.Infof(ctx, "Released deployment lock: deployment=%s version=%s reason=%d", deploymentID, versionID, reason) - } - }() - - // Calculate the destroy plan. - if !targetEngine.IsDirect() { - bundle.ApplySeqContext(ctx, b, - mutator.ResolveVariableReferencesWithoutResources("artifacts"), - mutator.ResolveVariableReferencesOnlyResources("artifacts"), - terraform.Interpolate(), - terraform.Write(), - terraform.Plan(terraform.PlanGoal("destroy")), - ) - } - - if logdiag.HasError(ctx) { - destroyFailed = true - return - } - - var plan *deployplan.Plan - if targetEngine.IsDirect() { - _, localPath := b.StateFilenameDirect(ctx) - plan, err = b.DeploymentBundle.CalculatePlan(ctx, b.WorkspaceClient(), nil, localPath) - if err != nil { - logdiag.LogError(ctx, err) - destroyFailed = true - return - } - } else { - tf := b.Terraform - if tf == nil { - logdiag.LogError(ctx, fmt.Errorf("terraform not initialized")) - destroyFailed = true - return - } - - plan, err = terraform.ShowPlanFile(ctx, tf, b.TerraformPlanPath) - if err != nil { - logdiag.LogError(ctx, err) - destroyFailed = true - return - } - } - - hasApproval, err := approvalForDestroy(ctx, b, plan) - if err != nil { - logdiag.LogError(ctx, err) - destroyFailed = true - return - } - - if hasApproval { - if targetEngine.IsDirect() { - b.DeploymentBundle.Apply(ctx, b.WorkspaceClient(), plan, direct.MigrateMode(false)) - } else { - bundle.ApplyContext(ctx, b, terraform.Apply()) - } - - if logdiag.HasError(ctx) { - destroyFailed = true - return - } - - bundle.ApplyContext(ctx, b, files.Delete()) - - if !logdiag.HasError(ctx) { - cmdio.LogString(ctx, "Destroy complete!") - } - } else { - cmdio.LogString(ctx, "Destroy cancelled!") - } -} diff --git a/bundle/deploy/metadata/service/heartbeat.go b/bundle/phases/heartbeat.go similarity index 56% rename from bundle/deploy/metadata/service/heartbeat.go rename to bundle/phases/heartbeat.go index d32e0a24f0..1f9b3d41d1 100644 --- a/bundle/deploy/metadata/service/heartbeat.go +++ b/bundle/phases/heartbeat.go @@ -1,17 +1,18 @@ -package service +package phases import ( "context" "time" "github.com/databricks/cli/libs/log" + "github.com/databricks/cli/libs/tempdms" ) -const DefaultHeartbeatInterval = 2 * time.Minute +const defaultHeartbeatInterval = 2 * time.Minute -// StartHeartbeat starts a background goroutine that sends heartbeats to keep +// startHeartbeat starts a background goroutine that sends heartbeats to keep // the deployment lock alive. Returns a cancel function to stop the heartbeat. -func StartHeartbeat(ctx context.Context, client *Client, deploymentID, versionID string, interval time.Duration) context.CancelFunc { +func startHeartbeat(ctx context.Context, svc *tempdms.DeploymentMetadataAPI, deploymentID, versionID string, interval time.Duration) context.CancelFunc { ctx, cancel := context.WithCancel(ctx) go func() { @@ -23,7 +24,10 @@ func StartHeartbeat(ctx context.Context, client *Client, deploymentID, versionID case <-ctx.Done(): return case <-ticker.C: - _, err := client.Heartbeat(ctx, deploymentID, versionID) + _, err := svc.Heartbeat(ctx, tempdms.HeartbeatRequest{ + DeploymentID: deploymentID, + VersionID: versionID, + }) if err != nil { log.Warnf(ctx, "Failed to send deployment heartbeat: %v", err) } else { diff --git a/libs/tempdms/api.go b/libs/tempdms/api.go new file mode 100644 index 0000000000..305633819e --- /dev/null +++ b/libs/tempdms/api.go @@ -0,0 +1,164 @@ +package tempdms + +import ( + "context" + "errors" + "fmt" + "net/http" + + "github.com/databricks/databricks-sdk-go" + "github.com/databricks/databricks-sdk-go/apierr" + "github.com/databricks/databricks-sdk-go/client" +) + +const basePath = "/api/2.0/bundle" + +// DeploymentMetadataAPI is a client for the Deployment Metadata Service. +// +// This is a temporary implementation that will be replaced by the SDK-generated +// client once the proto definitions land in the Go SDK. The method signatures +// and types are designed to match what the SDK will generate, so migration +// should be a straightforward import path change. +type DeploymentMetadataAPI struct { + api *client.DatabricksClient +} + +func NewDeploymentMetadataAPI(w *databricks.WorkspaceClient) (*DeploymentMetadataAPI, error) { + apiClient, err := client.New(w.Config) + if err != nil { + return nil, fmt.Errorf("failed to create deployment metadata API client: %w", err) + } + return &DeploymentMetadataAPI{api: apiClient}, nil +} + +func (a *DeploymentMetadataAPI) CreateDeployment(ctx context.Context, request CreateDeploymentRequest) (*Deployment, error) { + var resp Deployment + path := fmt.Sprintf("%s/deployments", basePath) + err := a.api.Do(ctx, http.MethodPost, path, nil, nil, request, &resp) + if err != nil { + return nil, mapError("create deployment", err) + } + return &resp, nil +} + +func (a *DeploymentMetadataAPI) GetDeployment(ctx context.Context, request GetDeploymentRequest) (*Deployment, error) { + var resp Deployment + path := fmt.Sprintf("%s/deployments/%s", basePath, request.DeploymentID) + err := a.api.Do(ctx, http.MethodGet, path, nil, nil, nil, &resp) + if err != nil { + return nil, mapError("get deployment", err) + } + return &resp, nil +} + +func (a *DeploymentMetadataAPI) DeleteDeployment(ctx context.Context, request DeleteDeploymentRequest) (*Deployment, error) { + var resp Deployment + path := fmt.Sprintf("%s/deployments/%s", basePath, request.DeploymentID) + err := a.api.Do(ctx, http.MethodDelete, path, nil, nil, nil, &resp) + if err != nil { + return nil, mapError("delete deployment", err) + } + return &resp, nil +} + +func (a *DeploymentMetadataAPI) CreateVersion(ctx context.Context, request CreateVersionRequest) (*Version, error) { + var resp Version + path := fmt.Sprintf("%s/deployments/%s/versions", basePath, request.DeploymentID) + err := a.api.Do(ctx, http.MethodPost, path, nil, nil, request, &resp) + if err != nil { + return nil, mapError("create version", err) + } + return &resp, nil +} + +func (a *DeploymentMetadataAPI) GetVersion(ctx context.Context, request GetVersionRequest) (*Version, error) { + var resp Version + path := fmt.Sprintf("%s/deployments/%s/versions/%s", basePath, request.DeploymentID, request.VersionID) + err := a.api.Do(ctx, http.MethodGet, path, nil, nil, nil, &resp) + if err != nil { + return nil, mapError("get version", err) + } + return &resp, nil +} + +func (a *DeploymentMetadataAPI) Heartbeat(ctx context.Context, request HeartbeatRequest) (*HeartbeatResponse, error) { + var resp HeartbeatResponse + path := fmt.Sprintf("%s/deployments/%s/versions/%s/heartbeat", basePath, request.DeploymentID, request.VersionID) + err := a.api.Do(ctx, http.MethodPost, path, nil, nil, struct{}{}, &resp) + if err != nil { + return nil, mapError("heartbeat", err) + } + return &resp, nil +} + +func (a *DeploymentMetadataAPI) CompleteVersion(ctx context.Context, request CompleteVersionRequest) (*Version, error) { + var resp Version + path := fmt.Sprintf("%s/deployments/%s/versions/%s/complete", basePath, request.DeploymentID, request.VersionID) + err := a.api.Do(ctx, http.MethodPost, path, nil, nil, request, &resp) + if err != nil { + return nil, mapError("complete version", err) + } + return &resp, nil +} + +func (a *DeploymentMetadataAPI) CreateOperation(ctx context.Context, request CreateOperationRequest) (*Operation, error) { + var resp Operation + path := fmt.Sprintf("%s/deployments/%s/versions/%s/operations", basePath, request.DeploymentID, request.VersionID) + err := a.api.Do(ctx, http.MethodPost, path, nil, nil, request, &resp) + if err != nil { + return nil, mapError("create operation", err) + } + return &resp, nil +} + +func (a *DeploymentMetadataAPI) ListResources(ctx context.Context, request ListResourcesRequest) ([]Resource, error) { + var allResources []Resource + pageToken := "" + + for { + var resp ListResourcesResponse + path := fmt.Sprintf("%s/deployments/%s/resources", basePath, request.DeploymentID) + + q := map[string]any{ + "parent": fmt.Sprintf("deployments/%s", request.DeploymentID), + "page_size": 1000, + } + if pageToken != "" { + q["page_token"] = pageToken + } + + err := a.api.Do(ctx, http.MethodGet, path, nil, q, nil, &resp) + if err != nil { + return nil, mapError("list resources", err) + } + + allResources = append(allResources, resp.Resources...) + if resp.NextPageToken == "" { + break + } + pageToken = resp.NextPageToken + } + + return allResources, nil +} + +// mapError translates API errors into user-friendly messages. +func mapError(operation string, err error) error { + var apiErr *apierr.APIError + if !errors.As(err, &apiErr) { + return fmt.Errorf("%s: %w", operation, err) + } + + switch apiErr.StatusCode { + case http.StatusConflict: + return fmt.Errorf("%s: deployment is locked by another active deployment. "+ + "If the prior deployment failed, the lock will expire automatically after 5 minutes. "+ + "You can also force-acquire the lock by running deploy with the --force-lock flag: %w", operation, err) + case http.StatusNotFound: + return fmt.Errorf("%s: resource not found: %w", operation, err) + case http.StatusBadRequest: + return fmt.Errorf("%s: bad request: %w", operation, err) + default: + return fmt.Errorf("%s: %w", operation, err) + } +} diff --git a/bundle/deploy/metadata/service/types.go b/libs/tempdms/types.go similarity index 58% rename from bundle/deploy/metadata/service/types.go rename to libs/tempdms/types.go index 05e0cf03b1..a5d8c0df92 100644 --- a/bundle/deploy/metadata/service/types.go +++ b/libs/tempdms/types.go @@ -1,4 +1,8 @@ -package service +// Package tempdms is a temporary client library for the Deployment Metadata Service. +// It mirrors the structure that the Databricks Go SDK will eventually generate from +// the service's proto definitions. When the protos land in the SDK, migration should +// be a straightforward import path change. +package tempdms import "time" @@ -60,47 +64,47 @@ const ( ) const ( - ResourceTypeUnspecified DeploymentResourceType = 0 - ResourceTypeJob DeploymentResourceType = 1 - ResourceTypePipeline DeploymentResourceType = 2 - ResourceTypeModel DeploymentResourceType = 4 - ResourceTypeRegisteredModel DeploymentResourceType = 5 - ResourceTypeExperiment DeploymentResourceType = 6 - ResourceTypeServingEndpoint DeploymentResourceType = 7 - ResourceTypeQualityMonitor DeploymentResourceType = 8 - ResourceTypeSchema DeploymentResourceType = 9 - ResourceTypeVolume DeploymentResourceType = 10 - ResourceTypeCluster DeploymentResourceType = 11 - ResourceTypeDashboard DeploymentResourceType = 12 - ResourceTypeApp DeploymentResourceType = 13 - ResourceTypeCatalog DeploymentResourceType = 14 - ResourceTypeExternalLocation DeploymentResourceType = 15 - ResourceTypeSecretScope DeploymentResourceType = 16 - ResourceTypeAlert DeploymentResourceType = 17 - ResourceTypeSQLWarehouse DeploymentResourceType = 18 - ResourceTypeDatabaseInstance DeploymentResourceType = 19 - ResourceTypeDatabaseCatalog DeploymentResourceType = 20 - ResourceTypeSyncedDBTable DeploymentResourceType = 21 - ResourceTypePostgresProject DeploymentResourceType = 22 - ResourceTypePostgresBranch DeploymentResourceType = 23 - ResourceTypePostgresEndpoint DeploymentResourceType = 24 + ResourceTypeUnspecified DeploymentResourceType = 0 + ResourceTypeJob DeploymentResourceType = 1 + ResourceTypePipeline DeploymentResourceType = 2 + ResourceTypeModel DeploymentResourceType = 4 + ResourceTypeRegisteredModel DeploymentResourceType = 5 + ResourceTypeExperiment DeploymentResourceType = 6 + ResourceTypeServingEndpoint DeploymentResourceType = 7 + ResourceTypeQualityMonitor DeploymentResourceType = 8 + ResourceTypeSchema DeploymentResourceType = 9 + ResourceTypeVolume DeploymentResourceType = 10 + ResourceTypeCluster DeploymentResourceType = 11 + ResourceTypeDashboard DeploymentResourceType = 12 + ResourceTypeApp DeploymentResourceType = 13 + ResourceTypeCatalog DeploymentResourceType = 14 + ResourceTypeExternalLocation DeploymentResourceType = 15 + ResourceTypeSecretScope DeploymentResourceType = 16 + ResourceTypeAlert DeploymentResourceType = 17 + ResourceTypeSQLWarehouse DeploymentResourceType = 18 + ResourceTypeDatabaseInstance DeploymentResourceType = 19 + ResourceTypeDatabaseCatalog DeploymentResourceType = 20 + ResourceTypeSyncedDBTable DeploymentResourceType = 21 + ResourceTypePostgresProject DeploymentResourceType = 22 + ResourceTypePostgresBranch DeploymentResourceType = 23 + ResourceTypePostgresEndpoint DeploymentResourceType = 24 ) -// Deployment represents a bundle deployment registered with the control plane. +// Resource types (proto message equivalents). + type Deployment struct { - Name string `json:"name,omitempty"` - DisplayName string `json:"display_name,omitempty"` - TargetName string `json:"target_name,omitempty"` - Status DeploymentStatus `json:"status,omitempty"` - LastVersionID string `json:"last_version_id,omitempty"` - CreatedBy string `json:"created_by,omitempty"` - CreateTime *time.Time `json:"create_time,omitempty"` - UpdateTime *time.Time `json:"update_time,omitempty"` - DestroyTime *time.Time `json:"destroy_time,omitempty"` - DestroyedBy string `json:"destroyed_by,omitempty"` -} - -// Version represents a single invocation of deploy/destroy against a deployment. + Name string `json:"name,omitempty"` + DisplayName string `json:"display_name,omitempty"` + TargetName string `json:"target_name,omitempty"` + Status DeploymentStatus `json:"status,omitempty"` + LastVersionID string `json:"last_version_id,omitempty"` + CreatedBy string `json:"created_by,omitempty"` + CreateTime *time.Time `json:"create_time,omitempty"` + UpdateTime *time.Time `json:"update_time,omitempty"` + DestroyTime *time.Time `json:"destroy_time,omitempty"` + DestroyedBy string `json:"destroyed_by,omitempty"` +} + type Version struct { Name string `json:"name,omitempty"` VersionID string `json:"version_id,omitempty"` @@ -116,7 +120,6 @@ type Version struct { TargetName string `json:"target_name,omitempty"` } -// Operation records the result of applying a resource change. type Operation struct { Name string `json:"name,omitempty"` ResourceKey string `json:"resource_key,omitempty"` @@ -128,7 +131,6 @@ type Operation struct { ErrorMessage string `json:"error_message,omitempty"` } -// Resource represents a resource managed by a deployment. type Resource struct { Name string `json:"name,omitempty"` ResourceKey string `json:"resource_key,omitempty"` @@ -139,43 +141,75 @@ type Resource struct { ResourceType DeploymentResourceType `json:"resource_type,omitempty"` } -// Request/Response types. +// Request types. type CreateDeploymentRequest struct { DeploymentID string `json:"deployment_id"` Deployment *Deployment `json:"deployment"` } -type ListDeploymentsResponse struct { - Deployments []Deployment `json:"deployments"` - NextPageToken string `json:"next_page_token,omitempty"` +type GetDeploymentRequest struct { + DeploymentID string `json:"-"` +} + +type DeleteDeploymentRequest struct { + DeploymentID string `json:"-"` } type CreateVersionRequest struct { - Parent string `json:"parent"` - Version *Version `json:"version"` - VersionID string `json:"version_id"` + DeploymentID string `json:"-"` + Parent string `json:"parent"` + Version *Version `json:"version"` + VersionID string `json:"version_id"` } -type ListVersionsResponse struct { - Versions []Version `json:"versions"` - NextPageToken string `json:"next_page_token,omitempty"` +type GetVersionRequest struct { + DeploymentID string `json:"-"` + VersionID string `json:"-"` } -type HeartbeatResponse struct { - ExpireTime *time.Time `json:"expire_time,omitempty"` +type HeartbeatRequest struct { + DeploymentID string `json:"-"` + VersionID string `json:"-"` } type CompleteVersionRequest struct { + DeploymentID string `json:"-"` + VersionID string `json:"-"` Name string `json:"name"` CompletionReason VersionComplete `json:"completion_reason"` Force bool `json:"force,omitempty"` } type CreateOperationRequest struct { - Parent string `json:"parent"` - ResourceKey string `json:"resource_key"` - Operation *Operation `json:"operation"` + DeploymentID string `json:"-"` + VersionID string `json:"-"` + Parent string `json:"parent"` + ResourceKey string `json:"resource_key"` + Operation *Operation `json:"operation"` +} + +type ListResourcesRequest struct { + DeploymentID string `json:"-"` + Parent string `json:"parent"` + PageSize int `json:"page_size,omitempty"` + PageToken string `json:"page_token,omitempty"` +} + +// Response types. + +type HeartbeatResponse struct { + ExpireTime *time.Time `json:"expire_time,omitempty"` +} + +type ListDeploymentsResponse struct { + Deployments []Deployment `json:"deployments"` + NextPageToken string `json:"next_page_token,omitempty"` +} + +type ListVersionsResponse struct { + Versions []Version `json:"versions"` + NextPageToken string `json:"next_page_token,omitempty"` } type ListOperationsResponse struct { diff --git a/libs/testserver/deployment_metadata.go b/libs/testserver/deployment_metadata.go new file mode 100644 index 0000000000..1b0b3e9f90 --- /dev/null +++ b/libs/testserver/deployment_metadata.go @@ -0,0 +1,400 @@ +package testserver + +import ( + "encoding/json" + "fmt" + "net/http" + "strconv" + "strings" + "time" + + "github.com/databricks/cli/libs/tempdms" +) + +// deploymentMetadataState holds in-memory state for the deployment metadata service. +// Stored per-workspace inside FakeWorkspace. +type deploymentMetadataState struct { + // deployments keyed by deployment_id + deployments map[string]tempdms.Deployment + + // versions keyed by "deploymentId/versionId" + versions map[string]tempdms.Version + + // operations keyed by "deploymentId/versionId/resourceKey" + operations map[string]tempdms.Operation + + // resources keyed by "deploymentId/resourceKey" + resources map[string]tempdms.Resource + + // lock state per deployment: which version holds the lock and when it expires + lockHolder map[string]string // deploymentId -> "deployments/{id}/versions/{vid}" + lockExpiry map[string]time.Time // deploymentId -> expiry time +} + +func newDeploymentMetadataState() *deploymentMetadataState { + return &deploymentMetadataState{ + deployments: map[string]tempdms.Deployment{}, + versions: map[string]tempdms.Version{}, + operations: map[string]tempdms.Operation{}, + resources: map[string]tempdms.Resource{}, + lockHolder: map[string]string{}, + lockExpiry: map[string]time.Time{}, + } +} + +const lockDuration = 5 * time.Minute + +func (s *FakeWorkspace) DeploymentMetadataCreateDeployment(req Request) Response { + defer s.LockUnlock()() + + var createReq tempdms.CreateDeploymentRequest + if err := json.Unmarshal(req.Body, &createReq); err != nil { + return Response{ + StatusCode: http.StatusBadRequest, + Body: map[string]string{"error_code": "INVALID_PARAMETER_VALUE", "message": fmt.Sprintf("invalid request: %s", err)}, + } + } + + deploymentID := createReq.DeploymentID + if deploymentID == "" { + return Response{ + StatusCode: http.StatusBadRequest, + Body: map[string]string{"error_code": "INVALID_PARAMETER_VALUE", "message": "deployment_id is required"}, + } + } + + state := s.deploymentMetadata + if _, exists := state.deployments[deploymentID]; exists { + return Response{ + StatusCode: http.StatusConflict, + Body: map[string]string{"error_code": "ALREADY_EXISTS", "message": fmt.Sprintf("deployment %s already exists", deploymentID)}, + } + } + + now := time.Now().UTC() + deployment := tempdms.Deployment{ + Name: fmt.Sprintf("deployments/%s", deploymentID), + DisplayName: deploymentID, + Status: tempdms.DeploymentStatusActive, + CreatedBy: s.CurrentUser().UserName, + CreateTime: &now, + UpdateTime: &now, + } + if createReq.Deployment != nil { + if createReq.Deployment.TargetName != "" { + deployment.TargetName = createReq.Deployment.TargetName + } + } + + state.deployments[deploymentID] = deployment + return Response{Body: deployment} +} + +func (s *FakeWorkspace) DeploymentMetadataGetDeployment(deploymentID string) Response { + defer s.LockUnlock()() + + state := s.deploymentMetadata + deployment, ok := state.deployments[deploymentID] + if !ok { + return Response{ + StatusCode: http.StatusNotFound, + Body: map[string]string{"error_code": "NOT_FOUND", "message": fmt.Sprintf("deployment %s not found", deploymentID)}, + } + } + return Response{Body: deployment} +} + +func (s *FakeWorkspace) DeploymentMetadataDeleteDeployment(deploymentID string) Response { + defer s.LockUnlock()() + + state := s.deploymentMetadata + deployment, ok := state.deployments[deploymentID] + if !ok { + return Response{ + StatusCode: http.StatusNotFound, + Body: map[string]string{"error_code": "NOT_FOUND", "message": fmt.Sprintf("deployment %s not found", deploymentID)}, + } + } + + now := time.Now().UTC() + deployment.Status = tempdms.DeploymentStatusDeleted + deployment.DestroyTime = &now + deployment.DestroyedBy = s.CurrentUser().UserName + deployment.UpdateTime = &now + state.deployments[deploymentID] = deployment + + return Response{Body: deployment} +} + +func (s *FakeWorkspace) DeploymentMetadataCreateVersion(req Request, deploymentID string) Response { + defer s.LockUnlock()() + + state := s.deploymentMetadata + deployment, ok := state.deployments[deploymentID] + if !ok { + return Response{ + StatusCode: http.StatusNotFound, + Body: map[string]string{"error_code": "NOT_FOUND", "message": fmt.Sprintf("deployment %s not found", deploymentID)}, + } + } + + var createReq tempdms.CreateVersionRequest + if err := json.Unmarshal(req.Body, &createReq); err != nil { + return Response{ + StatusCode: http.StatusBadRequest, + Body: map[string]string{"error_code": "INVALID_PARAMETER_VALUE", "message": fmt.Sprintf("invalid request: %s", err)}, + } + } + + versionID := createReq.VersionID + if versionID == "" { + return Response{ + StatusCode: http.StatusBadRequest, + Body: map[string]string{"error_code": "INVALID_PARAMETER_VALUE", "message": "version_id is required"}, + } + } + + // Validate version_id == last_version_id + 1 (matching server behavior). + var expectedVersionID string + if deployment.LastVersionID == "" { + expectedVersionID = "1" + } else { + lastVersion, err := strconv.ParseInt(deployment.LastVersionID, 10, 64) + if err != nil { + return Response{ + StatusCode: http.StatusInternalServerError, + Body: map[string]string{"error_code": "INTERNAL_ERROR", "message": fmt.Sprintf("stored last_version_id is not a valid number: %s", deployment.LastVersionID)}, + } + } + expectedVersionID = strconv.FormatInt(lastVersion+1, 10) + } + if versionID != expectedVersionID { + return Response{ + StatusCode: http.StatusConflict, + Body: map[string]string{ + "error_code": "ABORTED", + "message": fmt.Sprintf("version_id must be %s (last_version_id + 1), got: %s", expectedVersionID, versionID), + }, + } + } + + // Check lock: if a lock is held and not expired, reject with 409. + now := time.Now().UTC() + if holder, hasLock := state.lockHolder[deploymentID]; hasLock { + if expiry, ok := state.lockExpiry[deploymentID]; ok && expiry.After(now) { + return Response{ + StatusCode: http.StatusConflict, + Body: map[string]string{ + "error_code": "ABORTED", + "message": fmt.Sprintf("deployment is locked by %s until %s", holder, expiry.Format(time.RFC3339)), + }, + } + } + } + + versionKey := deploymentID + "/" + versionID + version := tempdms.Version{ + Name: fmt.Sprintf("deployments/%s/versions/%s", deploymentID, versionID), + VersionID: versionID, + CreatedBy: s.CurrentUser().UserName, + CreateTime: &now, + Status: tempdms.VersionStatusInProgress, + } + if createReq.Version != nil { + version.CliVersion = createReq.Version.CliVersion + version.VersionType = createReq.Version.VersionType + } + + state.versions[versionKey] = version + + // Acquire the lock. + lockExpiry := now.Add(lockDuration) + state.lockHolder[deploymentID] = version.Name + state.lockExpiry[deploymentID] = lockExpiry + + // Update the deployment's last_version_id and status. + deployment.LastVersionID = versionID + deployment.Status = tempdms.DeploymentStatusInProgress + deployment.UpdateTime = &now + state.deployments[deploymentID] = deployment + + return Response{Body: version} +} + +func (s *FakeWorkspace) DeploymentMetadataGetVersion(deploymentID, versionID string) Response { + defer s.LockUnlock()() + + state := s.deploymentMetadata + versionKey := deploymentID + "/" + versionID + version, ok := state.versions[versionKey] + if !ok { + return Response{ + StatusCode: http.StatusNotFound, + Body: map[string]string{"error_code": "NOT_FOUND", "message": fmt.Sprintf("version %s not found", versionKey)}, + } + } + return Response{Body: version} +} + +func (s *FakeWorkspace) DeploymentMetadataHeartbeat(req Request, deploymentID, versionID string) Response { + defer s.LockUnlock()() + + state := s.deploymentMetadata + versionKey := deploymentID + "/" + versionID + version, ok := state.versions[versionKey] + if !ok { + return Response{ + StatusCode: http.StatusNotFound, + Body: map[string]string{"error_code": "NOT_FOUND", "message": fmt.Sprintf("version %s not found", versionKey)}, + } + } + + if version.Status != tempdms.VersionStatusInProgress { + return Response{ + StatusCode: http.StatusConflict, + Body: map[string]string{"error_code": "ABORTED", "message": "version is no longer in progress"}, + } + } + + // Verify this version holds the lock. + expectedHolder := fmt.Sprintf("deployments/%s/versions/%s", deploymentID, versionID) + if state.lockHolder[deploymentID] != expectedHolder { + return Response{ + StatusCode: http.StatusConflict, + Body: map[string]string{"error_code": "ABORTED", "message": "lock is not held by this version"}, + } + } + + // Renew the lock. + now := time.Now().UTC() + newExpiry := now.Add(lockDuration) + state.lockExpiry[deploymentID] = newExpiry + + return Response{Body: tempdms.HeartbeatResponse{ExpireTime: &newExpiry}} +} + +func (s *FakeWorkspace) DeploymentMetadataCompleteVersion(req Request, deploymentID, versionID string) Response { + defer s.LockUnlock()() + + state := s.deploymentMetadata + versionKey := deploymentID + "/" + versionID + version, ok := state.versions[versionKey] + if !ok { + return Response{ + StatusCode: http.StatusNotFound, + Body: map[string]string{"error_code": "NOT_FOUND", "message": fmt.Sprintf("version %s not found", versionKey)}, + } + } + + if version.Status != tempdms.VersionStatusInProgress { + return Response{ + StatusCode: http.StatusConflict, + Body: map[string]string{"error_code": "ABORTED", "message": "version is already completed"}, + } + } + + var completeReq tempdms.CompleteVersionRequest + if err := json.Unmarshal(req.Body, &completeReq); err != nil { + return Response{ + StatusCode: http.StatusBadRequest, + Body: map[string]string{"error_code": "INVALID_PARAMETER_VALUE", "message": fmt.Sprintf("invalid request: %s", err)}, + } + } + + now := time.Now().UTC() + version.Status = tempdms.VersionStatusCompleted + version.CompleteTime = &now + version.CompletionReason = completeReq.CompletionReason + version.CompletedBy = s.CurrentUser().UserName + state.versions[versionKey] = version + + // Release the lock. + delete(state.lockHolder, deploymentID) + delete(state.lockExpiry, deploymentID) + + // Update deployment status based on completion reason. + if deployment, ok := state.deployments[deploymentID]; ok { + switch completeReq.CompletionReason { + case tempdms.VersionCompleteSuccess: + deployment.Status = tempdms.DeploymentStatusActive + case tempdms.VersionCompleteFailure, tempdms.VersionCompleteForceAbort, tempdms.VersionCompleteLeaseExpire: + deployment.Status = tempdms.DeploymentStatusFailed + } + deployment.UpdateTime = &now + state.deployments[deploymentID] = deployment + } + + return Response{Body: version} +} + +func (s *FakeWorkspace) DeploymentMetadataCreateOperation(req Request, deploymentID, versionID string) Response { + defer s.LockUnlock()() + + state := s.deploymentMetadata + + var createReq tempdms.CreateOperationRequest + if err := json.Unmarshal(req.Body, &createReq); err != nil { + return Response{ + StatusCode: http.StatusBadRequest, + Body: map[string]string{"error_code": "INVALID_PARAMETER_VALUE", "message": fmt.Sprintf("invalid request: %s", err)}, + } + } + + resourceKey := createReq.ResourceKey + if resourceKey == "" { + return Response{ + StatusCode: http.StatusBadRequest, + Body: map[string]string{"error_code": "INVALID_PARAMETER_VALUE", "message": "resource_key is required"}, + } + } + + now := time.Now().UTC() + opKey := deploymentID + "/" + versionID + "/" + resourceKey + operation := tempdms.Operation{ + Name: fmt.Sprintf("deployments/%s/versions/%s/operations/%s", deploymentID, versionID, resourceKey), + ResourceKey: resourceKey, + CreateTime: &now, + } + if createReq.Operation != nil { + operation.ActionType = createReq.Operation.ActionType + operation.State = createReq.Operation.State + operation.ResourceID = createReq.Operation.ResourceID + operation.Status = createReq.Operation.Status + operation.ErrorMessage = createReq.Operation.ErrorMessage + } + + state.operations[opKey] = operation + + // Upsert the deployment-level resource. + resKey := deploymentID + "/" + resourceKey + resource := tempdms.Resource{ + Name: fmt.Sprintf("deployments/%s/resources/%s", deploymentID, resourceKey), + ResourceKey: resourceKey, + } + if createReq.Operation != nil { + resource.State = createReq.Operation.State + resource.ResourceID = createReq.Operation.ResourceID + resource.LastActionType = createReq.Operation.ActionType + resource.LastVersionID = versionID + } + state.resources[resKey] = resource + + return Response{Body: operation} +} + +func (s *FakeWorkspace) DeploymentMetadataListResources(deploymentID string) Response { + defer s.LockUnlock()() + + state := s.deploymentMetadata + prefix := deploymentID + "/" + var resources []tempdms.Resource + for key, resource := range state.resources { + if strings.HasPrefix(key, prefix) { + resources = append(resources, resource) + } + } + if resources == nil { + resources = []tempdms.Resource{} + } + return Response{Body: tempdms.ListResourcesResponse{Resources: resources}} +} diff --git a/libs/testserver/fake_workspace.go b/libs/testserver/fake_workspace.go index b13aae069a..a2462c4c6d 100644 --- a/libs/testserver/fake_workspace.go +++ b/libs/testserver/fake_workspace.go @@ -173,6 +173,8 @@ type FakeWorkspace struct { // clusterVenvs caches Python venvs per existing cluster ID, // matching cloud behavior where libraries are cached on running clusters. clusterVenvs map[string]*clusterEnv + + deploymentMetadata *deploymentMetadataState } func (s *FakeWorkspace) LockUnlock() func() { @@ -297,6 +299,7 @@ func NewFakeWorkspace(url, token string) *FakeWorkspace { PostgresEndpoints: map[string]postgres.Endpoint{}, PostgresOperations: map[string]postgres.Operation{}, clusterVenvs: map[string]*clusterEnv{}, + deploymentMetadata: newDeploymentMetadataState(), Alerts: map[string]sql.AlertV2{}, Experiments: map[string]ml.GetExperimentResponse{}, ModelRegistryModels: map[string]ml.Model{}, diff --git a/libs/testserver/handlers.go b/libs/testserver/handlers.go index 9e30cb5f0c..904284ed51 100644 --- a/libs/testserver/handlers.go +++ b/libs/testserver/handlers.go @@ -905,4 +905,42 @@ func AddDefaultHandlers(server *Server) { }, } }) + + // Deployment Metadata Service: + + server.Handle("POST", "/api/2.0/bundle/deployments", func(req Request) any { + return req.Workspace.DeploymentMetadataCreateDeployment(req) + }) + + server.Handle("GET", "/api/2.0/bundle/deployments/{deployment_id}", func(req Request) any { + return req.Workspace.DeploymentMetadataGetDeployment(req.Vars["deployment_id"]) + }) + + server.Handle("DELETE", "/api/2.0/bundle/deployments/{deployment_id}", func(req Request) any { + return req.Workspace.DeploymentMetadataDeleteDeployment(req.Vars["deployment_id"]) + }) + + server.Handle("POST", "/api/2.0/bundle/deployments/{deployment_id}/versions", func(req Request) any { + return req.Workspace.DeploymentMetadataCreateVersion(req, req.Vars["deployment_id"]) + }) + + server.Handle("GET", "/api/2.0/bundle/deployments/{deployment_id}/versions/{version_id}", func(req Request) any { + return req.Workspace.DeploymentMetadataGetVersion(req.Vars["deployment_id"], req.Vars["version_id"]) + }) + + server.Handle("POST", "/api/2.0/bundle/deployments/{deployment_id}/versions/{version_id}/heartbeat", func(req Request) any { + return req.Workspace.DeploymentMetadataHeartbeat(req, req.Vars["deployment_id"], req.Vars["version_id"]) + }) + + server.Handle("POST", "/api/2.0/bundle/deployments/{deployment_id}/versions/{version_id}/complete", func(req Request) any { + return req.Workspace.DeploymentMetadataCompleteVersion(req, req.Vars["deployment_id"], req.Vars["version_id"]) + }) + + server.Handle("POST", "/api/2.0/bundle/deployments/{deployment_id}/versions/{version_id}/operations", func(req Request) any { + return req.Workspace.DeploymentMetadataCreateOperation(req, req.Vars["deployment_id"], req.Vars["version_id"]) + }) + + server.Handle("GET", "/api/2.0/bundle/deployments/{deployment_id}/resources", func(req Request) any { + return req.Workspace.DeploymentMetadataListResources(req.Vars["deployment_id"]) + }) }