Class: Workhorse::Worker

Inherits:
Object
  • Object
show all
Defined in:
lib/workhorse/worker.rb

Constant Summary collapse

LOG_LEVELS =
i[fatal error warn info debug].freeze
SHUTDOWN_SIGNALS =
%w[TERM INT].freeze

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(queues: [], pool_size: nil, polling_interval: 300, auto_terminate: true, quiet: true, logger: nil) ⇒ Worker

Instantiates a new worker. The worker is not automatically started.



40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
# File 'lib/workhorse/worker.rb', line 40

def initialize(queues: [], pool_size: nil, polling_interval: 300, auto_terminate: true, quiet: true, logger: nil)
  @queues = queues
  @pool_size = pool_size || queues.size + 1
  @polling_interval = polling_interval
  @auto_terminate = auto_terminate
  @state = :initialized
  @quiet = quiet

  @mutex = Mutex.new
  @pool = Pool.new(@pool_size)
  @poller = Workhorse::Poller.new(self)
  @logger = logger

  unless (@polling_interval / 0.1).round(2).modulo(1) == 0.0
    fail 'Polling interval must be a multiple of 0.1.'
  end

  check_rails_env if defined?(Rails)
end

Instance Attribute Details

#loggerObject (readonly)

Returns the value of attribute logger.



11
12
13
# File 'lib/workhorse/worker.rb', line 11

def logger
  @logger
end

#mutexObject (readonly)

Returns the value of attribute mutex.



10
11
12
# File 'lib/workhorse/worker.rb', line 10

def mutex
  @mutex
end

#polling_intervalObject (readonly)

Returns the value of attribute polling_interval.



9
10
11
# File 'lib/workhorse/worker.rb', line 9

def polling_interval
  @polling_interval
end

#pool_sizeObject (readonly)

Returns the value of attribute pool_size.



8
9
10
# File 'lib/workhorse/worker.rb', line 8

def pool_size
  @pool_size
end

#queuesObject (readonly)

Returns the value of attribute queues.



6
7
8
# File 'lib/workhorse/worker.rb', line 6

def queues
  @queues
end

#stateObject (readonly)

Returns the value of attribute state.



7
8
9
# File 'lib/workhorse/worker.rb', line 7

def state
  @state
end

Class Method Details

.start_and_wait(*args) ⇒ Object

Instantiates and starts a new worker with the given arguments and then waits for its completion (i.e. an interrupt).



15
16
17
18
19
# File 'lib/workhorse/worker.rb', line 15

def self.start_and_wait(*args)
  worker = new(*args)
  worker.start
  worker.wait
end

Instance Method Details

#assert_state!(state) ⇒ Object



86
87
88
# File 'lib/workhorse/worker.rb', line 86

def assert_state!(state)
  fail "Expected worker to be in state #{state} but current state is #{self.state}." unless self.state == state
end

#idObject



68
69
70
# File 'lib/workhorse/worker.rb', line 68

def id
  @id ||= "#{Socket.gethostname}.#{Process.pid}.#{SecureRandom.hex(3)}"
end

#idleObject



119
120
121
# File 'lib/workhorse/worker.rb', line 119

def idle
  @pool.idle
end

#log(text, level = :info) ⇒ Object



60
61
62
63
64
65
66
# File 'lib/workhorse/worker.rb', line 60

def log(text, level = :info)
  text = "[Job worker #{id}] #{text}"
  puts text unless @quiet
  return unless logger
  fail "Log level #{level} is not available. Available are #{LOG_LEVELS.inspect}." unless LOG_LEVELS.include?(level)
  logger.send(level, text.strip)
end

#perform(db_job) ⇒ Object



123
124
125
126
127
128
129
130
131
132
133
134
135
136
# File 'lib/workhorse/worker.rb', line 123

def perform(db_job)
  mutex.synchronize do
    assert_state! :running
    log "Posting job #{db_job.id} to thread pool"

    @pool.post do
      begin
        Workhorse::Performer.new(db_job, self).perform
      rescue => e
        log %(#{e.message}\n#{e.backtrace.join("\n")}), :error
      end
    end
  end
end

#shutdownObject

Shuts down worker and DB poller. Jobs currently beeing processed are properly finished before this method returns. Subsequent calls to this method are ignored.



93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
# File 'lib/workhorse/worker.rb', line 93

def shutdown
  # This is safe to be checked outside of the mutex as 'shutdown' is the
  # final state this worker can be in.
  return if @state == :shutdown

  mutex.synchronize do
    assert_state! :running

    log 'Shutting down'
    @state = :shutdown

    @poller.shutdown
    @pool.shutdown
    log 'Shut down'
  end
end

#startObject

Starts the worker. This call is not blocking - call #wait for this purpose.



74
75
76
77
78
79
80
81
82
83
84
# File 'lib/workhorse/worker.rb', line 74

def start
  mutex.synchronize do
    assert_state! :initialized
    log 'Starting up'
    @state = :running
    @poller.start
    log 'Started up'

    trap_termination if @auto_terminate
  end
end

#waitObject

Waits until the worker is shut down. This only happens if shutdown gets called - either by another thread or by enabling ‘auto_terminate` and receiving a respective signal. Use this method to let worker run undefinitely.



114
115
116
117
# File 'lib/workhorse/worker.rb', line 114

def wait
  @poller.wait
  @pool.wait
end