From f18d53a56937573d22a921ec199d5113ca072761 Mon Sep 17 00:00:00 2001 From: meskill <8974488+meskill@users.noreply.github.com> Date: Wed, 6 May 2026 17:25:55 +0000 Subject: [PATCH 1/5] feat(pool): add lock_timeout configuration --- .schema/pgdog.schema.json | 9 ++++ .schema/users.schema.json | 9 ++++ pgdog-config/src/database.rs | 6 +++ pgdog-config/src/users.rs | 13 +++++ pgdog-stats/src/pool.rs | 3 ++ pgdog/src/backend/pool/config.rs | 4 ++ pgdog/src/backend/pool/pool_impl.rs | 7 +++ .../frontend/client/query_engine/test/set.rs | 53 +++++++++++++++++++ 8 files changed, 104 insertions(+) diff --git a/.schema/pgdog.schema.json b/.schema/pgdog.schema.json index 881c45852..0409883c7 100644 --- a/.schema/pgdog.schema.json +++ b/.schema/pgdog.schema.json @@ -407,6 +407,15 @@ "maximum": 255, "minimum": 0 }, + "lock_timeout": { + "description": "This setting configures the `lock_timeout` connection parameter on all connections to Postgres for this database.\nAborts any statement that waits longer than the specified duration to acquire a lock.\nUnlike `statement_timeout`, this only counts time spent waiting for locks, not execution time.\n\nhttps://docs.pgdog.dev/configuration/pgdog.toml/databases/#lock_timeout", + "type": [ + "integer", + "null" + ], + "format": "uint64", + "minimum": 0 + }, "min_pool_size": { "description": "Overrides the `min_pool_size` setting. The connection pool will maintain at minimum this many connections.\n\nhttps://docs.pgdog.dev/configuration/pgdog.toml/databases/#min_pool_size", "type": [ diff --git a/.schema/users.schema.json b/.schema/users.schema.json index b5dbb7104..ebe188a33 100644 --- a/.schema/users.schema.json +++ b/.schema/users.schema.json @@ -126,6 +126,15 @@ "format": "uint64", "minimum": 0 }, + "lock_timeout": { + "description": "Lock timeout.\n\nSets the `lock_timeout` on all server connections at connection creation.\nAborts any statement that waits longer than the specified duration to acquire a lock.\nUnlike `statement_timeout`, this only counts time spent waiting for locks, not execution time.\nRecommended for replication destination connections to prevent cross-shard deadlocks\nfrom hanging indefinitely.\n\n**Note:** Nothing is preventing the user from manually changing this setting at runtime,\ne.g., by running `SET lock_timeout TO 0`;\n\nhttps://docs.pgdog.dev/configuration/users.toml/users/#lock_timeout", + "type": [ + "integer", + "null" + ], + "format": "uint64", + "minimum": 0 + }, "min_pool_size": { "description": "Overrides [`min_pool_size`](https://docs.pgdog.dev/configuration/pgdog.toml/general/#min_pool_size) for this user. Opens at least this many connections on pooler startup and keeps them open despite [`idle_timeout`](https://docs.pgdog.dev/configuration/pgdog.toml/general/#idle_timeout).\n\nhttps://docs.pgdog.dev/configuration/users.toml/users/#min_pool_size", "type": [ diff --git a/pgdog-config/src/database.rs b/pgdog-config/src/database.rs index fe8435e28..947b27da0 100644 --- a/pgdog-config/src/database.rs +++ b/pgdog-config/src/database.rs @@ -173,6 +173,12 @@ pub struct Database { /// /// https://docs.pgdog.dev/configuration/pgdog.toml/databases/#statement_timeout pub statement_timeout: Option, + /// This setting configures the `lock_timeout` connection parameter on all connections to Postgres for this database. + /// Aborts any statement that waits longer than the specified duration to acquire a lock. + /// Unlike `statement_timeout`, this only counts time spent waiting for locks, not execution time. + /// + /// https://docs.pgdog.dev/configuration/pgdog.toml/databases/#lock_timeout + pub lock_timeout: Option, /// Overrides the `idle_timeout` setting. Idle server connections exceeding this timeout will be closed automatically. /// /// https://docs.pgdog.dev/configuration/pgdog.toml/databases/#idle_timeout diff --git a/pgdog-config/src/users.rs b/pgdog-config/src/users.rs index 750e149ee..9628d6509 100644 --- a/pgdog-config/src/users.rs +++ b/pgdog-config/src/users.rs @@ -254,6 +254,19 @@ pub struct User { /// /// https://docs.pgdog.dev/configuration/users.toml/users/#statement_timeout pub statement_timeout: Option, + /// Lock timeout. + /// + /// Sets the `lock_timeout` on all server connections at connection creation. + /// Aborts any statement that waits longer than the specified duration to acquire a lock. + /// Unlike `statement_timeout`, this only counts time spent waiting for locks, not execution time. + /// Recommended for replication destination connections to prevent cross-shard deadlocks + /// from hanging indefinitely. + /// + /// **Note:** Nothing is preventing the user from manually changing this setting at runtime, + /// e.g., by running `SET lock_timeout TO 0`; + /// + /// https://docs.pgdog.dev/configuration/users.toml/users/#lock_timeout + pub lock_timeout: Option, /// Sets the `replication=database` parameter on user connections to Postgres. Allows this user to use replication commands. /// /// _Default:_ `false` diff --git a/pgdog-stats/src/pool.rs b/pgdog-stats/src/pool.rs index 24d7bc9f7..cb481bcc8 100644 --- a/pgdog-stats/src/pool.rs +++ b/pgdog-stats/src/pool.rs @@ -316,6 +316,8 @@ pub struct Config { pub rollback_timeout: Duration, /// Statement timeout pub statement_timeout: Option, + /// Lock timeout + pub lock_timeout: Option, /// Replication mode. pub replication_mode: bool, /// Pooler mode. @@ -367,6 +369,7 @@ impl Default for Config { ban_timeout: Duration::from_secs(300), rollback_timeout: Duration::from_secs(5), statement_timeout: None, + lock_timeout: None, replication_mode: false, pooler_mode: PoolerMode::default(), read_only: false, diff --git a/pgdog/src/backend/pool/config.rs b/pgdog/src/backend/pool/config.rs index fdcfa3110..c4bf0056f 100644 --- a/pgdog/src/backend/pool/config.rs +++ b/pgdog/src/backend/pool/config.rs @@ -126,6 +126,10 @@ impl Config { .statement_timeout .or(database.statement_timeout) .map(Duration::from_millis), + lock_timeout: user + .lock_timeout + .or(database.lock_timeout) + .map(Duration::from_millis), replication_mode: user.replication_mode, pooler_mode: user .pooler_mode diff --git a/pgdog/src/backend/pool/pool_impl.rs b/pgdog/src/backend/pool/pool_impl.rs index 883b8c7a9..6c923f740 100644 --- a/pgdog/src/backend/pool/pool_impl.rs +++ b/pgdog/src/backend/pool/pool_impl.rs @@ -427,6 +427,13 @@ impl Pool { }); } + if let Some(lock_timeout) = config.lock_timeout { + params.push(Parameter { + name: "lock_timeout".into(), + value: lock_timeout.as_millis().to_string().into(), + }); + } + if config.replication_mode { params.push(Parameter { name: "replication".into(), diff --git a/pgdog/src/frontend/client/query_engine/test/set.rs b/pgdog/src/frontend/client/query_engine/test/set.rs index 26a4cfeb0..3ea4c05d4 100644 --- a/pgdog/src/frontend/client/query_engine/test/set.rs +++ b/pgdog/src/frontend/client/query_engine/test/set.rs @@ -213,6 +213,12 @@ async fn test_reset_all() { expect_message!(test_client.read().await, CommandComplete); expect_message!(test_client.read().await, ReadyForQuery); + test_client + .send_simple(Query::new("SET lock_timeout TO 5000")) + .await; + expect_message!(test_client.read().await, CommandComplete); + expect_message!(test_client.read().await, ReadyForQuery); + assert!(test_client .client() .params @@ -223,6 +229,7 @@ async fn test_reset_all() { .params .get("statement_timeout") .is_some()); + assert!(test_client.client().params.get("lock_timeout").is_some()); // Reset all test_client.send_simple(Query::new("RESET ALL")).await; @@ -252,6 +259,10 @@ async fn test_reset_all() { .is_none(), "statement_timeout should be reset" ); + assert!( + test_client.client().params.get("lock_timeout").is_none(), + "lock_timeout should be reset" + ); } #[tokio::test] @@ -362,3 +373,45 @@ async fn test_reset_inside_transaction_rollback() { "application_name should be restored after rollback" ); } + +#[tokio::test] +async fn test_lock_timeout() { + let mut test_client = TestClient::new_sharded(Parameters::default()).await; + + test_client + .send_simple(Query::new("SET lock_timeout TO 3000")) + .await; + + assert_eq!( + expect_message!(test_client.read().await, CommandComplete).command(), + "SET" + ); + assert_eq!( + expect_message!(test_client.read().await, ReadyForQuery).status, + 'I' + ); + + assert!( + test_client.client().params.get("lock_timeout").is_some(), + "lock_timeout should be tracked after SET" + ); + + // Reset clears it. + test_client + .send_simple(Query::new("RESET lock_timeout")) + .await; + + assert_eq!( + expect_message!(test_client.read().await, CommandComplete).command(), + "RESET" + ); + assert_eq!( + expect_message!(test_client.read().await, ReadyForQuery).status, + 'I' + ); + + assert!( + test_client.client().params.get("lock_timeout").is_none(), + "lock_timeout should be cleared after RESET" + ); +} From 926ae7cd8a710d2124f0b570e04b6ea4c58b8059 Mon Sep 17 00:00:00 2001 From: meskill <8974488+meskill@users.noreply.github.com> Date: Wed, 6 May 2026 18:05:35 +0000 Subject: [PATCH 2/5] feat(replication): add retry logic --- .schema/pgdog.schema.json | 16 ++ docs/REPLICATION.md | 84 ++++++++- integration/resharding/pgdog.toml | 3 + integration/resharding/users.toml | 1 + pgdog-config/src/general.rs | 30 ++++ pgdog/src/backend/pool/cluster.rs | 24 +++ pgdog/src/backend/pool/config.rs | 3 + .../src/backend/replication/logical/error.rs | 32 ++++ .../logical/publisher/publisher_impl.rs | 166 +++++++++++++++--- .../replication/logical/publisher/slot.rs | 12 +- .../replication/logical/subscriber/stream.rs | 44 +++-- .../replication/logical/subscriber/tests.rs | 28 +++ pgdog/src/net/messages/error_response.rs | 20 +++ 13 files changed, 418 insertions(+), 45 deletions(-) diff --git a/.schema/pgdog.schema.json b/.schema/pgdog.schema.json index 0409883c7..ad38f09ff 100644 --- a/.schema/pgdog.schema.json +++ b/.schema/pgdog.schema.json @@ -97,6 +97,8 @@ "resharding_copy_retry_max_attempts": 5, "resharding_copy_retry_min_delay": 1000, "resharding_parallel_copies": 1, + "resharding_replication_retry_max_attempts": 5, + "resharding_replication_retry_min_delay": 1000, "rollback_timeout": 5000, "server_lifetime": 86400000, "server_lifetime_jitter": 0, @@ -1041,6 +1043,20 @@ "default": 1, "minimum": 0 }, + "resharding_replication_retry_max_attempts": { + "description": "Maximum number of consecutive replication-subscriber errors tolerated before\nthe source error is propagated. Each failure triggers `slot.reconnect()`,\nafter which Postgres re-streams every event since the last acked commit.\n`0` retries indefinitely.\n\n_Default:_ `5`\n\nhttps://docs.pgdog.dev/configuration/pgdog.toml/general/#resharding_replication_retry_max_attempts", + "type": "integer", + "format": "uint64", + "default": 5, + "minimum": 0 + }, + "resharding_replication_retry_min_delay": { + "description": "Delay in milliseconds between replication subscriber retry attempts.\n\n_Default:_ `1000`\n\nhttps://docs.pgdog.dev/configuration/pgdog.toml/general/#resharding_replication_retry_min_delay", + "type": "integer", + "format": "uint64", + "default": 1000, + "minimum": 0 + }, "rollback_timeout": { "description": "How long to allow for `ROLLBACK` queries to run on server connections with unfinished transactions.\n\n_Default:_ `5000`\n\nhttps://docs.pgdog.dev/configuration/pgdog.toml/general/#rollback_timeout", "type": "integer", diff --git a/docs/REPLICATION.md b/docs/REPLICATION.md index 698a7f5fc..91cd25759 100644 --- a/docs/REPLICATION.md +++ b/docs/REPLICATION.md @@ -360,4 +360,86 @@ Postgres re-emits after reconnect. `Sync` would not work: it commits when Postgres saw no error, but PgDog raises some errors (e.g. `FullIdentityMissingOld` on a missing OLD tuple) *after* a successful `CommandComplete`. FATAL disconnect is the only signal that rolls back regardless. Connections come from -`Pool::standalone`, so dropping them closes the socket instead of returning to a pool. \ No newline at end of file +`Pool::standalone`, so dropping them closes the socket instead of returning to a pool. + +--- + +## Retry and reconnect + +The publisher loop retries transient errors without restarting the whole resharding pipeline. + +### Configuration + +| Parameter | Env var | Default | Meaning | +|---|---|---|---| +|`resharding_replication_retry_max_attempts` | `PGDOG_RESHARDING_REPLICATION_RETRY_MAX_ATTEMPTS` | `5` | Maximum retry attempts; `0` = retry indefinitely | +|`resharding_replication_retry_min_delay` | `PGDOG_RESHARDING_REPLICATION_RETRY_MIN_DELAY` | `1000` ms | Fixed sleep between attempts | + +### What is retried + +An error is retried if `err.is_retryable()` returns `true`. The full classification lives in the +source: + +- [`pgdog/src/backend/replication/logical/error.rs`](../pgdog/src/backend/replication/logical/error.rs) — top-level replication error +- [`pgdog/src/backend/pool/error.rs`](../pgdog/src/backend/pool/error.rs) — pool layer +- [`pgdog/src/net/error.rs`](../pgdog/src/net/error.rs) — network/IO layer +- [`pgdog/src/net/messages/error_response.rs`](../pgdog/src/net/messages/error_response.rs) — Postgres error codes + +### Two-step reconnect + +A single retry runs two reconnects **in parallel** (`try_join!`): + +- **`slot.reconnect()`** — drops and re-establishes the source replication connection. + `slot.lsn` is preserved; Postgres re-streams from that position via + `START_REPLICATION SLOT … LOGICAL `. + +- **`stream.reconnect()`** — tears down every destination `Server` handle. + Dropping each handle sends `Terminate`; Postgres rolls back any open implicit transaction + on that shard. Clears `in_transaction`, `changed_tables`, and the per-session caches + (`relations`, `statements`, `keys`). Then calls `connect()` to open fresh connections and + re-prepare named statements. + +The destination reconnect is essential when a failure occurs mid-transaction. Without it, destination shards hold +an open implicit transaction containing partial DML. Postgres re-delivers the same `Begin` + +DML + `Commit` sequence on the new source connection, and those rows would be appended to the +already-dirty destination transaction — producing duplicates or constraint errors. + +**If `reconnect()` itself fails**: a retryable reconnect error produces a warning log and falls +back into the retry loop; a non-retryable reconnect error aborts the replication task immediately. + +### LSN tracking + +`StreamSubscriber` keeps two positions. `lsn` advances on `Begin` (to the future commit LSN) and is used for in-flight deduplication. `committed_lsn` advances only after `commit()` confirms all destination shards — this is what `status_update()` reports to Postgres. + +The split matters for KeepAlive: if the reply used `lsn`, Postgres would record a future commit LSN as `confirmed_flush_lsn` while the transaction is still open. On reconnect that would skip the open transaction entirely. `committed_lsn` ensures re-delivery always starts from the last safely committed position. + +### Full retry flow + +```mermaid +sequenceDiagram + participant PG as Source Postgres + participant PUB as Publisher + participant SS as StreamSubscriber + participant DS as Destination shards + + PG->>PUB: Begin (final_lsn=200) + PUB->>SS: handle(Begin) + SS->>DS: Bind/Execute row DML (implicit txn open) + Note over PUB: transient error + + Note over PUB: retry: sleep(delay) + PUB->>PG: slot.reconnect() → START_REPLICATION from committed_lsn=100 + PUB->>SS: stream.reconnect() + SS->>DS: Terminate (implicit txn rolled back) + SS->>DS: connect() — fresh connections, named stmts re-prepared + + PG->>PUB: Begin (final_lsn=200) [re-delivered] + PUB->>SS: handle(Begin) + SS->>DS: Bind/Execute row DML + PG->>PUB: Commit (end_lsn=200) + PUB->>SS: handle(Commit) + SS->>DS: Sync + DS-->>SS: ReadyForQuery + SS->>PUB: StatusUpdate(committed_lsn=200) + PUB->>PG: StandbyStatusUpdate(last_flushed=200) +``` \ No newline at end of file diff --git a/integration/resharding/pgdog.toml b/integration/resharding/pgdog.toml index 5aa5e7bf4..bb5b78d01 100644 --- a/integration/resharding/pgdog.toml +++ b/integration/resharding/pgdog.toml @@ -1,3 +1,6 @@ +[general] +resharding_replication_retry_min_delay = 100 + [[databases]] name = "source" host = "127.0.0.1" diff --git a/integration/resharding/users.toml b/integration/resharding/users.toml index 67142d309..fa324f5d2 100644 --- a/integration/resharding/users.toml +++ b/integration/resharding/users.toml @@ -9,3 +9,4 @@ database = "destination" name = "pgdog" password = "pgdog" schema_admin = true +lock_timeout = 100 diff --git a/pgdog-config/src/general.rs b/pgdog-config/src/general.rs index dc5019764..420e20743 100644 --- a/pgdog-config/src/general.rs +++ b/pgdog-config/src/general.rs @@ -688,6 +688,25 @@ pub struct General { #[serde(default = "General::resharding_copy_retry_min_delay")] pub resharding_copy_retry_min_delay: u64, + /// Maximum number of consecutive replication-subscriber errors tolerated before + /// the source error is propagated. Each failure triggers `slot.reconnect()`, + /// after which Postgres re-streams every event since the last acked commit. + /// `0` retries indefinitely. + /// + /// _Default:_ `5` + /// + /// https://docs.pgdog.dev/configuration/pgdog.toml/general/#resharding_replication_retry_max_attempts + #[serde(default = "General::resharding_replication_retry_max_attempts")] + pub resharding_replication_retry_max_attempts: usize, + + /// Delay in milliseconds between replication subscriber retry attempts. + /// + /// _Default:_ `1000` + /// + /// https://docs.pgdog.dev/configuration/pgdog.toml/general/#resharding_replication_retry_min_delay + #[serde(default = "General::resharding_replication_retry_min_delay")] + pub resharding_replication_retry_min_delay: u64, + /// Automatically reload the schema cache used by PgDog to route queries upon detecting DDL statements. /// /// **Note:** This setting requires PgDog Enterprise Edition to work as expected. If using the open source edition, it will only work with single-node PgDog deployments, e.g., in local development or CI. @@ -845,6 +864,9 @@ impl Default for General { resharding_parallel_copies: Self::resharding_parallel_copies(), resharding_copy_retry_max_attempts: Self::resharding_copy_retry_max_attempts(), resharding_copy_retry_min_delay: Self::resharding_copy_retry_min_delay(), + resharding_replication_retry_max_attempts: + Self::resharding_replication_retry_max_attempts(), + resharding_replication_retry_min_delay: Self::resharding_replication_retry_min_delay(), reload_schema_on_ddl: Self::reload_schema_on_ddl(), load_schema: Self::load_schema(), cutover_replication_lag_threshold: Self::cutover_replication_lag_threshold(), @@ -1090,6 +1112,14 @@ impl General { 1000 } + fn resharding_replication_retry_max_attempts() -> usize { + Self::env_or_default("PGDOG_RESHARDING_REPLICATION_RETRY_MAX_ATTEMPTS", 5) + } + + fn resharding_replication_retry_min_delay() -> u64 { + Self::env_or_default("PGDOG_RESHARDING_REPLICATION_RETRY_MIN_DELAY", 1000) + } + fn default_shutdown_termination_timeout() -> Option { Self::env_option("PGDOG_SHUTDOWN_TERMINATION_TIMEOUT") } diff --git a/pgdog/src/backend/pool/cluster.rs b/pgdog/src/backend/pool/cluster.rs index 792469497..9e5231169 100644 --- a/pgdog/src/backend/pool/cluster.rs +++ b/pgdog/src/backend/pool/cluster.rs @@ -83,6 +83,8 @@ pub struct Cluster { resharding_parallel_copies: usize, resharding_copy_retry_max_attempts: usize, resharding_copy_retry_min_delay: Duration, + resharding_replication_retry_max_attempts: usize, + resharding_replication_retry_min_delay: Duration, regex_parser: RegexParser, mutual_tls: bool, } @@ -167,6 +169,8 @@ pub struct ClusterConfig<'a> { pub resharding_parallel_copies: usize, pub resharding_copy_retry_max_attempts: usize, pub resharding_copy_retry_min_delay: u64, + pub resharding_replication_retry_max_attempts: usize, + pub resharding_replication_retry_min_delay: u64, pub regex_parser_limit: usize, pub pub_sub_enabled: bool, pub mutual_tls: bool, @@ -227,6 +231,9 @@ impl<'a> ClusterConfig<'a> { resharding_parallel_copies: general.resharding_parallel_copies, resharding_copy_retry_max_attempts: general.resharding_copy_retry_max_attempts, resharding_copy_retry_min_delay: general.resharding_copy_retry_min_delay, + resharding_replication_retry_max_attempts: general + .resharding_replication_retry_max_attempts, + resharding_replication_retry_min_delay: general.resharding_replication_retry_min_delay, regex_parser_limit: general.regex_parser_limit, pub_sub_enabled: general.pub_sub_enabled(), mutual_tls: config.general.tls_client_validate_cn, @@ -271,6 +278,8 @@ impl Cluster { resharding_parallel_copies, resharding_copy_retry_max_attempts, resharding_copy_retry_min_delay, + resharding_replication_retry_max_attempts, + resharding_replication_retry_min_delay, regex_parser_limit, pub_sub_enabled, mutual_tls, @@ -328,6 +337,10 @@ impl Cluster { resharding_parallel_copies, resharding_copy_retry_max_attempts, resharding_copy_retry_min_delay: Duration::from_millis(resharding_copy_retry_min_delay), + resharding_replication_retry_max_attempts, + resharding_replication_retry_min_delay: Duration::from_millis( + resharding_replication_retry_min_delay, + ), regex_parser: RegexParser::new(regex_parser_limit, query_parser), mutual_tls, } @@ -597,6 +610,17 @@ impl Cluster { &self.resharding_copy_retry_min_delay } + /// Maximum consecutive replication-subscriber errors before the error is propagated. + /// `0` means retry indefinitely. + pub fn resharding_replication_retry_max_attempts(&self) -> usize { + self.resharding_replication_retry_max_attempts + } + + /// Base delay between replication-subscriber retry attempts. + pub fn resharding_replication_retry_min_delay(&self) -> Duration { + self.resharding_replication_retry_min_delay + } + /// Launch the connection pools. pub(crate) fn launch(&self) { for shard in self.shards() { diff --git a/pgdog/src/backend/pool/config.rs b/pgdog/src/backend/pool/config.rs index c4bf0056f..26a8363d1 100644 --- a/pgdog/src/backend/pool/config.rs +++ b/pgdog/src/backend/pool/config.rs @@ -198,6 +198,7 @@ mod test { server_lifetime: Some(5), server_lifetime_jitter: Some(1), statement_timeout: Some(5), + lock_timeout: Some(7), pooler_mode: Some(PoolerMode::Session), idle_timeout: Some(5), read_only: Some(true), @@ -210,6 +211,7 @@ mod test { server_lifetime: Some(10), server_lifetime_jitter: Some(2), statement_timeout: Some(10), + lock_timeout: Some(11), pooler_mode: Some(PoolerMode::Transaction), idle_timeout: Some(10), read_only: Some(false), @@ -223,6 +225,7 @@ mod test { assert_eq!(Duration::from_millis(5), config.max_age); assert_eq!(Duration::from_millis(1), config.max_age_jitter); assert_eq!(Some(Duration::from_millis(5)), config.statement_timeout); + assert_eq!(Some(Duration::from_millis(7)), config.lock_timeout); assert_eq!(PoolerMode::Session, config.pooler_mode); assert_eq!(Duration::from_millis(5), config.idle_timeout); assert!(config.read_only); diff --git a/pgdog/src/backend/replication/logical/error.rs b/pgdog/src/backend/replication/logical/error.rs index 9948bfc36..59a9a1995 100644 --- a/pgdog/src/backend/replication/logical/error.rs +++ b/pgdog/src/backend/replication/logical/error.rs @@ -247,6 +247,8 @@ impl Error { Self::NotConnected | Self::NoPrimary => true, // Replication stalled; temporary slot is gone, next attempt starts fresh. Self::ReplicationTimeout => true, + // Postgres sent a transient error (e.g. admin_shutdown, cannot_connect_now). + Self::PgError(inner) => inner.is_retryable(), // TODO: escape-hatch when using ParallelConnection wrapper // the underlying error could be anything and to handler it properly // either the ParallelConnection wrapper should be removed or @@ -276,6 +278,36 @@ mod tests { assert!(Error::ReplicationTimeout.is_retryable()); } + #[test] + fn pg_error_retryable() { + use crate::net::messages::ErrorResponse; + let retryable_codes = [ + "08000", "08001", "08003", "08004", "08006", "08007", "57P01", "57P02", "57P03", + "53300", "55P03", + ]; + for code in retryable_codes { + let err = Error::PgError(Box::new(ErrorResponse { + code: code.into(), + ..Default::default() + })); + assert!(err.is_retryable(), "expected {code} to be retryable"); + } + } + + #[test] + fn pg_error_not_retryable() { + use crate::net::messages::ErrorResponse; + // 08P01 protocol_violation \ constraint/schema errors \ data errors. + let not_retryable = ["08P01", "23505", "42P01", "42501", ""]; + for code in not_retryable { + let err = Error::PgError(Box::new(ErrorResponse { + code: code.into(), + ..Default::default() + })); + assert!(!err.is_retryable(), "expected {code} to NOT be retryable"); + } + } + #[test] fn retryable_via_backend_wrapper() { use crate::backend::Error as BE; diff --git a/pgdog/src/backend/replication/logical/publisher/publisher_impl.rs b/pgdog/src/backend/replication/logical/publisher/publisher_impl.rs index 8241f9c28..4a289c300 100644 --- a/pgdog/src/backend/replication/logical/publisher/publisher_impl.rs +++ b/pgdog/src/backend/replication/logical/publisher/publisher_impl.rs @@ -6,7 +6,8 @@ use parking_lot::Mutex; use pgdog_config::QueryParserEngine; use tokio::sync::Notify; use tokio::task::JoinHandle; -use tokio::time::Instant; +use tokio::time::{sleep, Instant}; +use tokio::try_join; use tokio::{select, spawn, time::interval}; use tracing::{debug, info, warn}; @@ -230,7 +231,9 @@ impl Publisher { let handle = spawn(async move { slot.start_replication().await?; let progress = Progress::new_stream(); - + let max_attempts = dest.resharding_replication_retry_max_attempts(); + let delay = dest.resharding_replication_retry_min_delay(); + let mut attempt = 0usize; loop { select! { _ = stop.notified() => { @@ -239,36 +242,59 @@ impl Publisher { // This is cancellation-safe. replication_data = slot.replicate(Duration::MAX) => { - let replication_data = replication_data?; - - match replication_data { - Some(ReplicationData::CopyData(data)) => { - let lsn = if let Some(ReplicationMeta::KeepAlive(ka)) = - data.replication_meta() - { - if ka.reply() { - slot.status_update(stream.status_update()).await?; - } - debug!( - "origin at lsn {} [{}]", - Lsn::from_i64(ka.wal_end), - slot.server()?.addr() - ); - ka.wal_end - } else { - if let Some(status_update) = stream.handle(data).await? { - slot.status_update(status_update).await?; - *last_transaction.lock() = Some(Instant::now()); + // Returns Ok(true) when the slot is drained and the loop + // should break; Ok(false) to continue. All errors bubble up + // to the single retry/abort site below. + let done: Result = async { + let Some(replication_data) = replication_data? else { + slot.drop_slot().await?; + return Ok(true); + }; + match replication_data { + ReplicationData::CopyData(data) => { + if let Some(ReplicationMeta::KeepAlive(ka)) = + data.replication_meta() + { + if ka.reply() { + slot.status_update(stream.status_update()).await?; + } + debug!( + "origin at lsn {} [{}]", + Lsn::from_i64(ka.wal_end), + slot.server()?.addr() + ); + progress.update(stream.bytes_sharded(), ka.wal_end); + } else { + if let Some(su) = stream.handle(data).await? { + slot.status_update(su).await?; + *last_transaction.lock() = Some(Instant::now()); + } + attempt = 0; + progress.update(stream.bytes_sharded(), stream.lsn()); } - stream.lsn() - }; - progress.update(stream.bytes_sharded(), lsn); + Ok(false) + } + ReplicationData::CopyDone => Ok(false), } - Some(ReplicationData::CopyDone) => (), - None => { - slot.drop_slot().await?; - break; + } + .await; + + match done { + Ok(true) => break, + Ok(false) => {} + Err(err) + if err.is_retryable() + && (max_attempts == 0 || attempt < max_attempts) => + { + attempt += 1; + warn!( + "[replication] error ({attempt}/{max_attempts}): {err}, reconnecting in {}ms", + delay.as_millis() + ); + sleep(delay).await; + try_join!(slot.reconnect(), stream.reconnect())?; } + Err(err) => return Err(err), } } @@ -641,4 +667,86 @@ mod test { server.execute(*ddl).await.unwrap(); } } + + // ── Helpers ───────────────────────────────────────────────────────────── + + use crate::net::{ + replication::{ + logical::{begin::Begin, commit::Commit}, + XLogData, + }, + CopyData, ToBytes, + }; + /// Wrap a Begin payload in an XLogData CopyData message. + fn begin_copy_data(lsn: i64) -> CopyData { + let xlog = XLogData { + starting_point: lsn, + current_end: lsn, + system_clock: 0, + bytes: Begin { + final_transaction_lsn: lsn, + commit_timestamp: 0, + xid: 1, + } + .to_bytes() + .unwrap(), + }; + CopyData::bytes(xlog.to_bytes().unwrap()) + } + fn commit_copy_data(lsn: i64) -> CopyData { + let xlog = XLogData { + starting_point: lsn, + current_end: lsn, + system_clock: 0, + bytes: Commit { + flags: 0, + commit_lsn: 0, + end_lsn: lsn, + commit_timestamp: 0, + } + .to_bytes() + .unwrap(), + }; + CopyData::bytes(xlog.to_bytes().unwrap()) + } + + // -- handle --------------------------------------------------------------- + + /// A Begin event produces `Ok(None)` — no status update to forward to the origin. + #[tokio::test] + async fn apply_begin_no_status_update() { + let cfg = config(); + let cluster = Cluster::new_test(&cfg); + cluster.launch(); + let mut stream = StreamSubscriber::new(&cluster, &[]); + stream.connect().await.unwrap(); + + let result = stream.handle(begin_copy_data(1)).await; + + assert!(result.is_ok()); + cluster.shutdown(); + } + + /// A Commit event returns `Ok(Some(su))` — the caller must forward the + /// status update to the replication origin. Distinct from a Begin, which + /// returns `Ok(None)` and produces no status update. + #[tokio::test] + async fn apply_commit_emits_status_update() { + let cfg = config(); + let cluster = Cluster::new_test(&cfg); + cluster.launch(); + let mut stream = StreamSubscriber::new(&cluster, &[]); + stream.connect().await.unwrap(); + + let result = stream.handle(commit_copy_data(1)).await; + + // Commit must succeed and produce a status update for the caller to + // send to the origin via slot.status_update(). + assert!(result.is_ok()); + assert!( + result.unwrap().is_some(), + "commit should produce a status update" + ); + cluster.shutdown(); + } } diff --git a/pgdog/src/backend/replication/logical/publisher/slot.rs b/pgdog/src/backend/replication/logical/publisher/slot.rs index 4636bf101..dbc1997c1 100644 --- a/pgdog/src/backend/replication/logical/publisher/slot.rs +++ b/pgdog/src/backend/replication/logical/publisher/slot.rs @@ -361,8 +361,10 @@ impl ReplicationSlot { self.server()?.addr() ); + let lsn = Lsn::from_i64(status_update.last_flushed); + self.lsn = lsn; if let Some(tracker) = self.tracker.as_ref() { - tracker.update_lsn(&Lsn::from_i64(status_update.last_flushed)) + tracker.update_lsn(&lsn) } self.server()? @@ -373,6 +375,14 @@ impl ReplicationSlot { Ok(()) } + /// Drop the source connection and reconnect, restarting replication from the + /// last confirmed position (`self.lsn`, kept in sync by `status_update`). + pub async fn reconnect(&mut self) -> Result<(), Error> { + self.server = None; + self.connect().await?; + self.start_replication().await + } + /// Ask remote to close stream. pub async fn stop_replication(&mut self) -> Result<(), Error> { self.server()?.send_one(&CopyDone.into()).await?; diff --git a/pgdog/src/backend/replication/logical/subscriber/stream.rs b/pgdog/src/backend/replication/logical/subscriber/stream.rs index f9a450c45..268e69e26 100644 --- a/pgdog/src/backend/replication/logical/subscriber/stream.rs +++ b/pgdog/src/backend/replication/logical/subscriber/stream.rs @@ -119,7 +119,10 @@ pub struct StreamSubscriber { // Connections to shards. connections: Vec, - // Position in the WAL we have flushed successfully. + // Last commit LSN acked to Postgres. Reported in status updates; never + // advances mid-transaction so KeepAlive replies can't skip an open transaction. + committed_lsn: i64, + // Working position in the stream (advances on Begin for deduplication). lsn: i64, lsn_changed: bool, in_transaction: bool, @@ -156,6 +159,7 @@ impl StreamSubscriber { }) .collect(), connections: vec![], + committed_lsn: 0, lsn: 0, // Unknown, bytes_sharded: 0, lsn_changed: true, @@ -824,10 +828,18 @@ impl StreamSubscriber { Ok(()) } - /// Handle one replication stream message. - /// - /// On error, drops shard connections to roll back the implicit transaction left - /// by Bind/Execute/Flush, and clears per-session state. See + /// Reset destination connections and state, rolling back any open implicit + /// transaction on each shard. Caches are repopulated from Relation messages on re-delivery. + pub async fn reconnect(&mut self) -> Result<(), Error> { + self.connections.clear(); + self.relations.clear(); + self.statements.clear(); + self.keys.clear(); + self.changed_tables.clear(); + self.in_transaction = false; + self.connect().await + } + /// `docs/REPLICATION.md` → "Error rollback". pub async fn handle(&mut self, data: CopyData) -> Result, Error> { match self.handle_inner(data).await { @@ -869,7 +881,7 @@ impl StreamSubscriber { XLogPayload::Relation(relation) => self.relation(relation).await?, XLogPayload::Begin(begin) => { self.changed_tables.clear(); - self.set_current_lsn(begin.final_transaction_lsn); + self.set_working_lsn(begin.final_transaction_lsn); self.in_transaction = true; } _ => (), @@ -881,12 +893,12 @@ impl StreamSubscriber { Ok(status_update) } - /// Get latest LSN we flushed to replicas. + /// LSN of the last transaction committed to all destination shards. pub fn status_update(&self) -> StatusUpdate { StatusUpdate { - last_applied: self.lsn, - last_flushed: self.lsn, // We use transactions which are flushed. - last_written: self.lsn, + last_applied: self.committed_lsn, + last_flushed: self.committed_lsn, + last_written: self.committed_lsn, system_clock: postgres_now(), reply: 0, } @@ -897,16 +909,20 @@ impl StreamSubscriber { self.bytes_sharded } - /// Set stream start at this LSN. - /// - /// Return true if LSN has been updated to a new value, - /// i.e., the stream is moving forward. + /// Advance both LSN fields. Call after commit and on publisher init. pub fn set_current_lsn(&mut self, lsn: i64) -> bool { self.lsn_changed = lsn != self.lsn; self.lsn = lsn; + self.committed_lsn = lsn; self.lsn_changed } + /// Advance working LSN only. Used on Begin; does not move the ack pointer. + fn set_working_lsn(&mut self, lsn: i64) { + self.lsn_changed = lsn != self.lsn; + self.lsn = lsn; + } + /// Get current LSN. pub fn lsn(&self) -> i64 { self.lsn diff --git a/pgdog/src/backend/replication/logical/subscriber/tests.rs b/pgdog/src/backend/replication/logical/subscriber/tests.rs index 8a2e86722..28d676a94 100644 --- a/pgdog/src/backend/replication/logical/subscriber/tests.rs +++ b/pgdog/src/backend/replication/logical/subscriber/tests.rs @@ -477,6 +477,34 @@ fn lsn_changed_tracking() { assert!(sub.lsn_changed()); } +/// status_update() must always reflect the last *committed* LSN, never the +/// working LSN set by Begin. A KeepAlive reply during an open transaction must +/// not advance the ack pointer to the future commit LSN — doing so would cause +/// reconnect to skip the in-flight transaction and lose data. +#[tokio::test] +async fn status_update_stays_at_committed_lsn_during_transaction() { + let mut sub = make_subscriber(); + sub.connect().await.unwrap(); + + // Nothing committed yet: ack pointer is at 0. + assert_eq!(sub.status_update().last_flushed, 0); + + // Commit a first transaction (begin LSN 50, end LSN 100). + sub.handle(begin_copy_data(50)).await.unwrap(); + sub.handle(commit_copy_data(100)).await.unwrap(); + assert_eq!(sub.status_update().last_flushed, 100); + + // Open a second transaction: Begin advances lsn to 200 (future commit LSN). + sub.handle(begin_copy_data(200)).await.unwrap(); + assert_eq!(sub.lsn(), 200, "working lsn follows Begin"); + // KeepAlive mid-transaction must still report 100, not 200. + assert_eq!( + sub.status_update().last_flushed, + 100, + "committed_lsn must not advance before commit" + ); +} + // ── Relation handling tests ───────────────────────────────────────── /// Relation inside a transaction uses Flush — stays in transaction. diff --git a/pgdog/src/net/messages/error_response.rs b/pgdog/src/net/messages/error_response.rs index f278d7e84..a8400c676 100644 --- a/pgdog/src/net/messages/error_response.rs +++ b/pgdog/src/net/messages/error_response.rs @@ -215,6 +215,26 @@ impl ErrorResponse { } } + /// Whether this Postgres error is transient and the operation can be retried. + /// Covers connection exceptions (class 08, excluding protocol violation 08P01), + /// operator-intervention shutdowns (57P01/57P02/57P03), and resource pressure + /// (53300 too_many_connections). + pub fn is_retryable(&self) -> bool { + matches!( + self.code.as_str(), + // Connection exceptions — server unreachable or dropped the connection. + // 08P01 (protocol_violation) is intentionally excluded: that signals a + // client-side bug and retrying would just repeat the same violation. + "08000" | "08001" | "08003" | "08004" | "08006" | "08007" + // Operator-intervention: admin shutdown, crash, or startup not ready. + | "57P01" | "57P02" | "57P03" + // Too many connections — transient resource limit. + | "53300" + // Lock timeout — another transaction holds the lock; retry after reconnect. + | "55P03" + ) + } + pub fn no_transaction() -> Self { Self { severity: "WARNING".into(), From c98efad984a82217c69469cd6dee79a6ac41bd12 Mon Sep 17 00:00:00 2001 From: meskill <8974488+meskill@users.noreply.github.com> Date: Sun, 10 May 2026 14:17:08 +0000 Subject: [PATCH 3/5] add retry test --- .schema/pgdog.schema.json | 2 +- integration/copy_data/retry_test/run.sh | 49 ++- .../issues/omni-table-subscriber-deadlock.md | 340 ------------------ .../logical/publisher/publisher_impl.rs | 11 +- 4 files changed, 58 insertions(+), 344 deletions(-) delete mode 100644 pgdog/docs/issues/omni-table-subscriber-deadlock.md diff --git a/.schema/pgdog.schema.json b/.schema/pgdog.schema.json index ad38f09ff..6b048264e 100644 --- a/.schema/pgdog.schema.json +++ b/.schema/pgdog.schema.json @@ -1046,7 +1046,7 @@ "resharding_replication_retry_max_attempts": { "description": "Maximum number of consecutive replication-subscriber errors tolerated before\nthe source error is propagated. Each failure triggers `slot.reconnect()`,\nafter which Postgres re-streams every event since the last acked commit.\n`0` retries indefinitely.\n\n_Default:_ `5`\n\nhttps://docs.pgdog.dev/configuration/pgdog.toml/general/#resharding_replication_retry_max_attempts", "type": "integer", - "format": "uint64", + "format": "uint", "default": 5, "minimum": 0 }, diff --git a/integration/copy_data/retry_test/run.sh b/integration/copy_data/retry_test/run.sh index 42cbf4bdd..8c8a98fa1 100755 --- a/integration/copy_data/retry_test/run.sh +++ b/integration/copy_data/retry_test/run.sh @@ -41,7 +41,7 @@ shard0_count() { shard0_psql -tAc "SELECT COUNT(*) FROM $1"; } shard1_count() { shard1_psql -tAc "SELECT COUNT(*) FROM $1"; } # Pass a psql helper as $1; checks whether the canary row is present on that node. -has_canary() { local fn=$1; "${fn}" -tAc "SELECT 1 FROM copy_data.settings WHERE setting_name='${CANARY}' LIMIT 1" 2>/dev/null | grep -q 1; } +has_canary() { local fn=$1 name=${2:-${CANARY}}; "${fn}" -tAc "SELECT 1 FROM copy_data.settings WHERE setting_name='${name}' LIMIT 1" 2>/dev/null | grep -q 1; } pushd "${COMPOSE_DIR}" @@ -199,6 +199,51 @@ if [ "${CANARY_DELIVERED}" -ne 1 ]; then fi echo "[retry_test] OK: canary replicated to both shards." +# Assertion 3: replication survives a destination shard outage. +RETRY_CANARY="repl_retry_$(date +%s)_$$" +echo "[retry_test] Killing shard_0 to test replication retry..." +docker compose kill shard_0 + +echo "[retry_test] Inserting retry canary ${RETRY_CANARY} into source..." +src_psql -c "INSERT INTO copy_data.settings (setting_name, setting_value) VALUES ('${RETRY_CANARY}', 'repl_retry');" + +sleep 2 + +echo "[retry_test] Starting shard_0..." +docker compose start shard_0 + +READY_ATTEMPTS=0 +until pg_isready -h 127.0.0.1 -p 15433 -U pgdog -d pgdog1 -q; do + READY_ATTEMPTS=$((READY_ATTEMPTS + 1)) + if [ "${READY_ATTEMPTS}" -ge 120 ]; then + echo "[retry_test] FAIL: shard_0 not ready after $((READY_ATTEMPTS / 2))s" + exit 1 + fi + sleep 0.5 +done +echo "[retry_test] shard_0 is ready." + +RETRY_DELIVERED=0 +for _ in $(seq 1 "${REPLICATION_TIMEOUT}"); do + if has_canary shard0_psql "${RETRY_CANARY}" && has_canary shard1_psql "${RETRY_CANARY}"; then + RETRY_DELIVERED=1 + break + fi + if ! kill -0 "${PGDOG_PID}" 2>/dev/null; then + echo "[retry_test] FAIL: pgdog server exited while waiting for retry canary" + exit 1 + fi + sleep 1 +done + +if [ "${RETRY_DELIVERED}" -ne 1 ]; then + echo "[retry_test] FAIL: retry canary ${RETRY_CANARY} not replicated within ${REPLICATION_TIMEOUT}s" + echo "[retry_test] replication did not retry and recover after shard_0 outage" + admin_psql -c 'SHOW REPLICATION_SLOTS;' || true + exit 1 +fi +echo "[retry_test] OK: retry canary replicated to both shards after shard_0 outage." + # Verify row counts. # Sharded tables: sum across both destination shards must equal source. SHARDED_TABLES="copy_data.users copy_data.orders copy_data.order_items copy_data.log_actions copy_data.with_identity" @@ -237,7 +282,7 @@ if [ "${FAILED}" -ne 0 ]; then exit 1 fi -echo "[retry_test] PASS: COPY_DATA survived shard outage + pool reload; replication delivered canary." +echo "[retry_test] PASS: COPY_DATA survived shard outage + pool reload; replication delivered canary; replication retried after destination shard outage." # Stop pgdog cleanly before tearing down compose. kill "${PGDOG_PID}" 2>/dev/null || true diff --git a/pgdog/docs/issues/omni-table-subscriber-deadlock.md b/pgdog/docs/issues/omni-table-subscriber-deadlock.md deleted file mode 100644 index 9a94ac3d3..000000000 --- a/pgdog/docs/issues/omni-table-subscriber-deadlock.md +++ /dev/null @@ -1,340 +0,0 @@ -# Omni-table cross-subscriber distributed deadlock - -## Root cause - -pgdog runs one logical replication subscriber per source shard. Each subscriber holds a persistent -connection to **every** destination shard inside one implicit destination transaction (open from WAL -`Begin` to WAL `Commit`). For omni (unsharded) tables, every source subscriber fans out each DML to -every destination, producing `N×M` independent lock holders for the same rows. - -`send()` in `src/backend/replication/logical/subscriber/stream.rs` uses three sequential loops: - -``` -for each destination: send Bind + Execute + Flush -for each destination: flush socket -for each destination: read response -``` - -Two subscribers × two destinations interleave: - -``` -sub-0: write → dest-0 (granted) write → dest-1 (waits on sub-1) -sub-1: write → dest-0 (waits on sub-0) write → dest-1 (granted) -``` - -Responses are read dest-0-first, so sub-0 blocks reading dest-1 and sub-1 blocks reading dest-0. -Neither reaches `Sync`; neither releases its lock. - -**PostgreSQL won't resolve it:** each server sees one waiter on one holder. The cycle exists only -across both wait graphs jointly, and no cross-server detector exists. - -**pgdog won't resolve it:** `stream.handle(data).await` has no timeout. The client query path's -`query_timeout` (default 30s) calls `backend.force_close()` on expiry; the replication path has no -equivalent. The failure surfaces only as source-slot LSNs frozen at non-advancing values. - -## Conditions - -1. Omni (unsharded) table in the publication. -2. ≥2 source shards and ≥2 destination shards. -3. Two concurrent source transactions touching the same omni row(s). - -The single-row case needs precise timing (sub-0 wins dest-0 and sub-1 wins dest-1 before either -reads back). The multi-row case is more reliable: row locks accumulate across the WAL transaction, -so subscribers hold earlier rows' locks while applying later ones. - -## Diagnosing a stuck stream - -```sh -# Cross-blocked backends on both destinations. -for port in 15434 15435; do - PGPASSWORD=pgdog psql -h 127.0.0.1 -p $port -U pgdog -d postgres -c " - SELECT pid, wait_event_type, wait_event, state, left(query, 80) - FROM pg_stat_activity - WHERE backend_type = 'client backend' AND state <> 'idle';" -done - -# Frozen source slot positions. -for port in 15432 15433; do - PGPASSWORD=pgdog psql -h 127.0.0.1 -p $port -U pgdog -d postgres -c " - SELECT slot_name, confirmed_flush_lsn, - pg_current_wal_lsn() - confirmed_flush_lsn AS lag - FROM pg_replication_slots;" -done -``` - -Two waiters cross-blocked on the two destinations, plus two source-slot LSNs frozen across -successive `[0.000 MB/sec]` log lines, confirms this deadlock. - -## Reproduction - -- **Unit test:** `cross_subscriber_omni_deadlock_two_databases` in - `src/backend/replication/logical/subscriber/tests.rs` — two subscribers racing on separate - databases behind a `tokio::sync::Barrier`. -- **Integration:** `integration/resharding/repro_deadlock.sh` — full Docker stack, two seeded rows, - concurrent UPDATEs. - -## Solutions - -### Solution 1: sequential per-destination apply in `send()` - -Collapse the three loops in `send()` (`stream.rs:238-277`) into one that completes write→read per -destination before moving on: - -```rust -for conn in &mut conns { - conn.send(&vec![bind.clone().into(), Execute::new().into(), Flush.into()].into()).await?; - conn.flush().await?; - for _ in 0..2 { - let msg = conn.read().await?; - // ... existing response handling ... - } -} -``` - -**Effect:** no subscriber holds locks on a second destination before the first one's response is -read. Breaks the single-row two-destination cycle. - -**Limit:** does not fix multi-row transactions. Locks accumulate across the WAL transaction (held -until `Sync` in `commit()`), so two omni rows still produce a cross-row, cross-destination cycle. -This is the case in `repro_deadlock.sh`. **Not sufficient alone** for any workload with multi-row -omni transactions. - ---- - -### Solution 2: `lock_timeout` on destination connections - -In `connect()` (`stream.rs:165-222`), set `lock_timeout` on each destination connection alongside -the existing `BEGIN`/`COMMIT` `Parse`. Same pattern as `statement_timeout` in -`src/backend/pool/pool_impl.rs:423-428`. - -```rust -server.send(&vec![ - Query::new("SET lock_timeout = '5s'").into(), - Sync.into(), -].into()).await?; -// drain ReadyForQuery -``` - -Make it configurable: add `lock_timeout: Option` to `Replication` in -`pgdog-config/src/replication.rs`. - -**Effect:** Postgres aborts blocked statements with `canceling statement due to lock timeout`. The -error propagates `send()` → `handle_inner()` → `handle()`, which clears `connections` and lets the -subscriber reconnect from the last flushed LSN. The failure becomes visible in logs immediately. - -**Limit:** safety net, not a structural fix. Bounds how long a deadlock persists; the conflicting -transaction still retries from scratch, so high contention can produce repeated lock-timeout -errors. Combined with Solution 1, it catches the multi-row cases sequential apply can't prevent. - ---- - -### Combined recommendation - -| | Single-row deadlock | Multi-row deadlock | Recovery / visibility | -|---|:---:|:---:|:---:| -| Sequential apply | ✓ | ✗ | ✗ | -| `lock_timeout` | — | — | ✓ | -| Both | ✓ | bounded | ✓ | - -Ship `lock_timeout` first — lower-risk, immediate protection against every deadlock shape. -Sequential apply reduces single-row contention. Both are superseded by Solution 6. -### Solution 3: per-table async writer task - -One long-lived Tokio task per replicated table. Subscribers buffer the full WAL transaction in -memory; on `Commit`, each table's slice is sent over mpsc to that table's writer. The writer owns -all destination connections for its table, fans out to every destination shard in parallel within -one transaction, and applies transactions on the same table one at a time. - -The mpsc guarantees at most one writer touches a table's rows on any destination at a given moment, -so the omni-table deadlock is **structurally eliminated**. - -**Limits:** - -1. **Cross-table atomicity is broken.** A source transaction touching an omni table and a sharded - table flows through two writers; their commits land at different times. Reads between the two - see a partially applied transaction — exactly the invariant logical replication is meant to - preserve. Affects every workload mixing omni and sharded writes. - -2. **Foreign keys become a new responsibility.** With FK from `B` to `A`: - - If `B`'s writer drains before `A`'s commits → `foreign_key_violation`. Recovery requires - global replay coordination. - - Even without violation, FK checks take a share lock on the referenced row, so `B`'s writer - can wait on `A`'s writer — a new cross-writer deadlock surface for any schema with FKs across - replicated tables. Closing it requires a dependency tracker, which is the same coordination - layer that collapses Solution 4 to globally serialized apply under contention. - -3. **Cross-table commit order is not preserved.** Per-table writers are independent; arrival order - at the destination need not match source-commit order. Cross-table joins during the apply - window can return incorrect results. - -4. **Buffering is unbounded.** Full WAL transactions sit in memory until `Commit`. Bulk loads can - OOM before any row commits downstream. - -5. **Throughput: N-way → 1-way per omni table.** N subscribers funnel into one writer per table. - Even with parallel shard fan-out inside a single transaction, transactions on the same table - apply one at a time. Every omni table receives writes from every source shard, so every - concurrent source transaction queues — even when row sets don't overlap. PostgreSQL's row locks - would have run those concurrently; the writer serializes them unconditionally. Ceiling per omni - table: `1 / (max-shard transaction apply time)`, independent of source count. More source - shards makes this worse. - -6. **Connection count is K×M.** One writer per table × M destinations. If subscribers keep their - own connections during migration, total is K×M + N×M. - -7. **Back-pressure is global.** A slow destination on one hot table fills its channel, stalling the - subscriber and WAL advance for every other table. Bounded channels stall uniformly; unbounded - ones trade stalls for memory growth. - -8. **Recovery needs a global LSN barrier.** Any writer error must roll back every writer's - in-flight transaction and replay from the last globally-flushed LSN — a sync barrier that - doesn't exist today. - -Solves the deadlock by breaking logical replication's central correctness invariant. -**Not recommended.** - ---- - -### Solution 4: per-destination-shard async writer task - -One Tokio task per destination shard, owning that shard's single connection. Subscribers enqueue -buffered transactions onto the relevant shard's channel; the shard task applies in arrival order. - -One writer per shard means no two subscribers contend for the same row on a destination. For omni -tables this is **strictly worse than the bug it fixes**, and for all tables it removes -destination-side parallelism. - -**Limits:** - -1. **Cross-shard order inversion → silent permanent corruption.** Two omni transactions on the - same row, observed by different subscribers: - - - T1 (sub-0): `UPDATE r SET x = 1` - - T2 (sub-1): `UPDATE r SET x = 2` - - Each subscriber enqueues its buffered transaction onto every shard's channel via separate - `tx.send()` calls — one per destination, **not atomic across destinations**. The four sends - from the two subscribers interleave freely: - - ``` - t0: sub-0.send(chan-0, T1) chan-0: [T1] - t1: sub-1.send(chan-0, T2) chan-0: [T1, T2] - t2: sub-1.send(chan-1, T2) chan-1: [T2] - t3: sub-0.send(chan-1, T1) chan-1: [T2, T1] - ``` - - Each shard task drains FIFO: shard-0 applies T1 → T2 and ends at `x = 2`; shard-1 applies - T2 → T1 and ends at `x = 1`. The shards permanently disagree on row `r`. Postgres raises no - error — each shard's local apply order was internally consistent — and pgdog has no - cross-shard comparator. Reads of `r` return inconsistent values indefinitely. - - The current code cannot produce this: each subscriber writes serially through one connection - per shard from a single task, so its writes to dest-0 and dest-1 always land in the same - relative order on both. The lock contention is what produces the (observable, recoverable) - deadlock instead of (silent) divergence. - - A global sequence number per source transaction would let shard tasks reorder before applying, - but then shard-1 must wait for T1 to arrive before applying T2 — both shards apply in the same - global order, eliminating per-shard parallelism for omni writes. Strictly worse than today. - -2. **Cross-shard atomicity is lost.** Multi-shard transactions commit independently per shard with - no coordinated rollback. - -3. **Memory is unbounded** — same buffering requirement as Solution 3. - -4. **Eliminates all destination parallelism for all tables.** Today N subscribers drive concurrent - connections to each shard; unrelated transactions on different tables apply in parallel. - Solution 4 serializes everything per shard FIFO — an `orders` row from sub-1 waits behind a - `config` row from sub-0 even with no shared locks. One slow apply stalls everything behind it. - - Per-shard ceiling: `1 / (single-row apply RTT)`. Today: `N / (single-row apply RTT)` across N - subscribers. The regression is total, not scoped to omni tables. - -**Discard.** Silent corruption *and* total throughput regression. Solutions 1+2 give a bounded, -recoverable deadlock — strictly better on both axes. - ---- - -### Solution 5: transaction batching with deferred `Sync` - -Sometimes proposed alongside the deadlock fix; **not a deadlock fix**. Buffer the full WAL -transaction, then send all statements per destination in one batch terminated by `Sync`. `send()` -stops emitting per-row `Flush`; responses are read once at `commit()`. - -**Why the deadlock survives.** Locks are acquired when Postgres processes `Execute`, as soon as -bytes hit the network — independent of whether the payload ends with `Flush` or `Sync`. By the -time the last row is flushed, all locks are held, and they release only at COMMIT. The -cross-subscriber interleave is equally possible. `lock_timeout` is still required. - -**The current code does not have a partial-transaction-visibility bug.** `__pgdog_repl_begin` / -`__pgdog_repl_commit` are only `Parse`-prepared in `connect()` (`stream.rs:185-204`); never bound -or executed. Each destination runs an implicit extended-protocol transaction spanning the WAL -transaction, committed by the `Sync` in `commit()` (`stream.rs:671-674`). `send()` emits only -`Bind + Execute + Flush`; `prepare_statements` (`stream.rs:443-446`) emits `Sync` only when -`in_transaction == false`. On error, `handle()` (`stream.rs:826`) clears the pool, dropping -sockets and forcing a Postgres rollback. Solution 5 adds no correctness here. - -**Unrelated benefits:** - -- Removes per-row protocol round-trips (one response read per transaction instead of per row). -- Simpler `send()` control flow. - -**Costs:** - -- Unbounded memory: full WAL transaction per destination buffered before flush. -- Apply lag increases by one source-transaction duration. - -Treat as a `send()`-loop optimization parallel to Solutions 1+2, not part of the deadlock fix. - ---- - -### Solution 6: modulo-partition subscriber ownership (implemented) - -The root cause is that every subscriber writes every omni-table row to every destination. -The fix assigns each subscriber a disjoint subset of destination shards so no two subscribers -ever hold locks on the same row at the same time. - -The partitioning rule lives in `OmniOwnership::owns()` (`omni_ownership.rs`): subscriber -`source_shard` owns destination shard `d` when `d % n_sources == source_shard`. With two -subscribers and two destinations, sub-0 owns even-indexed destinations and sub-1 owns -odd-indexed ones — they never touch the same row on the same server simultaneously. - -Enforcement happens in `send()` (`stream.rs`). The connection filter always passes each -shard through `partition.owns()`, with one exception: when there are multiple destination -connections, `Shard::Direct` and `Shard::Multi` carry an explicit shard computed from the -row's shard key and must land precisely — ownership filtering doesn't apply there. - -The edge case that required care: with a single destination shard, the query router -collapses `Shard::All` to `Shard::Direct(0)` for every table, including omni tables that -have no shard key. Without a guard, the ownership check would be bypassed and all -subscribers would write to the one destination — the original bug, just on a smaller -cluster. The `n_conns == 1` path in the filter catches this, routing single-connection -dispatches through `partition.owns()` regardless of how the shard was computed. - -**Correctness prerequisite:** every subscriber must receive every omni-table WAL event. -Logical replication publishes each WAL record to all subscribers of the same publication, -so this holds in any standard deployment. - -**Sharded tables are unaffected.** `OmniOwnership` is constructed with `n_sources = 1` -for sharded-table subscribers, causing `owns()` to return `true` unconditionally — the -behavior is identical to the pre-fix code. - ---- - -### Comparison - -| | Deadlock fixed | Cross-table/shard atomicity | Throughput impact | Memory risk | Complexity | Recommendation | -|---|:---:|:---:|:---:|:---:|:---:|---| -| Solution 1: sequential per-dest apply | single-row only | preserved | minor | none | low | Superseded | -| Solution 2: `lock_timeout` | bounded recovery | preserved | none | none | low | Superseded | -| Solution 3: per-table writer | yes | broken | severe | unbounded | high | Not recommended | -| Solution 4: per-shard writer | silent divergence† | broken | severe | unbounded | high | Discard | -| Solution 5: buffer + `Sync` | no | preserved | minor | unbounded | low | Optional optimization | -| **Modulo-partition subscriber ownership** | **yes (structural)** | **preserved** | **none** | **none** | **low** | **Implemented** | - -†Solution 4 produces persistent, undetectable row-level disagreement across shards when two -subscribers race on the same omni row. The deadlock is observable and recoverable; this divergence -is neither. - -Modulo-partition subscriber ownership is the structural fix and is now implemented in -`send()`. Solutions 1 and 2 remain valid as defence-in-depth. Solutions 3 and 4 trade the -deadlock for worse failure modes and throughput. Solution 5 is orthogonal. \ No newline at end of file diff --git a/pgdog/src/backend/replication/logical/publisher/publisher_impl.rs b/pgdog/src/backend/replication/logical/publisher/publisher_impl.rs index 4a289c300..a495e3d29 100644 --- a/pgdog/src/backend/replication/logical/publisher/publisher_impl.rs +++ b/pgdog/src/backend/replication/logical/publisher/publisher_impl.rs @@ -292,7 +292,16 @@ impl Publisher { delay.as_millis() ); sleep(delay).await; - try_join!(slot.reconnect(), stream.reconnect())?; + if let Err(reconnect_err) = + try_join!(slot.reconnect(), stream.reconnect()) + { + if !reconnect_err.is_retryable() { + return Err(reconnect_err); + } + warn!( + "[replication] reconnect error ({attempt}/{max_attempts}): {reconnect_err}, will retry" + ); + } } Err(err) => return Err(err), } From 79796aa1f9b85e867f49c5527cde016dab5aa6b3 Mon Sep 17 00:00:00 2001 From: meskill <8974488+meskill@users.noreply.github.com> Date: Wed, 20 May 2026 10:14:31 +0000 Subject: [PATCH 4/5] fix test and OmniOwnershipt configuration --- .../logical/publisher/publisher_impl.rs | 4 +-- .../logical/subscriber/omni_ownership.rs | 5 +-- .../replication/logical/subscriber/stream.rs | 4 +-- .../replication/logical/subscriber/tests.rs | 34 +++++++++---------- 4 files changed, 24 insertions(+), 23 deletions(-) diff --git a/pgdog/src/backend/replication/logical/publisher/publisher_impl.rs b/pgdog/src/backend/replication/logical/publisher/publisher_impl.rs index a495e3d29..317fba869 100644 --- a/pgdog/src/backend/replication/logical/publisher/publisher_impl.rs +++ b/pgdog/src/backend/replication/logical/publisher/publisher_impl.rs @@ -727,7 +727,7 @@ mod test { let cfg = config(); let cluster = Cluster::new_test(&cfg); cluster.launch(); - let mut stream = StreamSubscriber::new(&cluster, &[]); + let mut stream = StreamSubscriber::new(&cluster, &[], OmniOwnership::test()); stream.connect().await.unwrap(); let result = stream.handle(begin_copy_data(1)).await; @@ -744,7 +744,7 @@ mod test { let cfg = config(); let cluster = Cluster::new_test(&cfg); cluster.launch(); - let mut stream = StreamSubscriber::new(&cluster, &[]); + let mut stream = StreamSubscriber::new(&cluster, &[], OmniOwnership::test()); stream.connect().await.unwrap(); let result = stream.handle(commit_copy_data(1)).await; diff --git a/pgdog/src/backend/replication/logical/subscriber/omni_ownership.rs b/pgdog/src/backend/replication/logical/subscriber/omni_ownership.rs index a29b2c9d6..01cb200fa 100644 --- a/pgdog/src/backend/replication/logical/subscriber/omni_ownership.rs +++ b/pgdog/src/backend/replication/logical/subscriber/omni_ownership.rs @@ -29,8 +29,9 @@ impl OmniOwnership { } } -impl Default for OmniOwnership { - fn default() -> Self { +#[cfg(test)] +impl OmniOwnership { + pub fn test() -> Self { Self::new(0, 1) } } diff --git a/pgdog/src/backend/replication/logical/subscriber/stream.rs b/pgdog/src/backend/replication/logical/subscriber/stream.rs index 268e69e26..82b48b6b0 100644 --- a/pgdog/src/backend/replication/logical/subscriber/stream.rs +++ b/pgdog/src/backend/replication/logical/subscriber/stream.rs @@ -92,7 +92,7 @@ impl Statement { } } -#[derive(Debug, Default)] +#[derive(Debug)] pub struct StreamSubscriber { /// Destination cluster. cluster: Cluster, @@ -1023,7 +1023,7 @@ mod tests { fn make_subscriber() -> StreamSubscriber { let cluster = Cluster::new_test(&config()); - StreamSubscriber::new(&cluster, &[], OmniOwnership::default()) + StreamSubscriber::new(&cluster, &[], OmniOwnership::test()) } #[test] diff --git a/pgdog/src/backend/replication/logical/subscriber/tests.rs b/pgdog/src/backend/replication/logical/subscriber/tests.rs index 28d676a94..836c64b0b 100644 --- a/pgdog/src/backend/replication/logical/subscriber/tests.rs +++ b/pgdog/src/backend/replication/logical/subscriber/tests.rs @@ -255,12 +255,12 @@ fn x_update(u: XLogUpdate) -> CopyData { fn make_subscriber() -> StreamSubscriber { let cluster = Cluster::new_test(&config()); let tables = vec![make_sharded_table(), make_sharded_test_b_table()]; - StreamSubscriber::new(&cluster, &tables, OmniOwnership::default()) + StreamSubscriber::new(&cluster, &tables, OmniOwnership::test()) } fn make_subscriber_with_tables(tables: Vec) -> StreamSubscriber { let cluster = Cluster::new_test(&config()); - StreamSubscriber::new(&cluster, &tables, OmniOwnership::default()) + StreamSubscriber::new(&cluster, &tables, OmniOwnership::test()) } fn make_subscriber_with_tables_two_databases( @@ -274,7 +274,7 @@ fn make_subscriber_with_tables_two_databases( fn make_subscriber_single_shard() -> StreamSubscriber { let cluster = Cluster::new_test_single_shard(&config()); let tables = vec![make_sharded_table(), make_sharded_test_b_table()]; - StreamSubscriber::new(&cluster, &tables, OmniOwnership::default()) + StreamSubscriber::new(&cluster, &tables, OmniOwnership::test()) } /// Count rows matching the given `WHERE` predicate using a separate connection. @@ -614,7 +614,7 @@ async fn partition_leaves_share_destination() { leaf_b.table.parent_name = "sharded".to_string(); let cluster = Cluster::new_test_single_shard(&config()); - let mut sub = StreamSubscriber::new(&cluster, &[leaf_a, leaf_b], OmniOwnership::default()); + let mut sub = StreamSubscriber::new(&cluster, &[leaf_a, leaf_b], OmniOwnership::test()); let mut verify = test_server().await; sub.connect().await.unwrap(); @@ -1558,7 +1558,7 @@ async fn full_identity_nothing_rejected() { let mut sub = StreamSubscriber::new( &cluster, &[make_replica_identity_nothing_table()], - OmniOwnership::default(), + OmniOwnership::test(), ); sub.connect().await.unwrap(); @@ -1596,7 +1596,7 @@ async fn full_identity_omni_no_unique_index_rejected() { let mut sub = StreamSubscriber::new( &cluster, &[make_full_identity_omni_table()], - OmniOwnership::default(), + OmniOwnership::test(), ); // Enforce precondition: the table must exist but have no qualifying unique index. @@ -1639,7 +1639,7 @@ async fn full_identity_insert_sharded() { let mut sub = StreamSubscriber::new( &cluster, &[make_full_identity_sharded_table()], - OmniOwnership::default(), + OmniOwnership::test(), ); let mut verify = test_server().await; sub.connect().await.unwrap(); @@ -1669,7 +1669,7 @@ async fn full_identity_update_fast_path() { let mut sub = StreamSubscriber::new( &cluster, &[make_full_identity_sharded_table()], - OmniOwnership::default(), + OmniOwnership::test(), ); let mut verify = test_server().await; sub.connect().await.unwrap(); @@ -1728,7 +1728,7 @@ async fn full_identity_update_slow_path() { let mut sub = StreamSubscriber::new( &cluster, &[make_full_identity_sharded_table()], - OmniOwnership::default(), + OmniOwnership::test(), ); let mut verify = test_server().await; sub.connect().await.unwrap(); @@ -1792,7 +1792,7 @@ async fn full_identity_update_slow_path_realistic_old_tuple() { let mut sub = StreamSubscriber::new( &cluster, &[make_full_identity_sharded_table()], - OmniOwnership::default(), + OmniOwnership::test(), ); let mut verify = test_server().await; sub.connect().await.unwrap(); @@ -1853,7 +1853,7 @@ async fn full_identity_update_all_toasted_is_noop() { let mut sub = StreamSubscriber::new( &cluster, &[make_full_identity_sharded_table()], - OmniOwnership::default(), + OmniOwnership::test(), ); let mut verify = test_server().await; sub.connect().await.unwrap(); @@ -1899,7 +1899,7 @@ async fn full_identity_delete() { let mut sub = StreamSubscriber::new( &cluster, &[make_full_identity_sharded_table()], - OmniOwnership::default(), + OmniOwnership::test(), ); let mut verify = test_server().await; sub.connect().await.unwrap(); @@ -1941,7 +1941,7 @@ async fn full_identity_insert_omni_dedup() { let mut sub = StreamSubscriber::new( &cluster, &[make_full_identity_omni_dedup_table()], - OmniOwnership::default(), + OmniOwnership::test(), ); let mut verify = test_server().await; @@ -2004,7 +2004,7 @@ async fn full_identity_update_duplicate_rows() { let mut sub = StreamSubscriber::new( &cluster, &[make_full_identity_dup_rows_table()], - OmniOwnership::default(), + OmniOwnership::test(), ); let mut verify = test_server().await; @@ -2074,7 +2074,7 @@ async fn full_identity_delete_duplicate_rows() { let mut sub = StreamSubscriber::new( &cluster, &[make_full_identity_dup_rows_table()], - OmniOwnership::default(), + OmniOwnership::test(), ); let mut verify = test_server().await; @@ -2145,7 +2145,7 @@ async fn full_identity_update_matches_null_column() { let mut sub = StreamSubscriber::new( &cluster, &[make_full_identity_dup_rows_table()], - OmniOwnership::default(), + OmniOwnership::test(), ); let mut verify = test_server().await; @@ -2210,7 +2210,7 @@ async fn full_identity_delete_matches_null_column() { let mut sub = StreamSubscriber::new( &cluster, &[make_full_identity_dup_rows_table()], - OmniOwnership::default(), + OmniOwnership::test(), ); let mut verify = test_server().await; From a5b20bb5188b06957d73489ce2bed1ef33d173ac Mon Sep 17 00:00:00 2001 From: meskill <8974488+meskill@users.noreply.github.com> Date: Wed, 20 May 2026 11:33:17 +0000 Subject: [PATCH 5/5] small fixes --- integration/copy_data/retry_test/pgdog.toml | 2 ++ .../replication/logical/publisher/publisher_impl.rs | 6 +++++- pgdog/src/backend/replication/logical/subscriber/stream.rs | 7 +++++++ pgdog/src/frontend/client/query_engine/test/set.rs | 7 ++++--- pgdog/src/net/messages/error_response.rs | 2 +- 5 files changed, 19 insertions(+), 5 deletions(-) diff --git a/integration/copy_data/retry_test/pgdog.toml b/integration/copy_data/retry_test/pgdog.toml index daa29a71f..1b453b812 100644 --- a/integration/copy_data/retry_test/pgdog.toml +++ b/integration/copy_data/retry_test/pgdog.toml @@ -2,6 +2,8 @@ resharding_copy_format = "binary" resharding_copy_retry_max_attempts = 8 resharding_copy_retry_min_delay = 500 +resharding_replication_retry_max_attempts = 8 +resharding_replication_retry_min_delay = 500 [[databases]] name = "source" diff --git a/pgdog/src/backend/replication/logical/publisher/publisher_impl.rs b/pgdog/src/backend/replication/logical/publisher/publisher_impl.rs index 317fba869..56f43a6c0 100644 --- a/pgdog/src/backend/replication/logical/publisher/publisher_impl.rs +++ b/pgdog/src/backend/replication/logical/publisher/publisher_impl.rs @@ -298,6 +298,7 @@ impl Publisher { if !reconnect_err.is_retryable() { return Err(reconnect_err); } + stream.reset_connections(); warn!( "[replication] reconnect error ({attempt}/{max_attempts}): {reconnect_err}, will retry" ); @@ -732,7 +733,10 @@ mod test { let result = stream.handle(begin_copy_data(1)).await; - assert!(result.is_ok()); + assert!( + result.unwrap().is_none(), + "Begin event must not emit a status update" + ); cluster.shutdown(); } diff --git a/pgdog/src/backend/replication/logical/subscriber/stream.rs b/pgdog/src/backend/replication/logical/subscriber/stream.rs index 82b48b6b0..b8f4dd1f4 100644 --- a/pgdog/src/backend/replication/logical/subscriber/stream.rs +++ b/pgdog/src/backend/replication/logical/subscriber/stream.rs @@ -840,6 +840,13 @@ impl StreamSubscriber { self.connect().await } + /// Clear destination connections so the next `handle` call forces a fresh + /// `connect()`. Use after a failed reconnect to avoid reusing connections + /// that may have buffered stale handshake responses. + pub fn reset_connections(&mut self) { + self.connections.clear(); + } + /// `docs/REPLICATION.md` → "Error rollback". pub async fn handle(&mut self, data: CopyData) -> Result, Error> { match self.handle_inner(data).await { diff --git a/pgdog/src/frontend/client/query_engine/test/set.rs b/pgdog/src/frontend/client/query_engine/test/set.rs index 3ea4c05d4..39c2c8093 100644 --- a/pgdog/src/frontend/client/query_engine/test/set.rs +++ b/pgdog/src/frontend/client/query_engine/test/set.rs @@ -391,9 +391,10 @@ async fn test_lock_timeout() { 'I' ); - assert!( - test_client.client().params.get("lock_timeout").is_some(), - "lock_timeout should be tracked after SET" + assert_eq!( + test_client.client().params.get("lock_timeout"), + Some(&ParameterValue::String("3000".into())), + "lock_timeout should be tracked with the correct value after SET" ); // Reset clears it. diff --git a/pgdog/src/net/messages/error_response.rs b/pgdog/src/net/messages/error_response.rs index a8400c676..3ec1e8bfb 100644 --- a/pgdog/src/net/messages/error_response.rs +++ b/pgdog/src/net/messages/error_response.rs @@ -218,7 +218,7 @@ impl ErrorResponse { /// Whether this Postgres error is transient and the operation can be retried. /// Covers connection exceptions (class 08, excluding protocol violation 08P01), /// operator-intervention shutdowns (57P01/57P02/57P03), and resource pressure - /// (53300 too_many_connections). + /// (53300 too_many_connections), and lock timeout (55P03 lock_not_available). pub fn is_retryable(&self) -> bool { matches!( self.code.as_str(),