Class: Workhorse::Poller
- Inherits:
-
Object
- Object
- Workhorse::Poller
- Defined in:
- lib/workhorse/poller.rb
Overview
Database poller that discovers and locks jobs for execution. Handles job querying, global locking, and job distribution to workers. Supports both MySQL and Oracle databases with database-specific optimizations.
Constant Summary collapse
- MIN_LOCK_TIMEOUT =
In seconds
0.1- MAX_LOCK_TIMEOUT =
In seconds
1.0- ORACLE_LOCK_MODE =
X_MODE (exclusive)
6- ORACLE_LOCK_HANDLE =
Randomly chosen number
478_564_848
Instance Attribute Summary collapse
-
#table ⇒ Arel::Table
readonly
The jobs table for query building.
-
#worker ⇒ Workhorse::Worker
readonly
The worker this poller serves.
Instance Method Summary collapse
-
#initialize(worker, before_poll = proc { true }) ⇒ Poller
constructor
Creates a new poller for the given worker.
-
#instant_repoll! ⇒ void
Interrupts current sleep and performs the next poll immediately.
-
#running? ⇒ Boolean
Checks if the poller is currently running.
-
#shutdown ⇒ void
Shuts down the poller and waits for completion.
-
#start ⇒ void
Starts the poller in a background thread.
-
#wait ⇒ void
Waits for the poller thread to complete.
Constructor Details
#initialize(worker, before_poll = proc { true }) ⇒ Poller
Creates a new poller for the given worker.
26 27 28 29 30 31 32 33 34 35 |
# File 'lib/workhorse/poller.rb', line 26 def initialize(worker, before_poll = proc { true }) @worker = worker @running = false @table = Workhorse::DbJob.arel_table @is_oracle = ActiveRecord::Base.connection.adapter_name == 'OracleEnhanced' @instant_repoll = Concurrent::AtomicBoolean.new(false) @global_lock_fails = 0 @max_global_lock_fails_reached = false @before_poll = before_poll end |
Instance Attribute Details
#table ⇒ Arel::Table (readonly)
Returns The jobs table for query building.
20 21 22 |
# File 'lib/workhorse/poller.rb', line 20 def table @table end |
#worker ⇒ Workhorse::Worker (readonly)
Returns The worker this poller serves.
17 18 19 |
# File 'lib/workhorse/poller.rb', line 17 def worker @worker end |
Instance Method Details
#instant_repoll! ⇒ void
This method returns an undefined value.
Interrupts current sleep and performs the next poll immediately. After the poll, resumes normal polling interval.
100 101 102 103 |
# File 'lib/workhorse/poller.rb', line 100 def instant_repoll! worker.log 'Aborting next sleep to perform instant repoll', :debug @instant_repoll.make_true end |
#running? ⇒ Boolean
Checks if the poller is currently running.
40 41 42 |
# File 'lib/workhorse/poller.rb', line 40 def running? @running end |
#shutdown ⇒ void
This method returns an undefined value.
Shuts down the poller and waits for completion.
83 84 85 86 87 |
# File 'lib/workhorse/poller.rb', line 83 def shutdown fail 'Poller is not running.' unless running? @running = false wait end |
#start ⇒ void
This method returns an undefined value.
Starts the poller in a background thread.
48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 |
# File 'lib/workhorse/poller.rb', line 48 def start fail 'Poller is already running.' if running? @running = true clean_stuck_jobs! if Workhorse.clean_stuck_jobs @thread = Thread.new do loop do break unless running? begin unless @before_poll.call Thread.new { worker.shutdown } sleep next end poll sleep rescue Exception => e worker.log %(Poll encountered exception:\n#{e.}\n#{e.backtrace.join("\n")}) worker.log 'Worker shutting down...' Workhorse.on_exception.call(e) unless Workhorse.silence_poller_exceptions @running = false worker.instance_variable_get(:@pool).shutdown break end end end end |
#wait ⇒ void
This method returns an undefined value.
Waits for the poller thread to complete.
92 93 94 |
# File 'lib/workhorse/poller.rb', line 92 def wait @thread.join end |