From 0a7c73ef761d7791e0878a5e58eb5f3d29be9a4e Mon Sep 17 00:00:00 2001 From: felipe stival Date: Mon, 20 Apr 2026 08:08:05 -0300 Subject: [PATCH 1/7] Add failing test for Watcher contention during pool termination The Watcher calls DynamicSupervisor.terminate_child/2 synchronously, blocking all other watch/terminate operations until the pool finishes shutting down. This test reproduces the issue with a slow disconnect. --- test/db_connection/watcher_test.exs | 45 +++++++++++++++++++++++++++++ 1 file changed, 45 insertions(+) create mode 100644 test/db_connection/watcher_test.exs diff --git a/test/db_connection/watcher_test.exs b/test/db_connection/watcher_test.exs new file mode 100644 index 0000000..18fdee1 --- /dev/null +++ b/test/db_connection/watcher_test.exs @@ -0,0 +1,45 @@ +defmodule DBConnection.WatcherTest do + use ExUnit.Case, async: true + + alias TestConnection, as: C + alias TestAgent, as: A + + test "starting a new pool is not blocked by a slow-to-terminate pool" do + # Start a pool whose connection takes a long time to disconnect. + # The disconnect callback sleeps for 10 seconds, simulating a slow shutdown. + {:ok, agent1} = + A.start_link([ + {:ok, :state}, + fn _err, _state -> + Process.sleep(10_000) + :ok + end + ]) + + start_supervised!( + %{ + id: :pool1, + start: {DBConnection, :start_link, [TestConnection, [agent: agent1, pool_size: 1]]}, + restart: :temporary + } + ) + + # Trigger termination of pool1 by stopping it (the caller). + # This sends a :DOWN to the Watcher, which will try to terminate the pool. + stop_supervised!(:pool1) + + # Give the Watcher a moment to receive the :DOWN and start terminating pool1 + Process.sleep(100) + + # Now try to start a second pool. If the Watcher is blocked by the + # synchronous DynamicSupervisor.terminate_child call, this will hang. + {:ok, agent2} = A.start_link([{:ok, :state}]) + + task = + Task.async(fn -> + C.start_link(agent: agent2, pool_size: 1) + end) + + assert {:ok, _pool2} = Task.await(task, 3_000) + end +end From 867dadef4337f2eef4da75b04e3f5e22aa62f4a2 Mon Sep 17 00:00:00 2001 From: felipe stival Date: Mon, 20 Apr 2026 08:59:52 -0300 Subject: [PATCH 2/7] Make watcher shut down the pool supervisor async --- lib/db_connection/watcher.ex | 28 ++++++++++++++++++++-------- 1 file changed, 20 insertions(+), 8 deletions(-) diff --git a/lib/db_connection/watcher.ex b/lib/db_connection/watcher.ex index ba782fb..958341a 100644 --- a/lib/db_connection/watcher.ex +++ b/lib/db_connection/watcher.ex @@ -37,16 +37,28 @@ defmodule DBConnection.Watcher do @impl true def handle_info({:DOWN, ref, _, _, _}, {caller_refs, started_refs}) do case caller_refs do - %{^ref => {supervisor, started_pid, started_ref}} -> - Process.demonitor(started_ref, [:flush]) - DynamicSupervisor.terminate_child(supervisor, started_pid) - {:noreply, {Map.delete(caller_refs, ref), Map.delete(started_refs, started_ref)}} + %{^ref => {_supervisor, started_pid, started_ref}} -> + Task.Supervisor.start_child(DBConnection.Task, fn -> + try do + GenServer.stop(started_pid, :shutdown, :infinity) + catch + :exit, _ -> :ok + end + end) + caller_refs = Map.delete(caller_refs, ref) + started_refs = Map.put(started_refs, started_ref, {nil, nil}) + {:noreply, {caller_refs, started_refs}} %{} -> - %{^ref => {caller_pid, caller_ref}} = started_refs - Process.demonitor(caller_ref, [:flush]) - Process.exit(caller_pid, :kill) - {:noreply, {Map.delete(caller_refs, caller_ref), Map.delete(started_refs, ref)}} + case started_refs do + %{^ref => {nil, nil}} -> + {:noreply, {caller_refs, Map.delete(started_refs, ref)}} + + %{^ref => {caller_pid, caller_ref}} -> + Process.demonitor(caller_ref, [:flush]) + Process.exit(caller_pid, :kill) + {:noreply, {Map.delete(caller_refs, caller_ref), Map.delete(started_refs, ref)}} + end end end From de36728bf2fa6fb3d6adb58c3fe7059217df9fe5 Mon Sep 17 00:00:00 2001 From: felipe stival Date: Mon, 20 Apr 2026 09:13:15 -0300 Subject: [PATCH 3/7] Make test concurrency safe --- test/db_connection/watcher_test.exs | 42 +++++++++++++++++++++-------- 1 file changed, 31 insertions(+), 11 deletions(-) diff --git a/test/db_connection/watcher_test.exs b/test/db_connection/watcher_test.exs index 18fdee1..2f0ad85 100644 --- a/test/db_connection/watcher_test.exs +++ b/test/db_connection/watcher_test.exs @@ -5,8 +5,6 @@ defmodule DBConnection.WatcherTest do alias TestAgent, as: A test "starting a new pool is not blocked by a slow-to-terminate pool" do - # Start a pool whose connection takes a long time to disconnect. - # The disconnect callback sleeps for 10 seconds, simulating a slow shutdown. {:ok, agent1} = A.start_link([ {:ok, :state}, @@ -16,30 +14,52 @@ defmodule DBConnection.WatcherTest do end ]) - start_supervised!( - %{ + pool1 = + start_supervised!(%{ id: :pool1, - start: {DBConnection, :start_link, [TestConnection, [agent: agent1, pool_size: 1]]}, + start: + {DBConnection, :start_link, + [ + TestConnection, + [agent: agent1, pool_size: 1, idle_interval: 100_000, shutdown: 2_500] + ]}, restart: :temporary - } - ) + }) + + # Find the pool sup for pool1 by matching the owner pid in child spec ids. + pool_sup_ref = + DBConnection.ConnectionPool.Supervisor + |> DynamicSupervisor.which_children() + |> Enum.find_value(fn {_, sup_pid, _, _} -> + try do + has_pool1? = + sup_pid + |> Supervisor.which_children() + |> Enum.any?(fn {{_mod, owner, _id}, _, _, _} -> owner == pool1 end) + + if has_pool1?, do: sup_pid + catch + :exit, _ -> nil + end + end) + |> Process.monitor() # Trigger termination of pool1 by stopping it (the caller). # This sends a :DOWN to the Watcher, which will try to terminate the pool. stop_supervised!(:pool1) - # Give the Watcher a moment to receive the :DOWN and start terminating pool1 - Process.sleep(100) - # Now try to start a second pool. If the Watcher is blocked by the # synchronous DynamicSupervisor.terminate_child call, this will hang. {:ok, agent2} = A.start_link([{:ok, :state}]) task = Task.async(fn -> - C.start_link(agent: agent2, pool_size: 1) + C.start_link(agent: agent2, pool_size: 1, idle_interval: 100_000) end) assert {:ok, _pool2} = Task.await(task, 3_000) + + # Ensure the pool supervisor actually terminates + assert_receive {:DOWN, ^pool_sup_ref, _, _, _}, 10_000 end end From bbc5943615a906c5ad452941184f39d3dcf64881 Mon Sep 17 00:00:00 2001 From: felipe stival Date: Mon, 20 Apr 2026 09:16:16 -0300 Subject: [PATCH 4/7] fmt --- lib/db_connection/watcher.ex | 1 + 1 file changed, 1 insertion(+) diff --git a/lib/db_connection/watcher.ex b/lib/db_connection/watcher.ex index 958341a..aea6aeb 100644 --- a/lib/db_connection/watcher.ex +++ b/lib/db_connection/watcher.ex @@ -45,6 +45,7 @@ defmodule DBConnection.Watcher do :exit, _ -> :ok end end) + caller_refs = Map.delete(caller_refs, ref) started_refs = Map.put(started_refs, started_ref, {nil, nil}) {:noreply, {caller_refs, started_refs}} From e6bb7452cf0b8a581e3a82c4f281e6bd8ea8a9fb Mon Sep 17 00:00:00 2001 From: felipe stival Date: Mon, 20 Apr 2026 09:24:13 -0300 Subject: [PATCH 5/7] Improve test --- test/db_connection/watcher_test.exs | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/test/db_connection/watcher_test.exs b/test/db_connection/watcher_test.exs index 2f0ad85..e5987fc 100644 --- a/test/db_connection/watcher_test.exs +++ b/test/db_connection/watcher_test.exs @@ -44,12 +44,9 @@ defmodule DBConnection.WatcherTest do end) |> Process.monitor() - # Trigger termination of pool1 by stopping it (the caller). - # This sends a :DOWN to the Watcher, which will try to terminate the pool. + # Trigger pool termination stop_supervised!(:pool1) - # Now try to start a second pool. If the Watcher is blocked by the - # synchronous DynamicSupervisor.terminate_child call, this will hang. {:ok, agent2} = A.start_link([{:ok, :state}]) task = @@ -57,9 +54,10 @@ defmodule DBConnection.WatcherTest do C.start_link(agent: agent2, pool_size: 1, idle_interval: 100_000) end) - assert {:ok, _pool2} = Task.await(task, 3_000) + # If the DynamicSupervisor or the watcher is blocked, this hangs + assert {:ok, _pool2} = Task.await(task, 1_000) # Ensure the pool supervisor actually terminates - assert_receive {:DOWN, ^pool_sup_ref, _, _, _}, 10_000 + assert_receive {:DOWN, ^pool_sup_ref, _, _, _}, 5_000 end end From bb7e167b99572825ebef4ae79aa0ed3c06b11e39 Mon Sep 17 00:00:00 2001 From: felipe stival Date: Mon, 20 Apr 2026 09:32:26 -0300 Subject: [PATCH 6/7] Handle nil caller pid on terminate --- lib/db_connection/watcher.ex | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/db_connection/watcher.ex b/lib/db_connection/watcher.ex index aea6aeb..057b5e9 100644 --- a/lib/db_connection/watcher.ex +++ b/lib/db_connection/watcher.ex @@ -69,7 +69,7 @@ defmodule DBConnection.Watcher do @impl true def terminate(_, {_, started_refs}) do - for {_, {caller_pid, _}} <- started_refs do + for {_, {caller_pid, _}} when caller_pid != nil <- started_refs do Process.exit(caller_pid, :kill) end From 7aa509a0f8a8a9dd6bbe798e5fdb9d798edd67c1 Mon Sep 17 00:00:00 2001 From: felipe stival Date: Mon, 20 Apr 2026 11:07:31 -0300 Subject: [PATCH 7/7] Use sys:terminate to avoid spawning a task --- lib/db_connection/watcher.ex | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/lib/db_connection/watcher.ex b/lib/db_connection/watcher.ex index 057b5e9..e2ee324 100644 --- a/lib/db_connection/watcher.ex +++ b/lib/db_connection/watcher.ex @@ -38,13 +38,12 @@ defmodule DBConnection.Watcher do def handle_info({:DOWN, ref, _, _, _}, {caller_refs, started_refs}) do case caller_refs do %{^ref => {_supervisor, started_pid, started_ref}} -> - Task.Supervisor.start_child(DBConnection.Task, fn -> - try do - GenServer.stop(started_pid, :shutdown, :infinity) - catch - :exit, _ -> :ok - end - end) + try do + :sys.terminate(started_pid, :shutdown, :infinity) + catch + :exit, {:noproc, {:sys, :terminate, _}} -> :ok + :exit, {:shutdown, {:sys, :terminate, _}} -> :ok + end caller_refs = Map.delete(caller_refs, ref) started_refs = Map.put(started_refs, started_ref, {nil, nil})