Class: Gearman::Worker
- Inherits:
-
Object
- Object
- Gearman::Worker
- Defined in:
- lib/gearman/worker.rb
Overview
Worker
Description
A worker that can connect to a Gearman server and perform tasks.
Usage
require 'gearman'
w = Gearman::Worker.new('127.0.0.1')
# Add a handler for a "sleep" function that takes a single argument, the
# number of seconds to sleep before reporting success.
w.add_ability('sleep') do |data,job|
seconds = data
(1..seconds.to_i).each do |i|
sleep 1
# Report our progress to the job server every second.
job.report_status(i, seconds)
end
# Report success.
true
end
loop { w.work }
Defined Under Namespace
Instance Attribute Summary collapse
-
#bad_servers ⇒ Object
Returns the value of attribute bad_servers.
-
#client_id ⇒ Object
Returns the value of attribute client_id.
-
#network_timeout_sec ⇒ Object
Returns the value of attribute network_timeout_sec.
-
#reconnect_sec ⇒ Object
Returns the value of attribute reconnect_sec.
-
#status ⇒ Object
Returns the value of attribute status.
-
#worker_enabled ⇒ Object
Returns the value of attribute worker_enabled.
Instance Method Summary collapse
-
#add_ability(func, timeout = nil, &f) ⇒ Object
Add a new ability, announcing it to job servers.
-
#after_ability(func, &f) ⇒ Object
Add an after-ability hook.
-
#handle_job_assign(data, sock, hostport) ⇒ Object
Handle a job_assign packet.
-
#initialize(job_servers = nil, opts = {}) ⇒ Worker
constructor
Create a new worker.
- #job_servers ⇒ Object
-
#job_servers=(servers) ⇒ Object
Connect to job servers to be used by this worker.
-
#remove_ability(func) ⇒ Object
Let job servers know that we’re no longer able to do something.
-
#start_reconnect_thread ⇒ Object
Start a thread to repeatedly attempt to connect to down job servers.
-
#work ⇒ Object
Do a single job and return.
Constructor Details
#initialize(job_servers = nil, opts = {}) ⇒ Worker
Create a new worker.
106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 |
# File 'lib/gearman/worker.rb', line 106 def initialize(job_servers=nil, opts={}) chars = ('a'..'z').to_a @client_id = Array.new(30) { chars[rand(chars.size)] }.join @sockets = {} # "host:port" -> Socket @abilities = {} # "funcname" -> Ability @after_abilities = {} # "funcname" -> Ability @bad_servers = [] # "host:port" @servers_mutex = Mutex.new %w{client_id reconnect_sec network_timeout_sec}.map {|s| s.to_sym }.each do |k| instance_variable_set "@#{k}", opts[k] opts.delete k end if opts.size > 0 raise InvalidArgsError, 'Invalid worker args: ' + opts.keys.sort.join(', ') end @reconnect_sec = 30 if not @reconnect_sec @network_timeout_sec = 5 if not @network_timeout_sec @worker_enabled = true @status = :preparing self.job_servers = job_servers if job_servers start_reconnect_thread end |
Instance Attribute Details
#bad_servers ⇒ Object
Returns the value of attribute bad_servers.
130 131 132 |
# File 'lib/gearman/worker.rb', line 130 def bad_servers @bad_servers end |
#client_id ⇒ Object
Returns the value of attribute client_id.
130 131 132 |
# File 'lib/gearman/worker.rb', line 130 def client_id @client_id end |
#network_timeout_sec ⇒ Object
Returns the value of attribute network_timeout_sec.
130 131 132 |
# File 'lib/gearman/worker.rb', line 130 def network_timeout_sec @network_timeout_sec end |
#reconnect_sec ⇒ Object
Returns the value of attribute reconnect_sec.
130 131 132 |
# File 'lib/gearman/worker.rb', line 130 def reconnect_sec @reconnect_sec end |
#status ⇒ Object
Returns the value of attribute status.
130 131 132 |
# File 'lib/gearman/worker.rb', line 130 def status @status end |
#worker_enabled ⇒ Object
Returns the value of attribute worker_enabled.
130 131 132 |
# File 'lib/gearman/worker.rb', line 130 def worker_enabled @worker_enabled end |
Instance Method Details
#add_ability(func, timeout = nil, &f) ⇒ Object
Add a new ability, announcing it to job servers.
The passed-in block of code will be executed for jobs of this function type. It’ll receive two arguments, the data supplied by the client and a Job object. If it returns nil or false, the server will be informed that the job has failed; otherwise the return value of the block will be passed back to the client in String form.
246 247 248 249 |
# File 'lib/gearman/worker.rb', line 246 def add_ability(func, timeout=nil, &f) @abilities[func] = Ability.new(f, timeout) @sockets.values.each {|s| announce_ability(s, func, timeout) } end |
#after_ability(func, &f) ⇒ Object
Add an after-ability hook
The passed-in block of code will be executed after the work block for jobs with the same function name. It takes two arguments, the result of the work and the original job data. This way, if you need to hook into after the job_complete packet is sent to the server, you can do so.
N.B The after-ability hook ONLY runs if the ability was successful and no exceptions were raised.
265 266 267 |
# File 'lib/gearman/worker.rb', line 265 def after_ability(func, &f) @after_abilities[func] = Ability.new(f) end |
#handle_job_assign(data, sock, hostport) ⇒ Object
Handle a job_assign packet.
285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 |
# File 'lib/gearman/worker.rb', line 285 def handle_job_assign(data, sock, hostport) handle, func, data = data.split("\0", 3) if not func Util.logger.error "GearmanRuby: Ignoring job_assign with no function from #{hostport}" return false end Util.logger.error "GearmanRuby: Got job_assign with handle #{handle} and #{data.size} byte(s) " + "from #{hostport}" ability = @abilities[func] if not ability Util.logger.error "Ignoring job_assign for unsupported func #{func} " + "with handle #{handle} from #{hostport}" Util.send_request(sock, Util.pack_request(:work_fail, handle)) return false end exception = nil begin ret = ability.run(data, Job.new(sock, handle)) rescue Exception => e exception = e Util.logger.debug "GearmanRuby: Exception: #{e}\n#{e.backtrace.join("\n")}\n" end cmd = if ret && exception.nil? Util.logger.debug "GearmanRuby: Sending work_complete for #{handle} with #{ret.to_s.size} byte(s) " + "to #{hostport}" [ Util.pack_request(:work_complete, "#{handle}\0#{ret.to_s}") ] elsif exception.nil? Util.logger.debug "GearmanRuby: Sending work_fail for #{handle} to #{hostport}" [ Util.pack_request(:work_fail, handle) ] elsif exception Util.logger.debug "GearmanRuby: Sending work_exception for #{handle} to #{hostport}" [ Util.pack_request(:work_exception, "#{handle}\0#{exception.}") ] end cmd.each {|p| Util.send_request(sock, p) } # There are cases where we might want to run something after the worker # successfully completes the ability in question and sends its results if ret && exception.nil? after_ability = @after_abilities[func] if after_ability Util.logger.debug "Running after ability for #{func}..." begin after_ability.run(ret, data) rescue Exception => e Util.logger.debug "GearmanRuby: Exception: #{e}\n#{e.backtrace.join("\n")}\n" nil end end end true end |
#job_servers ⇒ Object
147 148 149 150 151 152 153 |
# File 'lib/gearman/worker.rb', line 147 def job_servers servers = nil @servers_mutex.synchronize do servers = @sockets.keys + @bad_servers end servers end |
#job_servers=(servers) ⇒ Object
Connect to job servers to be used by this worker.
159 160 161 162 163 |
# File 'lib/gearman/worker.rb', line 159 def job_servers=(servers) @servers_mutex.synchronize do update_job_servers(servers) end end |
#remove_ability(func) ⇒ Object
Let job servers know that we’re no longer able to do something.
273 274 275 276 277 |
# File 'lib/gearman/worker.rb', line 273 def remove_ability(func) @abilities.delete(func) req = Util.pack_request(:cant_do, func) @sockets.values.each {|s| Util.send_request(s, req) } end |
#start_reconnect_thread ⇒ Object
Start a thread to repeatedly attempt to connect to down job servers.
133 134 135 136 137 138 139 140 141 142 143 144 145 |
# File 'lib/gearman/worker.rb', line 133 def start_reconnect_thread Thread.new do loop do @servers_mutex.synchronize do # If there are any failed servers, try to reconnect to them. if not @bad_servers.empty? update_job_servers(@sockets.keys + @bad_servers) end end sleep @reconnect_sec end end.run end |
#work ⇒ Object
Do a single job and return.
345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 |
# File 'lib/gearman/worker.rb', line 345 def work req = Util.pack_request(:grab_job) loop do @status = :preparing bad_servers = [] # We iterate through the servers in sorted order to make testing # easier. servers = nil @servers_mutex.synchronize { servers = @sockets.keys.sort } servers.each do |hostport| Util.logger.debug "GearmanRuby: Sending grab_job to #{hostport}" sock = @sockets[hostport] Util.send_request(sock, req) # Now that we've sent grab_job, we need to keep reading packets # until we see a no_job or job_assign response (there may be a noop # waiting for us in response to a previous pre_sleep). loop do begin type, data = Util.read_response(sock, @network_timeout_sec) case type when :no_job Util.logger.debug "GearmanRuby: Got no_job from #{hostport}" break when :job_assign @status = :working return worker_enabled if handle_job_assign(data, sock, hostport) break else Util.logger.debug "GearmanRuby: Got #{type.to_s} from #{hostport}" end rescue Exception Util.logger.debug "GearmanRuby: Server #{hostport} timed out or lost connection (#{$!.inspect}); marking bad" bad_servers << hostport break end end end @servers_mutex.synchronize do bad_servers.each do |hostport| @sockets[hostport].close if @sockets[hostport] @bad_servers << hostport if @sockets[hostport] @sockets.delete(hostport) end end Util.logger.debug "GearmanRuby: Sending pre_sleep and going to sleep for #{@reconnect_sec} sec" @servers_mutex.synchronize do @sockets.values.each do |sock| Util.send_request(sock, Util.pack_request(:pre_sleep)) end end return false unless worker_enabled @status = :waiting # FIXME: We could optimize things the next time through the 'each' by # sending the first grab_job to one of the servers that had a socket # with data in it. Not bothering with it for now. IO::select(@sockets.values, nil, nil, @reconnect_sec) end end |