Class: Qless::Workers::BaseWorker

Inherits:
Object
  • Object
show all
Includes:
SupportsMiddlewareModules
Defined in:
lib/qless/worker/base.rb

Direct Known Subclasses

ForkingWorker, SerialWorker

Defined Under Namespace

Modules: SupportsMiddlewareModules

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from SupportsMiddlewareModules

#after_fork, #around_perform

Constructor Details

#initialize(reserver, options = {}) ⇒ BaseWorker

Returns a new instance of BaseWorker.



20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
# File 'lib/qless/worker/base.rb', line 20

def initialize(reserver, options = {})
  # Our job reserver and options
  @reserver = reserver
  @options = options

  # SIGHUP handler
  @sighup_handler = options.fetch(:sighup_handler) { lambda { } }

  # Our logger
  @log = options.fetch(:logger) do
    @output = options.fetch(:output, $stdout)
    Logger.new(output).tap do |logger|
      logger.level = options.fetch(:log_level, Logger::WARN)
      logger.formatter = options.fetch(:log_formatter) do
        Proc.new { |severity, datetime, progname, msg| "#{datetime}: #{msg}\n" }
      end
    end
  end

  # The interval for checking for new jobs
  @interval = options.fetch(:interval, 5.0)
  @current_job_mutex = Mutex.new
  @current_job = nil

  # Default behavior when a lock is lost: stop after the current job.
  on_current_job_lock_lost { shutdown(in_signal_handler=false) }
end

Instance Attribute Details

#intervalObject

Returns the value of attribute interval.



17
18
19
# File 'lib/qless/worker/base.rb', line 17

def interval
  @interval
end

#optionsObject

Returns the value of attribute options.



17
18
19
# File 'lib/qless/worker/base.rb', line 17

def options
  @options
end

#outputObject

Returns the value of attribute output.



17
18
19
# File 'lib/qless/worker/base.rb', line 17

def output
  @output
end

#pausedObject

Returns the value of attribute paused.



17
18
19
# File 'lib/qless/worker/base.rb', line 17

def paused
  @paused
end

#reserverObject

Returns the value of attribute reserver.



17
18
19
# File 'lib/qless/worker/base.rb', line 17

def reserver
  @reserver
end

#sighup_handlerObject

Returns the value of attribute sighup_handler.



17
18
19
# File 'lib/qless/worker/base.rb', line 17

def sighup_handler
  @sighup_handler
end

Instance Method Details

#deregisterObject



188
189
190
191
192
# File 'lib/qless/worker/base.rb', line 188

def deregister
  uniq_clients.each do |client|
    client.deregister_workers(client.worker_name)
  end
end

#fail_job(job, error, worker_backtrace) ⇒ Object



178
179
180
181
182
183
184
185
186
# File 'lib/qless/worker/base.rb', line 178

def fail_job(job, error, worker_backtrace)
  failure = Qless.failure_formatter.format(job, error, worker_backtrace)
  log(:error, "Got #{failure.group} failure from #{job.inspect}\n#{failure.message}" )
  job.fail(*failure)
rescue Job::CantFailError => e
  # There's not much we can do here. Another worker may have cancelled it,
  # or we might not own the job, etc. Logging is the best we can do.
  log(:error, "Failed to fail #{job.inspect}: #{e.message}")
end

#jobsObject

Return an enumerator to each of the jobs provided by the reserver



84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
# File 'lib/qless/worker/base.rb', line 84

def jobs
  return Enumerator.new do |enum|
    loop do
      begin
        job = reserver.reserve
      rescue Exception => error
        # We want workers to durably stay up, so we don't want errors
        # during job reserving (e.g. network timeouts, etc) to kill the
        # worker.
        log(:error,
          "Error reserving job: #{error.class}: #{error.message}")
      end

      # If we ended up getting a job, yield it. Otherwise, we wait
      if job.nil?
        no_job_available
      else
        self.current_job = job
        enum.yield(job)
        self.current_job = nil
      end

      break if @shutdown
    end
  end
end

#listen_for_lost_lockObject



202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
# File 'lib/qless/worker/base.rb', line 202

def listen_for_lost_lock
  subscribers = uniq_clients.map do |client|
    Subscriber.start(client, "ql:w:#{client.worker_name}", log: @log) do |_, message|
      if message['event'] == 'lock_lost'
        with_current_job do |job|
          if job && message['jid'] == job.jid
            @on_current_job_lock_lost.call(job)
          end
        end
      end
    end
  end

  yield
ensure
  subscribers.each(&:stop)
end

#log_levelObject



48
49
50
# File 'lib/qless/worker/base.rb', line 48

def log_level
  @log.level
end

#on_current_job_lock_lost(&block) ⇒ Object



198
199
200
# File 'lib/qless/worker/base.rb', line 198

def on_current_job_lock_lost(&block)
  @on_current_job_lock_lost = block
end

#pause(in_signal_handler = true) ⇒ Object

Pause the worker – take no more new jobs



147
148
149
150
# File 'lib/qless/worker/base.rb', line 147

def pause(in_signal_handler=true)
  @paused = true
  procline("Paused -- #{reserver.description}", in_signal_handler=in_signal_handler)
end

#perform(job) ⇒ Object

Actually perform the job



112
113
114
115
116
117
118
119
120
121
122
123
124
# File 'lib/qless/worker/base.rb', line 112

def perform(job)
  start_time = Time.now.to_f
  around_perform(job)
rescue JobLockLost
  log(:warn, "Lost lock for job #{job.jid}")
rescue Exception => error
  fail_job(job, error, caller)
else
  try_complete(job)
ensure
  elapsed_time = Time.now.to_f - start_time
  log(:info, "Job #{job.description} took #{elapsed_time} seconds")
end

#procline(value, in_signal_handler = true) ⇒ Object

Set the procline. Not supported on all systems



158
159
160
161
# File 'lib/qless/worker/base.rb', line 158

def procline(value, in_signal_handler=true)
  $0 = "Qless-#{Qless::VERSION}: #{value} at #{Time.now.iso8601}"
  log(:debug, $PROGRAM_NAME) unless in_signal_handler
end

#register_signal_handlersObject

The meaning of these signals is meant to closely mirror resque

TERM: Shutdown immediately, stop processing jobs.

INT: Shutdown immediately, stop processing jobs.

QUIT: Shutdown after the current job has finished processing. USR1: Kill the forked children immediately, continue processing jobs. USR2: Pause after this job CONT: Start processing jobs again after a USR2

HUP: Print current stack to log and continue


69
70
71
72
73
74
75
76
77
78
79
80
81
# File 'lib/qless/worker/base.rb', line 69

def register_signal_handlers
  # Otherwise, we want to take the appropriate action
  trap('TERM') { exit! }
  trap('INT')  { exit! }
  safe_trap('HUP') { sighup_handler.call }
  safe_trap('QUIT') { shutdown(in_signal_handler=true) }
  begin
    trap('CONT') { unpause(in_signal_handler=true) }
    trap('USR2') { pause(in_signal_handler=true) }
  rescue ArgumentError
    warn 'Signals USR2, and/or CONT not supported.'
  end
end

#safe_trap(signal_name, &cblock) ⇒ Object



52
53
54
55
56
57
58
# File 'lib/qless/worker/base.rb', line 52

def safe_trap(signal_name, &cblock)
  begin
    trap(signal_name, cblock)
  rescue ArgumentError
    warn "Signal #{signal_name} not supported."
  end
end

#shutdown(in_signal_handler = true) ⇒ Object Also known as: stop!

Stop processing after this job



141
142
143
# File 'lib/qless/worker/base.rb', line 141

def shutdown(in_signal_handler=true)
  @shutdown = true
end

#try_complete(job) ⇒ Object

Complete the job unless the worker has already put it into another state by completing / failing / etc. the job



165
166
167
168
169
170
171
172
173
174
175
176
# File 'lib/qless/worker/base.rb', line 165

def try_complete(job)
  job.complete unless job.state_changed?
rescue Job::CantCompleteError => e
  # There's not much we can do here. Complete fails in a few cases:
  #   - The job is already failed (i.e. by another worker)
  #   - The job is being worked on by another worker
  #   - The job has been cancelled
  #
  # We don't want to (or are able to) fail the job with this error in
  # any of these cases, so the best we can do is log the failure.
  log(:warn, "Failed to complete #{job.inspect}: #{e.message}")
end

#uniq_clientsObject



194
195
196
# File 'lib/qless/worker/base.rb', line 194

def uniq_clients
  @uniq_clients ||= reserver.queues.map(&:client).uniq
end

#unpause(in_signal_handler = true) ⇒ Object

Continue taking new jobs



153
154
155
# File 'lib/qless/worker/base.rb', line 153

def unpause(in_signal_handler=true)
  @paused = false
end