Class: Reqless::Workers::BaseWorker

Inherits:
Object
  • Object
show all
Includes:
SupportsMiddlewareModules
Defined in:
lib/reqless/worker/base.rb

Direct Known Subclasses

ForkingWorker, SerialWorker

Defined Under Namespace

Modules: SupportsMiddlewareModules

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from SupportsMiddlewareModules

#after_fork, #around_perform

Constructor Details

#initialize(reserver, options = {}) ⇒ BaseWorker

Returns a new instance of BaseWorker.



19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
# File 'lib/reqless/worker/base.rb', line 19

def initialize(reserver, options = {})
  # Our job reserver and options
  @reserver = reserver
  @options = options

  # SIGHUP handler
  @sighup_handler = options.fetch(:sighup_handler) { lambda { } }

  # Our logger
  @log = options.fetch(:logger) do
    @output = options.fetch(:output, $stdout)
    Logger.new(output).tap do |logger|
      logger.level = options.fetch(:log_level, Logger::WARN)
      logger.formatter = options.fetch(:log_formatter) do
        Proc.new { |severity, datetime, progname, msg| "#{datetime}: #{msg}\n" }
      end
    end
  end

  # The interval for checking for new jobs
  @interval = options.fetch(:interval, 5.0)
  @current_job_mutex = Mutex.new
  @current_job = nil

  # Default behavior when a lock is lost: stop after the current job.
  on_current_job_lock_lost { shutdown }
end

Instance Attribute Details

#intervalObject

Returns the value of attribute interval.



16
17
18
# File 'lib/reqless/worker/base.rb', line 16

def interval
  @interval
end

#optionsObject

Returns the value of attribute options.



16
17
18
# File 'lib/reqless/worker/base.rb', line 16

def options
  @options
end

#outputObject

Returns the value of attribute output.



16
17
18
# File 'lib/reqless/worker/base.rb', line 16

def output
  @output
end

#pausedObject

Returns the value of attribute paused.



16
17
18
# File 'lib/reqless/worker/base.rb', line 16

def paused
  @paused
end

#reserverObject

Returns the value of attribute reserver.



16
17
18
# File 'lib/reqless/worker/base.rb', line 16

def reserver
  @reserver
end

#sighup_handlerObject

Returns the value of attribute sighup_handler.



16
17
18
# File 'lib/reqless/worker/base.rb', line 16

def sighup_handler
  @sighup_handler
end

Instance Method Details

#deregisterObject



183
184
185
186
187
# File 'lib/reqless/worker/base.rb', line 183

def deregister
  uniq_clients.each do |client|
    client.deregister_workers(client.worker_name)
  end
end

#fail_job(job, error, worker_backtrace) ⇒ Object



173
174
175
176
177
178
179
180
181
# File 'lib/reqless/worker/base.rb', line 173

def fail_job(job, error, worker_backtrace)
  failure = Reqless.failure_formatter.format(job, error, worker_backtrace)
  log(:error, "Got #{failure.group} failure from #{job.inspect}\n#{failure.message}" )
  job.fail(*failure)
rescue Job::CantFailError => e
  # There's not much we can do here. Another worker may have cancelled it,
  # or we might not own the job, etc. Logging is the best we can do.
  log(:error, "Failed to fail #{job.inspect}: #{e.message}")
end

#jobsObject

Return an enumerator to each of the jobs provided by the reserver



83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
# File 'lib/reqless/worker/base.rb', line 83

def jobs
  return Enumerator.new do |enum|
    loop do
      begin
        job = reserver.reserve
      rescue Exception => error
        # We want workers to durably stay up, so we don't want errors
        # during job reserving (e.g. network timeouts, etc) to kill the
        # worker.
        log(:error,
          "Error reserving job: #{error.class}: #{error.message}")
      end

      # If we ended up getting a job, yield it. Otherwise, we wait
      if job.nil?
        no_job_available
      else
        self.current_job = job
        enum.yield(job)
        self.current_job = nil
      end

      break if @shutdown
    end
  end
end

#listen_for_lost_lock(job) ⇒ Object



197
198
199
200
201
202
203
204
205
206
207
208
# File 'lib/reqless/worker/base.rb', line 197

def listen_for_lost_lock(job)
  # Ensure subscribers always has a value
  subscriber = Subscriber.start(job.client, "ql:w:#{job.client.worker_name}", log: @log) do |_, message|
    if message['event'] == 'lock_lost' && message['jid'] == job.jid
      @on_current_job_lock_lost.call(job)
    end
  end

  yield
ensure
  subscriber && subscriber.stop
end

#log_levelObject



47
48
49
# File 'lib/reqless/worker/base.rb', line 47

def log_level
  @log.level
end

#on_current_job_lock_lost(&block) ⇒ Object



193
194
195
# File 'lib/reqless/worker/base.rb', line 193

def on_current_job_lock_lost(&block)
  @on_current_job_lock_lost = block
end

#pauseObject

Pause the worker – take no more new jobs



142
143
144
145
# File 'lib/reqless/worker/base.rb', line 142

def pause
  @paused = true
  procline "Paused -- #{reserver.description}"
end

#perform(job) ⇒ Object

Actually perform the job



111
112
113
114
115
116
117
118
119
# File 'lib/reqless/worker/base.rb', line 111

def perform(job)
  around_perform(job)
rescue JobLockLost
  log(:warn, "Lost lock for job #{job.jid}")
rescue Exception => error
  fail_job(job, error, caller)
else
  try_complete(job)
end

#procline(value) ⇒ Object

Set the proceline. Not supported on all systems



153
154
155
156
# File 'lib/reqless/worker/base.rb', line 153

def procline(value)
  $0 = "reQless-#{Reqless::VERSION}: #{value} at #{Time.now.iso8601}"
  log(:debug, $PROGRAM_NAME)
end

#register_signal_handlersObject

The meaning of these signals is meant to closely mirror resque

TERM: Shutdown immediately, stop processing jobs.

INT: Shutdown immediately, stop processing jobs.

QUIT: Shutdown after the current job has finished processing. USR1: Kill the forked children immediately, continue processing jobs. USR2: Pause after this job CONT: Start processing jobs again after a USR2

HUP: Print current stack to log and continue


68
69
70
71
72
73
74
75
76
77
78
79
80
# File 'lib/reqless/worker/base.rb', line 68

def register_signal_handlers
  # Otherwise, we want to take the appropriate action
  trap('TERM') { exit! }
  trap('INT')  { exit! }
  safe_trap('HUP') { sighup_handler.call }
  safe_trap('QUIT') { shutdown }
  begin
    trap('CONT') { unpause }
    trap('USR2') { pause }
  rescue ArgumentError
    warn 'Signals USR2, and/or CONT not supported.'
  end
end

#safe_trap(signal_name, &cblock) ⇒ Object



51
52
53
54
55
56
57
# File 'lib/reqless/worker/base.rb', line 51

def safe_trap(signal_name, &cblock)
  begin
    trap(signal_name, cblock)
  rescue ArgumentError
    warn "Signal #{signal_name} not supported."
  end
end

#shutdownObject Also known as: stop!

Stop processing after this job



136
137
138
# File 'lib/reqless/worker/base.rb', line 136

def shutdown
  @shutdown = true
end

#try_complete(job) ⇒ Object

Complete the job unless the worker has already put it into another state by completing / failing / etc. the job



160
161
162
163
164
165
166
167
168
169
170
171
# File 'lib/reqless/worker/base.rb', line 160

def try_complete(job)
  job.complete unless job.state_changed?
rescue Job::CantCompleteError => e
  # There's not much we can do here. Complete fails in a few cases:
  #   - The job is already failed (i.e. by another worker)
  #   - The job is being worked on by another worker
  #   - The job has been cancelled
  #
  # We don't want to (or are able to) fail the job with this error in
  # any of these cases, so the best we can do is log the failure.
  log(:error, "Failed to complete #{job.inspect}: #{e.message}")
end

#uniq_clientsObject



189
190
191
# File 'lib/reqless/worker/base.rb', line 189

def uniq_clients
  @uniq_clients ||= reserver.queues.map(&:client).uniq
end

#unpauseObject

Continue taking new jobs



148
149
150
# File 'lib/reqless/worker/base.rb', line 148

def unpause
  @paused = false
end