From 99848871e6b10fd7c0320f7e641fd3fcdd5740e8 Mon Sep 17 00:00:00 2001 From: avallete Date: Fri, 24 Apr 2026 15:51:27 +0200 Subject: [PATCH] fix: improve error handling and output formatting in pg-delta apply process - Updated the `runDeclarativeSync` function to avoid wrapping SQL output with `utils.Bold`, preventing excessive whitespace in multi-line SQL. - Changed the result accumulation in `migra.ts` from string concatenation to an array for better performance and clarity. - Enhanced the `ApplyResult` struct to include `ValidationErrors` and `Diagnostics`, allowing for more detailed error reporting. - Modified the `formatApplyFailure` function to include validation errors and diagnostics in the output, improving user feedback on apply failures. - Added tests for validation error handling in `apply_test.go` to ensure robustness against various error scenarios. --- cmd/db_schema_declarative.go | 5 +- internal/db/diff/templates/migra.ts | 12 +- internal/pgdelta/apply.go | 157 +++++++++++- internal/pgdelta/apply_test.go | 231 +++++++++++++++++- .../templates/pgdelta_declarative_apply.ts | 7 + 5 files changed, 400 insertions(+), 12 deletions(-) diff --git a/cmd/db_schema_declarative.go b/cmd/db_schema_declarative.go index 4c68d04292..8cb98b9912 100644 --- a/cmd/db_schema_declarative.go +++ b/cmd/db_schema_declarative.go @@ -321,7 +321,10 @@ func runDeclarativeSync(cmd *cobra.Command, args []string) error { return nil } fmt.Fprintln(os.Stderr, "Generated migration SQL:") - fmt.Fprintln(os.Stderr, utils.Bold(result.DiffSQL)) + // Don't wrap with utils.Bold: lipgloss renders multi-line input as a block + // and pads every line with trailing spaces to match the widest line, which + // produces a wall of whitespace for long CREATE FUNCTION bodies. + fmt.Fprintln(os.Stderr, result.DiffSQL) // Step 4: Resolve migration name migrationName := resolveDeclarativeMigrationName(declarativeName, declarativeFile) diff --git a/internal/db/diff/templates/migra.ts b/internal/db/diff/templates/migra.ts index b2fd1ab526..fa44431a77 100644 --- a/internal/db/diff/templates/migra.ts +++ b/internal/db/diff/templates/migra.ts @@ -52,7 +52,7 @@ try { // Force schema qualified references for pg_get_expr await clientHead.query(sql`set search_path = ''`); await clientBase.query(sql`set search_path = ''`); - let result = ""; + const result: string[] = []; for (const schema of includedSchemas) { const m = await Migration.create(clientBase, clientHead, { schema, @@ -67,7 +67,7 @@ try { } else { m.add_all_changes(true); } - result += m.sql; + result.push(m.sql); } if (includedSchemas.length === 0) { // Migra does not ignore custom types and triggers created by extensions, so we diff @@ -80,7 +80,7 @@ try { e.set_safety(false); e.add(e.changes.schemas({ creations_only: true })); e.add_extension_changes(); - result += e.sql; + result.push(e.sql); } // Diff user defined entities in non-managed schemas, including extensions. const m = await Migration.create(clientBase, clientHead, { @@ -93,7 +93,7 @@ try { }); m.set_safety(false); m.add_all_changes(true); - result += m.sql; + result.push(m.sql); // For managed schemas, we want to include triggers and RLS policies only. for (const schema of managedSchemas) { const s = await Migration.create(clientBase, clientHead, { @@ -105,10 +105,10 @@ try { s.add(s.changes.rlspolicies({ drops_only: true })); s.add(s.changes.rlspolicies({ creations_only: true })); s.add(s.changes.triggers({ creations_only: true })); - result += s.sql; + result.push(s.sql); } } - console.log(result); + console.log(result.join("")); } catch (e) { if (sslDebug) { if (e instanceof Error) { diff --git a/internal/pgdelta/apply.go b/internal/pgdelta/apply.go index 3683d6269e..2c57eb506b 100644 --- a/internal/pgdelta/apply.go +++ b/internal/pgdelta/apply.go @@ -31,6 +31,11 @@ type ApplyResult struct { TotalSkipped int `json:"totalSkipped"` Errors []ApplyIssue `json:"errors"` StuckStatements []ApplyIssue `json:"stuckStatements"` + // ValidationErrors captures failures from pg-delta's final + // check_function_bodies=on pass. They are reported even when all + // statements applied cleanly, so must be surfaced explicitly. + ValidationErrors []ApplyIssue `json:"validationErrors,omitempty"` + Diagnostics []ApplyDiagnosis `json:"diagnostics,omitempty"` } // ApplyIssue models a pg-delta apply error or stuck statement. @@ -42,6 +47,71 @@ type ApplyIssue struct { Code string `json:"code,omitempty"` Message string `json:"message,omitempty"` IsDependencyError bool `json:"isDependencyError,omitempty"` + Position int `json:"position,omitempty"` + Detail string `json:"detail,omitempty"` + Hint string `json:"hint,omitempty"` +} + +// ApplyDiagnosis mirrors pg-topo's Diagnostic entries: static-analysis +// warnings that are surfaced alongside the apply result but don't cause +// failure on their own. Shape must stay in sync with the pg-topo package. +// +// UnmarshalJSON is implemented defensively so new or changed fields in +// pg-topo's Diagnostic do not break the whole apply result parse. Losing a +// diagnostic here would also swallow validationErrors and stuckStatements, +// leaving the user with a useless "failed to parse pg-delta apply output" +// message instead of the actual SQL error. +type ApplyDiagnosis struct { + Code string `json:"code,omitempty"` + Message string `json:"message,omitempty"` + StatementID *ApplyStatementLocation `json:"statementId,omitempty"` + SuggestedFix string `json:"suggestedFix,omitempty"` +} + +// ApplyStatementLocation matches pg-topo's StatementId shape. +type ApplyStatementLocation struct { + FilePath string `json:"filePath,omitempty"` + StatementIndex int `json:"statementIndex,omitempty"` + SourceOffset int `json:"sourceOffset,omitempty"` +} + +func (d *ApplyDiagnosis) UnmarshalJSON(data []byte) error { + trimmed := bytes.TrimSpace(data) + if bytes.Equal(trimmed, []byte("null")) { + *d = ApplyDiagnosis{} + return nil + } + // Unmarshal into a shadow type first so an unexpected statementId shape + // (string, missing fields, future additions) degrades gracefully instead + // of aborting the whole ApplyResult parse. + var raw struct { + Code string `json:"code"` + Message string `json:"message"` + StatementID json.RawMessage `json:"statementId"` + SuggestedFix string `json:"suggestedFix"` + } + if err := json.Unmarshal(trimmed, &raw); err != nil { + return err + } + d.Code = raw.Code + d.Message = raw.Message + d.SuggestedFix = raw.SuggestedFix + if len(bytes.TrimSpace(raw.StatementID)) == 0 || bytes.Equal(bytes.TrimSpace(raw.StatementID), []byte("null")) { + d.StatementID = nil + return nil + } + var loc ApplyStatementLocation + if err := json.Unmarshal(raw.StatementID, &loc); err == nil { + d.StatementID = &loc + return nil + } + // Fallback: accept a bare string (older pg-topo revisions) so we keep + // something printable instead of dropping the diagnostic entirely. + var asString string + if err := json.Unmarshal(raw.StatementID, &asString); err == nil { + d.StatementID = &ApplyStatementLocation{FilePath: asString} + } + return nil } type ApplyStatement struct { @@ -70,7 +140,13 @@ func (i *ApplyIssue) UnmarshalJSON(data []byte) error { return nil } -func formatApplyFailure(result ApplyResult) string { +// formatApplyFailure renders a human-readable summary of an unsuccessful +// pg-delta apply result. When verbose is false (the default CLI output), +// pg-topo diagnostics are collapsed to a single-line summary because they are +// static-analysis warnings – not fatal errors – and can number in the +// hundreds for large schemas. Passing verbose=true (set by --debug) expands +// them to the full per-diagnostic listing. +func formatApplyFailure(result ApplyResult, verbose bool) string { totalStatements := result.TotalStatements if totalStatements == 0 { totalStatements = result.TotalApplied + result.TotalSkipped + len(result.StuckStatements) @@ -91,6 +167,33 @@ func formatApplyFailure(result ApplyResult) string { lines = append(lines, formatApplyIssue(issue)) } } + if len(result.ValidationErrors) > 0 { + lines = append(lines, "Validation errors (from check_function_bodies=on pass):") + for _, issue := range result.ValidationErrors { + lines = append(lines, formatApplyIssue(issue)) + } + } + if len(result.Diagnostics) > 0 { + if verbose { + lines = append(lines, "Diagnostics:") + for _, d := range result.Diagnostics { + lines = append(lines, formatApplyDiagnosis(d)) + } + } else { + lines = append(lines, fmt.Sprintf("%d pg-topo diagnostic(s) omitted (re-run with --debug to view).", len(result.Diagnostics))) + } + } + // pg-delta may report status "error" without populating any issue arrays + // (e.g. an internal assertion in a future pg-delta release). Tell the user + // how to collect more information rather than leaving them with just the + // bare status line. + if len(result.Errors) == 0 && len(result.StuckStatements) == 0 && len(result.ValidationErrors) == 0 { + lines = append(lines, + "No per-statement diagnostics were reported by pg-delta.", + "Re-run with --debug to print the raw pg-delta payload, or open an issue at", + "https://github.com/supabase/pg-toolbelt/issues with the debug bundle attached.", + ) + } return strings.Join(lines, "\n") } @@ -104,6 +207,12 @@ func formatApplyIssue(issue ApplyIssue) string { } lines := []string{title} lines = append(lines, " "+formatApplyIssueMessage(issue)) + if detail := strings.TrimSpace(issue.Detail); detail != "" { + lines = append(lines, " Detail: "+detail) + } + if hint := strings.TrimSpace(issue.Hint); hint != "" { + lines = append(lines, " Hint: "+hint) + } if sql := formatStatementSQL(issue.Statement.SQL); sql != "" { lines = append(lines, " SQL: "+sql) } @@ -119,6 +228,9 @@ func formatApplyIssueMessage(issue ApplyIssue) string { if issue.Code != "" { metadata = append(metadata, "SQLSTATE "+issue.Code) } + if issue.Position > 0 { + metadata = append(metadata, fmt.Sprintf("position %d", issue.Position)) + } if issue.IsDependencyError { metadata = append(metadata, "dependency error") } @@ -128,6 +240,39 @@ func formatApplyIssueMessage(issue ApplyIssue) string { return fmt.Sprintf("%s (%s)", message, strings.Join(metadata, ", ")) } +func formatApplyDiagnosis(d ApplyDiagnosis) string { + message := strings.TrimSpace(d.Message) + if message == "" { + message = "unknown pg-delta diagnostic" + } + parts := []string{"- "} + if code := strings.TrimSpace(d.Code); code != "" { + parts = append(parts, "["+code+"] ") + } + parts = append(parts, message) + if loc := formatStatementLocation(d.StatementID); loc != "" { + parts = append(parts, " ("+loc+")") + } + if fix := strings.TrimSpace(d.SuggestedFix); fix != "" { + parts = append(parts, "\n Suggested fix: "+fix) + } + return strings.Join(parts, "") +} + +func formatStatementLocation(loc *ApplyStatementLocation) string { + if loc == nil { + return "" + } + path := strings.TrimSpace(loc.FilePath) + if path == "" { + return "" + } + if loc.StatementIndex > 0 { + return fmt.Sprintf("%s#%d", path, loc.StatementIndex) + } + return path +} + func formatStatementSQL(sql string) string { normalized := strings.Join(strings.Fields(sql), " ") const maxLen = 120 @@ -188,13 +333,17 @@ func ApplyDeclarative(ctx context.Context, config pgconn.Config, fsys afero.Fs) return errors.Errorf("failed to parse pg-delta apply output: %w", err) } if result.Status != "success" { - if viper.GetBool("DEBUG") { + // Always print the human-readable summary so failures are actionable + // even when --debug is set. In debug mode the summary also expands + // pg-topo diagnostics inline and we additionally dump the raw + // pg-delta payload so users can forward it when reporting bugs. + verbose := viper.GetBool("DEBUG") + fmt.Fprintln(os.Stderr, formatApplyFailure(result, verbose)) + if verbose { if debugJSON := formatDebugJSON(stdout.Bytes()); len(debugJSON) > 0 { fmt.Fprintln(os.Stderr, "pg-delta apply result:") fmt.Fprintln(os.Stderr, debugJSON) } - } else { - fmt.Fprintln(os.Stderr, formatApplyFailure(result)) } return errors.Errorf("pg-delta declarative apply failed with status: %s", result.Status) } diff --git a/internal/pgdelta/apply_test.go b/internal/pgdelta/apply_test.go index af9cfb0019..bef780269e 100644 --- a/internal/pgdelta/apply_test.go +++ b/internal/pgdelta/apply_test.go @@ -79,7 +79,7 @@ func TestFormatApplyFailure(t *testing.T) { }, } - formatted := formatApplyFailure(result) + formatted := formatApplyFailure(result, false) assertContains(t, formatted, `pg-delta apply returned status "stuck"`) assertContains(t, formatted, `29/34 statements applied in 2 round(s)`) assertContains(t, formatted, `cluster/extensions/pgmq.sql:0 [CREATE_EXTENSION]`) @@ -87,9 +87,238 @@ func TestFormatApplyFailure(t *testing.T) { assertContains(t, formatted, `SQL: CREATE EXTENSION pgmq WITH SCHEMA pgmq;`) } +// TestApplyResultUnmarshalValidationErrors reproduces the payload shape pg-delta +// emits when the final check_function_bodies=on pass fails: totalApplied +// matches totalStatements, errors and stuckStatements are empty, but status is +// "error" because validationErrors is non-empty. +func TestApplyResultUnmarshalValidationErrors(t *testing.T) { + raw := []byte(`{ + "status": "error", + "totalStatements": 1633, + "totalRounds": 1, + "totalApplied": 1633, + "totalSkipped": 0, + "errors": [], + "stuckStatements": [], + "validationErrors": [ + { + "statement": { + "id": "public/functions/my_function.sql:0", + "sql": "CREATE FUNCTION public.my_function() RETURNS integer LANGUAGE sql AS $$ SELECT missing_column FROM users $$;", + "statementClass": "CREATE_FUNCTION" + }, + "code": "42703", + "message": "column \"missing_column\" does not exist", + "isDependencyError": false, + "position": 8, + "hint": "Perhaps you meant to reference the column \"users.missing_column_renamed\"." + } + ] + }`) + + var result ApplyResult + if err := json.Unmarshal(raw, &result); err != nil { + t.Fatalf("json.Unmarshal() error = %v", err) + } + + if got, want := len(result.ValidationErrors), 1; got != want { + t.Fatalf("len(ValidationErrors) = %d, want %d", got, want) + } + + issue := result.ValidationErrors[0] + if issue.Statement == nil { + t.Fatal("expected structured statement details") + } + if got, want := issue.Statement.ID, "public/functions/my_function.sql:0"; got != want { + t.Fatalf("Statement.ID = %q, want %q", got, want) + } + if got, want := issue.Code, "42703"; got != want { + t.Fatalf("Code = %q, want %q", got, want) + } + if got, want := issue.Position, 8; got != want { + t.Fatalf("Position = %d, want %d", got, want) + } + if issue.Hint == "" { + t.Fatal("expected Hint to be preserved") + } +} + +func TestFormatApplyFailureValidationErrors(t *testing.T) { + result := ApplyResult{ + Status: "error", + TotalStatements: 1633, + TotalRounds: 1, + TotalApplied: 1633, + TotalSkipped: 0, + ValidationErrors: []ApplyIssue{ + { + Statement: &ApplyStatement{ + ID: "public/functions/my_function.sql:0", + SQL: "CREATE FUNCTION public.my_function() RETURNS integer LANGUAGE sql AS $$ SELECT missing_column FROM users $$;", + StatementClass: "CREATE_FUNCTION", + }, + Code: "42703", + Message: `column "missing_column" does not exist`, + Position: 8, + Hint: `Perhaps you meant to reference the column "users.missing_column_renamed".`, + }, + }, + } + + formatted := formatApplyFailure(result, false) + assertContains(t, formatted, `pg-delta apply returned status "error"`) + assertContains(t, formatted, `1633/1633 statements applied in 1 round(s)`) + assertContains(t, formatted, "Validation errors (from check_function_bodies=on pass):") + assertContains(t, formatted, "public/functions/my_function.sql:0 [CREATE_FUNCTION]") + assertContains(t, formatted, `column "missing_column" does not exist (SQLSTATE 42703, position 8)`) + assertContains(t, formatted, "Hint: Perhaps you meant to reference the column") +} + +// TestFormatApplyFailureNoDiagnostics exercises the fallback text we render +// when pg-delta returns status=error without any structured issues. The user +// originally reported seeing a bare error message in this situation. +func TestFormatApplyFailureNoDiagnostics(t *testing.T) { + result := ApplyResult{ + Status: "error", + TotalStatements: 1633, + TotalRounds: 1, + TotalApplied: 1633, + TotalSkipped: 0, + } + + formatted := formatApplyFailure(result, false) + assertContains(t, formatted, `pg-delta apply returned status "error"`) + assertContains(t, formatted, "No per-statement diagnostics were reported by pg-delta") + assertContains(t, formatted, "--debug") +} + +// TestApplyResultUnmarshalRealWorldPayload covers the full shape pg-delta emits +// in practice, including diagnostics whose statementId is an object. Before we +// made ApplyDiagnosis.UnmarshalJSON defensive, this payload caused the entire +// result parse to fail with "cannot unmarshal object into Go struct field +// ApplyDiagnosis.diagnostics.statementId of type string", which in turn hid +// the real validation error from the user. +func TestApplyResultUnmarshalRealWorldPayload(t *testing.T) { + raw := []byte(`{ + "status": "error", + "totalStatements": 1625, + "totalRounds": 1, + "totalApplied": 1625, + "totalSkipped": 0, + "errors": [], + "stuckStatements": [], + "validationErrors": [ + { + "statement": { + "id": "schemas/public/functions/create_device.sql:0", + "sql": "CREATE FUNCTION public.create_device () RETURNS void LANGUAGE plpgsql AS $function$BEGIN Invalid sql statement; END;$function$;", + "statementClass": "CREATE_FUNCTION" + }, + "code": "42601", + "message": "syntax error at or near \"Invalid\"", + "isDependencyError": false, + "position": 541 + } + ], + "diagnostics": [ + { + "code": "UNRESOLVED_DEPENDENCY", + "message": "No producer found for 'function:pgmq:delete:(unknown,unknown)'.", + "statementId": { + "filePath": "schemas/public/functions/pgmq_delete.sql", + "statementIndex": 0, + "sourceOffset": 0 + }, + "objectRefs": [ + {"kind": "function", "name": "delete", "schema": "pgmq", "signature": "(unknown,unknown)"} + ], + "suggestedFix": "Add the missing statement to your SQL set or declare an explicit pg-topo annotation.", + "details": { + "requiredObjectKey": "function:pgmq:delete:(unknown,unknown)", + "candidateObjectKeys": [] + } + } + ] + }`) + + var result ApplyResult + if err := json.Unmarshal(raw, &result); err != nil { + t.Fatalf("json.Unmarshal() error = %v", err) + } + + if got, want := len(result.ValidationErrors), 1; got != want { + t.Fatalf("len(ValidationErrors) = %d, want %d", got, want) + } + if got, want := result.ValidationErrors[0].Message, `syntax error at or near "Invalid"`; got != want { + t.Fatalf("ValidationErrors[0].Message = %q, want %q", got, want) + } + + if got, want := len(result.Diagnostics), 1; got != want { + t.Fatalf("len(Diagnostics) = %d, want %d", got, want) + } + diag := result.Diagnostics[0] + if diag.StatementID == nil { + t.Fatal("expected StatementID to be preserved as a structured location") + } + if got, want := diag.StatementID.FilePath, "schemas/public/functions/pgmq_delete.sql"; got != want { + t.Fatalf("StatementID.FilePath = %q, want %q", got, want) + } + if got, want := diag.Code, "UNRESOLVED_DEPENDENCY"; got != want { + t.Fatalf("Code = %q, want %q", got, want) + } + if diag.SuggestedFix == "" { + t.Fatal("expected SuggestedFix to be preserved") + } + + // Default (non-verbose) output collapses the diagnostics to a single line + // so the user isn't flooded with pg-topo warnings on large schemas. + formatted := formatApplyFailure(result, false) + assertContains(t, formatted, "Validation errors (from check_function_bodies=on pass):") + assertContains(t, formatted, "schemas/public/functions/create_device.sql:0 [CREATE_FUNCTION]") + assertContains(t, formatted, `syntax error at or near "Invalid" (SQLSTATE 42601, position 541)`) + assertContains(t, formatted, "1 pg-topo diagnostic(s) omitted (re-run with --debug to view).") + assertNotContains(t, formatted, "[UNRESOLVED_DEPENDENCY]") + + // Verbose mode (triggered by --debug) expands the diagnostics inline. + verbose := formatApplyFailure(result, true) + assertContains(t, verbose, "Diagnostics:") + assertContains(t, verbose, "[UNRESOLVED_DEPENDENCY]") + assertContains(t, verbose, "schemas/public/functions/pgmq_delete.sql") + assertNotContains(t, verbose, "pg-topo diagnostic(s) omitted") +} + +// TestApplyDiagnosisFallbackStatementIdString covers the defensive path where +// pg-topo emits statementId as a string (older revisions) so the diagnostic +// still survives the parse. +func TestApplyDiagnosisFallbackStatementIdString(t *testing.T) { + raw := []byte(`{ + "code": "LEGACY", + "message": "legacy diagnostic shape", + "statementId": "schemas/foo.sql:0" + }`) + + var d ApplyDiagnosis + if err := json.Unmarshal(raw, &d); err != nil { + t.Fatalf("json.Unmarshal() error = %v", err) + } + if d.StatementID == nil { + t.Fatal("expected StatementID to be populated from legacy string shape") + } + if got, want := d.StatementID.FilePath, "schemas/foo.sql:0"; got != want { + t.Fatalf("StatementID.FilePath = %q, want %q", got, want) + } +} + func assertContains(t *testing.T, text, want string) { t.Helper() if !strings.Contains(text, want) { t.Fatalf("expected %q to contain %q", text, want) } } + +func assertNotContains(t *testing.T, text, unwanted string) { + t.Helper() + if strings.Contains(text, unwanted) { + t.Fatalf("expected %q to NOT contain %q", text, unwanted) + } +} diff --git a/internal/pgdelta/templates/pgdelta_declarative_apply.ts b/internal/pgdelta/templates/pgdelta_declarative_apply.ts index 1cf19c29c1..a6589bf2b0 100644 --- a/internal/pgdelta/templates/pgdelta_declarative_apply.ts +++ b/internal/pgdelta/templates/pgdelta_declarative_apply.ts @@ -36,6 +36,13 @@ try { totalSkipped: apply.totalSkipped ?? 0, errors: apply.errors ?? [], stuckStatements: apply.stuckStatements ?? [], + // validationErrors is populated when the final + // check_function_bodies=on pass catches issues that didn't surface during + // the initial apply rounds (e.g. a function body that references a + // column whose type changed). Without surfacing this field, callers see + // status=error with empty errors/stuckStatements and no actionable info. + validationErrors: apply.validationErrors ?? [], + diagnostics: result.diagnostics ?? [], }; console.log(JSON.stringify(payload)); if (apply.status !== "success") {