Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 58 additions & 0 deletions integration_test/connection_pool/max_lifetime_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
defmodule MaxLifetimeTest do
use ExUnit.Case, async: true

alias TestPool, as: P
alias TestAgent, as: A

test "disconnects and reconnects when idle ping fires after max_lifetime" do
stack = [
{:ok, :state},
:ok,
{:ok, :state},
fn _, _ -> Process.sleep(:infinity) end
]

{:ok, agent} = A.start_link(stack)

opts = [
agent: agent,
parent: self(),
connection_listeners: [self()],
max_lifetime: 100..100,
idle_interval: 200,
backoff_min: 10
]

{:ok, _pool} = P.start_link(opts)

assert_receive {:connected, conn}
assert_receive {:disconnected, ^conn}
assert_receive {:connected, ^conn}
end

test "disconnects and reconnects after a long checkout exceeds max_lifetime" do
stack = [
{:ok, :state},
{:idle, :state},
{:idle, :state},
:ok,
{:ok, :state},
fn _, _ -> Process.sleep(:infinity) end
]

{:ok, agent} = A.start_link(stack)

opts = [
agent: agent,
parent: self(),
connection_listeners: [self()],
max_lifetime: 50..50
]

{:ok, pool} = P.start_link(opts)
assert_receive {:connected, conn}
assert P.run(pool, fn _conn -> Process.sleep(500) end)
assert_receive {:disconnected, ^conn}
assert_receive {:connected, ^conn}
end
end
10 changes: 10 additions & 0 deletions lib/db_connection.ex
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ defmodule DBConnection do
| {:configure, (keyword -> keyword) | {module, atom, [any]} | nil}
| {:idle_interval, non_neg_integer}
| {:idle_limit, non_neg_integer}
| {:max_lifetime, Range.t()}
| {:max_restarts, non_neg_integer}
| {:max_seconds, pos_integer}
| {:name, GenServer.name()}
Expand Down Expand Up @@ -437,6 +438,15 @@ defmodule DBConnection do
also terminate whenever a connection is disconnected (for instance, due to
client or server errors)

* `:max_lifetime` - The number of ms the connection is allowed to live.
It is a range so you can jitter/spread disconnections over some time period.
For example, to have a max lifetime between 8 and 9 minutes, you ca set it
to `480_000..540_000`. Because the timer is started *after* the connection
to the database is established and on checkout, the connection may live for
slightly longer. If the connection is idle, the worst case wait is of
`540_000 + idle_limit`. If the connection is in use, it may last as long as
the connection is checked out over the max period. Default is `nil`.

* `:name` - A name to register the started process (see the `:name` option
in `GenServer.start_link/3`)

Expand Down
28 changes: 22 additions & 6 deletions lib/db_connection/connection_pool.ex
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,21 @@ defmodule DBConnection.ConnectionPool do
DBConnection.register_as_pool(mod)

queue = :ets.new(__MODULE__.Queue, [:protected, :ordered_set, decentralized_counters: true])
ts = {System.monotonic_time(), 0}

max_lifetime =
case Keyword.fetch(opts, :max_lifetime) do
{:ok, %Range{first: first, last: last, step: 1}} ->
{System.convert_time_unit(first, :millisecond, :native), last - first}

{:ok, invalid} ->
raise ArgumentError,
"invalid value for :max_lifetime, expected a step-1 range, got: #{inspect(invalid)}"

:error ->
nil
end

ts = {System.monotonic_time(), 0, max_lifetime}
{:ok, _} = DBConnection.ConnectionPool.Pool.start_supervised(queue, mod, opts)
target = Keyword.get(opts, :queue_target, @queue_target)
interval = Keyword.get(opts, :queue_interval, @queue_interval)
Expand Down Expand Up @@ -99,8 +113,9 @@ defmodule DBConnection.ConnectionPool do
{:reply, [metrics], state}
end

def handle_call({:disconnect_all, interval}, _from, {type, queue, codel, _ts}) do
ts = {System.monotonic_time(), interval}
def handle_call({:disconnect_all, interval}, _from, {type, queue, codel, ts}) do
{_, _, max_lifetime} = ts
ts = {System.monotonic_time(), interval, max_lifetime}
{:reply, :ok, {type, queue, codel, ts}}
end

Expand Down Expand Up @@ -150,9 +165,9 @@ defmodule DBConnection.ConnectionPool do

case :ets.info(holder, :owner) do
^owner ->
{time, interval} = ts
{time, interval, max_lifetime} = ts

if Holder.maybe_disconnect(holder, time, interval) do
if Holder.maybe_disconnect(holder, time, interval, max_lifetime) do
{:noreply, data}
else
handle_checkin(holder, extra, data)
Expand Down Expand Up @@ -224,7 +239,8 @@ defmodule DBConnection.ConnectionPool do
{queued_in_native, holder} = key when queued_in_native <= past_in_native <-
:ets.first(queue) do
:ets.delete(queue, key)
Holder.maybe_disconnect(holder, elem(ts, 0), 0) or Holder.handle_ping(holder)
{time, _interval, max_lifetime} = ts
Holder.maybe_disconnect(holder, time, 0, max_lifetime) or Holder.handle_ping(holder)
drop_idle(past_in_native, limit - 1, status, queue, codel, ts)
else
_ ->
Expand Down
55 changes: 37 additions & 18 deletions lib/db_connection/holder.ex
Original file line number Diff line number Diff line change
Expand Up @@ -241,30 +241,46 @@ defmodule DBConnection.Holder do
handle_done(holder, &DBConnection.Connection.stop/3, err)
end

@spec maybe_disconnect(t, integer, non_neg_integer) :: boolean()
def maybe_disconnect(holder, start, interval_ms) do
@spec maybe_disconnect(t, integer, non_neg_integer, {integer, non_neg_integer} | nil) ::
boolean()
def maybe_disconnect(holder, start, interval_ms, lifetime) do
ts = :ets.lookup_element(holder, :conn, conn(:ts) + 1)

cond do
ts >= start ->
false

interval_ms == 0 ->
true

true ->
pid = :ets.lookup_element(holder, :conn, conn(:connection) + 1)
System.monotonic_time() > hash_pid(pid, interval_ms) + start
end
disconnect_all_reason(start, interval_ms, ts, holder) ||
max_lifetime_reason(lifetime, ts, holder)
rescue
_ -> false
else
true ->
opts = [message: "disconnect_all requested", severity: :debug]
nil ->
false

reason ->
opts = [message: reason, severity: :debug]
handle_disconnect(holder, DBConnection.ConnectionError.exception(opts))
end

false ->
false
defp max_lifetime_reason(nil, _ts, _holder), do: nil

defp max_lifetime_reason({start, interval_ms}, ts, holder) do
ellapsed = System.monotonic_time() - ts

# First check if passed start then check if also the interval
if ellapsed > start and ellapsed > hash_holder(holder, interval_ms) + start do
"max_lifetime exceeded"
end
end

defp disconnect_all_reason(start, interval_ms, ts, holder) do
disconnect? =
cond do
ts >= start -> false
interval_ms == 0 -> true
true -> System.monotonic_time() > hash_holder(holder, interval_ms) + start
end

if disconnect? do
"disconnect_all requested"
end
end

## Private
Expand Down Expand Up @@ -441,7 +457,10 @@ defmodule DBConnection.Holder do
:erlang.cancel_timer(deadline, async: true, info: false)
end

defp hash_pid(pid, interval_ms) do
defp hash_holder(_holder, 0), do: 0

defp hash_holder(holder, interval_ms) do
pid = :ets.lookup_element(holder, :conn, conn(:connection) + 1)
hash = :erlang.phash2(pid, interval_ms)
System.convert_time_unit(hash, :millisecond, :native)
end
Expand Down
Loading