Class: Resque::Worker

Inherits:
Object
  • Object
show all
Defined in:
lib/resque_manager/overrides/resque/worker.rb

Constant Summary collapse

@@local_ip =
nil

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.start(options) ⇒ Object



242
243
244
245
246
247
248
249
250
251
# File 'lib/resque_manager/overrides/resque/worker.rb', line 242

def self.start(options)
  ips = options[:hosts]
  application_path = options[:application_path]
  queues = options[:queues]
  if Rails.env =~ /development|test/
    Thread.new(application_path, queues) { |application_path, queue| system("cd #{application_path || '.'}; bundle exec #{ResqueManager.resque_worker_rake || 'rake'} RAILS_ENV=#{Rails.env} QUEUE=#{queue} resque:work") }
  else
    Thread.new(ips, application_path, queues) { |ip_list, application_path, queue| system("cd #{Rails.root}; bundle exec cap #{Rails.env} resque:work host=#{ip_list} application_path=#{application_path} queue=#{queue}") }
  end
end

.workingObject

logic for mappged_mget changed where it returns keys with nil values in latest redis gem.



224
225
226
227
228
229
230
231
# File 'lib/resque_manager/overrides/resque/worker.rb', line 224

def self.working
  names = all
  return [] unless names.any?
  names.map! { |name| "worker:#{name}" }
  Resque.redis.mapped_mget(*names).map do |key, value|
    find key.sub("worker:", '') unless value.nil?
  end.compact
end

Instance Method Details

#all_workers_in_pid_workingObject



147
148
149
# File 'lib/resque_manager/overrides/resque/worker.rb', line 147

def all_workers_in_pid_working
  workers_in_pid.select { |w| (hash = w.processing) && !hash.empty? }
end

#continueObject



276
277
278
279
280
281
282
# File 'lib/resque_manager/overrides/resque/worker.rb', line 276

def continue
  if Rails.env =~ /development|test/
    system("kill -CONT  #{self.pid}")
  else
    system("cd #{Rails.root}; bundle exec cap #{Rails.env} resque:continue_worker pid=#{self.pid} host=#{self.ip}")
  end
end

#ipObject



50
51
52
# File 'lib/resque_manager/overrides/resque/worker.rb', line 50

def ip
  to_s.split(':').first[/\b(?:\d{1,3}\.){3}\d{1,3}\b/]
end

#local_ipObject



7
8
9
10
11
12
13
14
# File 'lib/resque_manager/overrides/resque/worker.rb', line 7

def local_ip
  @@local_ip ||= begin
    UDPSocket.open do |s|
      s.connect 'google.com', 1
      s.addr.last
    end
  end
end

#overview_messageObject



238
239
240
# File 'lib/resque_manager/overrides/resque/worker.rb', line 238

def overview_message
  job['overview_message']
end

#overview_message=(message) ⇒ Object



233
234
235
236
# File 'lib/resque_manager/overrides/resque/worker.rb', line 233

def overview_message=(message)
  data = encode(job.merge('overview_message' => message))
  Resque.redis.set("worker:#{self}", data)
end

#pathObject



38
39
40
# File 'lib/resque_manager/overrides/resque/worker.rb', line 38

def path
  to_s.split(':').fourth
end

#pauseObject



268
269
270
271
272
273
274
# File 'lib/resque_manager/overrides/resque/worker.rb', line 268

def pause
  if Rails.env =~ /development|test/
    system("kill -USR2  #{self.pid}")
  else
    system("cd #{Rails.root}; bundle exec cap #{Rails.env} resque:pause_worker pid=#{self.pid} host=#{self.ip}")
  end
end

#pause_keyObject

When the worker gets the -USR2 signal, to_s may give a different value for the thread and queue portion



25
26
27
28
# File 'lib/resque_manager/overrides/resque/worker.rb', line 25

def pause_key
  key = to_s.split(':')
  "worker:#{key.first}:#{key.second}:all_workers:paused"
end

#pause_processingObject

Stop processing jobs after the current one has completed (if we’re currently running one). OVERRIDE to set a redis key so UI knows it’s paused too Would prefer to call super but get no superclass method error



102
103
104
105
106
# File 'lib/resque_manager/overrides/resque/worker.rb', line 102

def pause_processing
  log "USR2 received; pausing job processing"
  @paused = true
  Resque.redis.set(pause_key, Time.now.to_s)
end

#pausedObject



88
89
90
# File 'lib/resque_manager/overrides/resque/worker.rb', line 88

def paused
  Resque.redis.get pause_key
end

#paused?Boolean

are we paused? OVERRIDE so UI can tell if we’re paused

Returns:

  • (Boolean)


94
95
96
# File 'lib/resque_manager/overrides/resque/worker.rb', line 94

def paused?
  @paused || paused.present?
end

#pidObject



30
31
32
# File 'lib/resque_manager/overrides/resque/worker.rb', line 30

def pid
  to_s.split(':').second
end

#prune_dead_workersObject

Looks for any workers which should be running on this server and, if they’re not, removes them from Redis.

This is a form of garbage collection. If a server is killed by a hard shutdown, power failure, or something else beyond our control, the Resque workers will not die gracefully and therefor will leave stale state information in Redis.

By checking the current Redis state against the actual environment, we can determine if Redis is old and clean it up a bit.



127
128
129
130
131
132
133
134
135
# File 'lib/resque_manager/overrides/resque/worker.rb', line 127

def prune_dead_workers
  Worker.all.each do |worker|
    host, pid, thread, path, queues = worker.id.split(':')
    next unless host.include?(hostname)
    next if worker_pids.include?(pid)
    log! "Pruning dead worker: #{worker}"
    worker.unregister_worker
  end
end

#queueObject



42
43
44
# File 'lib/resque_manager/overrides/resque/worker.rb', line 42

def queue
  to_s.split(':').last
end

#queuesObject

OVERRIDE for multithreaded workers



59
60
61
# File 'lib/resque_manager/overrides/resque/worker.rb', line 59

def queues
  Thread.current[:queues] == "*" ? Resque.queues.sort : Thread.current[:queues].split(',')
end

#queues_in_pidObject



54
55
56
# File 'lib/resque_manager/overrides/resque/worker.rb', line 54

def queues_in_pid
  workers_in_pid.collect { |w| w.queue }
end

#quitObject



253
254
255
256
257
258
259
260
261
262
263
264
265
266
# File 'lib/resque_manager/overrides/resque/worker.rb', line 253

def quit
  if Rails.env =~ /development|test/
    if RUBY_PLATFORM =~ /java/
      #jruby doesn't trap the -QUIT signal
      #-TERM gracefully kills the main pid and does a -9 on the child if there is one.
      #Since jruby doesn't fork a child, the main worker is gracefully killed.
      system("kill -TERM  #{self.pid}")
    else
      system("kill -QUIT  #{self.pid}")
    end
  else
    system("cd #{Rails.root}; bundle exec cap #{Rails.env} resque:quit_worker pid=#{self.pid} host=#{self.ip} application_path=#{self.path}")
  end
end

#restartObject



284
285
286
287
288
# File 'lib/resque_manager/overrides/resque/worker.rb', line 284

def restart
  queues = self.queues_in_pid.join('#')
  quit
  self.class.start(hosts: self.ip, queues: queues, application_path: self.path)
end

#shutdownObject

Schedule this worker for shutdown. Will finish processing the current job. OVERRIDE for multithreaded workers



82
83
84
85
86
# File 'lib/resque_manager/overrides/resque/worker.rb', line 82

def shutdown
  log 'Exiting...'
  Thread.list.each { |t| t[:shutdown] = true }
  @shutdown = true
end

#startupObject

Runs all the methods needed when a worker begins its lifecycle. OVERRIDE for multithreaded workers



65
66
67
68
69
70
71
72
73
74
75
76
77
# File 'lib/resque_manager/overrides/resque/worker.rb', line 65

def startup
  enable_gc_optimizations
  if Thread.current == Thread.main
    register_signal_handlers
    prune_dead_workers
  end
  run_hook :before_first_fork
  register_worker

  # Fix buffering so we can `rake resque:work > resque.log` and
  # get output from the child in there.
  $stdout.sync = true
end

#threadObject



34
35
36
# File 'lib/resque_manager/overrides/resque/worker.rb', line 34

def thread
  to_s.split(':').third
end

#to_sObject Also known as: id

The string representation is the same as the id for this worker instance. Can be used with ‘Worker.find`.



18
19
20
# File 'lib/resque_manager/overrides/resque/worker.rb', line 18

def to_s
  @to_s || "#{hostname}(#{local_ip}):#{Process.pid}:#{Thread.current.object_id}:#{Thread.current[:path]}:#{Thread.current[:queues]}"
end

#unpause_processingObject

Start processing jobs again after a pause OVERRIDE to set remove redis key so UI knows it’s unpaused too Would prefer to call super but get no superclass method error



111
112
113
114
115
# File 'lib/resque_manager/overrides/resque/worker.rb', line 111

def unpause_processing
  log "CONT received; resuming job processing"
  @paused = false
  Resque.redis.del(pause_key)
end

#unregister_worker_with_pause(exception = nil) ⇒ Object

Unregisters ourself as a worker. Useful when shutting down. OVERRIDE to also remove the pause key Would prefer to call super but get no superclass method error



140
141
142
143
144
# File 'lib/resque_manager/overrides/resque/worker.rb', line 140

def unregister_worker_with_pause(exception = nil)
  unregister_worker_without_pause(exception)

  Resque.redis.del(pause_key)
end

#work(interval = 5.0, &block) ⇒ Object

This is the main workhorse method. Called on a Worker instance, it begins the worker life cycle.

The following events occur during a worker’s life cycle:

  1. Startup: Signals are registered, dead workers are pruned,

    and this worker is registered.
    
  2. Work loop: Jobs are pulled from a queue and processed.

  3. Teardown: This worker is unregistered.

Can be passed an integer representing the polling frequency. The default is 5 seconds, but for a semi-active site you may want to use a smaller value.

Also accepts a block which will be passed the job as soon as it has completed processing. Useful for testing. OVERRIDE for multithreaded workers



168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
# File 'lib/resque_manager/overrides/resque/worker.rb', line 168

def work(interval = 5.0, &block)
  interval = Float(interval)
  $0 = "resque: Starting"
  startup

  loop do
    break if shutdown? || Thread.current[:shutdown]

    if not paused? and job = reserve
      log "got: #{job.inspect}"
      job.worker = self
      working_on job

      procline "Processing #{job.queue} since #{Time.now.to_i} [#{job.payload_class}]"
      if @child = fork(job) do
        unregister_signal_handlers if term_child
        reconnect
        perform(job, &block)
        exit! unless run_at_exit_hooks
      end

        srand # Reseeding
        procline "Forked #{@child} at #{Time.now.to_i}"
        begin
          Process.waitpid(@child)
        rescue SystemCallError
          nil
        end
        job.fail(DirtyExit.new($?.to_s)) if $?.signaled?
      else
        reconnect
        perform(job, &block)
      end
      done_working
      @child = nil
    else
      break if interval.zero?
      log! "Sleeping for #{interval} seconds"
      procline paused? ? "Paused" : "Waiting for #{@queues.join(',')}"
      sleep interval
    end
  end

  unregister_worker
  loop do
    #hang onto the process until all threads are done
    break if all_workers_in_pid_working.blank?
    sleep interval.to_i
  end
rescue Exception => exception
  log "Failed to start worker : #{exception.inspect}"

  unregister_worker(exception)
end

#workers_in_pidObject



46
47
48
# File 'lib/resque_manager/overrides/resque/worker.rb', line 46

def workers_in_pid
  Array(Resque.redis.smembers(:workers)).select { |id| id =~ /\(#{ip}\):#{pid}/ }.map { |id| Resque::Worker.find(id) }.compact
end