Class: Reqless::Workers::SerialWorker

Inherits:
BaseWorker
  • Object
show all
Defined in:
lib/reqless/worker/serial.rb

Overview

A worker that keeps popping off jobs and processing them

Instance Attribute Summary

Attributes inherited from BaseWorker

#interval, #options, #output, #paused, #reserver, #sighup_handler

Instance Method Summary collapse

Methods inherited from BaseWorker

#deregister, #fail_job, #jobs, #listen_for_lost_lock, #log_level, #on_current_job_lock_lost, #pause, #perform, #procline, #register_signal_handlers, #safe_trap, #shutdown, #try_complete, #uniq_clients, #unpause

Methods included from BaseWorker::SupportsMiddlewareModules

#after_fork, #around_perform

Constructor Details

#initialize(reserver, options = {}) ⇒ SerialWorker

Returns a new instance of SerialWorker.



10
11
12
# File 'lib/reqless/worker/serial.rb', line 10

def initialize(reserver, options = {})
  super(reserver, options)
end

Instance Method Details

#runObject



14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
# File 'lib/reqless/worker/serial.rb', line 14

def run
  log(:info, "Starting #{reserver.description} in #{Process.pid}")
  procline "Starting #{reserver.description}"
  register_signal_handlers

  reserver.prep_for_work!

  procline "Running #{reserver.description}"

  jobs.each do |job|
    # Run the job we're working on
    log(:debug, "Starting job #{job.klass_name} (#{job.jid} from #{job.queue_name})")
    procline "Processing #{job.description}"
    listen_for_lost_lock(job) do
      perform(job)
    end
    log(:debug, "Finished job #{job.klass_name} (#{job.jid} from #{job.queue_name})")

    # So long as we're paused, we should wait
    while paused
      log(:debug, 'Paused...')
      sleep interval
    end
  end
end