Class: Qless::Worker

Inherits:
Object
  • Object
show all
Defined in:
lib/qless/worker.rb

Overview

This is heavily inspired by Resque’s excellent worker: github.com/defunkt/resque/blob/v1.20.0/lib/resque/worker.rb

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(job_reserver, options = {}) ⇒ Worker

Returns a new instance of Worker.



13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# File 'lib/qless/worker.rb', line 13

def initialize(job_reserver, options = {})
  self.job_reserver = job_reserver
  @shutdown = @paused = false

  self.very_verbose = options[:very_verbose]
  self.verbose = options[:verbose]
  self.run_as_single_process = options[:run_as_single_process]
  self.output = options.fetch(:output, $stdout)
  self.term_timeout = options.fetch(:term_timeout, 4.0)
  @backtrace_replacements = { Dir.pwd => '.' }
  @backtrace_replacements[ENV['GEM_HOME']] = '<GEM_HOME>' if ENV.has_key?('GEM_HOME')

  output.puts "\n\n\n" if verbose || very_verbose
  log "Instantiated Worker"
end

Instance Attribute Details

#job_reserverObject

The object responsible for reserving jobs from the Qless server, using some reasonable strategy (e.g. round robin or ordered)



46
47
48
# File 'lib/qless/worker.rb', line 46

def job_reserver
  @job_reserver
end

#outputObject

An IO-like object that logging output is sent to. Defaults to $stdout.



42
43
44
# File 'lib/qless/worker.rb', line 42

def output
  @output
end

#run_as_single_processObject

Whether the worker should run in a single prcoess i.e. not fork a child process to do the work This should only be true in a dev/test environment



38
39
40
# File 'lib/qless/worker.rb', line 38

def run_as_single_process
  @run_as_single_process
end

#term_timeoutObject

How long the child process is given to exit before forcibly killing it.



49
50
51
# File 'lib/qless/worker.rb', line 49

def term_timeout
  @term_timeout
end

#verboseObject

Whether the worker should log basic info to STDOUT



30
31
32
# File 'lib/qless/worker.rb', line 30

def verbose
  @verbose
end

#very_verboseObject

Whether the worker should log lots of info to STDOUT



33
34
35
# File 'lib/qless/worker.rb', line 33

def very_verbose
  @very_verbose
end

Class Method Details

.startObject

Starts a worker based on ENV vars. Supported ENV vars:

- REDIS_URL=redis://host:port/db-num (the redis gem uses this automatically)
- QUEUES=high,medium,low or QUEUE=blah
- JOB_RESERVER=Ordered or JOB_RESERVER=RoundRobin
- INTERVAL=3.2
- VERBOSE=true (to enable logging)
- VVERBOSE=true (to enable very verbose logging)
- RUN_AS_SINGLE_PROCESS=true (false will fork children to do work, true will keep it single process)

This is designed to be called from a rake task



60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
# File 'lib/qless/worker.rb', line 60

def self.start
  client = Qless::Client.new
  queues = (ENV['QUEUES'] || ENV['QUEUE']).to_s.split(',').map { |q| client.queues[q.strip] }
  if queues.none?
    raise "No queues provided. You must pass QUEUE or QUEUES when starting a worker."
  end

  reserver = JobReservers.const_get(ENV.fetch('JOB_RESERVER', 'Ordered')).new(queues)
  interval = Float(ENV.fetch('INTERVAL', 5.0))

  options = {}
  options[:verbose] = !!ENV['VERBOSE']
  options[:very_verbose] = !!ENV['VVERBOSE']
  options[:run_as_single_process] = !!ENV['RUN_AS_SINGLE_PROCESS']

  new(reserver, options).work(interval)
end

Instance Method Details

#pause_processingObject



159
160
161
162
163
# File 'lib/qless/worker.rb', line 159

def pause_processing
  log "USR2 received; pausing job processing"
  @paused = true
  procline "Paused -- #{@job_reserver.description}"
end

#paused?Boolean

Returns:

  • (Boolean)


155
156
157
# File 'lib/qless/worker.rb', line 155

def paused?
  @paused
end

#perform(job) ⇒ Object



105
106
107
108
109
110
111
# File 'lib/qless/worker.rb', line 105

def perform(job)
  around_perform(job)
rescue Exception => error
  fail_job(job, error, caller)
else
  try_complete(job)
end

#perform_job_in_child_process(job) ⇒ Object



122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
# File 'lib/qless/worker.rb', line 122

def perform_job_in_child_process(job)
  with_job(job) do
    @child = fork do
      job.reconnect_to_redis
      register_child_signal_handlers
      start_child_pub_sub_listener_for(job.client)
      procline "Processing #{job.description}"
      perform(job)
      exit! # don't run at_exit hooks
    end

    if @child
      wait_for_child
    else
      procline "Single processing #{job.description}"
      perform(job)
    end
  end
end

#reserve_jobObject



113
114
115
116
117
118
119
120
# File 'lib/qless/worker.rb', line 113

def reserve_job
  @job_reserver.reserve
rescue Exception => error
  # We want workers to durably stay up, so we don't want errors
  # during job reserving (e.g. network timeouts, etc) to kill
  # the worker.
  log "Got an error while reserving a job: #{error.class}: #{error.message}"
end

#shutdownObject



142
143
144
# File 'lib/qless/worker.rb', line 142

def shutdown
  @shutdown = true
end

#shutdown!Object



146
147
148
149
# File 'lib/qless/worker.rb', line 146

def shutdown!
  shutdown
  kill_child unless run_as_single_process
end

#shutdown?Boolean

Returns:

  • (Boolean)


151
152
153
# File 'lib/qless/worker.rb', line 151

def shutdown?
  @shutdown
end

#unpause_processingObject



165
166
167
168
# File 'lib/qless/worker.rb', line 165

def unpause_processing
  log "CONT received; resuming job processing"
  @paused = false
end

#work(interval = 5.0) ⇒ Object



78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
# File 'lib/qless/worker.rb', line 78

def work(interval = 5.0)
  procline "Starting #{@job_reserver.description}"
  register_parent_signal_handlers
  uniq_clients.each { |client| start_parent_pub_sub_listener_for(client) }

  loop do
    break if shutdown?
    if paused?
      sleep interval
      next
    end

    unless job = reserve_job
      break if interval.zero?
      procline "Waiting for #{@job_reserver.description}"
      log! "Sleeping for #{interval} seconds"
      sleep interval
      next
    end

    perform_job_in_child_process(job)
  end
ensure
  # make sure the worker deregisters on shutdown
  deregister
end