Class: Gouda::Worker

Inherits:
Object
  • Object
show all
Defined in:
lib/gouda/worker.rb

Overview

Worker class that supports both threaded and hybrid (threads + fibers) execution modes

Class Method Summary collapse

Class Method Details

.worker_loop(n_threads:, check_shutdown: TrapShutdownCheck.new, queue_constraint: Gouda::AnyQueue, use_fibers: false, fibers_per_thread: 1) ⇒ Object

Start looping, taking work from the queue and performing it, over multiple worker threads. Once the ‘check_shutdown` callable returns `true` the threads will cleanly terminate and the method will return (so it is blocking).

Parameters:

  • n_threads (Integer)

    how many worker threads to start. Another thread will be started for housekeeping, so ideally this should be the size of your connection pool minus 1

  • check_shutdown (#call) (defaults to: TrapShutdownCheck.new)

    A callable object (can be a Proc etc.). Once starts returning ‘true` the worker threads and the housekeeping thread will cleanly exit

  • use_fibers (Boolean) (defaults to: false)

    whether to use fibers within each thread for higher concurrency

  • fibers_per_thread (Integer) (defaults to: 1)

    how many fibers to run per thread (only used if use_fibers is true)



122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
# File 'lib/gouda/worker.rb', line 122

def self.worker_loop(n_threads:, check_shutdown: TrapShutdownCheck.new, queue_constraint: Gouda::AnyQueue, use_fibers: false, fibers_per_thread: 1)
  check_shutdown = CombinedShutdownCheck.new(*check_shutdown) if !check_shutdown.respond_to?(:call) && check_shutdown.is_a?(Array)

  log_worker_configuration(n_threads, use_fibers, fibers_per_thread)
  setup_fiber_environment if use_fibers

  worker_id = generate_worker_id
  executing_workload_ids = ThreadSafeSet.new

  raise ArgumentError, "You need at least 1 worker thread, but you requested #{n_threads}" if n_threads < 1

  worker_threads = if use_fibers
    create_hybrid_worker_threads(n_threads, fibers_per_thread, worker_id, queue_constraint, executing_workload_ids, check_shutdown)
  else
    create_threaded_worker_threads(n_threads, worker_id, queue_constraint, executing_workload_ids, check_shutdown)
  end

  run_housekeeping_loop(executing_workload_ids, check_shutdown)
ensure
  worker_threads&.map(&:join)
end