diff --git a/lib/db_connection/watcher.ex b/lib/db_connection/watcher.ex index ba782fb..e2ee324 100644 --- a/lib/db_connection/watcher.ex +++ b/lib/db_connection/watcher.ex @@ -37,16 +37,28 @@ defmodule DBConnection.Watcher do @impl true def handle_info({:DOWN, ref, _, _, _}, {caller_refs, started_refs}) do case caller_refs do - %{^ref => {supervisor, started_pid, started_ref}} -> - Process.demonitor(started_ref, [:flush]) - DynamicSupervisor.terminate_child(supervisor, started_pid) - {:noreply, {Map.delete(caller_refs, ref), Map.delete(started_refs, started_ref)}} + %{^ref => {_supervisor, started_pid, started_ref}} -> + try do + :sys.terminate(started_pid, :shutdown, :infinity) + catch + :exit, {:noproc, {:sys, :terminate, _}} -> :ok + :exit, {:shutdown, {:sys, :terminate, _}} -> :ok + end + + caller_refs = Map.delete(caller_refs, ref) + started_refs = Map.put(started_refs, started_ref, {nil, nil}) + {:noreply, {caller_refs, started_refs}} %{} -> - %{^ref => {caller_pid, caller_ref}} = started_refs - Process.demonitor(caller_ref, [:flush]) - Process.exit(caller_pid, :kill) - {:noreply, {Map.delete(caller_refs, caller_ref), Map.delete(started_refs, ref)}} + case started_refs do + %{^ref => {nil, nil}} -> + {:noreply, {caller_refs, Map.delete(started_refs, ref)}} + + %{^ref => {caller_pid, caller_ref}} -> + Process.demonitor(caller_ref, [:flush]) + Process.exit(caller_pid, :kill) + {:noreply, {Map.delete(caller_refs, caller_ref), Map.delete(started_refs, ref)}} + end end end @@ -56,7 +68,7 @@ defmodule DBConnection.Watcher do @impl true def terminate(_, {_, started_refs}) do - for {_, {caller_pid, _}} <- started_refs do + for {_, {caller_pid, _}} when caller_pid != nil <- started_refs do Process.exit(caller_pid, :kill) end diff --git a/test/db_connection/watcher_test.exs b/test/db_connection/watcher_test.exs new file mode 100644 index 0000000..e5987fc --- /dev/null +++ b/test/db_connection/watcher_test.exs @@ -0,0 +1,63 @@ +defmodule DBConnection.WatcherTest do + use ExUnit.Case, async: true + + alias TestConnection, as: C + alias TestAgent, as: A + + test "starting a new pool is not blocked by a slow-to-terminate pool" do + {:ok, agent1} = + A.start_link([ + {:ok, :state}, + fn _err, _state -> + Process.sleep(10_000) + :ok + end + ]) + + pool1 = + start_supervised!(%{ + id: :pool1, + start: + {DBConnection, :start_link, + [ + TestConnection, + [agent: agent1, pool_size: 1, idle_interval: 100_000, shutdown: 2_500] + ]}, + restart: :temporary + }) + + # Find the pool sup for pool1 by matching the owner pid in child spec ids. + pool_sup_ref = + DBConnection.ConnectionPool.Supervisor + |> DynamicSupervisor.which_children() + |> Enum.find_value(fn {_, sup_pid, _, _} -> + try do + has_pool1? = + sup_pid + |> Supervisor.which_children() + |> Enum.any?(fn {{_mod, owner, _id}, _, _, _} -> owner == pool1 end) + + if has_pool1?, do: sup_pid + catch + :exit, _ -> nil + end + end) + |> Process.monitor() + + # Trigger pool termination + stop_supervised!(:pool1) + + {:ok, agent2} = A.start_link([{:ok, :state}]) + + task = + Task.async(fn -> + C.start_link(agent: agent2, pool_size: 1, idle_interval: 100_000) + end) + + # If the DynamicSupervisor or the watcher is blocked, this hangs + assert {:ok, _pool2} = Task.await(task, 1_000) + + # Ensure the pool supervisor actually terminates + assert_receive {:DOWN, ^pool_sup_ref, _, _, _}, 5_000 + end +end