diff --git a/NEXT_CHANGELOG.md b/NEXT_CHANGELOG.md index 5c885c6f0ac..f28f0e6d257 100644 --- a/NEXT_CHANGELOG.md +++ b/NEXT_CHANGELOG.md @@ -8,5 +8,6 @@ * `experimental open` now opens every DABs resource type that has a workspace URL, picking up `catalogs`, `schemas`, `volumes`, `database_instances`, `database_catalogs`, `synced_database_tables`, `postgres_catalogs`, `postgres_synced_tables`, `quality_monitors`, `vector_search_endpoints`, and `vector_search_indexes` ([#5346](https://github.com/databricks/cli/pull/5346)). ### Bundles +* Retry transient HTTP 504 Gateway Timeout errors in direct deployment engine ([#5349](https://github.com/databricks/cli/pull/5349)). ### Dependency updates diff --git a/acceptance/bin/fault.py b/acceptance/bin/fault.py new file mode 100755 index 00000000000..30405fbc5a0 --- /dev/null +++ b/acceptance/bin/fault.py @@ -0,0 +1,51 @@ +#!/usr/bin/env python3 +"""Set up a fault rule on the testserver for the current test token. + +Usage: fault.py PATTERN STATUS_CODE OFFSET TIMES + + PATTERN HTTP method and path, supports trailing * wildcard, + e.g. "PUT /api/2.0/permissions/pipelines/*" + STATUS_CODE HTTP status code to return, e.g. 504 + OFFSET number of requests to let through before fault starts + TIMES number of times to return the fault response + +The rule is scoped to the current DATABRICKS_TOKEN so it only affects +the test that registers it, even when tests share a server. +""" + +import json +import os +import sys +import urllib.request + +host = os.environ.get("DATABRICKS_HOST", "") +token = os.environ.get("DATABRICKS_TOKEN", "") + +if not host: + print("DATABRICKS_HOST not set", file=sys.stderr) + sys.exit(1) + +if len(sys.argv) != 5: + print(f"usage: {sys.argv[0]} PATTERN STATUS_CODE OFFSET TIMES", file=sys.stderr) + sys.exit(1) + +pattern, status_code, offset, times = sys.argv[1], int(sys.argv[2]), int(sys.argv[3]), int(sys.argv[4]) +body = '{"error_code":"INJECTED","message":"Fault injected by test."}' + +data = json.dumps( + { + "pattern": pattern, + "status_code": status_code, + "body": body, + "offset": offset, + "times": times, + } +).encode() + +req = urllib.request.Request( + f"{host}/__testserver/fault", + data=data, + headers={"Content-Type": "application/json", "Authorization": f"Bearer {token}"}, + method="POST", +) +urllib.request.urlopen(req) diff --git a/acceptance/bundle/resources/permissions/pipelines/504/create/databricks.yml b/acceptance/bundle/resources/permissions/pipelines/504/create/databricks.yml new file mode 100644 index 00000000000..05d0681c885 --- /dev/null +++ b/acceptance/bundle/resources/permissions/pipelines/504/create/databricks.yml @@ -0,0 +1,10 @@ +bundle: + name: test-bundle + +resources: + pipelines: + foo: + name: foo + permissions: + - level: CAN_VIEW + user_name: viewer@example.com diff --git a/acceptance/bundle/resources/permissions/pipelines/504/create/out.test.toml b/acceptance/bundle/resources/permissions/pipelines/504/create/out.test.toml new file mode 100644 index 00000000000..7ffa3d15391 --- /dev/null +++ b/acceptance/bundle/resources/permissions/pipelines/504/create/out.test.toml @@ -0,0 +1,4 @@ +Local = true +Cloud = false +EnvMatrix.DATABRICKS_BUNDLE_ENGINE = ["direct"] +EnvMatrix.READPLAN = [] diff --git a/acceptance/bundle/resources/permissions/pipelines/504/create/output.txt b/acceptance/bundle/resources/permissions/pipelines/504/create/output.txt new file mode 100644 index 00000000000..72ac55e7006 --- /dev/null +++ b/acceptance/bundle/resources/permissions/pipelines/504/create/output.txt @@ -0,0 +1,9 @@ +Uploading bundle files to /Workspace/Users/[USERNAME]/.bundle/test-bundle/default/files... +Deploying resources... +Warn: deploying resources.pipelines.foo.permissions: retrying after 504 Gateway Timeout from PUT /api/2.0/permissions/pipelines/[UUID] +Updating deployment state... +Deployment complete! + +>>> print_requests.py //api/2.0/permissions/pipelines +"PUT /api/2.0/permissions/pipelines/[UUID]" +"PUT /api/2.0/permissions/pipelines/[UUID]" diff --git a/acceptance/bundle/resources/permissions/pipelines/504/create/script b/acceptance/bundle/resources/permissions/pipelines/504/create/script new file mode 100644 index 00000000000..616aaeb8034 --- /dev/null +++ b/acceptance/bundle/resources/permissions/pipelines/504/create/script @@ -0,0 +1,8 @@ +# Inject a single 504 on the first permissions PUT to simulate a transient error. +# Permissions Set is idempotent, so DoCreate opts in via retrySafe and the deploy succeeds. +fault.py "PUT /api/2.0/permissions/pipelines/*" 504 0 1 + +$CLI bundle deploy + +# Two PUT requests should appear: the initial 504 and the successful retry. +trace print_requests.py //api/2.0/permissions/pipelines | jq '.method + " " + .path' diff --git a/acceptance/bundle/resources/permissions/pipelines/504/create/test.toml b/acceptance/bundle/resources/permissions/pipelines/504/create/test.toml new file mode 100644 index 00000000000..159efe02696 --- /dev/null +++ b/acceptance/bundle/resources/permissions/pipelines/504/create/test.toml @@ -0,0 +1 @@ +RecordRequests = true diff --git a/acceptance/bundle/resources/permissions/pipelines/504/plan/databricks.yml b/acceptance/bundle/resources/permissions/pipelines/504/plan/databricks.yml new file mode 100644 index 00000000000..05d0681c885 --- /dev/null +++ b/acceptance/bundle/resources/permissions/pipelines/504/plan/databricks.yml @@ -0,0 +1,10 @@ +bundle: + name: test-bundle + +resources: + pipelines: + foo: + name: foo + permissions: + - level: CAN_VIEW + user_name: viewer@example.com diff --git a/acceptance/bundle/resources/permissions/pipelines/504/plan/out.test.toml b/acceptance/bundle/resources/permissions/pipelines/504/plan/out.test.toml new file mode 100644 index 00000000000..7ffa3d15391 --- /dev/null +++ b/acceptance/bundle/resources/permissions/pipelines/504/plan/out.test.toml @@ -0,0 +1,4 @@ +Local = true +Cloud = false +EnvMatrix.DATABRICKS_BUNDLE_ENGINE = ["direct"] +EnvMatrix.READPLAN = [] diff --git a/acceptance/bundle/resources/permissions/pipelines/504/plan/output.txt b/acceptance/bundle/resources/permissions/pipelines/504/plan/output.txt new file mode 100644 index 00000000000..6588ab497c6 --- /dev/null +++ b/acceptance/bundle/resources/permissions/pipelines/504/plan/output.txt @@ -0,0 +1,31 @@ +Uploading bundle files to /Workspace/Users/[USERNAME]/.bundle/test-bundle/default/files... +Deploying resources... +Updating deployment state... +Deployment complete! + +>>> [CLI] bundle plan +Warn: planning resources.pipelines.foo.permissions: retrying after 504 Gateway Timeout from GET /api/2.0/permissions/pipelines/[UUID] +Plan: 0 to add, 0 to change, 0 to delete, 2 unchanged + +>>> print_requests.py //api/2.0/permissions/pipelines --get --oneline +2 {"method": "GET", "path": "/api/2.0/permissions/pipelines/[UUID]"} + +>>> [CLI] bundle plan +Warn: planning resources.pipelines.foo.permissions: retrying after 504 Gateway Timeout from GET /api/2.0/permissions/pipelines/[UUID] +Warn: planning resources.pipelines.foo.permissions: retrying after 504 Gateway Timeout from GET /api/2.0/permissions/pipelines/[UUID] +Plan: 0 to add, 0 to change, 0 to delete, 2 unchanged +3 {"method": "GET", "path": "/api/2.0/permissions/pipelines/[UUID]"} + +>>> musterr [CLI] bundle plan +Warn: planning resources.pipelines.foo.permissions: retrying after 504 Gateway Timeout from GET /api/2.0/permissions/pipelines/[UUID] +Warn: planning resources.pipelines.foo.permissions: retrying after 504 Gateway Timeout from GET /api/2.0/permissions/pipelines/[UUID] +Error: cannot plan resources.pipelines.foo.permissions: reading id="/pipelines/[UUID]": Fault injected by test. (504 INJECTED) + +Endpoint: GET [DATABRICKS_URL]/api/2.0/permissions/pipelines/[UUID]? +HTTP Status: 504 Gateway Timeout +API error_code: INJECTED +API message: Fault injected by test. + +Error: planning failed + +3 {"method": "GET", "path": "/api/2.0/permissions/pipelines/[UUID]"} diff --git a/acceptance/bundle/resources/permissions/pipelines/504/plan/script b/acceptance/bundle/resources/permissions/pipelines/504/plan/script new file mode 100644 index 00000000000..10225da19ce --- /dev/null +++ b/acceptance/bundle/resources/permissions/pipelines/504/plan/script @@ -0,0 +1,15 @@ +# Deploy first so the permissions resource exists; plan reads it on subsequent runs. +$CLI bundle deploy +rm -f out.requests.txt + +fault.py "GET /api/2.0/permissions/pipelines/*" 504 0 1 +trace $CLI bundle plan +trace print_requests.py //api/2.0/permissions/pipelines --get --oneline | uniq -c | sed 's/^ *//' | contains.py "2 " + +fault.py "GET /api/2.0/permissions/pipelines/*" 504 0 2 +trace $CLI bundle plan +print_requests.py //api/2.0/permissions/pipelines --get --oneline | uniq -c | sed 's/^ *//' | contains.py "3 " + +fault.py "GET /api/2.0/permissions/pipelines/*" 504 0 3 +trace musterr $CLI bundle plan +print_requests.py //api/2.0/permissions/pipelines --get --oneline | uniq -c | sed 's/^ *//' | contains.py "3 " diff --git a/acceptance/bundle/resources/permissions/pipelines/504/test.toml b/acceptance/bundle/resources/permissions/pipelines/504/test.toml new file mode 100644 index 00000000000..f08b1b09e8e --- /dev/null +++ b/acceptance/bundle/resources/permissions/pipelines/504/test.toml @@ -0,0 +1,4 @@ +EnvMatrix.DATABRICKS_BUNDLE_ENGINE = ["direct"] +EnvMatrix.READPLAN = [] +RecordRequests = true +Env.DATABRICKS_BUNDLE_RETRY_INTERVAL_MS = "100" diff --git a/acceptance/bundle/resources/permissions/pipelines/504/update/databricks.yml b/acceptance/bundle/resources/permissions/pipelines/504/update/databricks.yml new file mode 100644 index 00000000000..05d0681c885 --- /dev/null +++ b/acceptance/bundle/resources/permissions/pipelines/504/update/databricks.yml @@ -0,0 +1,10 @@ +bundle: + name: test-bundle + +resources: + pipelines: + foo: + name: foo + permissions: + - level: CAN_VIEW + user_name: viewer@example.com diff --git a/acceptance/bundle/resources/permissions/pipelines/504/update/out.test.toml b/acceptance/bundle/resources/permissions/pipelines/504/update/out.test.toml new file mode 100644 index 00000000000..7ffa3d15391 --- /dev/null +++ b/acceptance/bundle/resources/permissions/pipelines/504/update/out.test.toml @@ -0,0 +1,4 @@ +Local = true +Cloud = false +EnvMatrix.DATABRICKS_BUNDLE_ENGINE = ["direct"] +EnvMatrix.READPLAN = [] diff --git a/acceptance/bundle/resources/permissions/pipelines/504/update/output.txt b/acceptance/bundle/resources/permissions/pipelines/504/update/output.txt new file mode 100644 index 00000000000..5c4d07448f6 --- /dev/null +++ b/acceptance/bundle/resources/permissions/pipelines/504/update/output.txt @@ -0,0 +1,17 @@ + +>>> [CLI] bundle deploy +Uploading bundle files to /Workspace/Users/[USERNAME]/.bundle/test-bundle/default/files... +Deploying resources... +Updating deployment state... +Deployment complete! + +>>> [CLI] bundle deploy +Uploading bundle files to /Workspace/Users/[USERNAME]/.bundle/test-bundle/default/files... +Deploying resources... +Warn: deploying resources.pipelines.foo.permissions: retrying after 504 Gateway Timeout from PUT /api/2.0/permissions/pipelines/[UUID] +Updating deployment state... +Deployment complete! + +>>> print_requests.py //api/2.0/permissions/pipelines +"PUT /api/2.0/permissions/pipelines/[UUID]" +"PUT /api/2.0/permissions/pipelines/[UUID]" diff --git a/acceptance/bundle/resources/permissions/pipelines/504/update/script b/acceptance/bundle/resources/permissions/pipelines/504/update/script new file mode 100644 index 00000000000..ed50417534c --- /dev/null +++ b/acceptance/bundle/resources/permissions/pipelines/504/update/script @@ -0,0 +1,13 @@ +trace $CLI bundle deploy + +update_file.py databricks.yml CAN_VIEW CAN_MANAGE + +# Inject a single 504 on the first permissions PUT to simulate a transient error. +# The retrying adapter should retry after DATABRICKS_BUNDLE_RETRY_INTERVAL_MS and succeed. +fault.py "PUT /api/2.0/permissions/pipelines/*" 504 0 1 + +rm out.requests.txt +trace $CLI bundle deploy + +# Two PUT requests should appear: the initial 504 and the successful retry. +trace print_requests.py //api/2.0/permissions/pipelines | jq '.method + " " + .path' diff --git a/bundle/direct/apply.go b/bundle/direct/apply.go index 1843a83b08f..a4a61f727f2 100644 --- a/bundle/direct/apply.go +++ b/bundle/direct/apply.go @@ -8,12 +8,14 @@ import ( "reflect" "github.com/databricks/cli/bundle/deployplan" + "github.com/databricks/cli/bundle/direct/dresources" "github.com/databricks/cli/bundle/direct/dstate" "github.com/databricks/cli/libs/log" "github.com/databricks/databricks-sdk-go/apierr" ) func (d *DeploymentUnit) Destroy(ctx context.Context, db *dstate.DeploymentState) error { + ctx = log.WithPrefix(ctx, "destroying "+d.ResourceKey) id := db.GetResourceID(d.ResourceKey) if id == "" { log.Infof(ctx, "Cannot delete %s: missing from state", d.ResourceKey) @@ -24,6 +26,7 @@ func (d *DeploymentUnit) Destroy(ctx context.Context, db *dstate.DeploymentState } func (d *DeploymentUnit) Deploy(ctx context.Context, db *dstate.DeploymentState, newState any, actionType deployplan.ActionType, planEntry *deployplan.PlanEntry) error { + ctx = log.WithPrefix(ctx, "deploying "+d.ResourceKey) if actionType == deployplan.Create { return d.Create(ctx, db, newState) } @@ -48,7 +51,18 @@ func (d *DeploymentUnit) Deploy(ctx context.Context, db *dstate.DeploymentState, } func (d *DeploymentUnit) Create(ctx context.Context, db *dstate.DeploymentState, newState any) error { - newID, remoteState, err := d.Adapter.DoCreate(ctx, newState) + var newID string + var remoteState any + _, err := retryWith(ctx, func(err error) bool { + // For DoCreate, retry feature is opt-in via retrySafe(err) error wrapper + _, ok := errors.AsType[*dresources.RetrySafeError](err) + return ok && isTransient(ctx, err) + }, func() (struct{}, error) { + var e error + newID, remoteState, e = d.Adapter.DoCreate(ctx, newState) + return struct{}{}, e + }) + err = dresources.UnwrapRetrySafe(err) if err != nil { // No need to prefix error, there is no ambiguity (only one operation - DoCreate) and no additional context (like id) return err @@ -66,7 +80,9 @@ func (d *DeploymentUnit) Create(ctx context.Context, db *dstate.DeploymentState, return fmt.Errorf("saving state after creating id=%s: %w", newID, err) } - waitRemoteState, err := d.Adapter.WaitAfterCreate(ctx, newID, newState) + waitRemoteState, err := retryOnTransient(ctx, func() (any, error) { + return d.Adapter.WaitAfterCreate(ctx, newID, newState) + }) if err != nil { return fmt.Errorf("waiting after creating id=%s: %w", newID, err) } @@ -89,7 +105,7 @@ func (d *DeploymentUnit) Recreate(ctx context.Context, db *dstate.DeploymentStat // MANAGED_BY_PARENT is still disregarded — the subsequent Create with // replace_existing=true will reconfigure the parent-managed resource in // place, matching the Terraform provider's recreate behaviour. - err = d.Adapter.DoDelete(ctx, oldID, oldState) + err = retryOnTransientErr(ctx, func() error { return d.Adapter.DoDelete(ctx, oldID, oldState) }) if err != nil && !isResourceGone(err) && !isManagedByParent(err) { return fmt.Errorf("deleting old id=%s: %w", oldID, err) } @@ -118,7 +134,9 @@ func (d *DeploymentUnit) Update(ctx context.Context, db *dstate.DeploymentState, return fmt.Errorf("internal error: DoUpdate not implemented for resource %s", d.ResourceKey) } - remoteState, err := d.Adapter.DoUpdate(ctx, id, newState, planEntry) + remoteState, err := retryOnTransient(ctx, func() (any, error) { + return d.Adapter.DoUpdate(ctx, id, newState, planEntry) + }) if err != nil { return fmt.Errorf("updating id=%s: %w", id, err) } @@ -133,7 +151,9 @@ func (d *DeploymentUnit) Update(ctx context.Context, db *dstate.DeploymentState, return fmt.Errorf("saving state id=%s: %w", id, err) } - waitRemoteState, err := d.Adapter.WaitAfterUpdate(ctx, id, newState) + waitRemoteState, err := retryOnTransient(ctx, func() (any, error) { + return d.Adapter.WaitAfterUpdate(ctx, id, newState) + }) if err != nil { return fmt.Errorf("waiting after updating id=%s: %w", id, err) } @@ -148,7 +168,13 @@ func (d *DeploymentUnit) Update(ctx context.Context, db *dstate.DeploymentState, } func (d *DeploymentUnit) UpdateWithID(ctx context.Context, db *dstate.DeploymentState, oldID string, newState any) error { - newID, remoteState, err := d.Adapter.DoUpdateWithID(ctx, oldID, newState) + var newID string + var remoteState any + err := retryOnTransientErr(ctx, func() error { + var e error + newID, remoteState, e = d.Adapter.DoUpdateWithID(ctx, oldID, newState) + return e + }) if err != nil { return fmt.Errorf("updating id=%s: %w", oldID, err) } @@ -169,7 +195,9 @@ func (d *DeploymentUnit) UpdateWithID(ctx context.Context, db *dstate.Deployment return fmt.Errorf("saving state id=%s: %w", oldID, err) } - waitRemoteState, err := d.Adapter.WaitAfterUpdate(ctx, newID, newState) + waitRemoteState, err := retryOnTransient(ctx, func() (any, error) { + return d.Adapter.WaitAfterUpdate(ctx, newID, newState) + }) if err != nil { return fmt.Errorf("waiting after updating id=%s: %w", newID, err) } @@ -219,7 +247,7 @@ func (d *DeploymentUnit) Delete(ctx context.Context, db *dstate.DeploymentState, } func (d *DeploymentUnit) Resize(ctx context.Context, db *dstate.DeploymentState, id string, newState any) error { - err := d.Adapter.DoResize(ctx, id, newState) + err := retryOnTransientErr(ctx, func() error { return d.Adapter.DoResize(ctx, id, newState) }) if err != nil { return fmt.Errorf("resizing id=%s: %w", id, err) } @@ -263,7 +291,9 @@ func (d *DeploymentUnit) refreshRemoteState(ctx context.Context, id string) erro if d.RemoteState != nil { return nil } - remoteState, err := d.Adapter.DoRead(ctx, id) + remoteState, err := retryOnTransient(ctx, func() (any, error) { + return d.Adapter.DoRead(ctx, id) + }) if err != nil { return fmt.Errorf("failed to refresh remote state id=%s: %w", id, err) } diff --git a/bundle/direct/bundle_plan.go b/bundle/direct/bundle_plan.go index e84078a4852..79ecb37f8dd 100644 --- a/bundle/direct/bundle_plan.go +++ b/bundle/direct/bundle_plan.go @@ -122,6 +122,7 @@ func (b *DeploymentBundle) CalculatePlan(ctx context.Context, client *databricks // We're processing resources in DAG order because we're resolving references (that can be resolved at plan stage). g.Run(defaultParallelism, func(resourceKey string, failedDependency *string) bool { errorPrefix := "cannot plan " + resourceKey + ctx := log.WithPrefix(ctx, "planning "+resourceKey) entry, err := plan.WriteLockEntry(resourceKey) if err != nil { @@ -155,7 +156,9 @@ func (b *DeploymentBundle) CalculatePlan(ctx context.Context, client *databricks return false } - remoteState, err := adapter.DoRead(ctx, id) + remoteState, err := retryOnTransient(ctx, func() (any, error) { + return adapter.DoRead(ctx, id) + }) if err != nil { if isResourceGone(err) { // no such resource @@ -210,7 +213,9 @@ func (b *DeploymentBundle) CalculatePlan(ctx context.Context, client *databricks return false } - remoteState, err := adapter.DoRead(ctx, dbentry.ID) + remoteState, err := retryOnTransient(ctx, func() (any, error) { + return adapter.DoRead(ctx, dbentry.ID) + }) if err != nil { if isResourceGone(err) { remoteState = nil diff --git a/bundle/direct/dresources/grants.go b/bundle/direct/dresources/grants.go index d5f4c54cbe1..596346f1614 100644 --- a/bundle/direct/dresources/grants.go +++ b/bundle/direct/dresources/grants.go @@ -110,7 +110,8 @@ func (r *ResourceGrants) DoRead(ctx context.Context, id string) (*GrantsState, e func (r *ResourceGrants) DoCreate(ctx context.Context, state *GrantsState) (string, *GrantsState, error) { _, err := r.DoUpdate(ctx, "", state, nil) if err != nil { - return "", nil, err + // Grants Update is idempotent (additive PATCH), so retrying on transient errors is safe. + return "", nil, retrySafe(err) } return state.SecurableType + "/" + state.FullName, nil, nil diff --git a/bundle/direct/dresources/permissions.go b/bundle/direct/dresources/permissions.go index 6755dac69ca..91ca9000aaf 100644 --- a/bundle/direct/dresources/permissions.go +++ b/bundle/direct/dresources/permissions.go @@ -219,7 +219,8 @@ func (r *ResourcePermissions) DoCreate(ctx context.Context, newState *Permission // should we remember the default here? _, err := r.DoUpdate(ctx, newState.ObjectID, newState, nil) if err != nil { - return "", nil, err + // Permissions Set is idempotent (PUT), so retrying on transient errors is safe. + return "", nil, retrySafe(err) } return newState.ObjectID, nil, nil diff --git a/bundle/direct/dresources/retry.go b/bundle/direct/dresources/retry.go new file mode 100644 index 00000000000..e7aa77fa956 --- /dev/null +++ b/bundle/direct/dresources/retry.go @@ -0,0 +1,29 @@ +package dresources + +import "errors" + +// RetrySafeError wraps an error to signal that the failed DoCreate is safe to retry. +type RetrySafeError struct { + err error +} + +func (e *RetrySafeError) Error() string { return e.err.Error() } +func (e *RetrySafeError) Unwrap() error { return e.err } + +// retrySafe wraps err to mark the operation as safe to retry from DoCreate. +// Use this when the create is idempotent (e.g. a PUT that can be repeated safely). +func retrySafe(err error) error { + if err == nil { + return nil + } + return &RetrySafeError{err: err} +} + +// UnwrapRetrySafe removes the retrySafe wrapper, returning the underlying error. +// If err is not retrySafe-wrapped, it returns err unchanged. +func UnwrapRetrySafe(err error) error { + if safe, ok := errors.AsType[*RetrySafeError](err); ok { + return safe.err + } + return err +} diff --git a/bundle/direct/retry.go b/bundle/direct/retry.go new file mode 100644 index 00000000000..bbe066c6fee --- /dev/null +++ b/bundle/direct/retry.go @@ -0,0 +1,85 @@ +package direct + +import ( + "context" + "errors" + "fmt" + "net/http" + "strconv" + "time" + + bundleenv "github.com/databricks/cli/bundle/env" + "github.com/databricks/cli/libs/log" + "github.com/databricks/databricks-sdk-go/apierr" +) + +const maxRetries = 2 + +var defaultRetryInterval = 15 * time.Second + +func retryInterval(ctx context.Context) time.Duration { + v, ok := bundleenv.RetryIntervalMs(ctx) + if !ok { + return defaultRetryInterval + } + ms, err := strconv.Atoi(v) + if err != nil { + return defaultRetryInterval + } + return time.Duration(ms) * time.Millisecond +} + +// isTransient returns true for 504 errors that the SDK did not already retry. +// The SDK retries 504s matching its allTransientErrors patterns (e.g. "deadline exceeded"); +// this covers the remaining 504s like TEMPORARILY_UNAVAILABLE. +func isTransient(ctx context.Context, err error) bool { + apiErr, ok := errors.AsType[*apierr.APIError](err) + if !ok { + return false + } + if apiErr.IsRetriable(ctx) { + // Already handled by SDK + return false + } + return apiErr.StatusCode == http.StatusGatewayTimeout +} + +// retryWith retries fn while check returns true for the error, up to maxRetries times. +func retryWith[T any](ctx context.Context, check func(error) bool, fn func() (T, error)) (T, error) { + interval := retryInterval(ctx) + for attempt := 0; ; attempt++ { + result, err := fn() + if err == nil || attempt >= maxRetries || !check(err) { + return result, err + } + msg := "retrying" + if apiErr, ok := errors.AsType[*apierr.APIError](err); ok { + endpoint := "" + if rw := apiErr.ResponseWrapper; rw != nil && rw.Response != nil && rw.Response.Request != nil { + req := rw.Response.Request + endpoint = fmt.Sprintf(" from %s %s", req.Method, req.URL.Path) + } + msg = fmt.Sprintf("retrying after %d %s%s", apiErr.StatusCode, http.StatusText(apiErr.StatusCode), endpoint) + } + log.Warnf(ctx, "%s", msg) + select { + case <-ctx.Done(): + var zero T + return zero, ctx.Err() + case <-time.After(interval): + } + } +} + +// retryOnTransient retries fn on transient 504 errors that the SDK did not already handle. +func retryOnTransient[T any](ctx context.Context, fn func() (T, error)) (T, error) { + return retryWith(ctx, func(err error) bool { return isTransient(ctx, err) }, fn) +} + +// retryOnTransientErr wraps retryOnTransient for functions that return only an error. +func retryOnTransientErr(ctx context.Context, fn func() error) error { + _, err := retryOnTransient(ctx, func() (struct{}, error) { + return struct{}{}, fn() + }) + return err +} diff --git a/bundle/env/retry_interval.go b/bundle/env/retry_interval.go new file mode 100644 index 00000000000..d00d88e0b25 --- /dev/null +++ b/bundle/env/retry_interval.go @@ -0,0 +1,11 @@ +package env + +import "context" + +// RetryIntervalMsVariable names the environment variable that overrides the retry interval for bundle operations. +const RetryIntervalMsVariable = "DATABRICKS_BUNDLE_RETRY_INTERVAL_MS" + +// RetryIntervalMs returns the retry interval override (in milliseconds) for bundle operations. +func RetryIntervalMs(ctx context.Context) (string, bool) { + return get(ctx, []string{RetryIntervalMsVariable}) +} diff --git a/libs/log/context.go b/libs/log/context.go index 5e3e8ccb652..457b918b149 100644 --- a/libs/log/context.go +++ b/libs/log/context.go @@ -9,6 +9,22 @@ type logger int var loggerKey logger +type prefixKeyType struct{} + +// WithPrefix returns a context that prepends prefix to every log message emitted from it. +// Calling WithPrefix on a context that already has a prefix appends with ": ". +func WithPrefix(ctx context.Context, prefix string) context.Context { + if existing, _ := ctx.Value(prefixKeyType{}).(string); existing != "" { + prefix = existing + ": " + prefix + } + return context.WithValue(ctx, prefixKeyType{}, prefix) +} + +func getPrefix(ctx context.Context) string { + prefix, _ := ctx.Value(prefixKeyType{}).(string) + return prefix +} + // NewContext returns a new Context that carries the specified logger. // // Discussion why this is not part of slog itself: https://github.com/golang/go/issues/58243. diff --git a/libs/log/logger.go b/libs/log/logger.go index 74347b1ab19..0687634ec73 100644 --- a/libs/log/logger.go +++ b/libs/log/logger.go @@ -20,6 +20,9 @@ func GetLogger(ctx context.Context) *slog.Logger { // helper function to abstract logging a string message. func log(ctx context.Context, logger *slog.Logger, level slog.Level, msg string) { + if prefix := getPrefix(ctx); prefix != "" { + msg = prefix + ": " + msg + } var pcs [1]uintptr // skip [runtime.Callers, this function, this function's caller]. runtime.Callers(3, pcs[:]) diff --git a/libs/testserver/fault.go b/libs/testserver/fault.go new file mode 100644 index 00000000000..49b98b0f25f --- /dev/null +++ b/libs/testserver/fault.go @@ -0,0 +1,102 @@ +package testserver + +import ( + "encoding/json" + "strings" + "sync" +) + +type faultRuleKey struct { + token string + pattern string +} + +// FaultRule describes a single injected fault: HTTP status, body, and remaining fire count. +type FaultRule struct { + StatusCode int + Body string + offset int + times int +} + +// FaultRules holds the active fault injection rules for a test server. +type FaultRules struct { + mu sync.Mutex + rules map[faultRuleKey]*FaultRule +} + +// NewFaultRules returns an empty FaultRules. +func NewFaultRules() *FaultRules { + return &FaultRules{rules: make(map[faultRuleKey]*FaultRule)} +} + +// Set registers or replaces a fault rule for the given token and pattern. +func (fr *FaultRules) Set(token, pattern string, statusCode int, body string, offset, times int) { + fr.mu.Lock() + defer fr.mu.Unlock() + fr.rules[faultRuleKey{token: token, pattern: pattern}] = &FaultRule{ + StatusCode: statusCode, + Body: body, + offset: offset, + times: times, + } +} + +// Check returns a matching fault rule and advances its counters, or nil if no rule matches. +// Pattern supports a trailing "*" wildcard, e.g. "PUT /api/2.0/permissions/pipelines/*". +func (fr *FaultRules) Check(method, path, token string) *FaultRule { + requestPattern := method + " " + path + + fr.mu.Lock() + defer fr.mu.Unlock() + + for key, rule := range fr.rules { + if key.token != token { + continue + } + rulePattern := key.pattern + var matched bool + if strings.HasSuffix(rulePattern, "*") { + matched = strings.HasPrefix(requestPattern, rulePattern[:len(rulePattern)-1]) + } else { + matched = requestPattern == rulePattern + } + if !matched { + continue + } + if rule.offset > 0 { + rule.offset-- + return nil + } + if rule.times <= 0 { + delete(fr.rules, key) + return nil + } + rule.times-- + if rule.times == 0 { + delete(fr.rules, key) + } + result := *rule + return &result + } + + return nil +} + +// faultEndpointHandler handles POST /__testserver/fault. +func faultEndpointHandler(fr *FaultRules) HandlerFunc { + return func(req Request) any { + var body struct { + Pattern string `json:"pattern"` + StatusCode int `json:"status_code"` + Body string `json:"body"` + Offset int `json:"offset"` + Times int `json:"times"` + } + if err := json.Unmarshal(req.Body, &body); err != nil { + return Response{StatusCode: 400, Body: map[string]string{"error": err.Error()}} + } + fr.Set(req.Token, body.Pattern, body.StatusCode, body.Body, body.Offset, body.Times) + return Response{StatusCode: 200} + } +} diff --git a/libs/testserver/fault_test.go b/libs/testserver/fault_test.go new file mode 100644 index 00000000000..2366fb733c1 --- /dev/null +++ b/libs/testserver/fault_test.go @@ -0,0 +1,57 @@ +package testserver_test + +import ( + "testing" + + "github.com/databricks/cli/libs/testserver" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestFaultRulesNoMatch(t *testing.T) { + fr := testserver.NewFaultRules() + fr.Set("tok", "GET /foo", 504, "body", 0, 1) + + assert.Nil(t, fr.Check("POST", "/foo", "tok")) + assert.Nil(t, fr.Check("GET", "/bar", "tok")) + assert.Nil(t, fr.Check("GET", "/foo", "other")) +} + +func TestFaultRulesExactMatch(t *testing.T) { + fr := testserver.NewFaultRules() + fr.Set("tok", "PUT /api/2.0/jobs/123", 504, "body", 0, 1) + + rule := fr.Check("PUT", "/api/2.0/jobs/123", "tok") + require.NotNil(t, rule) + assert.Equal(t, 504, rule.StatusCode) + assert.Equal(t, "body", rule.Body) +} + +func TestFaultRulesWildcardMatch(t *testing.T) { + fr := testserver.NewFaultRules() + fr.Set("tok", "PUT /api/2.0/permissions/pipelines/*", 504, "body", 0, 2) + + assert.NotNil(t, fr.Check("PUT", "/api/2.0/permissions/pipelines/abc", "tok")) + assert.NotNil(t, fr.Check("PUT", "/api/2.0/permissions/pipelines/xyz", "tok")) + assert.Nil(t, fr.Check("PUT", "/api/2.0/permissions/pipelines/xyz", "tok")) // exhausted +} + +func TestFaultRulesOffset(t *testing.T) { + fr := testserver.NewFaultRules() + fr.Set("tok", "GET /foo", 504, "body", 2, 1) + + assert.Nil(t, fr.Check("GET", "/foo", "tok")) // offset 2→1 + assert.Nil(t, fr.Check("GET", "/foo", "tok")) // offset 1→0 + assert.NotNil(t, fr.Check("GET", "/foo", "tok")) // fires + assert.Nil(t, fr.Check("GET", "/foo", "tok")) // exhausted +} + +func TestFaultRulesTimes(t *testing.T) { + fr := testserver.NewFaultRules() + fr.Set("tok", "GET /foo", 504, "body", 0, 3) + + for range 3 { + assert.NotNil(t, fr.Check("GET", "/foo", "tok")) + } + assert.Nil(t, fr.Check("GET", "/foo", "tok")) // exhausted +} diff --git a/libs/testserver/server.go b/libs/testserver/server.go index 7d4111e7e18..eb0268d694f 100644 --- a/libs/testserver/server.go +++ b/libs/testserver/server.go @@ -46,7 +46,8 @@ type Server struct { fakeOidc *FakeOidc mu sync.Mutex - kills *killRules + kills *killRules + faults *FaultRules RequestCallback func(request *Request) ResponseCallback func(request *Request, response *EncodedResponse) @@ -204,6 +205,7 @@ func getHeaders(value []byte) http.Header { func New(t testutil.TestingT) *Server { router := NewRouter() kills := newKillRules() + faults := NewFaultRules() // Wrap the router so kill rules fire for ALL requests, including those with // no registered handler that would otherwise bypass serve() entirely. @@ -225,6 +227,7 @@ func New(t testutil.TestingT) *Server { fakeWorkspaces: map[string]*FakeWorkspace{}, fakeOidc: &FakeOidc{url: server.URL}, kills: kills, + faults: faults, } router.Dispatch = s.serve @@ -274,8 +277,9 @@ Response.Body = '' }) router.NotFound = notFoundFunc - // Register a test-only endpoint for setting up kill rules from scripts. + // Register test-only endpoints for setting up kill and fault rules from scripts. s.Handle("POST", "/__testserver/kill", killEndpointHandler(s.kills)) + s.Handle("POST", "/__testserver/fault", faultEndpointHandler(s.faults)) // Register a default handler for the SDK's host metadata discovery endpoint. // The SDK resolves this during config initialization (as of v0.126.0) to @@ -324,7 +328,13 @@ func (s *Server) serve(w http.ResponseWriter, r *http.Request, handler HandlerFu var resp EncodedResponse - if bytes.Contains(request.Body, []byte("INJECT_ERROR")) { + if rule := s.faults.Check(r.Method, r.URL.Path, token); rule != nil { + resp = EncodedResponse{ + StatusCode: rule.StatusCode, + Body: []byte(rule.Body), + Headers: getJsonHeaders(), + } + } else if bytes.Contains(request.Body, []byte("INJECT_ERROR")) { resp = EncodedResponse{ StatusCode: 500, Body: []byte("INJECTED"),