Class: Resque::Worker

Inherits:
Object
  • Object
show all
Defined in:
lib/resque_manager/overrides/resque/worker.rb

Constant Summary collapse

@@local_ip =
nil

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.start(options) ⇒ Object



236
237
238
239
240
241
242
243
244
245
# File 'lib/resque_manager/overrides/resque/worker.rb', line 236

def self.start(options)
  ips = options[:hosts]
  application_path = options[:application_path]
  queues = options[:queues]
  if Rails.env =~ /development|test/
    Thread.new(application_path, queues) { |application_path, queue| system("cd #{application_path || '.'}; bundle exec #{ResqueManager.resque_worker_rake || 'rake'} RAILS_ENV=#{Rails.env} QUEUE=#{queue} resque:work") }
  else
    Thread.new(ips, application_path, queues) { |ip_list, application_path, queue| system("cd #{Rails.root}; bundle exec cap #{Rails.env} resque:work host=#{ip_list} application_path=#{application_path} queue=#{queue}") }
  end
end

.workingObject

logic for mappged_mget changed where it returns keys with nil values in latest redis gem.



218
219
220
221
222
223
224
225
# File 'lib/resque_manager/overrides/resque/worker.rb', line 218

def self.working
  names = all
  return [] unless names.any?
  names.map! { |name| "worker:#{name}" }
  Resque.redis.mapped_mget(*names).map do |key, value|
    find key.sub("worker:", '') unless value.nil?
  end.compact
end

Instance Method Details

#all_workers_in_pid_workingObject



142
143
144
# File 'lib/resque_manager/overrides/resque/worker.rb', line 142

def all_workers_in_pid_working
  workers_in_pid.select { |w| (hash = w.processing) && !hash.empty? }
end

#continueObject



270
271
272
273
274
275
276
# File 'lib/resque_manager/overrides/resque/worker.rb', line 270

def continue
  if Rails.env =~ /development|test/
    system("kill -CONT  #{self.pid}")
  else
    system("cd #{Rails.root}; bundle exec cap #{Rails.env} resque:continue_worker pid=#{self.pid} host=#{self.ip}")
  end
end

#ipObject



45
46
47
# File 'lib/resque_manager/overrides/resque/worker.rb', line 45

def ip
  to_s.split(':').first[/\b(?:\d{1,3}\.){3}\d{1,3}\b/]
end

#local_ipObject



7
8
9
# File 'lib/resque_manager/overrides/resque/worker.rb', line 7

def local_ip
  @@local_ip ||= IPSocket.getaddress(Socket.gethostname)
end

#overview_messageObject



232
233
234
# File 'lib/resque_manager/overrides/resque/worker.rb', line 232

def overview_message
  job['overview_message']
end

#overview_message=(message) ⇒ Object



227
228
229
230
# File 'lib/resque_manager/overrides/resque/worker.rb', line 227

def overview_message=(message)
  data = encode(job.merge('overview_message' => message))
  Resque.redis.set("worker:#{self}", data)
end

#pathObject



33
34
35
# File 'lib/resque_manager/overrides/resque/worker.rb', line 33

def path
  to_s.split(':').fourth
end

#pauseObject



262
263
264
265
266
267
268
# File 'lib/resque_manager/overrides/resque/worker.rb', line 262

def pause
  if Rails.env =~ /development|test/
    system("kill -USR2  #{self.pid}")
  else
    system("cd #{Rails.root}; bundle exec cap #{Rails.env} resque:pause_worker pid=#{self.pid} host=#{self.ip}")
  end
end

#pause_keyObject

When the worker gets the -USR2 signal, to_s may give a different value for the thread and queue portion



20
21
22
23
# File 'lib/resque_manager/overrides/resque/worker.rb', line 20

def pause_key
  key = to_s.split(':')
  "worker:#{key.first}:#{key.second}:all_workers:paused"
end

#pause_processingObject

Stop processing jobs after the current one has completed (if we’re currently running one). OVERRIDE to set a redis key so UI knows it’s paused too Would prefer to call super but get no superclass method error



97
98
99
100
101
# File 'lib/resque_manager/overrides/resque/worker.rb', line 97

def pause_processing
  log 'USR2 received; pausing job processing'
  @paused = true
  Resque.redis.set(pause_key, Time.now.to_s)
end

#pausedObject



83
84
85
# File 'lib/resque_manager/overrides/resque/worker.rb', line 83

def paused
  Resque.redis.get pause_key
end

#paused?Boolean

are we paused? OVERRIDE so UI can tell if we’re paused

Returns:

  • (Boolean)


89
90
91
# File 'lib/resque_manager/overrides/resque/worker.rb', line 89

def paused?
  @paused || paused.present?
end

#pidObject



25
26
27
# File 'lib/resque_manager/overrides/resque/worker.rb', line 25

def pid
  to_s.split(':').second
end

#prune_dead_workersObject

Looks for any workers which should be running on this server and, if they’re not, removes them from Redis.

This is a form of garbage collection. If a server is killed by a hard shutdown, power failure, or something else beyond our control, the Resque workers will not die gracefully and therefor will leave stale state information in Redis.

By checking the current Redis state against the actual environment, we can determine if Redis is old and clean it up a bit.



122
123
124
125
126
127
128
129
130
# File 'lib/resque_manager/overrides/resque/worker.rb', line 122

def prune_dead_workers
  Worker.all.each do |worker|
    host, pid, thread, path, queues = worker.id.split(':')
    next unless host.include?(hostname)
    next if worker_pids.include?(pid)
    log! "Pruning dead worker: #{worker}"
    worker.unregister_worker
  end
end

#queueObject



37
38
39
# File 'lib/resque_manager/overrides/resque/worker.rb', line 37

def queue
  to_s.split(':').fifth
end

#queuesObject

OVERRIDE for multithreaded workers



54
55
56
# File 'lib/resque_manager/overrides/resque/worker.rb', line 54

def queues
  Thread.current[:queues] == "*" ? Resque.queues.sort : Thread.current[:queues].split(',')
end

#queues_in_pidObject



49
50
51
# File 'lib/resque_manager/overrides/resque/worker.rb', line 49

def queues_in_pid
  workers_in_pid.collect { |w| w.queue }.compact
end

#quitObject



247
248
249
250
251
252
253
254
255
256
257
258
259
260
# File 'lib/resque_manager/overrides/resque/worker.rb', line 247

def quit
  if Rails.env =~ /development|test/
    if RUBY_PLATFORM =~ /java/
      #jruby doesn't trap the -QUIT signal
      #-TERM gracefully kills the main pid and does a -9 on the child if there is one.
      #Since jruby doesn't fork a child, the main worker is gracefully killed.
      system("kill -TERM  #{self.pid}")
    else
      system("kill -QUIT  #{self.pid}")
    end
  else
    system("cd #{Rails.root}; bundle exec cap #{Rails.env} resque:quit_worker pid=#{self.pid} host=#{self.ip} application_path=#{self.path}")
  end
end

#restartObject



278
279
280
281
282
# File 'lib/resque_manager/overrides/resque/worker.rb', line 278

def restart
  queues = self.queues_in_pid.join('#')
  quit
  self.class.start(hosts: self.ip, queues: queues, application_path: self.path)
end

#shutdownObject

Schedule this worker for shutdown. Will finish processing the current job. OVERRIDE for multithreaded workers



77
78
79
80
81
# File 'lib/resque_manager/overrides/resque/worker.rb', line 77

def shutdown
  log 'Exiting...'
  Thread.list.each { |t| t[:shutdown] = true }
  @shutdown = true
end

#startupObject

Runs all the methods needed when a worker begins its lifecycle. OVERRIDE for multithreaded workers



60
61
62
63
64
65
66
67
68
69
70
71
72
# File 'lib/resque_manager/overrides/resque/worker.rb', line 60

def startup
  enable_gc_optimizations
  if Thread.current == Thread.main
    register_signal_handlers
    prune_dead_workers
  end
  run_hook :before_first_fork
  register_worker

  # Fix buffering so we can `rake resque:work > resque.log` and
  # get output from the child in there.
  $stdout.sync = true
end

#threadObject



29
30
31
# File 'lib/resque_manager/overrides/resque/worker.rb', line 29

def thread
  to_s.split(':').third
end

#to_sObject Also known as: id

The string representation is the same as the id for this worker instance. Can be used with ‘Worker.find`.



13
14
15
# File 'lib/resque_manager/overrides/resque/worker.rb', line 13

def to_s
  @to_s || "#{hostname}(#{local_ip}):#{Process.pid}:#{Thread.current.object_id}:#{Thread.current[:path]}:#{Thread.current[:queues]}"
end

#unpause_processingObject

Start processing jobs again after a pause OVERRIDE to set remove redis key so UI knows it’s unpaused too Would prefer to call super but get no superclass method error



106
107
108
109
110
# File 'lib/resque_manager/overrides/resque/worker.rb', line 106

def unpause_processing
  log 'CONT received; resuming job processing'
  @paused = false
  Resque.redis.del(pause_key)
end

#unregister_worker_with_pause(exception = nil) ⇒ Object

Unregisters ourself as a worker. Useful when shutting down. OVERRIDE to also remove the pause key Would prefer to call super but get no superclass method error



135
136
137
138
139
# File 'lib/resque_manager/overrides/resque/worker.rb', line 135

def unregister_worker_with_pause(exception = nil)
  unregister_worker_without_pause(exception)

  Resque.redis.del(pause_key)
end

#work(interval = 5.0, &block) ⇒ Object

This is the main workhorse method. Called on a Worker instance, it begins the worker life cycle.

The following events occur during a worker’s life cycle:

  1. Startup: Signals are registered, dead workers are pruned,

    and this worker is registered.
    
  2. Work loop: Jobs are pulled from a queue and processed.

  3. Teardown: This worker is unregistered.

Can be passed an integer representing the polling frequency. The default is 5 seconds, but for a semi-active site you may want to use a smaller value.

Also accepts a block which will be passed the job as soon as it has completed processing. Useful for testing. OVERRIDE for multithreaded workers



163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
# File 'lib/resque_manager/overrides/resque/worker.rb', line 163

def work(interval = 5.0, &block)
  interval = Float(interval)
  $0 = 'resque: Starting'
  startup

  loop do
    break if shutdown? || Thread.current[:shutdown]
    if not paused? and job = reserve
      log "got: #{job.inspect}"
      job.worker = self
      working_on job

      procline "Processing #{job.queue} since #{Time.now.to_i} [#{job.payload_class}]"
      if @child = fork(job) do
        unregister_signal_handlers if term_child
        reconnect
        perform(job, &block)
        exit! unless run_at_exit_hooks
      end

        srand # Reseeding
        procline "Forked #{@child} at #{Time.now.to_i}"
        begin
          Process.waitpid(@child)
        rescue SystemCallError
          nil
        end
        job.fail(DirtyExit.new($?.to_s)) if $?.signaled?
      else
        reconnect
        perform(job, &block)
      end
      done_working
      @child = nil
    else
      break if interval.zero?
      log! "Sleeping for #{interval} seconds"
      procline paused? ? "Paused" : "Waiting for #{@queues.join(',')}"
      sleep interval
    end
  end

  unregister_worker
  loop do
    #hang onto the process until all threads are done
    break if all_workers_in_pid_working.blank?
    sleep interval.to_i
  end
rescue Exception => exception
  log "Failed to start worker : #{exception.inspect}"

  unregister_worker(exception)
end

#workers_in_pidObject



41
42
43
# File 'lib/resque_manager/overrides/resque/worker.rb', line 41

def workers_in_pid
  Array(Resque.redis.smembers(:workers)).select { |id| id =~ /\(#{ip}\):#{pid}/ }.map { |id| Resque::Worker.find(id) }.compact
end