Class: Workhorse::Poller

Inherits:
Object
  • Object
show all
Defined in:
lib/workhorse/poller.rb

Overview

Database poller that discovers and locks jobs for execution. Handles job querying, global locking, and job distribution to workers. Supports both MySQL and Oracle databases with database-specific optimizations.

Examples:

Basic usage (typically used internally)

poller = Workhorse::Poller.new(worker, proc { true })
poller.start

Constant Summary collapse

MIN_LOCK_TIMEOUT =

In seconds

0.1
MAX_LOCK_TIMEOUT =

In seconds

1.0
ORACLE_LOCK_MODE =

X_MODE (exclusive)

6
ORACLE_LOCK_HANDLE =

Randomly chosen number

478_564_848

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(worker, before_poll = proc { true }) ⇒ Poller

Creates a new poller for the given worker.

Parameters:

  • worker (Workhorse::Worker)

    The worker to serve

  • before_poll (Proc) (defaults to: proc { true })

    Callback executed before each poll (should return boolean)



26
27
28
29
30
31
32
33
34
35
# File 'lib/workhorse/poller.rb', line 26

def initialize(worker, before_poll = proc { true })
  @worker = worker
  @running = false
  @table = Workhorse::DbJob.arel_table
  @is_oracle = ActiveRecord::Base.connection.adapter_name == 'OracleEnhanced'
  @instant_repoll = Concurrent::AtomicBoolean.new(false)
  @global_lock_fails = 0
  @max_global_lock_fails_reached = false
  @before_poll = before_poll
end

Instance Attribute Details

#tableArel::Table (readonly)

Returns The jobs table for query building.

Returns:

  • (Arel::Table)

    The jobs table for query building



20
21
22
# File 'lib/workhorse/poller.rb', line 20

def table
  @table
end

#workerWorkhorse::Worker (readonly)

Returns The worker this poller serves.

Returns:



17
18
19
# File 'lib/workhorse/poller.rb', line 17

def worker
  @worker
end

Instance Method Details

#instant_repoll!void

This method returns an undefined value.

Interrupts current sleep and performs the next poll immediately. After the poll, resumes normal polling interval.



100
101
102
103
# File 'lib/workhorse/poller.rb', line 100

def instant_repoll!
  worker.log 'Aborting next sleep to perform instant repoll', :debug
  @instant_repoll.make_true
end

#running?Boolean

Checks if the poller is currently running.

Returns:

  • (Boolean)

    True if poller is running



40
41
42
# File 'lib/workhorse/poller.rb', line 40

def running?
  @running
end

#shutdownvoid

This method returns an undefined value.

Shuts down the poller and waits for completion.

Raises:

  • (RuntimeError)

    If poller is not running



83
84
85
86
87
# File 'lib/workhorse/poller.rb', line 83

def shutdown
  fail 'Poller is not running.' unless running?
  @running = false
  wait
end

#startvoid

This method returns an undefined value.

Starts the poller in a background thread.

Raises:

  • (RuntimeError)

    If poller is already running



48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
# File 'lib/workhorse/poller.rb', line 48

def start
  fail 'Poller is already running.' if running?
  @running = true

  clean_stuck_jobs! if Workhorse.clean_stuck_jobs

  @thread = Thread.new do
    loop do
      break unless running?

      begin
        unless @before_poll.call
          Thread.new { worker.shutdown }
          sleep
          next
        end

        poll
        sleep
      rescue Exception => e
        worker.log %(Poll encountered exception:\n#{e.message}\n#{e.backtrace.join("\n")})
        worker.log 'Worker shutting down...'
        Workhorse.on_exception.call(e) unless Workhorse.silence_poller_exceptions
        @running = false
        worker.instance_variable_get(:@pool).shutdown
        break
      end
    end
  end
end

#waitvoid

This method returns an undefined value.

Waits for the poller thread to complete.



92
93
94
# File 'lib/workhorse/poller.rb', line 92

def wait
  @thread.join
end