From ad93806e6e9ac40f4075c5a56566733f45217d01 Mon Sep 17 00:00:00 2001 From: Joshua LeBlanc Date: Sun, 28 Sep 2025 16:20:25 -0300 Subject: [PATCH 1/3] Re-introduce async mode Which was removed in https://github.com/rails/solid_queue/pull/308 refactor: remove unused supervisor require statements fix: use 6075 instead of $ to get correct process ID in solid_queue plugin fix: don't modify array during iteration chore: remove duplicate code chore: logging refactor: remove async thread creation option from runnable process --- lib/puma/plugin/solid_queue.rb | 66 ++++++++++----- lib/solid_queue/async_supervisor.rb | 98 ++++++++++++++++++++++ lib/solid_queue/cli.rb | 2 + lib/solid_queue/configuration.rb | 11 ++- lib/solid_queue/fork_supervisor.rb | 113 ++++++++++++++++++++++++++ lib/solid_queue/processes/runnable.rb | 6 +- lib/solid_queue/supervisor.rb | 110 ++----------------------- 7 files changed, 278 insertions(+), 128 deletions(-) create mode 100644 lib/solid_queue/async_supervisor.rb create mode 100644 lib/solid_queue/fork_supervisor.rb diff --git a/lib/puma/plugin/solid_queue.rb b/lib/puma/plugin/solid_queue.rb index 434b8f65..f67d5709 100644 --- a/lib/puma/plugin/solid_queue.rb +++ b/lib/puma/plugin/solid_queue.rb @@ -1,5 +1,13 @@ require "puma/plugin" +module Puma + class DSL + def solid_queue_mode(mode = :fork) + @options[:solid_queue_mode] = mode.to_sym + end + end +end + Puma::Plugin.create do attr_reader :puma_pid, :solid_queue_pid, :log_writer, :solid_queue_supervisor @@ -7,34 +15,54 @@ def start(launcher) @log_writer = launcher.log_writer @puma_pid = $$ - in_background do - monitor_solid_queue + if launcher.options[:solid_queue_mode] == :async + start_async(launcher) + else + start_forked(launcher) end + end - if Gem::Version.new(Puma::Const::VERSION) < Gem::Version.new("7") - launcher.events.on_booted do - @solid_queue_pid = fork do - Thread.new { monitor_puma } - SolidQueue::Supervisor.start - end + private + def start_forked(launcher) + in_background do + monitor_solid_queue end - launcher.events.on_stopped { stop_solid_queue } - launcher.events.on_restart { stop_solid_queue } - else - launcher.events.after_booted do - @solid_queue_pid = fork do - Thread.new { monitor_puma } - SolidQueue::Supervisor.start + if Gem::Version.new(Puma::Const::VERSION) < Gem::Version.new("7") + launcher.events.on_booted do + @solid_queue_pid = fork do + Thread.new { monitor_puma } + SolidQueue::Supervisor.start(mode: :fork) + end + end + + launcher.events.on_stopped { stop_solid_queue } + launcher.events.on_restart { stop_solid_queue } + else + launcher.events.after_booted do + @solid_queue_pid = fork do + Thread.new { monitor_puma } + SolidQueue::Supervisor.start(mode: :fork) + end end + + launcher.events.after_stopped { stop_solid_queue } + launcher.events.before_restart { stop_solid_queue } end + end - launcher.events.after_stopped { stop_solid_queue } - launcher.events.before_restart { stop_solid_queue } + def start_async(launcher) + if Gem::Version.new(Puma::Const::VERSION) < Gem::Version.new("7") + launcher.events.on_booted { @solid_queue_supervisor = SolidQueue::Supervisor.start(mode: :async) } + launcher.events.on_stopped { solid_queue_supervisor&.stop } + launcher.events.on_restart { solid_queue_supervisor&.stop; @solid_queue_supervisor = SolidQueue::Supervisor.start(mode: :async) } + else + launcher.events.after_booted { @solid_queue_supervisor = SolidQueue::Supervisor.start(mode: :async) } + launcher.events.after_stopped { solid_queue_supervisor&.stop } + launcher.events.before_restart { solid_queue_supervisor&.stop; @solid_queue_supervisor = SolidQueue::Supervisor.start(mode: :async) } + end end - end - private def stop_solid_queue Process.waitpid(solid_queue_pid, Process::WNOHANG) log "Stopping Solid Queue..." diff --git a/lib/solid_queue/async_supervisor.rb b/lib/solid_queue/async_supervisor.rb new file mode 100644 index 00000000..329c1c07 --- /dev/null +++ b/lib/solid_queue/async_supervisor.rb @@ -0,0 +1,98 @@ +# frozen_string_literal: true + +module SolidQueue + class AsyncSupervisor < Supervisor + private + attr_reader :threads + + def start_processes + @threads = {} + + configuration.configured_processes.each { |configured_process| start_process(configured_process) } + end + + def start_process(configured_process) + process_instance = configured_process.instantiate.tap do |instance| + instance.supervised_by process + instance.mode = :async + end + + thread = Thread.new do + begin + process_instance.start + rescue Exception => e + puts "Error in thread: #{e.message}" + puts e.backtrace + end + end + threads[thread] = [ process_instance, configured_process ] + end + + def terminate_gracefully + SolidQueue.instrument(:graceful_termination, process_id: process_id, supervisor_pid: ::Process.pid, supervised_processes: supervised_processes) do |payload| + processes.each(&:stop) + + Timer.wait_until(SolidQueue.shutdown_timeout, -> { all_threads_terminated? }) do + # No-op, we just wait + end + + unless all_threads_terminated? + payload[:shutdown_timeout_exceeded] = true + terminate_immediately + end + end + end + + def terminate_immediately + SolidQueue.instrument(:immediate_termination, process_id: process_id, supervisor_pid: ::Process.pid, supervised_processes: supervised_processes) do + threads.keys.each(&:kill) + end + end + + def supervised_processes + processes.map(&:to_s) + end + + def reap_and_replace_terminated_forks + # No-op in async mode, we'll check for dead threads in the supervise loop + end + + def all_threads_terminated? + threads.keys.all? { |thread| !thread.alive? } + end + + def supervise + loop do + break if stopped? + + set_procline + process_signal_queue + + unless stopped? + check_and_replace_terminated_threads + interruptible_sleep(1.second) + end + end + ensure + shutdown + end + + def check_and_replace_terminated_threads + terminated_threads = {} + threads.each do |thread, (process, configured_process)| + unless thread.alive? + terminated_threads[thread] = configured_process + end + end + + terminated_threads.each do |thread, configured_process| + threads.delete(thread) + start_process(configured_process) + end + end + + def processes + threads.values.map(&:first) + end + end +end \ No newline at end of file diff --git a/lib/solid_queue/cli.rb b/lib/solid_queue/cli.rb index 7bfe555b..930ddaab 100644 --- a/lib/solid_queue/cli.rb +++ b/lib/solid_queue/cli.rb @@ -8,6 +8,8 @@ class Cli < Thor desc: "Path to config file (default: #{Configuration::DEFAULT_CONFIG_FILE_PATH}).", banner: "SOLID_QUEUE_CONFIG" + class_option :mode, type: :string, default: "fork", enum: %w[ fork async ], desc: "Whether to fork processes for workers and dispatchers (fork) or to run these in the same process as the supervisor (async)" + class_option :recurring_schedule_file, type: :string, desc: "Path to recurring schedule definition (default: #{Configuration::DEFAULT_RECURRING_SCHEDULE_FILE_PATH}).", banner: "SOLID_QUEUE_RECURRING_SCHEDULE" diff --git a/lib/solid_queue/configuration.rb b/lib/solid_queue/configuration.rb index b0083a17..c942b6a5 100644 --- a/lib/solid_queue/configuration.rb +++ b/lib/solid_queue/configuration.rb @@ -31,8 +31,11 @@ def instantiate DEFAULT_CONFIG_FILE_PATH = "config/queue.yml" DEFAULT_RECURRING_SCHEDULE_FILE_PATH = "config/recurring.yml" + attr_reader :mode + def initialize(**options) @options = options.with_defaults(default_options) + @mode = @options[:mode].to_s.inquiry end def configured_processes @@ -84,6 +87,7 @@ def ensure_correctly_sized_thread_pool def default_options { + mode: :fork, config_file: Rails.root.join(ENV["SOLID_QUEUE_CONFIG"] || DEFAULT_CONFIG_FILE_PATH), recurring_schedule_file: Rails.root.join(ENV["SOLID_QUEUE_RECURRING_SCHEDULE"] || DEFAULT_RECURRING_SCHEDULE_FILE_PATH), only_work: false, @@ -110,7 +114,12 @@ def skip_recurring_tasks? def workers workers_options.flat_map do |worker_options| - processes = worker_options.fetch(:processes, WORKER_DEFAULTS[:processes]) + processes = if mode.fork? + worker_options.fetch(:processes, WORKER_DEFAULTS[:processes]) + else + 1 + end + processes.times.map { Process.new(:worker, worker_options.with_defaults(WORKER_DEFAULTS)) } end end diff --git a/lib/solid_queue/fork_supervisor.rb b/lib/solid_queue/fork_supervisor.rb new file mode 100644 index 00000000..9e2ecea5 --- /dev/null +++ b/lib/solid_queue/fork_supervisor.rb @@ -0,0 +1,113 @@ +# frozen_string_literal: true + +module SolidQueue + class ForkSupervisor < Supervisor + private + attr_reader :forks, :configured_processes + + def start_processes + @forks = {} + @configured_processes = {} + + configuration.configured_processes.each { |configured_process| start_process(configured_process) } + end + + def start_process(configured_process) + process_instance = configured_process.instantiate.tap do |instance| + instance.supervised_by process + instance.mode = :fork + end + + pid = fork do + process_instance.start + end + + configured_processes[pid] = configured_process + forks[pid] = process_instance + end + + def terminate_gracefully + SolidQueue.instrument(:graceful_termination, process_id: process_id, supervisor_pid: ::Process.pid, supervised_processes: supervised_processes) do |payload| + term_forks + + Timer.wait_until(SolidQueue.shutdown_timeout, -> { all_forks_terminated? }) do + reap_terminated_forks + end + + unless all_forks_terminated? + payload[:shutdown_timeout_exceeded] = true + terminate_immediately + end + end + end + + def terminate_immediately + SolidQueue.instrument(:immediate_termination, process_id: process_id, supervisor_pid: ::Process.pid, supervised_processes: supervised_processes) do + quit_forks + end + end + + def supervised_processes + forks.keys + end + + def term_forks + signal_processes(forks.keys, :TERM) + end + + def quit_forks + signal_processes(forks.keys, :QUIT) + end + + def reap_and_replace_terminated_forks + loop do + pid, status = ::Process.waitpid2(-1, ::Process::WNOHANG) + break unless pid + + replace_fork(pid, status) + end + end + + def reap_terminated_forks + loop do + pid, status = ::Process.waitpid2(-1, ::Process::WNOHANG) + break unless pid + + if (terminated_fork = forks.delete(pid)) && (!status.exited? || status.exitstatus > 0) + handle_claimed_jobs_by(terminated_fork, status) + end + + configured_processes.delete(pid) + end + rescue SystemCallError + # All children already reaped + end + + def replace_fork(pid, status) + SolidQueue.instrument(:replace_fork, supervisor_pid: ::Process.pid, pid: pid, status: status) do |payload| + if terminated_fork = forks.delete(pid) + payload[:fork] = terminated_fork + handle_claimed_jobs_by(terminated_fork, status) + + start_process(configured_processes.delete(pid)) + end + end + end + + # When a supervised fork crashes or exits we need to mark all the + # executions it had claimed as failed so that they can be retried + # by some other worker. + def handle_claimed_jobs_by(terminated_fork, status) + wrap_in_app_executor do + if registered_process = SolidQueue::Process.find_by(name: terminated_fork.name) + error = Processes::ProcessExitError.new(status) + registered_process.fail_all_claimed_executions_with(error) + end + end + end + + def all_forks_terminated? + forks.empty? + end + end +end diff --git a/lib/solid_queue/processes/runnable.rb b/lib/solid_queue/processes/runnable.rb index 33b441f6..aa5bb8f6 100644 --- a/lib/solid_queue/processes/runnable.rb +++ b/lib/solid_queue/processes/runnable.rb @@ -9,11 +9,7 @@ module Runnable def start boot - if running_async? - @thread = create_thread { run } - else - run - end + run end def stop diff --git a/lib/solid_queue/supervisor.rb b/lib/solid_queue/supervisor.rb index ef9c79d6..5c999a84 100644 --- a/lib/solid_queue/supervisor.rb +++ b/lib/solid_queue/supervisor.rb @@ -13,7 +13,8 @@ def start(**options) configuration = Configuration.new(**options) if configuration.valid? - new(configuration).tap(&:start) + klass = configuration.mode.fork? ? ForkSupervisor : AsyncSupervisor + klass.new(configuration).tap(&:start) else abort configuration.errors.full_messages.join("\n") + "\nExiting..." end @@ -22,9 +23,6 @@ def start(**options) def initialize(configuration) @configuration = configuration - @forks = {} - @configured_processes = {} - super end @@ -44,7 +42,7 @@ def stop end private - attr_reader :configuration, :forks, :configured_processes + attr_reader :configuration def boot SolidQueue.instrument(:start_process, process: self) do @@ -54,10 +52,6 @@ def boot end end - def start_processes - configuration.configured_processes.each { |configured_process| start_process(configured_process) } - end - def supervise loop do break if stopped? @@ -74,45 +68,14 @@ def supervise shutdown end - def start_process(configured_process) - process_instance = configured_process.instantiate.tap do |instance| - instance.supervised_by process - instance.mode = :fork - end - - pid = fork do - process_instance.start - end - - configured_processes[pid] = configured_process - forks[pid] = process_instance + def start_processes + raise NotImplementedError end def set_procline procline "supervising #{supervised_processes.join(", ")}" end - def terminate_gracefully - SolidQueue.instrument(:graceful_termination, process_id: process_id, supervisor_pid: ::Process.pid, supervised_processes: supervised_processes) do |payload| - term_forks - - Timer.wait_until(SolidQueue.shutdown_timeout, -> { all_forks_terminated? }) do - reap_terminated_forks - end - - unless all_forks_terminated? - payload[:shutdown_timeout_exceeded] = true - terminate_immediately - end - end - end - - def terminate_immediately - SolidQueue.instrument(:immediate_termination, process_id: process_id, supervisor_pid: ::Process.pid, supervised_processes: supervised_processes) do - quit_forks - end - end - def shutdown SolidQueue.instrument(:shutdown_process, process: self) do run_callbacks(:shutdown) do @@ -125,67 +88,8 @@ def sync_std_streams STDOUT.sync = STDERR.sync = true end - def supervised_processes - forks.keys - end - - def term_forks - signal_processes(forks.keys, :TERM) - end - - def quit_forks - signal_processes(forks.keys, :QUIT) - end - def reap_and_replace_terminated_forks - loop do - pid, status = ::Process.waitpid2(-1, ::Process::WNOHANG) - break unless pid - - replace_fork(pid, status) - end - end - - def reap_terminated_forks - loop do - pid, status = ::Process.waitpid2(-1, ::Process::WNOHANG) - break unless pid - - if (terminated_fork = forks.delete(pid)) && (!status.exited? || status.exitstatus > 0) - handle_claimed_jobs_by(terminated_fork, status) - end - - configured_processes.delete(pid) - end - rescue SystemCallError - # All children already reaped - end - - def replace_fork(pid, status) - SolidQueue.instrument(:replace_fork, supervisor_pid: ::Process.pid, pid: pid, status: status) do |payload| - if terminated_fork = forks.delete(pid) - payload[:fork] = terminated_fork - handle_claimed_jobs_by(terminated_fork, status) - - start_process(configured_processes.delete(pid)) - end - end - end - - # When a supervised fork crashes or exits we need to mark all the - # executions it had claimed as failed so that they can be retried - # by some other worker. - def handle_claimed_jobs_by(terminated_fork, status) - wrap_in_app_executor do - if registered_process = SolidQueue::Process.find_by(name: terminated_fork.name) - error = Processes::ProcessExitError.new(status) - registered_process.fail_all_claimed_executions_with(error) - end - end - end - - def all_forks_terminated? - forks.empty? + # No-op by default, implemented in ForkSupervisor end end -end +end \ No newline at end of file From dce990df1f11bd7d1b7e5bb32a10c6dc43d883dd Mon Sep 17 00:00:00 2001 From: Rosa Gutierrez Date: Tue, 30 Dec 2025 20:04:29 +0100 Subject: [PATCH 2/3] Refactor async mode and restore tests for it The tests were removed in 4b3483a9f85c408bba509b179c6ebccd5978e4c9 This also restores support for "standalone" async mode, which means the Solid Queue's supervisor running on its own process, via `bin/jobs`, but having all other processes (workers, dispatchers, schedulers) run as threads instead of processes. When Solid Queue is run via the Puma plugin, it won't be considered standalone, and it won't need to handle signals and act on them. --- lib/puma/plugin/solid_queue.rb | 40 +++- lib/solid_queue/async_supervisor.rb | 99 ++------ lib/solid_queue/configuration.rb | 12 +- lib/solid_queue/fork_supervisor.rb | 135 ++++------- lib/solid_queue/processes/runnable.rb | 32 ++- .../processes/thread_terminated_error.rb | 11 + lib/solid_queue/supervisor.rb | 74 +++++- lib/solid_queue/supervisor/signals.rb | 4 +- lib/solid_queue/timer.rb | 6 +- test/dummy/config/puma.rb | 45 +--- test/dummy/config/puma_async.rb | 46 ++++ test/dummy/config/puma_fork.rb | 46 ++++ .../async_processes_lifecycle_test.rb | 222 ++++++++++++++++++ ....rb => forked_processes_lifecycle_test.rb} | 4 +- test/integration/instrumentation_test.rb | 3 +- test/integration/lifecycle_hooks_test.rb | 2 +- test/integration/puma/plugin_async_test.rb | 13 + test/integration/puma/plugin_fork_test.rb | 30 +++ test/integration/puma/plugin_test.rb | 63 ----- test/integration/puma/plugin_testing.rb | 61 +++++ test/models/solid_queue/process_test.rb | 2 +- test/test_helper.rb | 2 + test/unit/async_supervisor_test.rb | 115 +++++++++ test/unit/dispatcher_test.rb | 74 +++--- ...rvisor_test.rb => fork_supervisor_test.rb} | 6 +- test/unit/process_recovery_test.rb | 2 +- test/unit/worker_test.rb | 1 + 27 files changed, 797 insertions(+), 353 deletions(-) create mode 100644 lib/solid_queue/processes/thread_terminated_error.rb mode change 100644 => 120000 test/dummy/config/puma.rb create mode 100644 test/dummy/config/puma_async.rb create mode 100644 test/dummy/config/puma_fork.rb create mode 100644 test/integration/async_processes_lifecycle_test.rb rename test/integration/{processes_lifecycle_test.rb => forked_processes_lifecycle_test.rb} (98%) create mode 100644 test/integration/puma/plugin_async_test.rb create mode 100644 test/integration/puma/plugin_fork_test.rb delete mode 100644 test/integration/puma/plugin_test.rb create mode 100644 test/integration/puma/plugin_testing.rb create mode 100644 test/unit/async_supervisor_test.rb rename test/unit/{supervisor_test.rb => fork_supervisor_test.rb} (97%) diff --git a/lib/puma/plugin/solid_queue.rb b/lib/puma/plugin/solid_queue.rb index f67d5709..70acfb9a 100644 --- a/lib/puma/plugin/solid_queue.rb +++ b/lib/puma/plugin/solid_queue.rb @@ -36,8 +36,8 @@ def start_forked(launcher) end end - launcher.events.on_stopped { stop_solid_queue } - launcher.events.on_restart { stop_solid_queue } + launcher.events.on_stopped { stop_solid_queue_fork } + launcher.events.on_restart { stop_solid_queue_fork } else launcher.events.after_booted do @solid_queue_pid = fork do @@ -46,24 +46,38 @@ def start_forked(launcher) end end - launcher.events.after_stopped { stop_solid_queue } - launcher.events.before_restart { stop_solid_queue } + launcher.events.after_stopped { stop_solid_queue_fork } + launcher.events.before_restart { stop_solid_queue_fork } end end def start_async(launcher) if Gem::Version.new(Puma::Const::VERSION) < Gem::Version.new("7") - launcher.events.on_booted { @solid_queue_supervisor = SolidQueue::Supervisor.start(mode: :async) } - launcher.events.on_stopped { solid_queue_supervisor&.stop } - launcher.events.on_restart { solid_queue_supervisor&.stop; @solid_queue_supervisor = SolidQueue::Supervisor.start(mode: :async) } + launcher.events.on_booted do + @solid_queue_supervisor = SolidQueue::Supervisor.start(mode: :async, standalone: false) + end + + launcher.events.on_stopped { @solid_queue_supervisor&.stop } + + launcher.events.on_restart do + solid_queue_supervisor&.stop + @solid_queue_supervisor = SolidQueue::Supervisor.start(mode: :async, standalone: false) + end else - launcher.events.after_booted { @solid_queue_supervisor = SolidQueue::Supervisor.start(mode: :async) } - launcher.events.after_stopped { solid_queue_supervisor&.stop } - launcher.events.before_restart { solid_queue_supervisor&.stop; @solid_queue_supervisor = SolidQueue::Supervisor.start(mode: :async) } + launcher.events.after_booted do + @solid_queue_supervisor = SolidQueue::Supervisor.start(mode: :async, standalone: false) + end + + launcher.events.after_stopped { @solid_queue_supervisor&.stop } + + launcher.events.before_restart do + solid_queue_supervisor&.stop + @solid_queue_supervisor = SolidQueue::Supervisor.start(mode: :async, standalone: false) + end end end - def stop_solid_queue + def stop_solid_queue_fork Process.waitpid(solid_queue_pid, Process::WNOHANG) log "Stopping Solid Queue..." Process.kill(:INT, solid_queue_pid) if solid_queue_pid @@ -76,7 +90,7 @@ def monitor_puma end def monitor_solid_queue - monitor(:solid_queue_dead?, "Detected Solid Queue has gone away, stopping Puma...") + monitor(:solid_queue_fork_dead?, "Detected Solid Queue has gone away, stopping Puma...") end def monitor(process_dead, message) @@ -90,7 +104,7 @@ def monitor(process_dead, message) end end - def solid_queue_dead? + def solid_queue_fork_dead? if solid_queue_started? Process.waitpid(solid_queue_pid, Process::WNOHANG) end diff --git a/lib/solid_queue/async_supervisor.rb b/lib/solid_queue/async_supervisor.rb index 329c1c07..f1a8aed2 100644 --- a/lib/solid_queue/async_supervisor.rb +++ b/lib/solid_queue/async_supervisor.rb @@ -3,96 +3,43 @@ module SolidQueue class AsyncSupervisor < Supervisor private - attr_reader :threads - - def start_processes - @threads = {} - - configuration.configured_processes.each { |configured_process| start_process(configured_process) } + def check_and_replace_terminated_processes + terminated_threads = process_instances.select { |thread_id, instance| !instance.alive? } + terminated_threads.each { |thread_id, instance| replace_thread(thread_id, instance) } end - def start_process(configured_process) - process_instance = configured_process.instantiate.tap do |instance| - instance.supervised_by process - instance.mode = :async - end + def replace_thread(thread_id, instance) + SolidQueue.instrument(:replace_thread, supervisor_pid: ::Process.pid) do |payload| + payload[:thread] = instance + handle_claimed_jobs_by(terminated_instance, thread) - thread = Thread.new do - begin - process_instance.start - rescue Exception => e - puts "Error in thread: #{e.message}" - puts e.backtrace - end + start_process(configured_processes.delete(thread_id)) end - threads[thread] = [ process_instance, configured_process ] end - def terminate_gracefully - SolidQueue.instrument(:graceful_termination, process_id: process_id, supervisor_pid: ::Process.pid, supervised_processes: supervised_processes) do |payload| - processes.each(&:stop) - - Timer.wait_until(SolidQueue.shutdown_timeout, -> { all_threads_terminated? }) do - # No-op, we just wait - end - - unless all_threads_terminated? - payload[:shutdown_timeout_exceeded] = true - terminate_immediately - end - end - end + def perform_graceful_termination + process_instances.values.each(&:stop) - def terminate_immediately - SolidQueue.instrument(:immediate_termination, process_id: process_id, supervisor_pid: ::Process.pid, supervised_processes: supervised_processes) do - threads.keys.each(&:kill) - end + Timer.wait_until(SolidQueue.shutdown_timeout, -> { all_processes_terminated? }) end - def supervised_processes - processes.map(&:to_s) + def perform_immediate_termination + exit! end - def reap_and_replace_terminated_forks - # No-op in async mode, we'll check for dead threads in the supervise loop + def all_processes_terminated? + process_instances.values.none?(&:alive?) end - def all_threads_terminated? - threads.keys.all? { |thread| !thread.alive? } - end - - def supervise - loop do - break if stopped? - - set_procline - process_signal_queue - - unless stopped? - check_and_replace_terminated_threads - interruptible_sleep(1.second) + # When a supervised thread terminates unexpectedly, mark all executions + # it had claimed as failed so they can be retried by another worker. + def handle_claimed_jobs_by(terminated_instance, thread) + wrap_in_app_executor do + if registered_process = SolidQueue::Process.find_by(name: terminated_instance.name) + error = Processes::ThreadTerminatedError.new(terminated_instance.name) + registered_process.fail_all_claimed_executions_with(error) end end - ensure - shutdown - end - - def check_and_replace_terminated_threads - terminated_threads = {} - threads.each do |thread, (process, configured_process)| - unless thread.alive? - terminated_threads[thread] = configured_process - end - end - - terminated_threads.each do |thread, configured_process| - threads.delete(thread) - start_process(configured_process) - end - end - - def processes - threads.values.map(&:first) end end -end \ No newline at end of file +end diff --git a/lib/solid_queue/configuration.rb b/lib/solid_queue/configuration.rb index c942b6a5..c62d6141 100644 --- a/lib/solid_queue/configuration.rb +++ b/lib/solid_queue/configuration.rb @@ -31,11 +31,8 @@ def instantiate DEFAULT_CONFIG_FILE_PATH = "config/queue.yml" DEFAULT_RECURRING_SCHEDULE_FILE_PATH = "config/recurring.yml" - attr_reader :mode - def initialize(**options) @options = options.with_defaults(default_options) - @mode = @options[:mode].to_s.inquiry end def configured_processes @@ -59,6 +56,14 @@ def error_messages end end + def mode + @options[:mode].to_s.inquiry + end + + def standalone? + mode.fork? || @options[:standalone] + end + private attr_reader :options @@ -88,6 +93,7 @@ def ensure_correctly_sized_thread_pool def default_options { mode: :fork, + standalone: true, config_file: Rails.root.join(ENV["SOLID_QUEUE_CONFIG"] || DEFAULT_CONFIG_FILE_PATH), recurring_schedule_file: Rails.root.join(ENV["SOLID_QUEUE_RECURRING_SCHEDULE"] || DEFAULT_RECURRING_SCHEDULE_FILE_PATH), only_work: false, diff --git a/lib/solid_queue/fork_supervisor.rb b/lib/solid_queue/fork_supervisor.rb index 9e2ecea5..126b2171 100644 --- a/lib/solid_queue/fork_supervisor.rb +++ b/lib/solid_queue/fork_supervisor.rb @@ -3,111 +3,76 @@ module SolidQueue class ForkSupervisor < Supervisor private - attr_reader :forks, :configured_processes - def start_processes - @forks = {} - @configured_processes = {} + def perform_graceful_termination + term_forks - configuration.configured_processes.each { |configured_process| start_process(configured_process) } + Timer.wait_until(SolidQueue.shutdown_timeout, -> { all_processes_terminated? }) do + reap_terminated_forks end + end - def start_process(configured_process) - process_instance = configured_process.instantiate.tap do |instance| - instance.supervised_by process - instance.mode = :fork - end - - pid = fork do - process_instance.start - end - - configured_processes[pid] = configured_process - forks[pid] = process_instance - end - - def terminate_gracefully - SolidQueue.instrument(:graceful_termination, process_id: process_id, supervisor_pid: ::Process.pid, supervised_processes: supervised_processes) do |payload| - term_forks - - Timer.wait_until(SolidQueue.shutdown_timeout, -> { all_forks_terminated? }) do - reap_terminated_forks - end - - unless all_forks_terminated? - payload[:shutdown_timeout_exceeded] = true - terminate_immediately - end - end - end + def perform_immediate_termination + quit_forks + end - def terminate_immediately - SolidQueue.instrument(:immediate_termination, process_id: process_id, supervisor_pid: ::Process.pid, supervised_processes: supervised_processes) do - quit_forks - end - end + def term_forks + signal_processes(process_instances.keys, :TERM) + end - def supervised_processes - forks.keys - end + def quit_forks + signal_processes(process_instances.keys, :QUIT) + end - def term_forks - signal_processes(forks.keys, :TERM) - end + def check_and_replace_terminated_processes + loop do + pid, status = ::Process.waitpid2(-1, ::Process::WNOHANG) + break unless pid - def quit_forks - signal_processes(forks.keys, :QUIT) + replace_fork(pid, status) end + end - def reap_and_replace_terminated_forks - loop do - pid, status = ::Process.waitpid2(-1, ::Process::WNOHANG) - break unless pid + def reap_terminated_forks + loop do + pid, status = ::Process.waitpid2(-1, ::Process::WNOHANG) + break unless pid - replace_fork(pid, status) + if (terminated_fork = process_instances.delete(pid)) && !status.exited? || status.exitstatus > 0 + handle_claimed_jobs_by(terminated_fork, status) end - end - def reap_terminated_forks - loop do - pid, status = ::Process.waitpid2(-1, ::Process::WNOHANG) - break unless pid - - if (terminated_fork = forks.delete(pid)) && (!status.exited? || status.exitstatus > 0) - handle_claimed_jobs_by(terminated_fork, status) - end - - configured_processes.delete(pid) - end - rescue SystemCallError - # All children already reaped + configured_processes.delete(pid) end + rescue SystemCallError + # All children already reaped + end - def replace_fork(pid, status) - SolidQueue.instrument(:replace_fork, supervisor_pid: ::Process.pid, pid: pid, status: status) do |payload| - if terminated_fork = forks.delete(pid) - payload[:fork] = terminated_fork - handle_claimed_jobs_by(terminated_fork, status) + def replace_fork(pid, status) + SolidQueue.instrument(:replace_fork, supervisor_pid: ::Process.pid, pid: pid, status: status) do |payload| + if terminated_fork = process_instances.delete(pid) + payload[:fork] = terminated_fork + handle_claimed_jobs_by(terminated_fork, status) - start_process(configured_processes.delete(pid)) - end + start_process(configured_processes.delete(pid)) end end - - # When a supervised fork crashes or exits we need to mark all the - # executions it had claimed as failed so that they can be retried - # by some other worker. - def handle_claimed_jobs_by(terminated_fork, status) - wrap_in_app_executor do - if registered_process = SolidQueue::Process.find_by(name: terminated_fork.name) - error = Processes::ProcessExitError.new(status) - registered_process.fail_all_claimed_executions_with(error) - end + end + + # When a supervised fork crashes or exits we need to mark all the + # executions it had claimed as failed so that they can be retried + # by some other worker. + def handle_claimed_jobs_by(terminated_fork, status) + wrap_in_app_executor do + if registered_process = SolidQueue::Process.find_by(name: terminated_fork.name) + error = Processes::ProcessExitError.new(status) + registered_process.fail_all_claimed_executions_with(error) end end + end - def all_forks_terminated? - forks.empty? - end + def all_processes_terminated? + process_instances.empty? + end end end diff --git a/lib/solid_queue/processes/runnable.rb b/lib/solid_queue/processes/runnable.rb index aa5bb8f6..a3aa4fab 100644 --- a/lib/solid_queue/processes/runnable.rb +++ b/lib/solid_queue/processes/runnable.rb @@ -7,16 +7,26 @@ module Runnable attr_writer :mode def start - boot - - run + run_in_mode do + boot + run + end end def stop super - wake_up - @thread&.join + + # When not supervised, block until the thread terminates for backward + # compatibility with code that expects stop to be synchronous. + # When supervised, the supervisor controls the shutdown timeout. + unless supervised? + @thread&.join + end + end + + def alive? + !running_async? || @thread&.alive? end private @@ -26,6 +36,18 @@ def mode (@mode || DEFAULT_MODE).to_s.inquiry end + def run_in_mode(&block) + case + when running_as_fork? + fork(&block) + when running_async? + @thread = create_thread(&block) + @thread.object_id + else + block.call + end + end + def boot SolidQueue.instrument(:start_process, process: self) do run_callbacks(:boot) do diff --git a/lib/solid_queue/processes/thread_terminated_error.rb b/lib/solid_queue/processes/thread_terminated_error.rb new file mode 100644 index 00000000..7f1d6f7a --- /dev/null +++ b/lib/solid_queue/processes/thread_terminated_error.rb @@ -0,0 +1,11 @@ +# frozen_string_literal: true + +module SolidQueue + module Processes + class ThreadTerminatedError < RuntimeError + def initialize(name) + super("Thread #{name} terminated unexpectedly") + end + end + end +end diff --git a/lib/solid_queue/supervisor.rb b/lib/solid_queue/supervisor.rb index 5c999a84..89696eba 100644 --- a/lib/solid_queue/supervisor.rb +++ b/lib/solid_queue/supervisor.rb @@ -21,8 +21,14 @@ def start(**options) end end + delegate :mode, :standalone?, to: :configuration + def initialize(configuration) @configuration = configuration + + @configured_processes = {} + @process_instances = {} + super end @@ -41,8 +47,12 @@ def stop run_stop_hooks end + def kind + "Supervisor(#{mode})" + end + private - attr_reader :configuration + attr_reader :configuration, :configured_processes, :process_instances def boot SolidQueue.instrument(:start_process, process: self) do @@ -52,15 +62,21 @@ def boot end end + def start_processes + configuration.configured_processes.each { |configured_process| start_process(configured_process) } + end + def supervise loop do break if stopped? - set_procline - process_signal_queue + if standalone? + set_procline + process_signal_queue + end unless stopped? - reap_and_replace_terminated_forks + check_and_replace_terminated_processes interruptible_sleep(1.second) end end @@ -68,12 +84,48 @@ def supervise shutdown end - def start_processes + def start_process(configured_process) + process_instance = configured_process.instantiate.tap do |instance| + instance.supervised_by process + instance.mode = mode + end + + process_id = process_instance.start + + configured_processes[process_id] = configured_process + process_instances[process_id] = process_instance + end + + def check_and_replace_terminated_processes + end + + def terminate_gracefully + SolidQueue.instrument(:graceful_termination, process_id: process_id, supervisor_pid: ::Process.pid, supervised_processes: configured_processes.keys) do |payload| + perform_graceful_termination + + unless all_processes_terminated? + payload[:shutdown_timeout_exceeded] = true + terminate_immediately + end + end + end + + def terminate_immediately + SolidQueue.instrument(:immediate_termination, process_id: process_id, supervisor_pid: ::Process.pid, supervised_processes: configured_processes.keys) do + perform_immediate_termination + end + end + + def perform_graceful_termination raise NotImplementedError end - def set_procline - procline "supervising #{supervised_processes.join(", ")}" + def perform_immediate_termination + raise NotImplementedError + end + + def all_processes_terminated? + raise NotImplementedError end def shutdown @@ -84,12 +136,12 @@ def shutdown end end - def sync_std_streams - STDOUT.sync = STDERR.sync = true + def set_procline + procline "supervising #{configured_processes.keys.join(", ")}" end - def reap_and_replace_terminated_forks - # No-op by default, implemented in ForkSupervisor + def sync_std_streams + STDOUT.sync = STDERR.sync = true end end end \ No newline at end of file diff --git a/lib/solid_queue/supervisor/signals.rb b/lib/solid_queue/supervisor/signals.rb index fe0960d5..7bee107d 100644 --- a/lib/solid_queue/supervisor/signals.rb +++ b/lib/solid_queue/supervisor/signals.rb @@ -6,8 +6,8 @@ module Signals extend ActiveSupport::Concern included do - before_boot :register_signal_handlers - after_shutdown :restore_default_signal_handlers + before_boot :register_signal_handlers, if: :standalone? + after_shutdown :restore_default_signal_handlers, if: :standalone? end private diff --git a/lib/solid_queue/timer.rb b/lib/solid_queue/timer.rb index ca16466d..19e691f5 100644 --- a/lib/solid_queue/timer.rb +++ b/lib/solid_queue/timer.rb @@ -4,18 +4,18 @@ module SolidQueue module Timer extend self - def wait_until(timeout, condition, &block) + def wait_until(timeout, condition) if timeout > 0 deadline = monotonic_time_now + timeout while monotonic_time_now < deadline && !condition.call sleep 0.1 - block.call + yield if block_given? end else while !condition.call sleep 0.5 - block.call + yield if block_given? end end end diff --git a/test/dummy/config/puma.rb b/test/dummy/config/puma.rb deleted file mode 100644 index d4f1ae10..00000000 --- a/test/dummy/config/puma.rb +++ /dev/null @@ -1,44 +0,0 @@ -# Puma can serve each request in a thread from an internal thread pool. -# The `threads` method setting takes two numbers: a minimum and maximum. -# Any libraries that use thread pools should be configured to match -# the maximum value specified for Puma. Default is set to 5 threads for minimum -# and maximum; this matches the default thread size of Active Record. -# -max_threads_count = ENV.fetch("RAILS_MAX_THREADS") { 5 } -min_threads_count = ENV.fetch("RAILS_MIN_THREADS") { max_threads_count } -threads min_threads_count, max_threads_count - -# Specifies the `worker_timeout` threshold that Puma will use to wait before -# terminating a worker in development environments. -# -worker_timeout 3600 if ENV.fetch("RAILS_ENV", "development") == "development" - -# Specifies the `port` that Puma will listen on to receive requests; default is 3000. -# -port ENV.fetch("PORT") { 3000 } - -# Specifies the `environment` that Puma will run in. -# -environment ENV.fetch("RAILS_ENV") { "development" } - -# Specifies the `pidfile` that Puma will use. -pidfile ENV.fetch("PIDFILE") { "tmp/pids/server.pid" } - -# Specifies the number of `workers` to boot in clustered mode. -# Workers are forked web server processes. If using threads and workers together -# the concurrency of the application would be max `threads` * `workers`. -# Workers do not work on JRuby or Windows (both of which do not support -# processes). -# -# workers ENV.fetch("WEB_CONCURRENCY") { 2 } - -# Use the `preload_app!` method when specifying a `workers` number. -# This directive tells Puma to first boot the application and load code -# before forking the application. This takes advantage of Copy On Write -# process behavior so workers use less memory. -# -# preload_app! - -# Allow puma to be restarted by `bin/rails restart` command. -plugin :tmp_restart -plugin :solid_queue diff --git a/test/dummy/config/puma.rb b/test/dummy/config/puma.rb new file mode 120000 index 00000000..f923a826 --- /dev/null +++ b/test/dummy/config/puma.rb @@ -0,0 +1 @@ +puma_fork.rb \ No newline at end of file diff --git a/test/dummy/config/puma_async.rb b/test/dummy/config/puma_async.rb new file mode 100644 index 00000000..beb65259 --- /dev/null +++ b/test/dummy/config/puma_async.rb @@ -0,0 +1,46 @@ +# Puma can serve each request in a thread from an internal thread pool. +# The `threads` method setting takes two numbers: a minimum and maximum. +# Any libraries that use thread pools should be configured to match +# the maximum value specified for Puma. Default is set to 5 threads for minimum +# and maximum; this matches the default thread size of Active Record. +# +max_threads_count = ENV.fetch("RAILS_MAX_THREADS") { 5 } +min_threads_count = ENV.fetch("RAILS_MIN_THREADS") { max_threads_count } +threads min_threads_count, max_threads_count + +# Specifies the `worker_timeout` threshold that Puma will use to wait before +# terminating a worker in development environments. +# +worker_timeout 3600 if ENV.fetch("RAILS_ENV", "development") == "development" + +# Specifies the `port` that Puma will listen on to receive requests; default is 3000. +# +port ENV.fetch("PORT") { 3000 } + +# Specifies the `environment` that Puma will run in. +# +environment ENV.fetch("RAILS_ENV") { "development" } + +# Specifies the `pidfile` that Puma will use. +pidfile ENV.fetch("PIDFILE") { "tmp/pids/server.pid" } + +# Specifies the number of `workers` to boot in clustered mode. +# Workers are forked web server processes. If using threads and workers together +# the concurrency of the application would be max `threads` * `workers`. +# Workers do not work on JRuby or Windows (both of which do not support +# processes). +# +# workers ENV.fetch("WEB_CONCURRENCY") { 2 } + +# Use the `preload_app!` method when specifying a `workers` number. +# This directive tells Puma to first boot the application and load code +# before forking the application. This takes advantage of Copy On Write +# process behavior so workers use less memory. +# +# preload_app! + +# Allow puma to be restarted by `bin/rails restart` command. +plugin :tmp_restart +plugin :solid_queue + +solid_queue_mode :async diff --git a/test/dummy/config/puma_fork.rb b/test/dummy/config/puma_fork.rb new file mode 100644 index 00000000..4cdbbfd1 --- /dev/null +++ b/test/dummy/config/puma_fork.rb @@ -0,0 +1,46 @@ +# Puma can serve each request in a thread from an internal thread pool. +# The `threads` method setting takes two numbers: a minimum and maximum. +# Any libraries that use thread pools should be configured to match +# the maximum value specified for Puma. Default is set to 5 threads for minimum +# and maximum; this matches the default thread size of Active Record. +# +max_threads_count = ENV.fetch("RAILS_MAX_THREADS") { 5 } +min_threads_count = ENV.fetch("RAILS_MIN_THREADS") { max_threads_count } +threads min_threads_count, max_threads_count + +# Specifies the `worker_timeout` threshold that Puma will use to wait before +# terminating a worker in development environments. +# +worker_timeout 3600 if ENV.fetch("RAILS_ENV", "development") == "development" + +# Specifies the `port` that Puma will listen on to receive requests; default is 3000. +# +port ENV.fetch("PORT") { 3000 } + +# Specifies the `environment` that Puma will run in. +# +environment ENV.fetch("RAILS_ENV") { "development" } + +# Specifies the `pidfile` that Puma will use. +pidfile ENV.fetch("PIDFILE") { "tmp/pids/server.pid" } + +# Specifies the number of `workers` to boot in clustered mode. +# Workers are forked web server processes. If using threads and workers together +# the concurrency of the application would be max `threads` * `workers`. +# Workers do not work on JRuby or Windows (both of which do not support +# processes). +# +# workers ENV.fetch("WEB_CONCURRENCY") { 2 } + +# Use the `preload_app!` method when specifying a `workers` number. +# This directive tells Puma to first boot the application and load code +# before forking the application. This takes advantage of Copy On Write +# process behavior so workers use less memory. +# +# preload_app! + +# Allow puma to be restarted by `bin/rails restart` command. +plugin :tmp_restart +plugin :solid_queue + +solid_queue_mode :fork diff --git a/test/integration/async_processes_lifecycle_test.rb b/test/integration/async_processes_lifecycle_test.rb new file mode 100644 index 00000000..1d22a2c9 --- /dev/null +++ b/test/integration/async_processes_lifecycle_test.rb @@ -0,0 +1,222 @@ +# frozen_string_literal: true + +require "test_helper" + +class AsyncProcessesLifecycleTest < ActiveSupport::TestCase + self.use_transactional_tests = false + + setup do + @pid = run_supervisor_as_fork(mode: :async, workers: [ { queues: :background }, { queues: :default, threads: 5 } ]) + + wait_for_registered_processes(3, timeout: 3.second) + assert_registered_workers_for(:background, :default, supervisor_pid: @pid) + end + + teardown do + terminate_process(@pid) if process_exists?(@pid) + end + + test "enqueue jobs in multiple queues" do + 6.times { |i| enqueue_store_result_job("job_#{i}") } + 6.times { |i| enqueue_store_result_job("job_#{i}", :default) } + + wait_for_jobs_to_finish_for(2.seconds) + + assert_equal 12, JobResult.count + 6.times { |i| assert_completed_job_results("job_#{i}", :background) } + 6.times { |i| assert_completed_job_results("job_#{i}", :default) } + + terminate_process(@pid) + assert_clean_termination + end + + test "kill supervisor while there are jobs in-flight" do + no_pause = enqueue_store_result_job("no pause") + pause = enqueue_store_result_job("pause", pause: 1.second) + + signal_process(@pid, :KILL, wait: 0.2.seconds) + wait_for_jobs_to_finish_for(2.seconds) + wait_for_registered_processes(1, timeout: 3.second) + + assert_not process_exists?(@pid) + + assert_completed_job_results("no pause") + assert_job_status(no_pause, :finished) + + # In async mode, killing the supervisor kills all threads too, + # so we can't complete in-flight jobs + assert_registered_supervisor + assert_registered_workers_for(:background, :default, supervisor_pid: @pid) + assert_started_job_result("pause") + assert_claimed_jobs + end + + test "term supervisor multiple times" do + 5.times do + signal_process(@pid, :TERM, wait: 0.1.second) + end + + sleep(1.second) + assert_clean_termination + end + + test "quit supervisor while there are jobs in-flight" do + no_pause = enqueue_store_result_job("no pause") + pause = enqueue_store_result_job("pause", pause: 1.second) + + wait_while_with_timeout(1.second) { SolidQueue::ReadyExecution.count > 0 } + + signal_process(@pid, :QUIT, wait: 0.4.second) + wait_for_jobs_to_finish_for(2.seconds, except: pause) + + wait_while_with_timeout(2.seconds) { process_exists?(@pid) } + assert_not process_exists?(@pid) + + # In async mode, QUIT calls exit! which terminates immediately without cleanup. + # The in-flight job remains claimed and the process/workers remain registered. + # A future supervisor will need to prune and fail these orphaned executions. + assert_completed_job_results("no pause") + assert_job_status(no_pause, :finished) + assert_started_job_result("pause") + assert_job_status(pause, :claimed) + + assert_registered_supervisor + assert_registered_workers_for(:background, :default, supervisor_pid: @pid) + assert_claimed_jobs + end + + test "term supervisor while there are jobs in-flight" do + no_pause = enqueue_store_result_job("no pause") + pause = enqueue_store_result_job("pause", pause: 0.2.seconds) + + signal_process(@pid, :TERM, wait: 0.3.second) + wait_for_jobs_to_finish_for(3.seconds) + + assert_completed_job_results("no pause") + assert_completed_job_results("pause") + + assert_job_status(no_pause, :finished) + assert_job_status(pause, :finished) + + wait_for_process_termination_with_timeout(@pid, timeout: 1.second) + assert_clean_termination + end + + test "int supervisor while there are jobs in-flight" do + no_pause = enqueue_store_result_job("no pause") + pause = enqueue_store_result_job("pause", pause: 0.2.seconds) + + signal_process(@pid, :INT, wait: 0.3.second) + wait_for_jobs_to_finish_for(2.second) + + assert_completed_job_results("no pause") + assert_completed_job_results("pause") + + assert_job_status(no_pause, :finished) + assert_job_status(pause, :finished) + + wait_for_process_termination_with_timeout(@pid, timeout: 1.second) + assert_clean_termination + end + + test "term supervisor exceeding timeout while there are jobs in-flight" do + no_pause = enqueue_store_result_job("no pause") + pause = enqueue_store_result_job("pause", pause: SolidQueue.shutdown_timeout + 10.second) + + wait_while_with_timeout(1.second) { SolidQueue::ReadyExecution.count > 1 } + + signal_process(@pid, :TERM, wait: 0.5.second) + wait_for_jobs_to_finish_for(2.seconds, except: pause) + + # exit! exits with status 1 by default + wait_for_process_termination_with_timeout(@pid, timeout: SolidQueue.shutdown_timeout + 5.seconds, exitstatus: 1) + assert_not process_exists?(@pid) + + assert_completed_job_results("no pause") + assert_job_status(no_pause, :finished) + + # When timeout is exceeded, exit! is called without cleanup. + # The in-flight job stays claimed and processes stay registered. + # A future supervisor will need to prune and fail these orphaned executions. + assert_started_job_result("pause") + assert_job_status(pause, :claimed) + + assert_registered_supervisor + assert find_processes_registered_as("Worker").any? { |w| w.metadata["queues"].include?("background") } + assert_claimed_jobs + end + + test "process some jobs that raise errors" do + 2.times { enqueue_store_result_job("no error", :background) } + 2.times { enqueue_store_result_job("no error", :default) } + error1 = enqueue_store_result_job("error", :background, exception: ExpectedTestError) + enqueue_store_result_job("no error", :background, pause: 0.03) + error2 = enqueue_store_result_job("error", :background, exception: ExpectedTestError, pause: 0.05) + 2.times { enqueue_store_result_job("no error", :default, pause: 0.01) } + error3 = enqueue_store_result_job("error", :default, exception: ExpectedTestError) + + wait_for_jobs_to_finish_for(2.second, except: [ error1, error2, error3 ]) + + assert_completed_job_results("no error", :background, 3) + assert_completed_job_results("no error", :default, 4) + + wait_while_with_timeout(1.second) { SolidQueue::FailedExecution.count < 3 } + [ error1, error2, error3 ].each do |job| + assert_job_status(job, :failed) + end + + terminate_process(@pid) + assert_clean_termination + end + + + private + def assert_clean_termination + wait_for_registered_processes 0, timeout: 0.2.second + assert_no_registered_processes + assert_no_claimed_jobs + assert_not process_exists?(@pid) + end + + def assert_registered_workers_for(*queues, supervisor_pid: nil) + workers = find_processes_registered_as("Worker") + registered_queues = workers.map { |process| process.metadata["queues"] }.compact + assert_equal queues.map(&:to_s).sort, registered_queues.sort + if supervisor_pid + assert_equal [ supervisor_pid ], workers.map { |process| process.supervisor.pid }.uniq + end + end + + def assert_registered_supervisor + processes = find_processes_registered_as("Supervisor(async)") + assert_equal 1, processes.count + assert_equal @pid, processes.first.pid + end + + def assert_no_registered_workers + assert_empty find_processes_registered_as("Worker").to_a + end + + def enqueue_store_result_job(value, queue_name = :background, **options) + StoreResultJob.set(queue: queue_name).perform_later(value, **options) + end + + def assert_completed_job_results(value, queue_name = :background, count = 1) + skip_active_record_query_cache do + assert_equal count, JobResult.where(queue_name: queue_name, status: "completed", value: value).count + end + end + + def assert_started_job_result(value, queue_name = :background, count = 1) + skip_active_record_query_cache do + assert_equal count, JobResult.where(queue_name: queue_name, status: "started", value: value).count + end + end + + def assert_job_status(active_job, status) + skip_active_record_query_cache do + job = SolidQueue::Job.find_by(active_job_id: active_job.job_id) + assert job.public_send("#{status}?") + end + end +end diff --git a/test/integration/processes_lifecycle_test.rb b/test/integration/forked_processes_lifecycle_test.rb similarity index 98% rename from test/integration/processes_lifecycle_test.rb rename to test/integration/forked_processes_lifecycle_test.rb index 47d56b4d..561166c5 100644 --- a/test/integration/processes_lifecycle_test.rb +++ b/test/integration/forked_processes_lifecycle_test.rb @@ -2,7 +2,7 @@ require "test_helper" -class ProcessesLifecycleTest < ActiveSupport::TestCase +class ForkedProcessesLifecycleTest < ActiveSupport::TestCase self.use_transactional_tests = false setup do @@ -283,7 +283,7 @@ def assert_registered_workers_for(*queues, supervisor_pid: nil) end def assert_registered_supervisor_with(pid) - processes = find_processes_registered_as("Supervisor") + processes = find_processes_registered_as("Supervisor(fork)") assert_equal 1, processes.count assert_equal pid, processes.first.pid end diff --git a/test/integration/instrumentation_test.rb b/test/integration/instrumentation_test.rb index 4440fe61..7e2a51ca 100644 --- a/test/integration/instrumentation_test.rb +++ b/test/integration/instrumentation_test.rb @@ -166,12 +166,11 @@ class InstrumentationTest < ActiveSupport::TestCase SolidQueue::Process.any_instance.expects(:destroy!).raises(error).at_least_once events = subscribed("deregister_process.solid_queue") do - assert_raises RuntimeError do + assert_raises ExpectedTestError do worker = SolidQueue::Worker.new.tap(&:start) wait_for_registered_processes(1, timeout: 1.second) worker.stop - wait_for_registered_processes(0, timeout: 1.second) end end diff --git a/test/integration/lifecycle_hooks_test.rb b/test/integration/lifecycle_hooks_test.rb index da7feedc..7cd04a82 100644 --- a/test/integration/lifecycle_hooks_test.rb +++ b/test/integration/lifecycle_hooks_test.rb @@ -87,7 +87,7 @@ class LifecycleHooksTest < ActiveSupport::TestCase end assert_equal %w[ - supervisor_start supervisor_stop supervisor_exit + forksupervisor_start forksupervisor_stop forksupervisor_exit worker_first_queue_start worker_first_queue_stop worker_first_queue_exit worker_second_queue_start worker_second_queue_stop worker_second_queue_exit dispatcher_100_start dispatcher_100_stop dispatcher_100_exit diff --git a/test/integration/puma/plugin_async_test.rb b/test/integration/puma/plugin_async_test.rb new file mode 100644 index 00000000..551ebd63 --- /dev/null +++ b/test/integration/puma/plugin_async_test.rb @@ -0,0 +1,13 @@ +# frozen_string_literal: true + +require "test_helper" +require_relative "plugin_testing" + +class PluginAsyncTest < ActiveSupport::TestCase + include PluginTesting + + private + def solid_queue_mode + :async + end +end diff --git a/test/integration/puma/plugin_fork_test.rb b/test/integration/puma/plugin_fork_test.rb new file mode 100644 index 00000000..40f1fd18 --- /dev/null +++ b/test/integration/puma/plugin_fork_test.rb @@ -0,0 +1,30 @@ +# frozen_string_literal: true + +require "test_helper" +require_relative "plugin_testing" + +class PluginForkTest < ActiveSupport::TestCase + include PluginTesting + + test "stop puma when solid queue's supervisor dies" do + supervisor = find_processes_registered_as("Supervisor(fork)").first + + signal_process(supervisor.pid, :KILL) + wait_for_process_termination_with_timeout(@pid) + + assert_not process_exists?(@pid) + + # When the supervisor is KILLed, the forked processes become orphans. + # Clean them up manually. + SolidQueue::Process.all.each do |process| + signal_process(process.pid, :KILL) if process_exists?(process.pid) + end + + wait_for_registered_processes 0, timeout: 3.second + end + + private + def solid_queue_mode + :fork + end +end diff --git a/test/integration/puma/plugin_test.rb b/test/integration/puma/plugin_test.rb deleted file mode 100644 index bac98a2b..00000000 --- a/test/integration/puma/plugin_test.rb +++ /dev/null @@ -1,63 +0,0 @@ -# frozen_string_literal: true - -require "test_helper" - -class PluginTest < ActiveSupport::TestCase - self.use_transactional_tests = false - - setup do - FileUtils.mkdir_p Rails.root.join("tmp", "pids") - Dir.chdir("test/dummy") do - cmd = %W[ - bundle exec puma - -b tcp://127.0.0.1:9222 - -C config/puma.rb - -s - config.ru - ] - @pid = fork do - exec(*cmd) - end - end - wait_for_registered_processes 5, timeout: 3.second - end - - teardown do - terminate_process(@pid, signal: :INT) if process_exists?(@pid) - wait_for_registered_processes 0, timeout: 2.seconds - end - - test "perform jobs inside puma's process" do - StoreResultJob.perform_later(:puma_plugin) - - wait_for_jobs_to_finish_for(2.seconds) - assert_equal 1, JobResult.where(queue_name: :background, status: "completed", value: :puma_plugin).count - end - - test "stop the queue on puma's restart" do - signal_process(@pid, :SIGUSR2) - # Ensure the restart finishes before we try to continue with the test - wait_for_registered_processes(0, timeout: 3.second) - wait_for_registered_processes(5, timeout: 3.second) - - StoreResultJob.perform_later(:puma_plugin) - wait_for_jobs_to_finish_for(2.seconds) - assert_equal 1, JobResult.where(queue_name: :background, status: "completed", value: :puma_plugin).count - end - - test "stop puma when solid queue's supervisor dies" do - supervisor = find_processes_registered_as("Supervisor").first - - signal_process(supervisor.pid, :KILL) - wait_for_process_termination_with_timeout(@pid) - - assert_not process_exists?(@pid) - - # Make sure all supervised processes are also terminated - SolidQueue::Process.all.each do |process| - signal_process(process.pid, :KILL) if process_exists?(process.pid) - end - - wait_for_registered_processes 0, timeout: 3.second - end -end diff --git a/test/integration/puma/plugin_testing.rb b/test/integration/puma/plugin_testing.rb new file mode 100644 index 00000000..ec2198f8 --- /dev/null +++ b/test/integration/puma/plugin_testing.rb @@ -0,0 +1,61 @@ +# frozen_string_literal: true + +require "test_helper" + +module PluginTesting + extend ActiveSupport::Concern + extend ActiveSupport::Testing::Declarative + + included do + self.use_transactional_tests = false + + setup do + FileUtils.mkdir_p Rails.root.join("tmp", "pids") + + Dir.chdir("test/dummy") do + cmd = %W[ + bundle exec puma + -b tcp://127.0.0.1:9222 + -C config/puma_#{solid_queue_mode}.rb + -s + config.ru + ] + + @pid = fork do + exec(*cmd) + end + end + + wait_for_registered_processes(5, timeout: 3.second) + end + + teardown do + terminate_process(@pid, signal: :INT) if process_exists?(@pid) + + wait_for_registered_processes 0, timeout: 2.seconds + end + end + + test "perform jobs inside puma's process" do + StoreResultJob.perform_later(:puma_plugin) + + wait_for_jobs_to_finish_for(2.seconds) + assert_equal 1, JobResult.where(queue_name: :background, status: "completed", value: :puma_plugin).count + end + + test "stop the queue on puma's restart" do + signal_process(@pid, :SIGUSR2) + # Ensure the restart finishes before we try to continue with the test + wait_for_registered_processes(0, timeout: 3.second) + wait_for_registered_processes(5, timeout: 3.second) + + StoreResultJob.perform_later(:puma_plugin) + wait_for_jobs_to_finish_for(2.seconds) + assert_equal 1, JobResult.where(queue_name: :background, status: "completed", value: :puma_plugin).count + end + + private + def solid_queue_mode + raise NotImplementedError + end +end diff --git a/test/models/solid_queue/process_test.rb b/test/models/solid_queue/process_test.rb index 489b2aca..cadd7ee0 100644 --- a/test/models/solid_queue/process_test.rb +++ b/test/models/solid_queue/process_test.rb @@ -35,7 +35,7 @@ class SolidQueue::ProcessTest < ActiveSupport::TestCase end test "prune processes including their supervisor with expired heartbeats and fail claimed executions" do - supervisor = SolidQueue::Process.register(kind: "Supervisor", pid: 42, name: "supervisor-42") + supervisor = SolidQueue::Process.register(kind: "Supervisor(fork)", pid: 42, name: "supervisor-42") process = SolidQueue::Process.register(kind: "Worker", pid: 43, name: "worker-43", supervisor_id: supervisor.id) 3.times { |i| StoreResultJob.set(queue: :new_queue).perform_later(i) } jobs = SolidQueue::Job.last(3) diff --git a/test/test_helper.rb b/test/test_helper.rb index 89e8df9f..15032700 100644 --- a/test/test_helper.rb +++ b/test/test_helper.rb @@ -35,6 +35,8 @@ def destroy_records SolidQueue::Process.destroy_all SolidQueue::Semaphore.delete_all SolidQueue::RecurringTask.delete_all + SolidQueue::ScheduledExecution.delete_all + SolidQueue::ReadyExecution.delete_all JobResult.delete_all end diff --git a/test/unit/async_supervisor_test.rb b/test/unit/async_supervisor_test.rb new file mode 100644 index 00000000..65350c82 --- /dev/null +++ b/test/unit/async_supervisor_test.rb @@ -0,0 +1,115 @@ +require "test_helper" + +class AsyncSupervisorTest < ActiveSupport::TestCase + self.use_transactional_tests = false + + test "start as non-standalone" do + supervisor, thread = run_supervisor_as_thread + wait_for_registered_processes(4) + + assert_registered_processes(kind: "Supervisor(async)") + assert_registered_processes(kind: "Worker", supervisor_id: supervisor.process_id, count: 2) + assert_registered_processes(kind: "Dispatcher", supervisor_id: supervisor.process_id) + + supervisor.stop + thread.join + + assert_no_registered_processes + end + + test "start standalone" do + pid = run_supervisor_as_fork(mode: :async) + wait_for_registered_processes(4) + + assert_registered_processes(kind: "Supervisor(async)") + assert_registered_processes(kind: "Worker", supervisor_pid: pid, count: 2) + assert_registered_processes(kind: "Dispatcher", supervisor_pid: pid) + + terminate_process(pid) + assert_no_registered_processes + end + + test "start as non-standalone with provided configuration" do + supervisor, thread = run_supervisor_as_thread(workers: [], dispatchers: [ { batch_size: 100 } ]) + wait_for_registered_processes(2) # supervisor + dispatcher + + assert_registered_processes(kind: "Supervisor(async)") + assert_registered_processes(kind: "Worker", count: 0) + assert_registered_processes(kind: "Dispatcher", supervisor_id: supervisor.process_id) + + supervisor.stop + thread.join + + assert_no_registered_processes + end + + test "failed orphaned executions as non-standalone" do + simulate_orphaned_executions 3 + + config = { + workers: [ { queues: "background", polling_interval: 10 } ], + dispatchers: [] + } + + supervisor, thread = run_supervisor_as_thread(**config) + wait_for_registered_processes(2) # supervisor + 1 worker + assert_registered_processes(kind: "Supervisor(async)") + + wait_while_with_timeout(1.second) { SolidQueue::ClaimedExecution.count > 0 } + + supervisor.stop + thread.join + + skip_active_record_query_cache do + assert_equal 0, SolidQueue::ClaimedExecution.count + assert_equal 3, SolidQueue::FailedExecution.count + end + end + + test "failed orphaned executions as standalone" do + simulate_orphaned_executions 3 + + config = { + workers: [ { queues: "background", polling_interval: 10 } ], + dispatchers: [] + } + + pid = run_supervisor_as_fork(mode: :async, **config) + wait_for_registered_processes(2) # supervisor + 1 worker + assert_registered_processes(kind: "Supervisor(async)") + + wait_while_with_timeout(1.second) { SolidQueue::ClaimedExecution.count > 0 } + + terminate_process(pid) + + skip_active_record_query_cache do + assert_equal 0, SolidQueue::ClaimedExecution.count + assert_equal 3, SolidQueue::FailedExecution.count + end + end + + private + def run_supervisor_as_thread(**kwargs) + configuration = SolidQueue::Configuration.new(mode: :async, standalone: false, **kwargs) + supervisor = SolidQueue::AsyncSupervisor.new(configuration) + + thread = Thread.new { supervisor.start } + + [ supervisor, thread ] + end + + def simulate_orphaned_executions(count) + count.times { |i| StoreResultJob.set(queue: :new_queue).perform_later(i) } + process = SolidQueue::Process.register(kind: "Worker", pid: 42, name: "worker-123") + + SolidQueue::ReadyExecution.claim("*", count + 1, process.id) + + assert_equal count, SolidQueue::ClaimedExecution.count + assert_equal 0, SolidQueue::ReadyExecution.count + + assert_equal [ process.id ], SolidQueue::ClaimedExecution.last(3).pluck(:process_id).uniq + + # Simulate orphaned executions by just wiping the claiming process + process.delete + end +end diff --git a/test/unit/dispatcher_test.rb b/test/unit/dispatcher_test.rb index 359bb504..531e2d72 100644 --- a/test/unit/dispatcher_test.rb +++ b/test/unit/dispatcher_test.rb @@ -12,6 +12,7 @@ class DispatcherTest < ActiveSupport::TestCase teardown do @dispatcher.stop + wait_for_registered_processes(0, timeout: 2.seconds) end test "dispatcher is registered as process" do @@ -75,55 +76,52 @@ class DispatcherTest < ActiveSupport::TestCase assert_no_registered_processes end - test "run more than one instance of the dispatcher" do - 15.times do - AddToBufferJob.set(wait: 0.5.second).perform_later("I'm scheduled") - end - sleep 0.5.second - assert_equal 15, SolidQueue::ScheduledExecution.count - - another_dispatcher = SolidQueue::Dispatcher.new(polling_interval: 0.1, batch_size: 10) + test "dispatch scheduled executions" do + skip_active_record_query_cache do + 15.times do + AddToBufferJob.set(wait: 0.5.second).perform_later("I'm scheduled") + end + sleep 0.5.second + assert_equal 15, SolidQueue::ScheduledExecution.count - @dispatcher.start - another_dispatcher.start + @dispatcher.start + wait_for_registered_processes(1, timeout: 2.seconds) - wait_while_with_timeout(1.second) { SolidQueue::ScheduledExecution.any? } + wait_while_with_timeout(3.seconds) { SolidQueue::ScheduledExecution.any? } - assert_equal 0, SolidQueue::ScheduledExecution.count - assert_equal 15, SolidQueue::ReadyExecution.count - ensure - another_dispatcher&.stop + assert_equal 0, SolidQueue::ScheduledExecution.count + assert_equal 15, SolidQueue::ReadyExecution.count + end end test "sleeps `0.seconds` between polls if there are ready to dispatch jobs" do - dispatcher = SolidQueue::Dispatcher.new(polling_interval: 10, batch_size: 1) - dispatcher.expects(:interruptible_sleep).with(0.seconds).at_least(3) - dispatcher.expects(:interruptible_sleep).with(dispatcher.polling_interval).at_least_once - dispatcher.expects(:handle_thread_error).never - - 3.times { AddToBufferJob.set(wait: 0.5.second).perform_later("I'm scheduled") } - sleep 0.5.second - assert_equal 3, SolidQueue::ScheduledExecution.count - - dispatcher.start - wait_while_with_timeout(1.second) { SolidQueue::ScheduledExecution.any? } - - assert_equal 0, SolidQueue::ScheduledExecution.count - assert_equal 3, SolidQueue::ReadyExecution.count - ensure - dispatcher.stop + skip_active_record_query_cache do + @dispatcher = SolidQueue::Dispatcher.new(polling_interval: 10, batch_size: 1) + @dispatcher.expects(:interruptible_sleep).with(0.seconds).at_least(3) + @dispatcher.expects(:interruptible_sleep).with(@dispatcher.polling_interval).at_least_once + @dispatcher.expects(:handle_thread_error).never + + 3.times { AddToBufferJob.set(wait: 0.5.second).perform_later("I'm scheduled") } + sleep 0.5.second + assert_equal 3, SolidQueue::ScheduledExecution.count + + @dispatcher.start + wait_for_registered_processes(1, timeout: 2.seconds) + wait_while_with_timeout(3.seconds) { SolidQueue::ScheduledExecution.any? } + + assert_equal 0, SolidQueue::ScheduledExecution.count + assert_equal 3, SolidQueue::ReadyExecution.count + end end test "sleeps `polling_interval` between polls if there are no un-dispatched jobs" do - dispatcher = SolidQueue::Dispatcher.new(polling_interval: 10, batch_size: 1) - dispatcher.expects(:interruptible_sleep).with(0.seconds).never - dispatcher.expects(:interruptible_sleep).with(dispatcher.polling_interval).at_least_once - dispatcher.expects(:handle_thread_error).never + @dispatcher = SolidQueue::Dispatcher.new(polling_interval: 10, batch_size: 1) + @dispatcher.expects(:interruptible_sleep).with(0.seconds).never + @dispatcher.expects(:interruptible_sleep).with(@dispatcher.polling_interval).at_least_once + @dispatcher.expects(:handle_thread_error).never - dispatcher.start + @dispatcher.start wait_while_with_timeout(1.second) { !SolidQueue::ScheduledExecution.exists? } - ensure - dispatcher.stop end private diff --git a/test/unit/supervisor_test.rb b/test/unit/fork_supervisor_test.rb similarity index 97% rename from test/unit/supervisor_test.rb rename to test/unit/fork_supervisor_test.rb index 7a531ad2..2759260c 100644 --- a/test/unit/supervisor_test.rb +++ b/test/unit/fork_supervisor_test.rb @@ -1,6 +1,6 @@ require "test_helper" -class SupervisorTest < ActiveSupport::TestCase +class ForkSupervisorTest < ActiveSupport::TestCase self.use_transactional_tests = false setup do @@ -203,7 +203,7 @@ def termsig = nil end status = DummyStatus.new(worker_process.pid, 1) - supervisor = SolidQueue::Supervisor.allocate + supervisor = SolidQueue::ForkSupervisor.allocate supervisor.send(:handle_claimed_jobs_by, terminated_fork, status) @@ -223,7 +223,7 @@ def assert_registered_dispatcher(supervisor_pid: nil) def assert_registered_supervisor(pid) skip_active_record_query_cache do - processes = find_processes_registered_as("Supervisor") + processes = find_processes_registered_as("Supervisor(fork)") assert_equal 1, processes.count assert_nil processes.first.supervisor assert_equal pid, processes.first.pid diff --git a/test/unit/process_recovery_test.rb b/test/unit/process_recovery_test.rb index 620fbd51..e3eccdcf 100644 --- a/test/unit/process_recovery_test.rb +++ b/test/unit/process_recovery_test.rb @@ -20,7 +20,7 @@ class ProcessRecoveryTest < ActiveSupport::TestCase @pid = run_supervisor_as_fork(workers: [ { queues: "*", polling_interval: 0.1, processes: 1 } ]) wait_for_registered_processes(2, timeout: 1.second) # Supervisor + 1 worker - supervisor_process = SolidQueue::Process.find_by(kind: "Supervisor", pid: @pid) + supervisor_process = SolidQueue::Process.find_by(kind: "Supervisor(fork)", pid: @pid) assert supervisor_process worker_process = SolidQueue::Process.find_by(kind: "Worker") diff --git a/test/unit/worker_test.rb b/test/unit/worker_test.rb index 8db67912..3d692404 100644 --- a/test/unit/worker_test.rb +++ b/test/unit/worker_test.rb @@ -35,6 +35,7 @@ class WorkerTest < ActiveSupport::TestCase worker = SolidQueue::Worker.new(queues: "background", threads: 3, polling_interval: 0.2).tap(&:start) sleep(1) + # stop calls join internally when not supervised, which re-raises the error assert_raises ExpectedTestError do worker.stop end From f5881054ddc7096b18d086d0ec01620d4e1024d4 Mon Sep 17 00:00:00 2001 From: Rosa Gutierrez Date: Tue, 30 Dec 2025 21:08:09 +0100 Subject: [PATCH 3/3] DRY a bit releasing claimed executions by both supervisors --- lib/solid_queue/async_supervisor.rb | 52 ++++++++++------------- lib/solid_queue/fork_supervisor.rb | 18 ++------ lib/solid_queue/supervisor.rb | 2 +- lib/solid_queue/supervisor/maintenance.rb | 11 +++++ test/unit/fork_supervisor_test.rb | 16 +++---- 5 files changed, 43 insertions(+), 56 deletions(-) diff --git a/lib/solid_queue/async_supervisor.rb b/lib/solid_queue/async_supervisor.rb index f1a8aed2..8a7a7f6b 100644 --- a/lib/solid_queue/async_supervisor.rb +++ b/lib/solid_queue/async_supervisor.rb @@ -3,43 +3,35 @@ module SolidQueue class AsyncSupervisor < Supervisor private - def check_and_replace_terminated_processes - terminated_threads = process_instances.select { |thread_id, instance| !instance.alive? } - terminated_threads.each { |thread_id, instance| replace_thread(thread_id, instance) } - end - def replace_thread(thread_id, instance) - SolidQueue.instrument(:replace_thread, supervisor_pid: ::Process.pid) do |payload| - payload[:thread] = instance - handle_claimed_jobs_by(terminated_instance, thread) + def check_and_replace_terminated_processes + terminated_threads = process_instances.select { |thread_id, instance| !instance.alive? } + terminated_threads.each { |thread_id, instance| replace_thread(thread_id, instance) } + end - start_process(configured_processes.delete(thread_id)) - end - end + def replace_thread(thread_id, instance) + SolidQueue.instrument(:replace_thread, supervisor_pid: ::Process.pid) do |payload| + payload[:thread] = instance - def perform_graceful_termination - process_instances.values.each(&:stop) + error = Processes::ThreadTerminatedError.new(terminated_instance.name) + release_claimed_jobs_by(terminated_instance, with_error: error) - Timer.wait_until(SolidQueue.shutdown_timeout, -> { all_processes_terminated? }) + start_process(configured_processes.delete(thread_id)) end + end - def perform_immediate_termination - exit! - end + def perform_graceful_termination + process_instances.values.each(&:stop) - def all_processes_terminated? - process_instances.values.none?(&:alive?) - end + Timer.wait_until(SolidQueue.shutdown_timeout, -> { all_processes_terminated? }) + end - # When a supervised thread terminates unexpectedly, mark all executions - # it had claimed as failed so they can be retried by another worker. - def handle_claimed_jobs_by(terminated_instance, thread) - wrap_in_app_executor do - if registered_process = SolidQueue::Process.find_by(name: terminated_instance.name) - error = Processes::ThreadTerminatedError.new(terminated_instance.name) - registered_process.fail_all_claimed_executions_with(error) - end - end - end + def perform_immediate_termination + exit! + end + + def all_processes_terminated? + process_instances.values.none?(&:alive?) + end end end diff --git a/lib/solid_queue/fork_supervisor.rb b/lib/solid_queue/fork_supervisor.rb index 126b2171..c3c87dbe 100644 --- a/lib/solid_queue/fork_supervisor.rb +++ b/lib/solid_queue/fork_supervisor.rb @@ -39,7 +39,8 @@ def reap_terminated_forks break unless pid if (terminated_fork = process_instances.delete(pid)) && !status.exited? || status.exitstatus > 0 - handle_claimed_jobs_by(terminated_fork, status) + error = Processes::ProcessExitError.new(status) + release_claimed_jobs_by(terminated_fork, with_error: error) end configured_processes.delete(pid) @@ -52,25 +53,14 @@ def replace_fork(pid, status) SolidQueue.instrument(:replace_fork, supervisor_pid: ::Process.pid, pid: pid, status: status) do |payload| if terminated_fork = process_instances.delete(pid) payload[:fork] = terminated_fork - handle_claimed_jobs_by(terminated_fork, status) + error = Processes::ProcessExitError.new(status) + release_claimed_jobs_by(terminated_fork, with_error: error) start_process(configured_processes.delete(pid)) end end end - # When a supervised fork crashes or exits we need to mark all the - # executions it had claimed as failed so that they can be retried - # by some other worker. - def handle_claimed_jobs_by(terminated_fork, status) - wrap_in_app_executor do - if registered_process = SolidQueue::Process.find_by(name: terminated_fork.name) - error = Processes::ProcessExitError.new(status) - registered_process.fail_all_claimed_executions_with(error) - end - end - end - def all_processes_terminated? process_instances.empty? end diff --git a/lib/solid_queue/supervisor.rb b/lib/solid_queue/supervisor.rb index 89696eba..ae17ec95 100644 --- a/lib/solid_queue/supervisor.rb +++ b/lib/solid_queue/supervisor.rb @@ -144,4 +144,4 @@ def sync_std_streams STDOUT.sync = STDERR.sync = true end end -end \ No newline at end of file +end diff --git a/lib/solid_queue/supervisor/maintenance.rb b/lib/solid_queue/supervisor/maintenance.rb index 1b6b5204..d92569d5 100644 --- a/lib/solid_queue/supervisor/maintenance.rb +++ b/lib/solid_queue/supervisor/maintenance.rb @@ -32,5 +32,16 @@ def fail_orphaned_executions ClaimedExecution.orphaned.fail_all_with(Processes::ProcessMissingError.new) end end + + # When a supervised process crashes or exits we need to mark all the + # executions it had claimed as failed so that they can be retried + # by some other worker. + def release_claimed_jobs_by(terminated_process, with_error:) + wrap_in_app_executor do + if registered_process = SolidQueue::Process.find_by(name: terminated_process.name) + registered_process.fail_all_claimed_executions_with(with_error) + end + end + end end end diff --git a/test/unit/fork_supervisor_test.rb b/test/unit/fork_supervisor_test.rb index 2759260c..9ec81b51 100644 --- a/test/unit/fork_supervisor_test.rb +++ b/test/unit/fork_supervisor_test.rb @@ -186,8 +186,8 @@ class ForkSupervisorTest < ActiveSupport::TestCase end # Regression test for supervisor failing to handle claimed jobs when its own - # process record has been pruned (NoMethodError in #handle_claimed_jobs_by). - test "handle_claimed_jobs_by fails claimed executions even if supervisor record is missing" do + # process record has been pruned (NoMethodError in #release_claimed_jobs_by). + test "release_claimed_jobs_by fails claimed executions even if supervisor record is missing" do worker_name = "worker-test-#{SecureRandom.hex(4)}" worker_process = SolidQueue::Process.register(kind: "Worker", pid: 999_999, name: worker_name) @@ -196,20 +196,14 @@ class ForkSupervisorTest < ActiveSupport::TestCase claimed_execution = SolidQueue::ReadyExecution.claim("*", 1, worker_process.id).first terminated_fork = Struct.new(:name).new(worker_name) - - DummyStatus = Struct.new(:pid, :exitstatus) do - def signaled? = false - def termsig = nil - end - status = DummyStatus.new(worker_process.pid, 1) - supervisor = SolidQueue::ForkSupervisor.allocate + error = RuntimeError.new - supervisor.send(:handle_claimed_jobs_by, terminated_fork, status) + supervisor.send(:release_claimed_jobs_by, terminated_fork, with_error: error) failed = SolidQueue::FailedExecution.find_by(job_id: claimed_execution.job_id) assert failed.present? - assert_equal "SolidQueue::Processes::ProcessExitError", failed.exception_class + assert_equal "RuntimeError", failed.exception_class end private