diff --git a/lib/firenest/pub_sub.ex b/lib/firenest/pub_sub.ex index 9b1fbc8..d5992e7 100644 --- a/lib/firenest/pub_sub.ex +++ b/lib/firenest/pub_sub.ex @@ -41,7 +41,7 @@ defmodule Firenest.PubSub do end @doc """ - Returns a child specifiction for pubsub with the given `options`. + Returns a child specification for pubsub with the given `options`. The `:name` and `:topology` keys are required as part of `options`. `:name` refers to the name of the pubsub to be started and `:topology` @@ -119,7 +119,7 @@ defmodule Firenest.PubSub do Broadcasts the given `message` on `topic` in `pubsub`. Returns `:ok` or `{:error, reason}` in case of failures in - the distributed brodcast. + the distributed broadcast. """ @spec broadcast(t, topic | [topic], term) :: :ok | {:error, term} def broadcast(pubsub, topic, message) when is_atom(pubsub) do @@ -135,7 +135,7 @@ defmodule Firenest.PubSub do Broadcasts the given `message` on `topic` in `pubsub`. Returns `:ok` or raises `Firenest.PubSub.BroadcastError` in case of - failures in the distributed brodcast. + failures in the distributed broadcast. """ @spec broadcast!(t, topic | [topic], term) :: :ok | no_return def broadcast!(pubsub, topic, message) do @@ -153,7 +153,7 @@ defmodule Firenest.PubSub do are not delivered to the broadcasting process. Returns `:ok` or `{:error, reason}` in case of failures in - the distributed brodcast. + the distributed broadcast. """ @spec broadcast_from(t, pid, topic | [topic], term) :: :ok | {:error, term()} def broadcast_from(pubsub, pid, topic, message) when is_atom(pubsub) and is_pid(pid) do @@ -173,7 +173,7 @@ defmodule Firenest.PubSub do are not delivered to the broadcasting process. Returns `:ok` or raises `Firenest.PubSub.BroadcastError` in case of - failures in the distributed brodcast. + failures in the distributed broadcast. """ @spec broadcast_from!(t, pid, topic | [topic], term) :: :ok | no_return def broadcast_from!(pubsub, pid, topic, message) do @@ -269,8 +269,7 @@ defmodule Firenest.PubSub.Supervisor do end def init({pubsub, topology, options}) do - partitions = - options[:partitions] || System.schedulers_online() |> Kernel./(4) |> Float.ceil() |> trunc() + partitions = options[:partitions] || ceil(System.schedulers_online() / 4) {module, function} = options[:dispatcher] || {Firenest.PubSub.Dispatcher, :dispatch} diff --git a/lib/firenest/replicated_state.ex b/lib/firenest/replicated_state.ex new file mode 100644 index 0000000..f35e107 --- /dev/null +++ b/lib/firenest/replicated_state.ex @@ -0,0 +1,361 @@ +defmodule Firenest.ReplicatedState do + @moduledoc """ + Distributed key-value store for ephemeral data. + + The key-value pairs are always attached to a lifetime of a + process and the state is replicated to all nodes across the + topology. When the attached process dies the state will be + removed on all nodes and in case the all the state from + disconnected nodes will be (temporarily) removed. The state + can only be attached to local processes. + + The state is managed through callbacks that are invoked on the + node where the process lives and remotely on other nodes when local + changes are propagated. + + The state is replicated incrementally through building local "deltas" + (changes to the state) and periodically replicating them remotely. Only + some amount of recent deltas are retained for "catching up" remote nodes. + If remote node is too far behind the current state and a delta-based + catch-up can't be performed, the server falls back to transferring the entire + local state. If keeping track of incremental changes is not convenient + for a particular state type, the value of a delta can be set to be equal + to the current state - this will always cause full state transfers. + """ + alias Firenest.SyncedServer + + @type server() :: atom() + @type key() :: term() + + @type local_delta() :: term() + @type remote_delta() :: term() + @type state() :: term() + @type callback_config() :: term() + + @type extra_action :: :delete | {:update_after, update :: term(), time :: pos_integer()} + + @type server_opt :: {:remote_changes, :ignore | :observe_collapsed | :observe_full} + + @doc """ + Called when a partition starts up. + + It returns: + + * an `initial_delta` that will be passed to `c:local_put/3` + callback on new entries and to `c:local_update/4` after remote + broadcast resets the local delta. + * an immutable `callback_config` value that will be passed to all + callbacks. + * a list of server_opt` settings configuring the behaviour of the server + + * `:remote_changes` - specifies behaviour of the `c:observe_remote_changes/2` + callback and can be `:observe_full | :observe_collapsed | :ignore`, + defaults to `:ignore`; + + """ + @callback init(opts :: keyword()) :: + {initial_delta :: local_delta(), callback_config(), [server_opt]} + + @doc """ + Called whenever the `put/4` function is called to create a new state. + + The `arg` is received from the corresponding `put/4` call. For the + explanation of the `extra_action` return values see the + `c:local_update/4` callback. + + The value of `local_delta` argument is always the initial delta as + returned by the `init/1` callback. + + This is a good place for broadcasting local state changes. + """ + @callback local_put(arg :: term(), local_delta(), callback_config()) :: + {local_delta(), initial_state} | {local_delta(), initial_state, extra_action()} + when initial_state: state() + + @doc """ + Called whenever the `update/4` function is called to update a state. + + This is a good place for broadcasting local state changes. + + The `local_delta` argument is either the accumulated delta from + `c:local_put/3` and `c:local_update/4` calls or the initial delta + from `c:init/1` after remote broadcast resets the local delta. + + ## Delayed update and delete + + If the function returns a third element in the tuple consisting of + `{:update_after, update, time, update}`, a timer is started by the + server and the `c:local_update/4` callback will be called with the + `update` value after `time` milliseconds. + + If the third element is `:delete`, the state will be immediately + deleted and the `c:local_delete/2` callback triggered. + """ + @callback local_update(update :: term(), local_delta(), state(), callback_config()) :: + {local_delta(), state()} | {local_delta(), state(), extra_action()} + + @doc """ + Called whenever attached process dies or the `delete/3` or `delete/2` + function is called and state is about to be deleted. + + The return value is ignored. + + This is a good place for broadcasting local state changes. + """ + @callback local_delete(state(), callback_config()) :: term() + + @doc """ + Called whenever the server is about to incrementally replicate local + state to a remote node. + + It takes in the local delta value constructed in the `c:local_update/4` + calls and returns a remote delta value that will be replicated to other + servers. The value of the local delta is reset to the initial delta value + returned from the `c:init/1` callback. + + In case the callback is not provided it defaults to just returning local delta. + """ + @callback prepare_remote_delta(local_delta(), callback_config()) :: remote_delta() + + @doc """ + Called whenever a remote delta is received from another node. + + The `remote_delta` value is the return value of the `c:prepare_remote_delta/2` + callback. The result of applying the remote delta to state must be + exactly the same as the result of applying local updates to the + state in the `c:local_update/3` callback. + """ + @callback handle_remote_delta(remote_delta(), state(), callback_config()) :: state() + + @doc """ + Called when remote changes are received by the local server. + + It receives a list of observed remote changes for all the tracked keys. + A `process_state_change` specifies how the state for a single process changed. + In particular the change can be: + + * `{initial_state, [:put, ...]}` in case the process just joined + * `{last_known_state, [..., :delete]}` in case the process just left + * `{last_known_state, [{:replace, state}, ...]}` in case it was not possible + to replicate incremental changes to remote state through deltas and a full + state transfer was performed. + * `{last_known_state, [..., {:delta, remote_delta}, ...]}` in case + incremental changes to the remote state were communicated. + + This callback is optional and its behaviour depends on the value + of the `:remote_changes` option returned from the `c:init/2` callback. + + * `:ignore` - the callback is not invoked and the server skips + all operations required for tracking the changes. This is the + default and should be chosen if precise state change tracking + is not required. + + * `:observe_full` - calls the callback with as precise information + about the remote state changes as possible. This means that, for + example, for a very short-lived process a `[:put, :delete]` sequence + of changes is possible. + + * `:observe_collapsed` - collapses the information about remote state + changes. For example a `[:put, {:delta, ...}]` sequence is collapsed + into just `[:put]` with all the state changes already applied and a + `[{:delta, ...}, :delete]` sequence is collapsed into just `[:delete]`. + This also means that for short-lived processes, no information + may be transferred if the changes would contain both `:put` and `:delete`. + + This is a good place for broadcasting remote state changes. + + The return value is ignored. + """ + @callback observe_remote_changes(observed_remote_changes, callback_config()) :: term() + when observed_remote_changes: [{key(), [process_state_change]}], + process_state_change: {current_state :: state(), [change]}, + change: + :put | {:replace, new_state :: state()} | {:delta, remote_delta()} | :delete + + @optional_callbacks [observe_remote_changes: 2, prepare_remote_delta: 2] + + @doc """ + Returns a child spec for the replicated state server. + + Once started, each partition will call the `c:init/1` callback + passing all the provided options. + + ## Options + + * `:name` - name for the process, required; + * `:topology` - name of the supporting topology, required; + * `:partitions` - number of partitions, defaults to 1; + * `:broadcast_timeout` - delay (in milliseconds) of broadcasting local + events to other nodes, defaults to 50 ms; + * `:max_remote_deltas` - the number of last broadcast deltas to keep for + catching up nodes that fell behind, defaults to 5. + + """ + defdelegate start_link(opts), to: Firenest.ReplicatedState.Supervisor + + defdelegate child_spec(opts), to: Firenest.ReplicatedState.Supervisor + + @doc """ + Puts new state under `key` attached to lifetime of `pid`. + + A special value `:partition` can be used for `pid` to indicate that the + lifetime of the value will be attached to the partition itself and + should be managed manually through `delete/3`, `delete/2` and `update/4` + calls with `:delete` returns from the `c:local_update/4` callback. + + This calls the `c:local_put/3` callback with `arg` inside the server. + """ + @spec put(server(), key(), pid() | :partition, term()) :: :ok | {:error, :already_joined} + def put(server, key, pid, arg) when pid == :parittion or node(pid) == node() do + partition = partition_info!(server, key) + SyncedServer.call(partition, {:put, key, pid, arg}) + end + + @doc """ + Deletes state under `key` attached to lifetime of `pid`. + + For the explanation of `:partition` value for `pid` see `put/4`. + + This calls the `c:local_delete/2` callback inside the server. + """ + @spec delete(server(), key(), pid() | :partition) :: :ok | {:error, :not_member} + def delete(server, key, pid) when pid == :partition or node(pid) == node() do + partition = partition_info!(server, key) + SyncedServer.call(partition, {:delete, key, pid}) + end + + @doc """ + Deletes state under all keys attached to lifetime of `pid`. + + This calls the `c:local_delete/2` callback inside the server for + each key the process is leaving. + """ + @spec delete(server(), pid()) :: :ok | {:error, :not_member} + def delete(server, pid) when node(pid) == node() do + partitions = partition_infos!(server) + replies = multicall(partitions, {:delete, pid}, 5_000) + + if :ok in replies do + :ok + else + {:error, :not_member} + end + end + + @doc """ + Updates state under `key` attached to lifetime of `pid`. + + For the explanation of `:partition` value for `pid` see `put/4`. + + This calls the `c:local_update/4` callback inside the server passing + the value of `update`. + """ + @spec update(server(), key(), pid() | :partition, term()) :: :ok | {:error, :not_member} + def update(server, key, pid, update) when pid == :partition or node(pid) == node() do + partition = partition_info!(server, key) + SyncedServer.call(partition, {:update, key, pid, update}) + end + + @doc """ + Lists all states present for a given `key`. + """ + @spec list(server(), key()) :: [state()] + def list(server, key) do + partition = partition_info!(server, key) + {m, f, args} = SyncedServer.call(partition, {:list, key}) + apply(m, f, args) + end + + # TODO + # def dirty_list(server, group) + + defp multicall(servers, request, timeout) do + timer_ref = :erlang.start_timer(timeout, self(), :timeout) + + request_refs = + Enum.map(servers, fn server -> + pid = Process.whereis(server) + ref = Process.monitor(pid) + send(pid, {:"$gen_call", {self(), ref}, request}) + ref + end) + + collect_replies(request_refs, timer_ref) + end + + defp collect_replies([], timer_ref) do + cancel_flush_timer(timer_ref) + [] + end + + defp collect_replies([ref | request_refs], timer_ref) do + receive do + {^ref, reply} -> + Process.demonitor(ref, [:flush]) + [reply | collect_replies(request_refs, timer_ref)] + + {:DOWN, ^ref, _, _, reason} -> + cancel_flush_timer(timer_ref) + Enum.each(request_refs, &Process.demonitor(&1, [:flush])) + exit(reason) + + {:timeout, ^timer_ref, _} -> + Enum.each([ref | request_refs], &Process.demonitor(&1, [:flush])) + exit(:timeout) + end + end + + defp cancel_flush_timer(timer_ref) do + :erlang.cancel_timer(timer_ref) + + receive do + {:timeout, ^timer_ref, _} -> :ok + after + 0 -> :ok + end + end + + defp partition_info!(server, key) do + hash = :erlang.phash2(key) + extract = {:element, {:+, {:rem, {:const, hash}, :"$1"}, 1}, :"$2"} + ms = [{{:partitions, :"$1", :"$2"}, [], [extract]}] + [info] = :ets.select(server, ms) + info + end + + defp partition_infos!(server) do + Tuple.to_list(:ets.lookup_element(server, :partitions, 3)) + end +end + +defmodule Firenest.ReplicatedState.Supervisor do + @moduledoc false + use Supervisor + + def start_link(opts) do + name = Keyword.fetch!(opts, :name) + supervisor = Module.concat(name, "Supervisor") + Supervisor.start_link(__MODULE__, {name, opts}, name: supervisor) + end + + def init({name, opts}) do + partitions = Keyword.get(opts, :partitions, 1) + topology = Keyword.fetch!(opts, :topology) + handler = Keyword.fetch!(opts, :handler) + + names = + for partition <- 0..(partitions - 1), + do: Module.concat(name, "Partition" <> Integer.to_string(partition)) + + children = + for name <- names do + spec = {Firenest.ReplicatedState.Server, {name, topology, handler, opts}} + Supervisor.child_spec(spec, id: name) + end + + :ets.new(name, [:named_table, :set, read_concurrency: true]) + :ets.insert(name, {:partitions, partitions, List.to_tuple(names)}) + + Supervisor.init(children, strategy: :one_for_one) + end +end diff --git a/lib/firenest/replicated_state/handler.ex b/lib/firenest/replicated_state/handler.ex new file mode 100644 index 0000000..7c3798c --- /dev/null +++ b/lib/firenest/replicated_state/handler.ex @@ -0,0 +1,66 @@ +defmodule Firenest.ReplicatedState.Handler do + @moduledoc false + + defstruct mod: nil, init_delta: nil, config: nil, delayed_fun: nil + + def new(mod, mod_opts, delayed_fun) do + {delta, config, opts} = mod.init(mod_opts) + state = %__MODULE__{mod: mod, init_delta: delta, config: config, delayed_fun: delayed_fun} + {state, opts} + end + + def local_put(%__MODULE__{} = state, arg, key, pid) do + %{mod: mod, config: config, init_delta: delta, delayed_fun: delayed} = state + + case mod.local_put(arg, delta, config) do + {delta, value} -> + {:put, value, delta, state} + + {delta, value, :delete} -> + {:delete, value, delta, state} + + {delta, value, {:update_after, update, time}} -> + delayed.(key, pid, update, time) + {:put, value, delta, state} + end + end + + def local_update(%__MODULE__{} = state, arg, key, pid, local_delta, value) do + %{mod: mod, config: config, delayed_fun: delayed} = state + + case mod.local_update(arg, local_delta, value, config) do + {delta, value} -> + {:put, value, delta, state} + + {delta, value, :delete} -> + {:delete, value, delta, state} + + {delta, value, {:update_after, update, time}} -> + delayed.(key, pid, update, time) + {:put, value, delta, state} + end + end + + def local_delete(%__MODULE__{} = state, deletes) do + %{mod: mod, config: config} = state + + Enum.each(deletes, &mod.local_delete(&1, config)) + state + end + + def handle_remote_delta(%__MODULE__{} = state, delta, value) do + %{mod: mod, config: config} = state + + mod.handle_remote_delta(delta, value, config) + end + + def prepare_remote_delta_fun(%__MODULE__{} = state) do + %{mod: mod, config: config} = state + + if function_exported?(mod, :prepare_remote_delta, 2) do + &mod.prepare_remote_delta(&1, config) + else + & &1 + end + end +end diff --git a/lib/firenest/replicated_state/remote.ex b/lib/firenest/replicated_state/remote.ex new file mode 100644 index 0000000..264bb2f --- /dev/null +++ b/lib/firenest/replicated_state/remote.ex @@ -0,0 +1,221 @@ +defmodule Firenest.ReplicatedState.Remote do + @moduledoc false + + defstruct pending: %{}, clocks: %{}, clock: 0, tag: nil, deltas: nil, broadcast: nil + + # TODO: some protocol for requesting more of a state, even from other nodes + # on first up, so we don't need to go to each node separately. + + import Record + defrecord :deltas, max: nil, lowest: 0, store: %{} + + def new(:ignore, broadcast, max_deltas) + when is_function(broadcast, 0) and is_integer(max_deltas) do + %__MODULE__{tag: :ignore, broadcast: broadcast, deltas: deltas(max: max_deltas)} + end + + def clock(%__MODULE__{clock: clock}), do: clock + + def clock_for(%__MODULE__{clocks: clocks}, ref), do: Map.fetch!(clocks, ref) + + # Reconnections are dead until we have permdown + def up(%__MODULE__{clocks: clocks} = state, ref, clock) do + case clocks do + # Reconnection, try to catch up + %{^ref => old_clock} when clock > old_clock -> + {:catch_up, old_clock, state} + + # Reconnection, no remote changes + %{^ref => old_clock} -> + # Assert for sanity + true = old_clock == clock + {:ok, state} + + # New node, no state + %{} when clock == 0 -> + {:ok, %{state | clocks: Map.put(clocks, ref, clock)}} + + # New node, catch up + %{} -> + {:catch_up, 0, state} + end + end + + # TODO: Right now down means permdown + def down(state, ref) do + permdown(state, ref) + end + + def permdown(%__MODULE__{clocks: clocks} = state, ref) do + %{^ref => _} = clocks + clocks = Map.delete(clocks, ref) + {:delete, %{state | clocks: clocks}} + end + + def catch_up(%__MODULE__{clock: current} = state, old_clock, state_getter) + when old_clock < current do + %{deltas: deltas(store: deltas, lowest: lowest), tag: tag} = state + + if old_clock >= lowest do + {:deltas, tag, current, Enum.map((old_clock + 1)..current, &Map.fetch!(deltas, &1))} + else + {:state_transfer, tag, current, state_getter.()} + end + end + + def broadcast(%__MODULE__{} = state, prepare_delta) do + %{pending: pending, clock: clock, tag: tag, broadcast: broadcast, deltas: deltas} = state + {:scheduled, broadcast} = broadcast + + new_deltas = prepare_deltas(tag, pending, prepare_delta) + + new_clock = clock + 1 + deltas = store_deltas(deltas, new_clock, new_deltas) + new_state = %{state | pending: %{}, clock: new_clock, broadcast: broadcast, deltas: deltas} + + {{tag, new_clock, new_deltas}, new_state} + end + + def handle_catch_up(%__MODULE__{tag: tag} = state, ref, {:deltas, tag, clock, deltas}) do + %{clocks: clocks} = state + + state = %{state | clocks: Map.put(clocks, ref, clock)} + {puts, updates, deletes} = handle_deltas(tag, deltas) + {:diff, puts, updates, deletes, state} + end + + def handle_catch_up(%__MODULE__{tag: tag} = state, ref, {:state_transfer, tag, clock, data}) do + %{clocks: clocks} = state + + case tag do + :ignore -> {:replace, data, %{state | clocks: Map.put(clocks, ref, clock)}} + end + end + + # TODO: should we store somewhere we're catching up with the server? + # if so, then we should accumulate the broadcasts we can't handle, until we can. + def handle_broadcast(%__MODULE__{clocks: clocks, tag: tag} = state, ref, {tag, clock, delta}) do + case clocks do + %{^ref => old_clock} when old_clock + 1 == clock -> + state = %{state | clocks: %{clocks | ref => clock}} + {puts, updates, deletes} = handle_deltas(tag, [delta]) + {:diff, puts, updates, deletes, state} + + # We missed some broadcast, catch up with the node + %{^ref => old_clock} when clock > old_clock -> + {:catch_up, old_clock, state} + + # We were caught up with a newer clock than the current, ignore + # TODO: is that even possible? + %{^ref => old_clock} when clock < old_clock -> + {:ok, state} + end + end + + def handle_broadcast(%__MODULE__{tag: local_tag}, ref, {remote_tag, _, _}) do + {:error, {:different_tag, ref, local_tag, remote_tag}} + end + + defp handle_deltas(:ignore, deltas) do + handler = &handle_ignore_delta/4 + handle_deltas(deltas, [], [], [], handler) + end + + defp handle_deltas([delta], inserts, updates, deletes, handler) do + handler.(delta, inserts, updates, deletes) + end + + defp handle_deltas([delta | rest], inserts, updates, deletes, handler) do + {inserts, updates, deletes} = handler.(delta, inserts, updates, deletes) + handle_deltas(rest, inserts, updates, deletes, handler) + end + + def local_put(state, key, pid, value) do + event(state, key, pid, {:put, value}) + end + + def local_delete(state, key, pid) do + event(state, key, pid, :delete) + end + + def local_update(state, key, pid, value, delta) do + event(state, key, pid, {:update, value, delta}) + end + + defp event(%__MODULE__{tag: tag} = state, key, pid, event) do + %{pending: pending, broadcast: broadcast} = state + + pending = + case tag do + :ignore -> event_ignore(pending, key, pid, event) + end + + %{state | pending: pending, broadcast: broadcast(broadcast)} + end + + defp event_ignore(pending, key, pid, {:put, value}) do + Map.put(pending, {key, pid}, {:put, value}) + end + + defp event_ignore(pending, key, pid, :delete) do + pending_key = {key, pid} + + case pending do + %{^pending_key => {:put, _}} -> Map.delete(pending, pending_key) + %{} -> Map.put(pending, pending_key, :delete) + end + end + + defp event_ignore(pending, key, pid, {:update, value, delta}) do + pending_key = {key, pid} + + case pending do + %{^pending_key => {:put, _}} -> %{pending | pending_key => {:put, value}} + %{} -> Map.put(pending, pending_key, {:update, delta}) + end + end + + defp prepare_deltas(:ignore, pending, prepare) do + prepare_ignore_deltas(Map.to_list(pending), [], [], [], prepare) + end + + defp prepare_ignore_deltas([], puts, updates, deletes, _prepare) do + {puts, updates, deletes} + end + + defp prepare_ignore_deltas([{key, {:put, value}} | rest], puts, updates, deletes, prepare) do + prepare_ignore_deltas(rest, [{key, value} | puts], updates, deletes, prepare) + end + + defp prepare_ignore_deltas([{key, {:update, delta}} | rest], puts, updates, deletes, prepare) do + delta = prepare.(delta) + prepare_ignore_deltas(rest, puts, [{key, delta} | updates], deletes, prepare) + end + + defp prepare_ignore_deltas([{key, :delete} | rest], puts, updates, deletes, prepare) do + prepare_ignore_deltas(rest, puts, updates, [key | deletes], prepare) + end + + defp handle_ignore_delta({delta_puts, delta_updates, delta_deletes}, puts, updates, deletes) do + {delta_puts ++ puts, delta_updates ++ updates, delta_deletes ++ deletes} + end + + defp broadcast({:scheduled, fun}) do + {:scheduled, fun} + end + + defp broadcast(fun) do + fun.() + {:scheduled, fun} + end + + defp store_deltas(deltas(max: max, lowest: lowest, store: store), clock, new_delta) do + store = Map.put(store, clock, new_delta) + + if map_size(store) > max do + deltas(max: max, lowest: lowest + 1, store: Map.delete(store, lowest)) + else + deltas(max: max, lowest: lowest, store: store) + end + end +end diff --git a/lib/firenest/replicated_state/server.ex b/lib/firenest/replicated_state/server.ex new file mode 100644 index 0000000..4b52758 --- /dev/null +++ b/lib/firenest/replicated_state/server.ex @@ -0,0 +1,292 @@ +defmodule Firenest.ReplicatedState.Server do + @moduledoc false + use Firenest.SyncedServer + + alias Firenest.SyncedServer + alias Firenest.ReplicatedState.{Store, Remote, Handler} + + def start_link({name, topology, handler, opts}) do + server_opts = [name: name, topology: topology] + + SyncedServer.start_link(__MODULE__, {name, handler, opts}, server_opts) + end + + defstruct [:store, :handler, :remote] + + @impl true + def init({name, handler, opts}) do + Process.flag(:trap_exit, true) + + store = Store.new(name) + + delayed_fun = &Process.send_after(self(), {:update, &1, &2, &3}, &4) + {handler, server_opts} = Handler.new(handler, opts, delayed_fun) + + broadcast_timeout = Keyword.get(opts, :broadcast_timeout, 50) + broadcast_fun = fn -> Process.send_after(self(), :broadcast_timeout, broadcast_timeout) end + + remote_changes = Keyword.get(server_opts, :remote_changes, :ignore) + max_deltas = Keyword.get(opts, :max_remote_deltas, 5) + remote = Remote.new(remote_changes, broadcast_fun, max_deltas) + + {:ok, + %__MODULE__{ + store: store, + handler: handler, + remote: remote + }} + end + + @impl true + def handshake_data(%{remote: remote}), do: Remote.clock(remote) + + @impl true + def handle_call({:put, key, pid, arg}, _from, state) do + %__MODULE__{store: store, handler: handler, remote: remote} = state + + link(pid) + pid = resolve_pid(pid) + + if Store.present?(store, key, pid) do + {:reply, {:error, :already_present}, state} + else + case Handler.local_put(handler, arg, key, pid) do + {:put, value, delta, handler} -> + remote = Remote.local_put(remote, key, pid, value) + store = Store.local_put(store, key, pid, value, delta) + {:reply, :ok, %{state | store: store, handler: handler, remote: remote}} + + {:delete, value, _delta, handler} -> + remote = Remote.local_put(remote, key, pid, value) + remote = Remote.local_delete(remote, key, pid) + handler = Handler.local_delete(handler, [value]) + {:reply, :ok, %{state | handler: handler, remote: remote}} + end + end + end + + def handle_call({:update, key, pid, arg}, _from, state) do + %__MODULE__{store: store, handler: handler, remote: remote} = state + pid = resolve_pid(pid) + + case Store.fetch(store, key, pid) do + {:ok, value, delta} -> + case Handler.local_update(handler, arg, key, pid, delta, value) do + {:put, value, delta, handler} -> + remote = Remote.local_update(remote, key, pid, value, delta) + store = Store.local_update(store, key, pid, value, delta) + {:reply, :ok, %{state | store: store, handler: handler, remote: remote}} + + {:delete, value, delta, handler} -> + remote = Remote.local_update(remote, key, pid, value, delta) + + case Store.local_delete(store, key, pid) do + # The value returned from update is fresher + {:ok, _value, store} -> + remote = Remote.local_delete(remote, key, pid) + handler = Handler.local_delete(handler, [value]) + {:reply, :ok, %{state | store: store, handler: handler, remote: remote}} + + {:last_member, _value, store} -> + unlink_flush(pid) + remote = Remote.local_delete(remote, key, pid) + handler = Handler.local_delete(handler, [value]) + {:reply, :ok, %{state | store: store, handler: handler, remote: remote}} + end + end + + :error -> + {:reply, {:error, :not_present}, state} + end + end + + def handle_call({:delete, key, pid}, _from, state) do + %__MODULE__{store: store, handler: handler, remote: remote} = state + pid = resolve_pid(pid) + + case Store.local_delete(store, key, pid) do + {:ok, value, store} -> + remote = Remote.local_delete(remote, key, pid) + handler = Handler.local_delete(handler, [value]) + {:reply, :ok, %{state | store: store, handler: handler, remote: remote}} + + {:last_member, value, store} -> + unlink_flush(pid) + remote = Remote.local_delete(remote, key, pid) + handler = Handler.local_delete(handler, [value]) + {:reply, :ok, %{state | store: store, handler: handler, remote: remote}} + + :error -> + {:reply, {:error, :not_present}, state} + end + end + + def handle_call({:delete, pid}, _from, state) do + %__MODULE__{store: store, handler: handler} = state + pid = resolve_pid(pid) + + case Store.local_delete(store, pid) do + {:ok, deletes, store} -> + unlink_flush(pid) + handler = Handler.local_delete(handler, deletes) + # TODO: how do we handle remote in here? + {:reply, :ok, %{state | store: store, handler: handler}} + + :error -> + {:reply, {:error, :not_member}, state} + end + end + + def handle_call({:list, key}, _from, state) do + %__MODULE__{store: store} = state + + {:reply, {Store, :list, [store, key]}, state} + end + + @impl true + def handle_info({:EXIT, pid, reason}, state) do + %__MODULE__{store: store, handler: handler} = state + + case Store.local_delete(store, pid) do + {:ok, leaves, store} -> + handler = Handler.local_delete(handler, leaves) + # TODO: how do we handle remote in here? + {:noreply, %{state | store: store, handler: handler}} + + :error -> + {:stop, reason, state} + end + end + + def handle_info(:broadcast_timeout, state) do + %__MODULE__{remote: remote, handler: handler} = state + + prepare_delta = Handler.prepare_remote_delta_fun(handler) + {data, remote} = Remote.broadcast(remote, prepare_delta) + SyncedServer.remote_broadcast({:broadcast, data}) + {:noreply, %{state | remote: remote}} + end + + def handle_info({:update, key, pid, arg}, state) do + %__MODULE__{store: store, handler: handler} = state + + case Store.fetch(store, key, pid) do + {:ok, value, delta} -> + case Handler.local_update(handler, arg, key, pid, delta, value) do + {:put, value, delta, handler} -> + store = Store.local_update(store, key, pid, value, delta) + {:noreply, %{state | store: store, handler: handler}} + + {:delete, value, _delta, handler} -> + # TODO: this delta has to propagate remotely before delete + case Store.local_delete(store, key, pid) do + # The value returned from update is fresher + {:ok, _value, store} -> + # state = schedule_broadcast_events(state, [{:leave, key, pid}]) + handler = Handler.local_delete(handler, [value]) + {:noreply, %{state | store: store, handler: handler}} + + {:last_member, _value, store} -> + unlink_flush(pid) + handler = Handler.local_delete(handler, [value]) + # state = schedule_broadcast_events(state, [{:leave, key, pid}]) + {:noreply, %{state | store: store, handler: handler}} + end + end + + :error -> + # Must have been already deleted, ignore + {:noreply, state} + end + end + + @impl true + def handle_remote({:catch_up_req, data}, from, state) do + %__MODULE__{remote: remote, store: store} = state + + get_all_local = fn -> Store.list_local(store) end + reply = Remote.catch_up(remote, data, get_all_local) + SyncedServer.remote_send(from, {:catch_up_rep, reply}) + {:noreply, state} + end + + def handle_remote({:catch_up_rep, data}, from, state) do + %__MODULE__{remote: remote, store: store, handler: handler} = state + + case Remote.handle_catch_up(remote, from, data) do + {:replace, data, remote} -> + store = Store.remote_update(store, from, data) + {:noreply, %{state | remote: remote, store: store}} + + {:diff, puts, updates, deletes, remote} -> + update_handler = &Handler.handle_remote_delta(handler, &1, &2) + store = Store.remote_diff(store, puts, updates, deletes, update_handler) + {:noreply, %{state | remote: remote, store: store}} + + {:ok, remote} -> + {:noreply, %{state | remote: remote}} + end + end + + def handle_remote({:broadcast, data}, from, state) do + %__MODULE__{remote: remote, store: store, handler: handler} = state + + case Remote.handle_broadcast(remote, from, data) do + {:diff, puts, updates, deletes, remote} -> + update_handler = &Handler.handle_remote_delta(handler, &1, &2) + store = Store.remote_diff(store, puts, updates, deletes, update_handler) + {:noreply, %{state | remote: remote, store: store}} + + {:catch_up, data, remote} -> + SyncedServer.remote_send(from, {:catch_up_req, data}) + {:noreply, %{state | remote: remote}} + + {:ok, remote} -> + {:noreply, %{state | remote: remote}} + end + end + + @impl true + def handle_replica(change, remote_ref, state) do + %__MODULE__{remote: remote, store: store} = state + + case remote_replica(change, remote_ref, remote) do + {:ok, remote} -> + {:noreply, %{state | remote: remote}} + + {:delete, remote} -> + store = Store.remote_delete(store, remote_ref) + {:noreply, %{state | remote: remote, store: store}} + + {:catch_up, data, remote} -> + SyncedServer.remote_send(remote_ref, {:catch_up_req, data}) + {:noreply, %{state | remote: remote}} + end + end + + defp remote_replica({:up, clock}, ref, remote) do + Remote.up(remote, ref, clock) + end + + defp remote_replica(:down, ref, remote) do + Remote.down(remote, ref) + end + + defp link(:partition), do: true + defp link(pid), do: Process.link(pid) + + defp resolve_pid(:partition), do: self() + defp resolve_pid(pid), do: pid + + defp unlink_flush(:partition), do: true + + defp unlink_flush(pid) do + Process.unlink(pid) + + receive do + {:EXIT, ^pid, _} -> true + after + 0 -> true + end + end +end diff --git a/lib/firenest/replicated_state/store.ex b/lib/firenest/replicated_state/store.ex new file mode 100644 index 0000000..ed417dd --- /dev/null +++ b/lib/firenest/replicated_state/store.ex @@ -0,0 +1,141 @@ +defmodule Firenest.ReplicatedState.Store do + @moduledoc false + + defstruct [:values, :pids] + + # Local data is stored in the values table in the following format: + # + # {{key, pid}, value, delta} + # + # Remote data is stored as: + # + # {{key, pid}, value} + # + # To recognise the node that remote data is coming from we can use + # the `node/1` function in the match spec. This saves on space of + # explicitly storing node and whole-node operations should be rare + # in practice. We can ignore the version part of node_ref, since + # we're guaranteed to get a nodedown from the old version of the node + # before it comes back up with a new version - we can ignore the + # version when storing the data. + # + # The pids table stores data as {key, pid} with key position of 2. + # This allows having the same data format in both tables and save + # on some data shuffling. + + def new(name) do + values = :ets.new(name, [:named_table, :protected, :ordered_set, read_concurrency: true]) + pids = :ets.new(__MODULE__.Pids, [:private, :duplicate_bag, keypos: 2]) + + %__MODULE__{values: ets_whereis(values), pids: pids} + end + + def list(%__MODULE__{values: values}, key) do + local = {{{key, :_}, :"$1", :_}, [], [:"$1"]} + remote = {{{key, :_}, :"$1"}, [], [:"$1"]} + :ets.select(values, [local, remote]) + end + + def list_local(%__MODULE__{values: values}) do + ms = [{{:"$1", :"$2", :_}, [], [{{:"$1", :"$2"}}]}] + :ets.select(values, ms) + end + + def present?(%__MODULE__{values: values}, key, pid) do + ets_key = {key, pid} + :ets.member(values, ets_key) + end + + def fetch(%__MODULE__{values: values}, key, pid) do + ets_key = {key, pid} + + case :ets.match(values, {ets_key, :"$1", :"$2"}) do + [[value, local_delta]] -> {:ok, value, local_delta} + [] -> :error + end + end + + def local_put(%__MODULE__{values: values, pids: pids} = state, key, pid, value, local_delta) do + ets_key = {key, pid} + + true = :ets.insert_new(values, {ets_key, value, local_delta}) + :ets.insert(pids, ets_key) + state + end + + def local_delete(%__MODULE__{values: values, pids: pids} = state, key, pid) do + ets_key = {key, pid} + ms = [{ets_key, [], [true]}] + + case :ets.select_delete(pids, ms) do + 0 -> + :error + + 1 -> + [{_, value, _}] = :ets.take(values, ets_key) + + if :ets.member(pids, pid) do + {:ok, value, state} + else + {:last_member, value, state} + end + end + end + + def local_delete(%__MODULE__{values: values, pids: pids} = state, pid) do + case :ets.take(pids, pid) do + [] -> + :error + + list -> + delete_ms = for ets_key <- list, do: {{ets_key, :_, :_}, [], [true]} + select_ms = for ets_key <- list, do: {{ets_key, :"$1", :_}, [], [:"$1"]} + data = :ets.select(values, select_ms) + :ets.select_delete(values, delete_ms) + {:ok, data, state} + end + end + + def local_update(%__MODULE__{values: values} = state, key, pid, value, local_delta) do + ets_key = {key, pid} + + :ets.insert(values, {ets_key, value, local_delta}) + state + end + + def remote_delete(%__MODULE__{values: values} = state, {node, _}) when is_atom(node) do + remote_delete_values(values, node) + state + end + + def remote_update(%__MODULE__{values: values} = state, {node, _}, data) when is_atom(node) do + remote_delete_values(values, node) + :ets.insert(values, data) + state + end + + def remote_diff(%__MODULE__{values: values} = state, puts, updates, deletes, update_handler) do + delete_ms = for key <- deletes, do: {{key, :_}, [], [true]} + puts = Enum.reduce(updates, puts, &[prepare_update(values, &1, update_handler) | &2]) + :ets.insert(values, puts) + :ets.select_delete(values, delete_ms) + state + end + + defp remote_delete_values(values, node) do + ms = [{{{:_, :"$1"}, :_}, [{:"=:=", {:node, :"$1"}, {:const, node}}], [true]}] + :ets.select_delete(values, ms) + end + + defp prepare_update(values, {key, delta}, update_handler) do + value = :ets.lookup_element(values, key, 2) + new_value = update_handler.(delta, value) + {key, new_value} + end + + if function_exported?(:ets, :whereis, 1) do + defp ets_whereis(table), do: :ets.whereis(table) + else + defp ets_whereis(table), do: table + end +end diff --git a/lib/firenest/synced_server.ex b/lib/firenest/synced_server.ex index 7f3e5a8..87737d4 100644 --- a/lib/firenest/synced_server.ex +++ b/lib/firenest/synced_server.ex @@ -65,6 +65,14 @@ defmodule Firenest.SyncedServer do | {:stop, reason :: term(), new_state} when new_state: state() + @doc """ + Invoked before connecting to a remote synced server. + + The returned value will be passed to the remote server in the + `c:handle_replica/3` callback. + """ + @callback handshake_data(state()) :: term() + @doc """ Invoked when a status of a remote synced server changes. @@ -76,7 +84,7 @@ defmodule Firenest.SyncedServer do See `c:handle_info/2` for explanation of return values. """ - @callback handle_replica(:up | :down, Topology.node_ref(), state()) :: + @callback handle_replica({:up, term()} | :down, Topology.node_ref(), state()) :: {:noreply, new_state} | {:noreply, new_state, timeout() | :hibernate} | {:stop, reason :: term(), new_state} @@ -221,16 +229,17 @@ defmodule Firenest.SyncedServer do def init({mod, arg, topology, name}) do state = %{ name: name, + node_ref: nil, topology: topology, mod: mod, int: nil, awaiting_hello: [], - awaiting_up: [], + awaiting_up: %{}, replicas: [] } with {:ok, state} <- init_mod(mod, arg, state), - {:ok, state, node_ref} <- sync_named(topology, name, state) do + {:ok, %{node_ref: node_ref} = state} <- sync_named(topology, state) do Process.put(__MODULE__, {topology, name, node_ref}) {:ok, state} end @@ -252,25 +261,28 @@ defmodule Firenest.SyncedServer do end @impl true - def handle_info({:named_up, node_ref, name}, %{name: name} = state) do + def handle_info({:named_up, remote_ref, name}, %{name: name} = state) do %{awaiting_up: awaiting, replicas: replicas} = state + init_replicas([remote_ref], state) - case delete_element(awaiting, node_ref) do - {:ok, awaiting} -> - result = apply_callback(state, :handle_replica, [:up, node_ref]) - handle_common(result, %{state | awaiting_up: awaiting, replicas: [node_ref | replicas]}) + case awaiting do + %{^remote_ref => data} -> + result = apply_callback(state, :handle_replica, [{:up, data}, remote_ref]) + awaiting = Map.delete(awaiting, remote_ref) + handle_common(result, %{state | awaiting_up: awaiting, replicas: [remote_ref | replicas]}) - :error -> - {:noreply, %{state | awaiting_hello: [node_ref | awaiting]}} + %{} -> + %{awaiting_hello: awaiting} = state + {:noreply, %{state | awaiting_hello: [remote_ref | awaiting]}} end end - def handle_info({:named_down, node_ref, name}, %{name: name} = state) do - %{awaiting_hello: awaiting, replicas: replicas} = state + def handle_info({:named_down, remote_ref, name}, %{name: name} = state) do + %{awaiting_hello: awaiting, node_ref: node_ref, replicas: replicas} = state - case delete_element(replicas, node_ref) do + case delete_element(replicas, remote_ref) do {:ok, replicas} -> - result = apply_callback(state, :handle_replica, [:down, node_ref]) + result = apply_callback(state, :handle_replica, [:down, remote_ref]) handle_common(result, %{state | replicas: replicas}) :error -> @@ -278,17 +290,19 @@ defmodule Firenest.SyncedServer do end end - def handle_info({__MODULE__, :hello, node_ref}, state) do + def handle_info({__MODULE__, :hello, data, remote_ref}, state) do %{awaiting_hello: awaiting, replicas: replicas} = state - case delete_element(awaiting, node_ref) do + case delete_element(awaiting, remote_ref) do {:ok, awaiting} -> - result = apply_callback(state, :handle_replica, [:up, node_ref]) + result = apply_callback(state, :handle_replica, [{:up, data}, remote_ref]) - handle_common(result, %{state | awaiting_hello: awaiting, replicas: [node_ref | replicas]}) + state = %{state | awaiting_hello: awaiting, replicas: [remote_ref | replicas]} + handle_common(result, state) :error -> - {:noreply, %{state | awaiting_up: [node_ref | awaiting]}} + %{awaiting_up: awaiting} = state + {:noreply, %{state | awaiting_up: Map.put(awaiting, remote_ref, data)}} end end @@ -381,21 +395,23 @@ defmodule Firenest.SyncedServer do end end - defp sync_named(topology, name, state) do + defp sync_named(topology, state) do node_ref = Topology.node(topology) case Topology.sync_named(topology, self()) do {:ok, replicas} -> - init_replicas(topology, name, replicas, node_ref) - {:ok, %{state | awaiting_hello: replicas}, node_ref} + state = %{state | awaiting_hello: replicas, node_ref: node_ref} + init_replicas(replicas, state) + {:ok, state} {:error, error} -> {:stop, {:sync_named, error}} end end - defp init_replicas(topology, name, replicas, node_ref) do - Enum.each(replicas, &Topology.send(topology, &1, name, {__MODULE__, :hello, node_ref})) + defp init_replicas(replicas, %{topology: topology, name: name, node_ref: node_ref} = state) do + data = apply_callback(state, :handshake_data, []) + Enum.each(replicas, &Topology.send(topology, &1, name, {__MODULE__, :hello, data, node_ref})) end defp apply_callback(%{mod: mod, int: int}, fun, args) do @@ -410,7 +426,7 @@ defmodule Firenest.SyncedServer do :erlang.raise(:error, :undef, System.stacktrace()) else {:registered_name, name} = Process.info(self(), :registered_name) - pattern = 'Undefined handle_info/2 in ~ts, process ~ts received unexpected message: ~p~n' + pattern = 'Undefined handle_info/2 in ~ts, process ~ts received message: ~p~n' :error_logger.warning_msg(pattern, [inspect(mod), inspect(name), hd(args)]) {:noreply, int} end diff --git a/test/firenest/pub_sub_test.exs b/test/firenest/pub_sub_test.exs index da9ff8c..6aab7ea 100644 --- a/test/firenest/pub_sub_test.exs +++ b/test/firenest/pub_sub_test.exs @@ -7,7 +7,7 @@ defmodule Firenest.PubSubTest do import Firenest.TestHelpers setup_all do - wait_until(fn -> Process.whereis(:firenest_topology_setup) == nil end) + wait_until(fn -> Process.whereis(:firenest_topology_setup) == nil end, 5000) nodes = [:"first@127.0.0.1", :"second@127.0.0.1"] pubsub = Firenest.Test.PubSub topology = Firenest.Test diff --git a/test/firenest/replicated_state/distributed_test.exs b/test/firenest/replicated_state/distributed_test.exs new file mode 100644 index 0000000..a2c03cd --- /dev/null +++ b/test/firenest/replicated_state/distributed_test.exs @@ -0,0 +1,94 @@ +defmodule Firenest.ReplicatedState.DistributedTest do + use ExUnit.Case + + alias Firenest.Topology, as: T + alias Firenest.ReplicatedState, as: R + + import Firenest.TestHelpers + + setup_all do + wait_until(fn -> Process.whereis(:firenest_topology_setup) == nil end, 5000) + nodes = [:"first@127.0.0.1", :"second@127.0.0.1", :"third@127.0.0.1"] + topology = Firenest.Test + server = Firenest.Test.ReplicatedState + + %{start: start} = + R.child_spec(name: server, topology: topology, handler: Firenest.Test.EvalState) + + Firenest.Test.start_link(nodes, start) + nodes = for {name, _} = ref <- T.nodes(topology), name in nodes, do: ref + + {:ok, topology: topology, evaluator: Firenest.Test.Evaluator, nodes: nodes, server: server} + end + + test "remote join is propagated", config do + %{server: server, test: test, nodes: [second | _]} = config + + quote do + spawn(fn -> + :ok = R.put(unquote(server), unquote(test), self(), :baz) + Process.sleep(:infinity) + end) + end + |> eval_on_node(second, config) + + wait_until(fn -> R.list(server, test) == [:baz] end) + end + + test "propages changes when nodes were disconnected", config do + %{topology: topology, server: server, test: test, nodes: [second, third]} = config + Process.register(self(), test) + + quote do + spawn(fn -> + Process.register(self(), unquote(test)) + :ok = R.put(unquote(server), unquote(test), self(), :baz) + Process.sleep(:infinity) + end) + end + |> eval_on_node(second, config) + + quote(do: R.list(unquote(server), unquote(test)) == [:baz]) + |> await_on_node(third, config) + + quote do + T.disconnect(unquote(topology), elem(unquote(third), 0)) + pid = Process.whereis(unquote(test)) + :ok = R.delete(unquote(server), unquote(test), pid) + + spawn(fn -> + :ok = R.put(unquote(server), unquote(test), self(), :bar) + Process.sleep(:infinity) + end) + end + |> eval_on_node(second, config) + + quote(do: R.list(unquote(server), unquote(test)) == []) + |> await_on_node(third, config) + + quote(do: T.connect(unquote(topology), elem(unquote(third), 0))) + |> eval_on_node(second, config) + + quote(do: R.list(unquote(server), unquote(test)) == [:bar]) + |> await_on_node(third, config) + end + + defp eval_on_node(quoted, node, config) do + %{topology: topology, evaluator: evaluator} = config + + T.send(topology, node, evaluator, {:eval_quoted, quoted}) + end + + defp await_on_node(quoted, node, config) do + %{topology: topology} = config + {:registered_name, name} = Process.info(self(), :registered_name) + + quote do + wait_until(fn -> unquote(quoted) end) + T.broadcast(unquote(topology), unquote(name), :continue) + end + |> eval_on_node(node, config) + + assert_receive :continue + end +end diff --git a/test/firenest/replicated_state/remote_test.exs b/test/firenest/replicated_state/remote_test.exs new file mode 100644 index 0000000..9ee61af --- /dev/null +++ b/test/firenest/replicated_state/remote_test.exs @@ -0,0 +1,133 @@ +defmodule Firenest.ReplicatedState.RemoteTest do + use ExUnit.Case, async: true + + alias Firenest.ReplicatedState.Remote + + @moduletag remote_changes: :ignore + + defmacrop assert_received_times(times, pattern) do + receives = List.duplicate(quote(do: assert_received(unquote(pattern))), times) + receives ++ [quote(do: refute_received(unquote(pattern)))] + end + + setup %{remote_changes: changes} do + parent = self() + %Remote{} = remote = Remote.new(changes, fn -> send(parent, :broadcast) end, 1) + [remote: remote] + end + + test "up and down", %{remote: remote} do + assert {:ok, remote} = Remote.up(remote, :node1, 0) + assert {:catch_up, 0, remote} = Remote.up(remote, :node2, 5) + assert {:delete, _remote} = Remote.down(remote, :node1) + end + + describe "remote_changes: :ignore" do + @describetag remote_changes: :ignore + + test "local_put/4", %{remote: remote} do + remote = + remote + |> Remote.local_put(:a, self(), 1) + |> Remote.local_put(:b, self(), 2) + + assert_received_times(1, :broadcast) + + {data, _remote} = Remote.broadcast(remote, &identity/1) + assert {:ignore, 1, {puts, _updates = [], _deletes = []}} = data + assert [{{:b, self()}, 2}, {{:a, self()}, 1}] == puts + end + + test "local_update/5", %{remote: remote} do + remote = + remote + |> Remote.local_update(:a, self(), 1, 2) + |> Remote.local_put(:b, self(), 3) + |> Remote.local_update(:b, self(), 4, 5) + + {data, _remote} = Remote.broadcast(remote, &identity/1) + assert {:ignore, 1, {puts, updates, _deletes = []}} = data + assert [{{:b, self()}, 4}] == puts + assert [{{:a, self()}, 2}] == updates + end + + test "local_delete/3", %{remote: remote} do + remote = + remote + |> Remote.local_update(:a, self(), 1, 2) + |> Remote.local_put(:b, self(), 3) + |> Remote.local_delete(:a, self()) + |> Remote.local_delete(:b, self()) + |> Remote.local_delete(:c, self()) + + {data, _remote} = Remote.broadcast(remote, &identity/1) + assert {:ignore, 1, {_puts = [], _updates = [], deletes}} = data + assert [{:c, self()}, {:a, self()}] == deletes + end + + test "(handle_)catch_up/3 with deltas", %{remote: remote} do + other = Remote.new(:ignore, fn -> nil end, 1) + + {_, remote} = + remote + |> Remote.local_put(:a, self(), 1) + |> Remote.broadcast(&identity/1) + + assert {:catch_up, request, other} = connect(remote, :remote, other) + reply = Remote.catch_up(remote, request, fn -> flunk("should not get here") end) + + assert {:diff, puts, updates, deletes, other} = + Remote.handle_catch_up(other, :remote, reply) + + assert [{{:a, self()}, 1}] == puts + assert [] == updates + assert [] == deletes + + assert Remote.clock_for(other, :remote) == Remote.clock(remote) + end + + test "(handle_)catch_up/3 with state transfer", %{} do + remote = Remote.new(:ignore, fn -> nil end, 0) + other = Remote.new(:ignore, fn -> nil end, 1) + + {_, remote} = + remote + |> Remote.local_put(:a, self(), 1) + |> Remote.broadcast(&identity/1) + + assert {:catch_up, request, other} = connect(remote, :remote, other) + data = [{{:a, self()}, 1}] + reply = Remote.catch_up(remote, request, fn -> data end) + + assert {:replace, ^data, other} = Remote.handle_catch_up(other, :remote, reply) + assert Remote.clock_for(other, :remote) == Remote.clock(remote) + end + + test "handle_broadcast/3", %{remote: remote} do + other = Remote.new(:ignore, fn -> nil end, 1) + + assert {:ok, other} = connect(remote, :remote, other) + + {broadcast, remote} = + remote + |> Remote.local_put(:a, self(), 1) + |> Remote.broadcast(&identity/1) + + assert {:diff, puts, updates, deletes, other} = + Remote.handle_broadcast(other, :remote, broadcast) + + assert [{{:a, self()}, 1}] == puts + assert [] == updates + assert [] == deletes + + assert Remote.clock_for(other, :remote) == Remote.clock(remote) + end + end + + defp identity(x), do: x + + defp connect(remote, id, other) do + clock = Remote.clock(remote) + Remote.up(other, id, clock) + end +end diff --git a/test/firenest/replicated_state/store_test.exs b/test/firenest/replicated_state/store_test.exs new file mode 100644 index 0000000..8d6bdc6 --- /dev/null +++ b/test/firenest/replicated_state/store_test.exs @@ -0,0 +1,153 @@ +defmodule Firenest.ReplicatedState.StoreTest do + use ExUnit.Case, async: true + + alias Firenest.ReplicatedState.Store + + setup %{test: test} do + %Store{} = store = Store.new(test) + other = spawn_link(fn -> Process.sleep(:infinity) end) + [store: store, other: other] + end + + test "empty store", %{store: store} do + assert Store.list(store, :a) == [] + assert Store.list_local(store) == [] + refute Store.present?(store, :a, self()) + end + + test "list with entries", %{store: store, other: other} do + store = + store + |> Store.local_put(:a, self(), 1, 1) + |> Store.local_put(:b, other, 2, 2) + + assert Store.list(store, :a) == [1] + assert Store.list(store, :b) == [2] + assert Store.list(store, :c) == [] + assert Store.list_local(store) == [{{:a, self()}, 1}, {{:b, other}, 2}] + end + + test "present?/3", %{store: store, other: other} do + store = + store + |> Store.local_put(:a, self(), 1, 1) + |> Store.local_put(:b, other, 2, 2) + + assert Store.present?(store, :a, self()) + assert Store.present?(store, :b, other) + refute Store.present?(store, :c, self()) + refute Store.present?(store, :a, other) + end + + test "fetch/3", %{store: store, other: other} do + store = + store + |> Store.local_put(:a, self(), 1, 1) + |> Store.local_put(:b, other, 2, 2) + + assert Store.fetch(store, :a, self()) == {:ok, 1, 1} + assert Store.fetch(store, :b, other) == {:ok, 2, 2} + assert Store.fetch(store, :c, self()) == :error + assert Store.fetch(store, :a, other) == :error + end + + test "local_update/5", %{store: store, other: other} do + store = + store + |> Store.local_put(:a, self(), 1, 1) + |> Store.local_put(:b, other, 2, 2) + |> Store.local_update(:a, self(), 3, 3) + |> Store.local_update(:b, other, 4, 4) + + assert Store.fetch(store, :a, self()) == {:ok, 3, 3} + assert Store.fetch(store, :b, other) == {:ok, 4, 4} + end + + test "local_delete/2", %{store: store, other: other} do + store = + store + |> Store.local_put(:a, self(), 1, 1) + |> Store.local_put(:b, other, 2, 2) + + another = spawn_link(fn -> Process.sleep(:infinity) end) + + assert Store.local_delete(store, another) == :error + + assert {:ok, [1], store} = Store.local_delete(store, self()) + refute Store.present?(store, :a, self()) + assert Store.present?(store, :b, other) + end + + test "local_delete/3", %{store: store, other: other} do + store = + store + |> Store.local_put(:a, self(), 1, 1) + |> Store.local_put(:b, other, 2, 2) + |> Store.local_put(:c, self(), 3, 3) + + another = spawn_link(fn -> Process.sleep(:infinity) end) + + assert Store.local_delete(store, :a, another) == :error + + assert {:ok, 1, store} = Store.local_delete(store, :a, self()) + assert {:last_member, 3, store} = Store.local_delete(store, :c, self()) + refute Store.present?(store, :a, self()) + refute Store.present?(store, :c, self()) + assert Store.present?(store, :b, other) + end + + test "remote_update/3", %{store: store} do + node_ref = {:node1, 1} + data = [{{:a, remote_pid(:node1, 1)}, 1}] + store = Store.remote_update(store, node_ref, data) + + assert Store.list(store, :a) == [1] + + store = Store.local_put(store, :a, self(), 2, 2) + + assert Store.list(store, :a) == [1, 2] + + new_data = [{{:b, remote_pid(:node1, 1)}, 3}] + store = Store.remote_update(store, node_ref, new_data) + + assert Store.list(store, :a) == [2] + end + + test "remote_delete/2", %{store: store} do + data1 = [{{:a, remote_pid(:node1, 1)}, 1}] + data2 = [{{:a, remote_pid(:node2, 1)}, 2}] + + store = + store + |> Store.remote_update({:node1, 1}, data1) + |> Store.remote_update({:node2, 1}, data2) + |> Store.remote_delete({:node1, 1}) + + assert Store.list(store, :a) == [2] + end + + test "remote_diff/5", %{store: store} do + parent = self() + + update_handler = fn delta, value -> + send(parent, {:update, delta, value}) + delta + value + end + + data = [{{:a, remote_pid(:node1, 1)}, 1}, {{:a, remote_pid(:node1, 2)}, 2}] + store = Store.remote_update(store, {:node1, 1}, data) + + puts = [{{:a, remote_pid(:node1, 3)}, 3}] + updates = [{{:a, remote_pid(:node1, 2)}, 4}] + deletes = [{:a, remote_pid(:node1, 1)}] + store = Store.remote_diff(store, puts, updates, deletes, update_handler) + + assert Store.list(store, :a) == [6, 3] + assert_received {:update, 4, 2} + end + + defp remote_pid(node, num) do + <<131, 100, node::binary>> = :erlang.term_to_binary(node) + :erlang.binary_to_term(<<131, 103, 100, node::binary, num::8*4, 0::8*4, 0::8*1>>) + end +end diff --git a/test/firenest/replicated_state_test.exs b/test/firenest/replicated_state_test.exs new file mode 100644 index 0000000..0715b9c --- /dev/null +++ b/test/firenest/replicated_state_test.exs @@ -0,0 +1,364 @@ +defmodule Firenest.ReplicatedStateTest do + use ExUnit.Case, async: true + + alias Firenest.ReplicatedState, as: R + + setup_all do + {:ok, topology: Firenest.Test, evaluator: Firenest.Test.Evaluator} + end + + setup %{test: test, topology: topology} do + opts = [name: test, topology: topology, handler: Firenest.Test.EvalState] + assert {:ok, _} = start_supervised({R, opts}) + {:ok, server: test} + end + + describe "put/4" do + test "adds process", %{server: server} do + parent = self() + + fun = fn delta, config -> + send(parent, {:local_put, delta, config}) + {delta + 1, 1} + end + + assert R.put(server, :foo, self(), fun) == :ok + assert_received {:local_put, 0, _} + + assert [1] == R.list(server, :foo) + end + + test "rejects double puts", %{server: server} do + assert R.put(server, :foo, self(), 1) == :ok + assert R.put(server, :foo, self(), 2) == {:error, :already_present} + end + + test "cleans up entries after process dies", %{server: server} do + {pid, ref} = spawn_monitor(Process, :sleep, [:infinity]) + R.put(server, :foo, pid, 1) + assert [_] = R.list(server, :foo) + Process.exit(pid, :kill) + assert_receive {:DOWN, ^ref, _, _, _} + assert [] = R.list(server, :foo) + end + + test "server dies if other linked process dies", %{server: server} do + parent = self() + [{_, pid, _, _}] = Supervisor.which_children(Module.concat(server, "Supervisor")) + ref = Process.monitor(pid) + + temp = + spawn(fn -> + Process.link(pid) + send(parent, :continue) + Process.sleep(:infinity) + end) + + assert_receive :continue + + Process.exit(temp, :shutdown) + assert_receive {:DOWN, ^ref, _, _, _} + end + + test "immediately deletes with :delete return", %{server: server} do + parent = self() + + fun = fn delta, config -> + send(parent, {:local_put, delta, config}) + + delete = fn config -> + send(parent, {:local_delete, config}) + end + + {delta + 1, delete, :delete} + end + + assert R.put(server, :foo, self(), fun) == :ok + assert_received {:local_put, 0, _} + assert_received {:local_delete, _} + + assert [] == R.list(server, :foo) + end + + test "immediately deletes with :delete return and other keys", %{server: server} do + parent = self() + + fun = fn delta, config -> + send(parent, {:local_put, delta, config}) + + delete = fn config -> + send(parent, {:local_delete, config}) + end + + {delta + 1, delete, :delete} + end + + assert R.put(server, :bar, self(), 1) == :ok + assert R.put(server, :foo, self(), fun) == :ok + assert_received {:local_put, 0, _} + assert_received {:local_delete, _} + + assert [] == R.list(server, :foo) + end + + test "with :update_after return", %{server: server} do + parent = self() + + fun = fn delta, config -> + send(parent, {:local_put, delta, config}) + + update = fn delta, state, config -> + send(parent, {:local_update, delta, state, config}) + {delta + 1, state + 1} + end + + {delta + 1, 1, {:update_after, update, 50}} + end + + assert R.put(server, :foo, self(), fun) == :ok + assert_received {:local_put, 0, _} + refute_received {:local_update, _, _, _} + assert [1] == R.list(server, :foo) + + assert_receive {:local_update, 1, 1, _} + assert [2] == R.list(server, :foo) + end + + test "with :update_after return that deletes immediately", %{server: server} do + parent = self() + + fun = fn delta, config -> + send(parent, {:local_put, delta, config}) + + update = fn delta, state, config -> + send(parent, {:local_update, delta, state, config}) + + delete = fn config -> + send(parent, {:local_delete, config}) + end + + {delta + 1, delete, :delete} + end + + {delta + 1, 1, {:update_after, update, 50}} + end + + assert R.put(server, :foo, self(), fun) == :ok + assert_received {:local_put, 0, _} + refute_received {:local_update, _, _, _} + assert [1] == R.list(server, :foo) + + assert_receive {:local_update, 1, 1, _} + assert_received {:local_delete, _} + assert [] == R.list(server, :foo) + end + + test "with :update_after return that deletes immediately with other keys", %{server: server} do + parent = self() + + fun = fn delta, config -> + send(parent, {:local_put, delta, config}) + + update = fn delta, state, config -> + send(parent, {:local_update, delta, state, config}) + + delete = fn config -> + send(parent, {:local_delete, config}) + end + + {delta + 1, delete, :delete} + end + + {delta + 1, 1, {:update_after, update, 50}} + end + + assert R.put(server, :bar, self(), 1) == :ok + assert R.put(server, :foo, self(), fun) == :ok + assert_received {:local_put, 0, _} + refute_received {:local_update, _, _, _} + assert [1] == R.list(server, :foo) + + assert_receive {:local_update, 1, 1, _} + assert_received {:local_delete, _} + assert [] == R.list(server, :foo) + assert [_] = R.list(server, :bar) + end + end + + describe "delete/2" do + test "removes entry", %{server: server} do + parent = self() + + fun = fn config -> + send(parent, {:local_delete, config}) + end + + R.put(server, :foo, self(), fn delta, _ -> {delta, fun} end) + + assert [_] = R.list(server, :foo) + assert R.delete(server, self()) == :ok + assert_received {:local_delete, _} + + assert [] == R.list(server, :foo) + end + + test "does not remove non members", %{server: server} do + [{_, pid, _, _}] = Supervisor.which_children(Module.concat(server, "Supervisor")) + Process.link(pid) + + assert R.delete(server, self()) == {:error, :not_member} + {:links, links} = Process.info(self(), :links) + assert pid in links + end + end + + describe "delete/3" do + test "removes single entry", %{server: server} do + parent = self() + + fun = fn config -> + send(parent, {:local_delete, config}) + end + + R.put(server, :foo, self(), fn delta, _ -> {delta, fun} end) + assert [_] = R.list(server, :foo) + + assert R.delete(server, :foo, self()) == :ok + assert_received {:local_delete, _} + assert [] == R.list(server, :foo) + end + + test "leaves other entries intact", %{server: server} do + pid = spawn_link(fn -> Process.sleep(:infinity) end) + R.put(server, :foo, self(), 1) + R.put(server, :foo, pid, 2) + assert [_, _] = R.list(server, :foo) + + assert R.delete(server, :foo, self()) == :ok + assert [2] == R.list(server, :foo) + end + + test "leaves other keys for same process intact", %{server: server} do + R.put(server, :foo, self(), 1) + R.put(server, :bar, self(), 2) + assert [_] = R.list(server, :foo) + assert [_] = R.list(server, :bar) + + assert R.delete(server, :foo, self()) == :ok + assert [] = R.list(server, :foo) + assert [_] = R.list(server, :bar) + end + + test "does not remove non members", %{server: server} do + [{_, pid, _, _}] = Supervisor.which_children(Module.concat(server, "Supervisor")) + Process.link(pid) + + assert R.delete(server, :foo, self()) == {:error, :not_present} + {:links, links} = Process.info(self(), :links) + assert pid in links + end + end + + describe "update/4" do + test "executes the update if entry is present", %{server: server} do + parent = self() + + R.put(server, :foo, self(), 1) + assert [1] = R.list(server, :foo) + + fun = fn delta, state, config -> + send(parent, {:local_update, delta, state, config}) + {delta + 1, state + 1} + end + + assert R.update(server, :foo, self(), fun) == :ok + assert_received {:local_update, 0, 1, _} + assert [2] = R.list(server, :foo) + end + + test "does not execute update if entry is absent", %{server: server} do + parent = self() + + fun = fn delta, state, config -> + Process.exit(parent, {:unexpected_update, delta, state, config}) + {delta, state} + end + + assert R.update(server, :foo, self(), fun) == {:error, :not_present} + end + + test "immediately deletes with :delete return", %{server: server} do + parent = self() + + R.put(server, :foo, self(), 1) + assert [1] = R.list(server, :foo) + + fun = fn delta, state, config -> + send(parent, {:local_update, delta, state, config}) + + delete = fn config -> + send(parent, {:local_delete, config}) + end + + {delta + 1, delete, :delete} + end + + assert R.update(server, :foo, self(), fun) == :ok + assert_received {:local_update, 0, 1, _} + assert_received {:local_delete, _} + + assert [] == R.list(server, :foo) + end + + test "immediately deletes with :delete return and other keys", %{server: server} do + parent = self() + + R.put(server, :bar, self(), 1) + R.put(server, :foo, self(), 1) + assert [1] = R.list(server, :foo) + + fun = fn delta, state, config -> + send(parent, {:local_update, delta, state, config}) + + delete = fn config -> + send(parent, {:local_delete, config}) + end + + {delta + 1, delete, :delete} + end + + assert R.update(server, :foo, self(), fun) == :ok + assert_received {:local_update, 0, 1, _} + assert_received {:local_delete, _} + + assert [] = R.list(server, :foo) + assert [_] = R.list(server, :bar) + end + + test "with :update_after return", %{server: server} do + parent = self() + + R.put(server, :foo, self(), 1) + assert [1] = R.list(server, :foo) + + fun = fn delta, state, config -> + send(parent, {:local_update, delta, state, config}) + + update = fn delta, state, config -> + send(parent, {:local_update, delta, state, config}) + {delta + 1, state + 1} + end + + {delta + 1, state + 1, {:update_after, update, 50}} + end + + assert R.update(server, :foo, self(), fun) == :ok + assert_received {:local_update, 0, 1, _} + refute_received {:local_update, _, _, _} + assert [2] == R.list(server, :foo) + + assert_receive {:local_update, 1, 2, _} + assert [3] == R.list(server, :foo) + end + end +end diff --git a/test/firenest/synced_server/distributed_test.exs b/test/firenest/synced_server/distributed_test.exs new file mode 100644 index 0000000..84e9152 --- /dev/null +++ b/test/firenest/synced_server/distributed_test.exs @@ -0,0 +1,284 @@ +defmodule Firenest.SyncedServer.DistributedTest do + use ExUnit.Case + + alias Firenest.SyncedServer, as: S + alias Firenest.Topology, as: T + alias Firenest.Test.EvalServer + + import Firenest.TestHelpers + + setup_all do + wait_until(fn -> Process.whereis(:firenest_topology_setup) == nil end, 5000) + nodes = [:"first@127.0.0.1", :"second@127.0.0.1"] + topology = Firenest.Test + nodes = for {name, _} = ref <- T.nodes(topology), name in nodes, do: ref + node = T.node(topology) + {:ok, topology: topology, evaluator: Firenest.Test.Evaluator, nodes: nodes, node: node} + end + + setup %{test: test, topology: topology} do + {:ok, pid} = S.start_link(EvalServer, 1, name: test, topology: topology) + mfa = &{S, :start_link, [EvalServer, &1, [name: test, topology: topology]]} + {:ok, mfa: mfa, pid: pid} + end + + describe "handle_replica/3" do + test "both ends receive message", %{pid: pid, node: node} = config do + parent = self() + + fun = fn status, replica -> + send(parent, {:replica, status, replica, 1}) + {:noreply, 1} + end + + remote_fun = + quote do + {:ok, + fn status, replica -> + send(unquote(parent), {:replica, status, replica, 2}) + {:noreply, 2} + end} + end + + send(pid, {:state, fun}) + second = start_another(config, remote_fun) + + assert_receive {:replica, {:up, _}, ^second, 1} + assert_receive {:replica, {:up, _}, ^node, 2} + end + + test "{:noreply, state}", %{pid: pid} = config do + parent = self() + + fun = fn status, replica -> + send(parent, {:replica, status, replica, 1}) + {:noreply, 1} + end + + send(pid, {:state, fun}) + second = start_another(config) + + assert_receive {:replica, {:up, _}, ^second, 1} + assert S.call(pid, :state) == 1 + end + + test "{:noreply, state, timeout}", %{pid: pid} = config do + parent = self() + + fun = fn status, replica -> + timeout = fn -> + send(parent, {:timeout, 2}) + {:noreply, 2} + end + + send(parent, {:replica, status, replica, 1}) + + {:noreply, timeout, 0} + end + + send(pid, {:state, fun}) + second = start_another(config) + + assert_receive {:replica, {:up, _}, ^second, 1} + assert_receive {:timeout, 2} + assert S.call(pid, :state) == 2 + end + + test "{:noreply, state, :hibernate}", %{pid: pid} = config do + parent = self() + + fun = fn status, replica -> + send(parent, {:replica, status, replica, 1}) + {:noreply, 1, :hibernate} + end + + send(pid, {:state, fun}) + second = start_another(config) + + assert_hibernate pid + assert_receive {:replica, {:up, _}, ^second, 1} + assert S.call(pid, :state) == 1 + end + + test "{:stop, reason, state}", %{pid: pid} = config do + parent = self() + Process.flag(:trap_exit, true) + + fun = fn status, replica -> + terminate = fn m -> + send(parent, {:terminate, m}) + end + + send(parent, {:replica, status, replica, 1}) + + {:stop, {:shutdown, terminate}, 1} + end + + send(pid, {:state, fun}) + second = start_another(config) + + assert_receive {:replica, {:up, _}, ^second, 1} + assert_receive {:terminate, 1} + assert_receive {:EXIT, ^pid, {:shutdown, _}} + end + + test "down", %{test: test} = config do + parent = self() + + fun = fn status, replica -> + handle_remote = fn status, replica -> + send(parent, {:replica, status, replica, 2}) + {:noreply, 2} + end + + send(parent, {:replica, status, replica, 1}) + {:noreply, handle_remote} + end + + send(test, {:state, fun}) + second = start_another(config) + + cmd = + quote do + pid = Process.whereis(unquote(test)) + Process.exit(pid, :kill) + end + + assert send_eval(config, second, cmd) == :ok + assert_receive {:replica, {:up, _}, ^second, 1} + assert_receive {:replica, :down, ^second, 2} + assert S.call(test, :state) == 2 + end + end + + describe "handle_remote/3" do + setup config do + {:ok, second: start_another(config)} + end + + test "{:noreply, state}", config do + %{second: second, node: node, test: test, pid: pid} = config + parent = self() + + cmd = + quote do + fun = fn :info, state -> + handle_remote = fn from, n -> + send(unquote(parent), {:remote, from, n}) + {:noreply, n + 1} + end + + S.remote_send(unquote(node), handle_remote) + {:noreply, state} + end + + send(unquote(test), fun) + end + + assert send_eval(config, second, cmd) == :ok + assert_receive {:remote, ^second, 2} + assert S.call(pid, :state) == 3 + end + + test "{:noreply, state, timeout}", config do + %{second: second, test: test, pid: pid} = config + parent = self() + + cmd = + quote do + fun = fn :info, state -> + handle_remote = fn from, n -> + timeout = fn -> + send(unquote(parent), {:timeout, n + 1}) + {:noreply, n + 1} + end + + send(unquote(parent), {:remote, from, n}) + {:noreply, timeout, 0} + end + + S.remote_broadcast(handle_remote) + {:noreply, state} + end + + send(unquote(test), fun) + end + + assert send_eval(config, second, cmd) == :ok + assert_receive {:remote, ^second, 2} + assert_receive {:timeout, 3} + assert S.call(pid, :state) == 3 + end + + test "{:noreply, state, :hibernate}", config do + %{second: second, test: test, pid: pid} = config + parent = self() + + cmd = + quote do + fun = fn :info, state -> + handle_remote = fn from, n -> + send(unquote(parent), {:remote, from, n}) + {:noreply, n + 1, :hibernate} + end + + S.remote_broadcast(handle_remote) + {:noreply, state} + end + + send(unquote(test), fun) + end + + assert send_eval(config, second, cmd) == :ok + assert_receive {:remote, ^second, 2} + assert_hibernate pid + assert S.call(pid, :state) == 3 + end + + test "{:stop, reason, state}", config do + %{second: second, node: node, test: test, pid: pid} = config + parent = self() + Process.flag(:trap_exit, true) + + cmd = + quote do + fun = fn :info, state -> + handle_remote = fn from, n -> + terminate = fn m -> + send(unquote(parent), {:terminate, m}) + end + + send(unquote(parent), {:remote, from, n}) + {:stop, {:shutdown, terminate}, n + 1} + end + + S.remote_send(unquote(node), handle_remote) + {:noreply, state} + end + + send(unquote(test), fun) + end + + assert send_eval(config, second, cmd) == :ok + assert_receive {:remote, ^second, 2} + assert_receive {:terminate, 3} + assert_receive {:EXIT, ^pid, {:shutdown, _}} + end + end + + defp start_another(config) do + start_another(config, quote(do: {:ok, fn _, _ -> {:noreply, 1} end})) + end + + defp start_another(config, initial_state) do + %{mfa: mfa, nodes: [second | _]} = config + + Firenest.Test.start_link([elem(second, 0)], mfa.({:eval, initial_state})) + second + end + + defp send_eval(config, to, cmd) do + %{evaluator: evaluator, topology: topology} = config + T.send(topology, to, evaluator, {:eval_quoted, cmd}) + end +end diff --git a/test/firenest/synced_server_test.exs b/test/firenest/synced_server_test.exs index dbfd182..90a94f8 100644 --- a/test/firenest/synced_server_test.exs +++ b/test/firenest/synced_server_test.exs @@ -4,7 +4,6 @@ defmodule Firenest.SyncedServerTest do import ExUnit.CaptureIO alias Firenest.SyncedServer, as: S - alias Firenest.Topology, as: T alias Firenest.Test.EvalServer import Firenest.TestHelpers @@ -318,262 +317,4 @@ defmodule Firenest.SyncedServerTest do end) =~ ~r"error.*GenServer.*\(stop\) shutdown: :terminate.*State: 3"sm end end - - defmodule Distributed do - use ExUnit.Case, async: true - - setup_all do - wait_until(fn -> Process.whereis(:firenest_topology_setup) == nil end) - nodes = [:"first@127.0.0.1", :"second@127.0.0.1"] - topology = Firenest.Test - nodes = for {name, _} = ref <- T.nodes(topology), name in nodes, do: ref - node = T.node(topology) - {:ok, topology: topology, evaluator: Firenest.Test.Evaluator, nodes: nodes, node: node} - end - - setup %{test: test, topology: topology} do - {:ok, pid} = S.start_link(EvalServer, 1, name: test, topology: topology) - mfa = {S, :start_link, [EvalServer, 1, [name: test, topology: topology]]} - {:ok, mfa: mfa, pid: pid} - end - - describe "handle_replica/3" do - test "{:noreply, state}", %{pid: pid} = config do - parent = self() - - fun = fn status, replica -> - send(parent, {:replica, status, replica, 1}) - {:noreply, 1} - end - - send(pid, {:state, fun}) - second = start_another(config) - - assert_receive {:replica, :up, ^second, 1} - assert S.call(pid, :state) == 1 - end - - test "{:noreply, state, timeout}", %{pid: pid} = config do - parent = self() - - fun = fn status, replica -> - timeout = fn -> - send(parent, {:timeout, 2}) - {:noreply, 2} - end - - send(parent, {:replica, status, replica, 1}) - - {:noreply, timeout, 0} - end - - send(pid, {:state, fun}) - second = start_another(config) - - assert_receive {:replica, :up, ^second, 1} - assert_receive {:timeout, 2} - assert S.call(pid, :state) == 2 - end - - test "{:noreply, state, :hibernate}", %{pid: pid} = config do - parent = self() - - fun = fn status, replica -> - send(parent, {:replica, status, replica, 1}) - {:noreply, 1, :hibernate} - end - - send(pid, {:state, fun}) - second = start_another(config) - - assert_hibernate pid - assert_receive {:replica, :up, ^second, 1} - assert S.call(pid, :state) == 1 - end - - test "{:stop, reason, state}", %{pid: pid} = config do - parent = self() - Process.flag(:trap_exit, true) - - fun = fn status, replica -> - terminate = fn m -> - send(parent, {:terminate, m}) - end - - send(parent, {:replica, status, replica, 1}) - - {:stop, {:shutdown, terminate}, 1} - end - - send(pid, {:state, fun}) - second = start_another(config) - - assert_receive {:replica, :up, ^second, 1} - assert_receive {:terminate, 1} - assert_receive {:EXIT, ^pid, {:shutdown, _}} - end - - test "down", %{test: test} = config do - parent = self() - - fun = fn status, replica -> - handle_remote = fn status, replica -> - send(parent, {:replica, status, replica, 2}) - {:noreply, 2} - end - - send(parent, {:replica, status, replica, 1}) - {:noreply, handle_remote} - end - - send(test, {:state, fun}) - second = start_another(config) - - cmd = - quote do - pid = Process.whereis(unquote(test)) - Process.exit(pid, :kill) - end - - assert send_eval(config, second, cmd) == :ok - assert_receive {:replica, :up, ^second, 1} - assert_receive {:replica, :down, ^second, 2} - assert S.call(test, :state) == 2 - end - end - - describe "handle_remote/3" do - setup config do - {:ok, second: start_another(config)} - end - - test "{:noreply, state}", config do - %{second: second, node: node, test: test, pid: pid} = config - parent = self() - - cmd = - quote do - fun = fn :info, state -> - handle_remote = fn from, n -> - send(unquote(parent), {:remote, from, n}) - {:noreply, n + 1} - end - - S.remote_send(unquote(node), handle_remote) - {:noreply, state} - end - - send(unquote(test), fun) - end - - assert send_eval(config, second, cmd) == :ok - assert_receive {:remote, ^second, 2} - assert S.call(pid, :state) == 3 - end - - test "{:noreply, state, timeout}", config do - %{second: second, test: test, pid: pid} = config - parent = self() - - cmd = - quote do - fun = fn :info, state -> - handle_remote = fn from, n -> - timeout = fn -> - send(unquote(parent), {:timeout, n + 1}) - {:noreply, n + 1} - end - - send(unquote(parent), {:remote, from, n}) - {:noreply, timeout, 0} - end - - S.remote_broadcast(handle_remote) - {:noreply, state} - end - - send(unquote(test), fun) - end - - assert send_eval(config, second, cmd) == :ok - assert_receive {:remote, ^second, 2} - assert_receive {:timeout, 3} - assert S.call(pid, :state) == 3 - end - - test "{:noreply, state, :hibernate}", config do - %{second: second, test: test, pid: pid} = config - parent = self() - - cmd = - quote do - fun = fn :info, state -> - handle_remote = fn from, n -> - send(unquote(parent), {:remote, from, n}) - {:noreply, n + 1, :hibernate} - end - - S.remote_broadcast(handle_remote) - {:noreply, state} - end - - send(unquote(test), fun) - end - - assert send_eval(config, second, cmd) == :ok - assert_receive {:remote, ^second, 2} - assert_hibernate pid - assert S.call(pid, :state) == 3 - end - - test "{:stop, reason, state}", config do - %{second: second, node: node, test: test, pid: pid} = config - parent = self() - Process.flag(:trap_exit, true) - - cmd = - quote do - fun = fn :info, state -> - handle_remote = fn from, n -> - terminate = fn m -> - send(unquote(parent), {:terminate, m}) - end - - send(unquote(parent), {:remote, from, n}) - {:stop, {:shutdown, terminate}, n + 1} - end - - S.remote_send(unquote(node), handle_remote) - {:noreply, state} - end - - send(unquote(test), fun) - end - - assert send_eval(config, second, cmd) == :ok - assert_receive {:remote, ^second, 2} - assert_receive {:terminate, 3} - assert_receive {:EXIT, ^pid, {:shutdown, _}} - end - end - - defp start_another(config) do - %{test: test, mfa: mfa, nodes: [second | _]} = config - - cmd = - quote do - fun = fn _, _ -> {:noreply, 1} end - send(unquote(test), {:state, fun}) - end - - Firenest.Test.start_link([elem(second, 0)], mfa) - assert send_eval(config, second, cmd) == :ok - second - end - - defp send_eval(config, to, cmd) do - %{evaluator: evaluator, topology: topology} = config - T.send(topology, to, evaluator, {:eval_quoted, cmd}) - end - end end diff --git a/test/shared/test.ex b/test/shared/test.ex index b53aa2b..e37f181 100644 --- a/test/shared/test.ex +++ b/test/shared/test.ex @@ -25,6 +25,11 @@ defmodule Firenest.Test do def handle_info({:eval_quoted, quoted}, state) do Code.eval_quoted(quoted) {:noreply, state} + catch + kind, reason -> + exception = Exception.format(kind, reason, System.stacktrace()) + Logger.error("Eval failed on node #{inspect(node())}\n#{exception}") + {:noreply, state} end def handle_info(_, state) do @@ -84,21 +89,13 @@ defmodule Firenest.Test do spawn_link(fn -> Process.register(self(), __MODULE__.Reporter) - forward(parent) + :slave.relay(parent) end) multirpc(nodes, :slave, :pseudo, [node(), [__MODULE__.Reporter]]) :ok end - defp forward(parent) do - receive do - msg -> send(parent, msg) - end - - forward(parent) - end - @doc """ Sends a report back to the reporter process configured with `start_reporter/1`. """ diff --git a/test/shared/topology_test.exs b/test/shared/topology_test.exs index 41a9540..9d92394 100644 --- a/test/shared/topology_test.exs +++ b/test/shared/topology_test.exs @@ -10,7 +10,7 @@ defmodule Firenest.TopologyTest do import Firenest.TestHelpers setup_all do - wait_until(fn -> Process.whereis(:firenest_topology_setup) == nil end) + wait_until(fn -> Process.whereis(:firenest_topology_setup) == nil end, 5000) topology = Firenest.Test {:ok, diff --git a/test/support/eval_server.ex b/test/support/eval_server.ex index 0b38bd8..372e2f5 100644 --- a/test/support/eval_server.ex +++ b/test/support/eval_server.ex @@ -2,8 +2,12 @@ defmodule Firenest.Test.EvalServer do use Firenest.SyncedServer def init(fun) when is_function(fun, 0), do: fun.() + def init({:eval, cmd}), do: elem(Code.eval_quoted(cmd), 0) def init(state), do: {:ok, state} + def handshake_data(fun) when is_function(fun, 0), do: fun.() + def handshake_data(state), do: state + def handle_call(:state, _, state), do: {:reply, state, state} def handle_call(fun, from, state), do: fun.(from, state) diff --git a/test/support/eval_state.ex b/test/support/eval_state.ex new file mode 100644 index 0000000..cf11d07 --- /dev/null +++ b/test/support/eval_state.ex @@ -0,0 +1,20 @@ +defmodule Firenest.Test.EvalState do + @behaviour Firenest.ReplicatedState + + @impl true + def init(opts) do + {0, opts, opts} + end + + @impl true + def local_put(fun, delta, config) when is_function(fun), do: fun.(delta, config) + def local_put(data, delta, _config), do: {delta, data} + + @impl true + def local_delete(fun, config) when is_function(fun), do: fun.(config) + def local_delete(_data, _config), do: :ok + + @impl true + def local_update(fun, delta, state, config) when is_function(fun), + do: fun.(delta, state, config) +end diff --git a/test/support/test_helpers.ex b/test/support/test_helpers.ex index e691177..9488abe 100644 --- a/test/support/test_helpers.ex +++ b/test/support/test_helpers.ex @@ -11,7 +11,7 @@ defmodule Firenest.TestHelpers do @doc """ Waits until fun is true `count * 10` milliseconds. """ - def wait_until(fun, count \\ 1000) do + def wait_until(fun, count \\ div(Application.fetch_env!(:ex_unit, :assert_receive_timeout), 10)) do cond do count == 0 -> raise "waited until fun returned true but it never did" diff --git a/test/test_helper.exs b/test/test_helper.exs index abd54de..ebf8e40 100644 --- a/test/test_helper.exs +++ b/test/test_helper.exs @@ -4,15 +4,16 @@ nodes = [:"first@127.0.0.1", :"second@127.0.0.1", :"third@127.0.0.1"] Firenest.Test.start_boot_server(hd(nodes)) Firenest.Test.start_firenest([hd(nodes)], adapter: Firenest.Topology.Erlang) +parent = self() + # Start other nodes async, so we can start running tests that don't need them right away -pid = - spawn_link(fn -> - receive do: (:continue -> :ok) - Firenest.Test.spawn_nodes(tl(nodes)) - Firenest.Test.start_firenest(tl(nodes), adapter: Firenest.Topology.Erlang) - Process.unregister(:firenest_topology_setup) - Process.sleep(:infinity) - end) +spawn_link(fn -> + Process.register(self(), :firenest_topology_setup) + send(parent, :continue) + Firenest.Test.spawn_nodes(tl(nodes)) + Firenest.Test.start_firenest(tl(nodes), adapter: Firenest.Topology.Erlang) + Process.unregister(:firenest_topology_setup) + Process.sleep(:infinity) +end) -Process.register(pid, :firenest_topology_setup) -send(pid, :continue) +receive do: (:continue -> :ok)