Class: Rjob::WorkerProcess
- Inherits:
-
Object
- Object
- Rjob::WorkerProcess
- Defined in:
- lib/rjob/worker_process.rb
Overview
TODO: find a mechanism to recover from jobs that went to working but never returned
Constant Summary collapse
- ITERATION_TIMEOUT =
2- HEARTBEAT_TIMEOUT =
15- StopSubscription =
Class.new(StandardError)
Instance Attribute Summary collapse
-
#context ⇒ Object
readonly
Returns the value of attribute context.
-
#leader ⇒ Object
readonly
Returns the value of attribute leader.
-
#state ⇒ Object
readonly
Returns the value of attribute state.
-
#worker_name ⇒ Object
readonly
Returns the value of attribute worker_name.
Instance Method Summary collapse
-
#initialize(context) ⇒ WorkerProcess
constructor
A new instance of WorkerProcess.
- #run_forever ⇒ Object
Constructor Details
#initialize(context) ⇒ WorkerProcess
Returns a new instance of WorkerProcess.
16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 |
# File 'lib/rjob/worker_process.rb', line 16 def initialize(context) @context = context @prefix = @context.prefix @pubsub_redis = @context.create_redis_connection init_worker_name @iteration_no = 0 @max_queue_size = 20 max_threads = @context.config.fetch(:max_threads, 2) @subscription_thread = nil @thread_pool = Concurrent::ThreadPoolExecutor.new( min_threads: [2, max_threads].min, max_threads: max_threads, max_queue: @max_queue_size, fallback_policy: :abort # Concurrent::RejectedExecutionError ) @processed_count = Concurrent::AtomicFixnum.new @failed_count = Concurrent::AtomicFixnum.new @returned_count = Concurrent::AtomicFixnum.new @leader = nil @state = :new end |
Instance Attribute Details
#context ⇒ Object (readonly)
Returns the value of attribute context.
11 12 13 |
# File 'lib/rjob/worker_process.rb', line 11 def context @context end |
#leader ⇒ Object (readonly)
Returns the value of attribute leader.
14 15 16 |
# File 'lib/rjob/worker_process.rb', line 14 def leader @leader end |
#state ⇒ Object (readonly)
Returns the value of attribute state.
13 14 15 |
# File 'lib/rjob/worker_process.rb', line 13 def state @state end |
#worker_name ⇒ Object (readonly)
Returns the value of attribute worker_name.
12 13 14 |
# File 'lib/rjob/worker_process.rb', line 12 def worker_name @worker_name end |
Instance Method Details
#run_forever ⇒ Object
43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 |
# File 'lib/rjob/worker_process.rb', line 43 def run_forever register_worker Signal.trap("INT") do if @state == :exiting puts "Force exit requested. Exiting immediately" exit 1 else @state = :exiting puts "Exiting..." end end @state = :running loop do break if @state == :exited run_iteration end ensure unregister_worker end |