Class: Qless::Workers::BaseWorker

Inherits:
Object
  • Object
show all
Includes:
SupportsMiddlewareModules
Defined in:
lib/qless/worker/base.rb

Direct Known Subclasses

ForkingWorker, SerialWorker

Defined Under Namespace

Modules: SupportsMiddlewareModules

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from SupportsMiddlewareModules

#after_fork, #around_perform

Constructor Details

#initialize(reserver, options = {}) ⇒ BaseWorker

Returns a new instance of BaseWorker.



20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
# File 'lib/qless/worker/base.rb', line 20

def initialize(reserver, options = {})
  # Our job reserver and options
  @reserver = reserver
  @options = options

  # SIGHUP handler
  @sighup_handler = options.fetch(:sighup_handler) { lambda { } }

  # Our logger
  @log = options.fetch(:logger) do
    @output = options.fetch(:output, $stdout)
    Logger.new(output).tap do |logger|
      logger.level = options.fetch(:log_level, Logger::WARN)
      logger.formatter = options.fetch(:log_formatter) do
        Proc.new { |severity, datetime, progname, msg| "#{datetime}: #{msg}\n" }
      end
    end
  end

  # The interval for checking for new jobs
  @interval = options.fetch(:interval, 5.0)
  @current_job_mutex = Mutex.new
  @current_job = nil

  # Default behavior when a lock is lost: stop after the current job.
  on_current_job_lock_lost { shutdown(in_signal_handler=false) }
end

Instance Attribute Details

#intervalObject

Returns the value of attribute interval.



17
18
19
# File 'lib/qless/worker/base.rb', line 17

def interval
  @interval
end

#optionsObject

Returns the value of attribute options.



17
18
19
# File 'lib/qless/worker/base.rb', line 17

def options
  @options
end

#outputObject

Returns the value of attribute output.



17
18
19
# File 'lib/qless/worker/base.rb', line 17

def output
  @output
end

#pausedObject

Returns the value of attribute paused.



17
18
19
# File 'lib/qless/worker/base.rb', line 17

def paused
  @paused
end

#reserverObject

Returns the value of attribute reserver.



17
18
19
# File 'lib/qless/worker/base.rb', line 17

def reserver
  @reserver
end

#sighup_handlerObject

Returns the value of attribute sighup_handler.



17
18
19
# File 'lib/qless/worker/base.rb', line 17

def sighup_handler
  @sighup_handler
end

Instance Method Details

#deregisterObject



184
185
186
187
188
# File 'lib/qless/worker/base.rb', line 184

def deregister
  uniq_clients.each do |client|
    client.deregister_workers(client.worker_name)
  end
end

#fail_job(job, error, worker_backtrace) ⇒ Object



174
175
176
177
178
179
180
181
182
# File 'lib/qless/worker/base.rb', line 174

def fail_job(job, error, worker_backtrace)
  failure = Qless.failure_formatter.format(job, error, worker_backtrace)
  log(:error, "Got #{failure.group} failure from #{job.inspect}\n#{failure.message}" )
  job.fail(*failure)
rescue Job::CantFailError => e
  # There's not much we can do here. Another worker may have cancelled it,
  # or we might not own the job, etc. Logging is the best we can do.
  log(:error, "Failed to fail #{job.inspect}: #{e.message}")
end

#jobsObject

Return an enumerator to each of the jobs provided by the reserver



84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
# File 'lib/qless/worker/base.rb', line 84

def jobs
  return Enumerator.new do |enum|
    loop do
      begin
        job = reserver.reserve
      rescue Exception => error
        # We want workers to durably stay up, so we don't want errors
        # during job reserving (e.g. network timeouts, etc) to kill the
        # worker.
        log(:error,
          "Error reserving job: #{error.class}: #{error.message}")
      end

      # If we ended up getting a job, yield it. Otherwise, we wait
      if job.nil?
        no_job_available
      else
        self.current_job = job
        enum.yield(job)
        self.current_job = nil
      end

      break if @shutdown
    end
  end
end

#listen_for_lost_lockObject



198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
# File 'lib/qless/worker/base.rb', line 198

def listen_for_lost_lock
  subscribers = uniq_clients.map do |client|
    Subscriber.start(client, "ql:w:#{client.worker_name}", log: @log) do |_, message|
      if message['event'] == 'lock_lost'
        with_current_job do |job|
          if job && message['jid'] == job.jid
            @on_current_job_lock_lost.call(job)
          end
        end
      end
    end
  end

  yield
ensure
  subscribers.each(&:stop)
end

#log_levelObject



48
49
50
# File 'lib/qless/worker/base.rb', line 48

def log_level
  @log.level
end

#on_current_job_lock_lost(&block) ⇒ Object



194
195
196
# File 'lib/qless/worker/base.rb', line 194

def on_current_job_lock_lost(&block)
  @on_current_job_lock_lost = block
end

#pause(in_signal_handler = true) ⇒ Object

Pause the worker – take no more new jobs



143
144
145
146
# File 'lib/qless/worker/base.rb', line 143

def pause(in_signal_handler=true)
  @paused = true
  procline("Paused -- #{reserver.description}", in_signal_handler=in_signal_handler)
end

#perform(job) ⇒ Object

Actually perform the job



112
113
114
115
116
117
118
119
120
# File 'lib/qless/worker/base.rb', line 112

def perform(job)
  around_perform(job)
rescue JobLockLost
  log(:warn, "Lost lock for job #{job.jid}")
rescue Exception => error
  fail_job(job, error, caller)
else
  try_complete(job)
end

#procline(value, in_signal_handler = true) ⇒ Object

Set the proceline. Not supported on all systems



154
155
156
157
# File 'lib/qless/worker/base.rb', line 154

def procline(value, in_signal_handler=true)
  $0 = "Qless-#{Qless::VERSION}: #{value} at #{Time.now.iso8601}"
  log(:debug, $PROGRAM_NAME) unless in_signal_handler
end

#register_signal_handlersObject

The meaning of these signals is meant to closely mirror resque

TERM: Shutdown immediately, stop processing jobs.

INT: Shutdown immediately, stop processing jobs.

QUIT: Shutdown after the current job has finished processing. USR1: Kill the forked children immediately, continue processing jobs. USR2: Pause after this job CONT: Start processing jobs again after a USR2

HUP: Print current stack to log and continue


69
70
71
72
73
74
75
76
77
78
79
80
81
# File 'lib/qless/worker/base.rb', line 69

def register_signal_handlers
  # Otherwise, we want to take the appropriate action
  trap('TERM') { exit! }
  trap('INT')  { exit! }
  safe_trap('HUP') { sighup_handler.call }
  safe_trap('QUIT') { shutdown(in_signal_handler=true) }
  begin
    trap('CONT') { unpause(in_signal_handler=true) }
    trap('USR2') { pause(in_signal_handler=true) }
  rescue ArgumentError
    warn 'Signals USR2, and/or CONT not supported.'
  end
end

#safe_trap(signal_name, &cblock) ⇒ Object



52
53
54
55
56
57
58
# File 'lib/qless/worker/base.rb', line 52

def safe_trap(signal_name, &cblock)
  begin
    trap(signal_name, cblock)
  rescue ArgumentError
    warn "Signal #{signal_name} not supported."
  end
end

#shutdown(in_signal_handler = true) ⇒ Object Also known as: stop!

Stop processing after this job



137
138
139
# File 'lib/qless/worker/base.rb', line 137

def shutdown(in_signal_handler=true)
  @shutdown = true
end

#try_complete(job) ⇒ Object

Complete the job unless the worker has already put it into another state by completing / failing / etc. the job



161
162
163
164
165
166
167
168
169
170
171
172
# File 'lib/qless/worker/base.rb', line 161

def try_complete(job)
  job.complete unless job.state_changed?
rescue Job::CantCompleteError => e
  # There's not much we can do here. Complete fails in a few cases:
  #   - The job is already failed (i.e. by another worker)
  #   - The job is being worked on by another worker
  #   - The job has been cancelled
  #
  # We don't want to (or are able to) fail the job with this error in
  # any of these cases, so the best we can do is log the failure.
  log(:error, "Failed to complete #{job.inspect}: #{e.message}")
end

#uniq_clientsObject



190
191
192
# File 'lib/qless/worker/base.rb', line 190

def uniq_clients
  @uniq_clients ||= reserver.queues.map(&:client).uniq
end

#unpause(in_signal_handler = true) ⇒ Object

Continue taking new jobs



149
150
151
# File 'lib/qless/worker/base.rb', line 149

def unpause(in_signal_handler=true)
  @paused = false
end