Class: Resque::Worker

Inherits:
Object
  • Object
show all
Includes:
SemanticLogger::Loggable
Defined in:
lib/resque_manager/overrides/resque/worker.rb

Constant Summary collapse

@@local_ip =
nil

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.start(options) ⇒ Object



199
200
201
202
203
204
205
206
207
208
# File 'lib/resque_manager/overrides/resque/worker.rb', line 199

def self.start(options)
  ips = options[:hosts]
  application_path = options[:application_path]
  queues = options[:queues]
  if Rails.env =~ /development|test/
    Thread.new(application_path, queues) { |application_path, queue| system("cd #{application_path || '.'} && bundle exec #{ResqueManager.resque_worker_rake || 'rake'} RAILS_ENV=#{Rails.env} QUEUE=#{queue} resque:work") }
  else
    Thread.new(ips, application_path, queues) { |ip_list, application_path, queue| system("cd #{Rails.root} && bundle exec cap #{Rails.env} resque:work host=#{ip_list} application_path=#{application_path} queue=#{queue}") }
  end
end

.workingObject

logic for mappged_mget changed where it returns keys with nil values in latest redis gem.



181
182
183
184
185
186
187
188
# File 'lib/resque_manager/overrides/resque/worker.rb', line 181

def self.working
  names = all
  return [] unless names.any?
  names.map! { |name| "worker:#{name}" }
  Resque.redis.mapped_mget(*names).map do |key, value|
    find key.sub("worker:", '') unless value.nil?
  end.compact
end

Instance Method Details

#all_workers_in_pid_workingObject



144
145
146
# File 'lib/resque_manager/overrides/resque/worker.rb', line 144

def all_workers_in_pid_working
  workers_in_pid.select { |w| (hash = w.processing) && !hash.empty? }
end

#continueObject



233
234
235
236
237
238
239
# File 'lib/resque_manager/overrides/resque/worker.rb', line 233

def continue
  if Rails.env =~ /development|test/
    system("kill -CONT  #{self.pid}")
  else
    system("cd #{Rails.root} && bundle exec cap #{Rails.env} resque:continue_worker pid=#{self.pid} host=#{self.ip}")
  end
end

#ipObject



48
49
50
# File 'lib/resque_manager/overrides/resque/worker.rb', line 48

def ip
  to_s.split(':').first[/\b(?:\d{1,3}\.){3}\d{1,3}\b/]
end

#local_ipObject



10
11
12
# File 'lib/resque_manager/overrides/resque/worker.rb', line 10

def local_ip
  @@local_ip ||= IPSocket.getaddress(Socket.gethostname)
end

#overview_messageObject



195
196
197
# File 'lib/resque_manager/overrides/resque/worker.rb', line 195

def overview_message
  job['overview_message']
end

#overview_message=(message) ⇒ Object



190
191
192
193
# File 'lib/resque_manager/overrides/resque/worker.rb', line 190

def overview_message=(message)
  data = encode(job.merge('overview_message' => message))
  Resque.redis.set("worker:#{self}", data)
end

#pathObject



36
37
38
# File 'lib/resque_manager/overrides/resque/worker.rb', line 36

def path
  to_s.split(':').fourth
end

#pauseObject



225
226
227
228
229
230
231
# File 'lib/resque_manager/overrides/resque/worker.rb', line 225

def pause
  if Rails.env =~ /development|test/
    system("kill -USR2  #{self.pid}")
  else
    system("cd #{Rails.root} && bundle exec cap #{Rails.env} resque:pause_worker pid=#{self.pid} host=#{self.ip}")
  end
end

#pause_keyObject

When the worker gets the -USR2 signal, to_s may give a different value for the thread and queue portion



23
24
25
26
# File 'lib/resque_manager/overrides/resque/worker.rb', line 23

def pause_key
  key = to_s.split(':')
  "worker:#{key.first}:#{key.second}:all_workers:paused"
end

#pause_processing_with_pause_keyObject

Stop processing jobs after the current one has completed (if we’re currently running one). OVERRIDE to set a redis key so UI knows it’s paused too



99
100
101
102
# File 'lib/resque_manager/overrides/resque/worker.rb', line 99

def pause_processing_with_pause_key
  pause_processing_without_pause_key
  Resque.redis.set(pause_key, Time.now.to_s)
end

#pausedObject



86
87
88
# File 'lib/resque_manager/overrides/resque/worker.rb', line 86

def paused
  Resque.redis.get pause_key
end

#paused?Boolean

are we paused? OVERRIDE so UI can tell if we’re paused

Returns:

  • (Boolean)


92
93
94
# File 'lib/resque_manager/overrides/resque/worker.rb', line 92

def paused?
  @paused || paused.present?
end

#pidObject



28
29
30
# File 'lib/resque_manager/overrides/resque/worker.rb', line 28

def pid
  to_s.split(':').second
end

#prune_dead_workersObject

Looks for any workers which should be running on this server and, if they’re not, removes them from Redis.

This is a form of garbage collection. If a server is killed by a hard shutdown, power failure, or something else beyond our control, the Resque workers will not die gracefully and therefore will leave stale state information in Redis.

By checking the current Redis state against the actual environment, we can determine if Redis is old and clean it up a bit.



124
125
126
127
128
129
130
131
132
# File 'lib/resque_manager/overrides/resque/worker.rb', line 124

def prune_dead_workers
  Worker.all.each do |worker|
    host, pid, thread, path, queues = worker.id.split(':')
    next unless host.include?(hostname)
    next if worker_pids.include?(pid)
    log! "Pruning dead worker: #{worker}"
    worker.unregister_worker
  end
end

#queueObject



40
41
42
# File 'lib/resque_manager/overrides/resque/worker.rb', line 40

def queue
  to_s.split(':').fifth
end

#queuesObject

OVERRIDE for multithreaded workers



57
58
59
# File 'lib/resque_manager/overrides/resque/worker.rb', line 57

def queues
  Thread.current[:queues] == "*" ? Resque.queues.sort : Thread.current[:queues].split(',')
end

#queues_in_pidObject



52
53
54
# File 'lib/resque_manager/overrides/resque/worker.rb', line 52

def queues_in_pid
  workers_in_pid.collect { |w| w.queue }.compact
end

#quitObject



210
211
212
213
214
215
216
217
218
219
220
221
222
223
# File 'lib/resque_manager/overrides/resque/worker.rb', line 210

def quit
  if Rails.env =~ /development|test/
    if RUBY_PLATFORM =~ /java/
      #jruby doesn't trap the -QUIT signal
      #-TERM gracefully kills the main pid and does a -9 on the child if there is one.
      #Since jruby doesn't fork a child, the main worker is gracefully killed.
      system("kill -TERM  #{self.pid}")
    else
      system("kill -QUIT  #{self.pid}")
    end
  else
    system("cd #{Rails.root} && bundle exec cap #{Rails.env} resque:quit_worker pid=#{self.pid} host=#{self.ip} application_path=#{self.path}")
  end
end

#restartObject



241
242
243
244
245
# File 'lib/resque_manager/overrides/resque/worker.rb', line 241

def restart
  queues = self.queues_in_pid.join('#')
  quit
  self.class.start(hosts: self.ip, queues: queues, application_path: self.path)
end

#shutdown_with_multithreadingObject

Schedule this worker for shutdown. Will finish processing the current job. OVERRIDE for multithreaded workers



80
81
82
83
# File 'lib/resque_manager/overrides/resque/worker.rb', line 80

def shutdown_with_multithreading
  Thread.list.each { |t| t[:shutdown] = true }
  shutdown_without_multithreading
end

#shutdown_with_multithreading?Boolean

Returns:

  • (Boolean)


175
176
177
# File 'lib/resque_manager/overrides/resque/worker.rb', line 175

def shutdown_with_multithreading?
  shutdown_without_multithreading? || Thread.current[:shutdown]
end

#startupObject

Runs all the methods needed when a worker begins its lifecycle. OVERRIDE for multithreaded workers



63
64
65
66
67
68
69
70
71
72
73
74
75
# File 'lib/resque_manager/overrides/resque/worker.rb', line 63

def startup
  enable_gc_optimizations
  if Thread.current == Thread.main
    register_signal_handlers
    prune_dead_workers
  end
  run_hook :before_first_fork
  register_worker

  # Fix buffering so we can `rake resque:work > resque.log` and
  # get output from the child in there.
  $stdout.sync = true
end

#threadObject



32
33
34
# File 'lib/resque_manager/overrides/resque/worker.rb', line 32

def thread
  to_s.split(':').third
end

#to_sObject Also known as: id

The string representation is the same as the id for this worker instance. Can be used with ‘Worker.find`.



16
17
18
# File 'lib/resque_manager/overrides/resque/worker.rb', line 16

def to_s
  @to_s || "#{hostname}(#{local_ip}):#{Process.pid}:#{Thread.current.object_id}:#{Thread.current[:path]}:#{Thread.current[:queues]}"
end

#unpause_processing_with_pause_keyObject

Start processing jobs again after a pause OVERRIDE to set remove redis key so UI knows it’s unpaused too Would prefer to call super but get no superclass method error



108
109
110
111
# File 'lib/resque_manager/overrides/resque/worker.rb', line 108

def unpause_processing_with_pause_key
  unpause_processing_without_pause_key
  Resque.redis.del(pause_key)
end

#unregister_worker_with_pause(exception = nil) ⇒ Object

Unregisters ourself as a worker. Useful when shutting down. OVERRIDE to also remove the pause key Would prefer to call super but get no superclass method error



137
138
139
140
141
# File 'lib/resque_manager/overrides/resque/worker.rb', line 137

def unregister_worker_with_pause(exception = nil)
  unregister_worker_without_pause(exception)

  Resque.redis.del(pause_key)
end

#work_with_multithreading(interval = 5.0, &block) ⇒ Object

This is the main workhorse method. Called on a Worker instance, it begins the worker life cycle.

The following events occur during a worker’s life cycle:

  1. Startup: Signals are registered, dead workers are pruned,

    and this worker is registered.
    
  2. Work loop: Jobs are pulled from a queue and processed.

  3. Teardown: This worker is unregistered.

Can be passed an integer representing the polling frequency. The default is 5 seconds, but for a semi-active site you may want to use a smaller value.

Also accepts a block which will be passed the job as soon as it has completed processing. Useful for testing. OVERRIDE for multithreaded workers



165
166
167
168
169
170
171
172
# File 'lib/resque_manager/overrides/resque/worker.rb', line 165

def work_with_multithreading(interval = 5.0, &block)
  work_without_multithreading(interval, &block)
  loop do
    #hang onto the process until all threads are done
    break if all_workers_in_pid_working.blank?
    sleep interval.to_i
  end
end

#workers_in_pidObject



44
45
46
# File 'lib/resque_manager/overrides/resque/worker.rb', line 44

def workers_in_pid
  Array(Resque.redis.smembers(:workers)).select { |id| id =~ /\(#{ip}\):#{pid}/ }.map { |id| Resque::Worker.find(id) }.compact
end