Class: Qless::Worker

Inherits:
Object
  • Object
show all
Defined in:
lib/qless/worker.rb

Overview

This is heavily inspired by Resque’s excellent worker: github.com/defunkt/resque/blob/v1.20.0/lib/resque/worker.rb

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(client, job_reserver, options = {}) ⇒ Worker

Returns a new instance of Worker.



10
11
12
13
14
15
16
17
18
19
20
21
# File 'lib/qless/worker.rb', line 10

def initialize(client, job_reserver, options = {})
  @client, @job_reserver = client, job_reserver
  @shutdown = @paused = false

  self.very_verbose = options[:very_verbose]
  self.verbose = options[:verbose]
  self.run_as_single_process = options[:run_as_single_process]
  self.output = options.fetch(:output, $stdout)

  output.puts "\n\n\n" if verbose || very_verbose
  log "Instantiated Worker"
end

Instance Attribute Details

#outputObject

An IO-like object that logging output is sent to. Defaults to $stdout.



36
37
38
# File 'lib/qless/worker.rb', line 36

def output
  @output
end

#run_as_single_processObject

Whether the worker should run in a single prcoess i.e. not fork a child process to do the work This should only be true in a dev/test environment



32
33
34
# File 'lib/qless/worker.rb', line 32

def run_as_single_process
  @run_as_single_process
end

#verboseObject

Whether the worker should log basic info to STDOUT



24
25
26
# File 'lib/qless/worker.rb', line 24

def verbose
  @verbose
end

#very_verboseObject

Whether the worker should log lots of info to STDOUT



27
28
29
# File 'lib/qless/worker.rb', line 27

def very_verbose
  @very_verbose
end

Class Method Details

.startObject

Starts a worker based on ENV vars. Supported ENV vars:

- REDIS_URL=redis://host:port/db-num (the redis gem uses this automatically)
- QUEUES=high,medium,low or QUEUE=blah
- JOB_RESERVER=Ordered or JOB_RESERVER=RoundRobin
- INTERVAL=3.2
- VERBOSE=true (to enable logging)
- VVERBOSE=true (to enable very verbose logging)
- RUN_AS_SINGLE_PROCESS=true (false will fork children to do work, true will keep it single process)

This is designed to be called from a rake task



47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
# File 'lib/qless/worker.rb', line 47

def self.start
  client = Qless::Client.new
  queues = (ENV['QUEUES'] || ENV['QUEUE']).to_s.split(',').map { |q| client.queues[q.strip] }
  if queues.none?
    raise "No queues provided. You must pass QUEUE or QUEUES when starting a worker."
  end

  reserver = JobReservers.const_get(ENV.fetch('JOB_RESERVER', 'Ordered')).new(queues)
  interval = Float(ENV.fetch('INTERVAL', 5.0))

  options = {}
  options[:verbose] = !!ENV['VERBOSE']
  options[:very_verbose] = !!ENV['VVERBOSE']
  options[:run_as_single_process] = !!ENV['RUN_AS_SINGLE_PROCESS']

  new(client, reserver, options).work(interval)
end

Instance Method Details

#pause_processingObject



125
126
127
128
129
# File 'lib/qless/worker.rb', line 125

def pause_processing
  log "USR2 received; pausing job processing"
  @paused = true
  procline "Paused -- #{@job_reserver.description}"
end

#paused?Boolean

Returns:

  • (Boolean)


121
122
123
# File 'lib/qless/worker.rb', line 121

def paused?
  @paused
end

#perform(job) ⇒ Object



100
101
102
103
104
105
106
# File 'lib/qless/worker.rb', line 100

def perform(job)
  around_perform(job)
rescue Exception => error
  fail_job(job, error)
else
  job.complete unless job.state_changed?
end

#shutdownObject



108
109
110
# File 'lib/qless/worker.rb', line 108

def shutdown
  @shutdown = true
end

#shutdown!Object



112
113
114
115
# File 'lib/qless/worker.rb', line 112

def shutdown!
  shutdown
  kill_child unless run_as_single_process
end

#shutdown?Boolean

Returns:

  • (Boolean)


117
118
119
# File 'lib/qless/worker.rb', line 117

def shutdown?
  @shutdown
end

#unpause_processingObject



131
132
133
134
# File 'lib/qless/worker.rb', line 131

def unpause_processing
  log "CONT received; resuming job processing"
  @paused = false
end

#work(interval = 5.0) ⇒ Object



65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
# File 'lib/qless/worker.rb', line 65

def work(interval = 5.0)
  procline "Starting #{@job_reserver.description}"
  register_signal_handlers

  loop do
    break if shutdown?
    next  if paused?

    unless job = @job_reserver.reserve
      break if interval.zero?
      procline "Waiting for #{@job_reserver.description}"
      log! "Sleeping for #{interval} seconds"
      sleep interval
      next
    end

    log "got: #{job.inspect}"

    if run_as_single_process
      # We're staying in the same process
      procline "Single processing #{job.description}"
      perform(job)
    elsif @child = fork
      # We're in the parent process
      procline "Forked #{@child} for #{job.description}"
      Process.wait(@child)
    else
      # We're in the child process
      procline "Processing #{job.description}"
      perform(job)
      exit!
    end
  end
end