diff --git a/lib/flame/pool.ex b/lib/flame/pool.ex index 9b341b0..a27eaf4 100644 --- a/lib/flame/pool.ex +++ b/lib/flame/pool.ex @@ -58,7 +58,8 @@ defmodule FLAME.Pool do on_grow_start: nil, on_grow_end: nil, on_shrink: nil, - async_boot_timer: nil + async_boot_timer: nil, + hotstart_threshold: 1 def child_spec(opts) do %{ @@ -128,6 +129,10 @@ defmodule FLAME.Pool do * `:name` - The name of the pool * `:count` - The number of runners the pool is attempting to shrink to + + * `:hotstart_threshold` - The percentage of pool utilisation before a new node is preemptively + added to the pool. + Should be a number 0 < n <= 1 """ def start_link(opts) do Keyword.validate!(opts, [ @@ -149,7 +154,8 @@ defmodule FLAME.Pool do :shutdown_timeout, :on_grow_start, :on_grow_end, - :on_shrink + :on_shrink, + :hotstart_threshold ]) GenServer.start_link(__MODULE__, opts, name: Keyword.fetch!(opts, :name)) @@ -330,7 +336,8 @@ defmodule FLAME.Pool do on_grow_start: opts[:on_grow_start], on_grow_end: opts[:on_grow_end], on_shrink: opts[:on_shrink], - runner_opts: runner_opts + runner_opts: runner_opts, + hotstart_threshold: Keyword.get(opts, :hotstart_threshold, 1) } {:ok, boot_runners(state)} @@ -452,10 +459,13 @@ defmodule FLAME.Pool do defp checkout_runner(%Pool{} = state, deadline, from, monitor_ref \\ nil) do min_runner = min_runner(state) runner_count = runner_count(state) + threshold = state.max_concurrency * state.hotstart_threshold cond do runner_count == 0 || !min_runner || - (min_runner.count == state.max_concurrency && runner_count < state.max) -> + (min_runner.count == state.max_concurrency && runner_count < state.max) || + (threshold > 1 && min_runner.count > threshold) -> + # if the threshold is less than 1 you would get a recursive creation of new nodes if map_size(state.pending_runners) > 0 || state.async_boot_timer do waiting_in(state, deadline, from) else diff --git a/test/flame_test.exs b/test/flame_test.exs index 118ddc7..ddad3c2 100644 --- a/test/flame_test.exs +++ b/test/flame_test.exs @@ -58,6 +58,32 @@ defmodule FLAME.FLAMETest do assert new_pool == Supervisor.which_children(runner_sup) end + @tag runner: [min: 1, max: 2, max_concurrency: 8, hotstart_threshold: 0.5] + test "init boots min runners and grows on demand", + %{runner_sup: runner_sup} = config do + min_pool = Supervisor.which_children(runner_sup) + assert [{:undefined, _pid, :worker, [FLAME.Runner]}] = min_pool + # execute against single runner + assert FLAME.call(config.test, fn -> :works end) == :works + + # dynamically grows to max + _task1 = sim_long_running(config.test) + assert FLAME.call(config.test, fn -> :works end) == :works + assert Supervisor.which_children(runner_sup) == min_pool + _task2 = sim_long_running(config.test) + assert FLAME.call(config.test, fn -> :works end) == :works + _task3 = sim_long_running(config.test) + _task4 = sim_long_running(config.test) + _task5 = sim_long_running(config.test) + assert FLAME.call(config.test, fn -> :works end) == :works + # concurrency above hotstart threshold boots new runner + new_pool = Supervisor.which_children(runner_sup) + refute new_pool == min_pool + assert length(new_pool) == 2 + # caller is now queued while waiting for available runner + assert new_pool == Supervisor.which_children(runner_sup) + end + @tag runner: [min: 0, max: 1, max_concurrency: 2] test "concurrent calls on fully pending runners", %{runner_sup: runner_sup} = config do