Class: Gearman::Worker
- Inherits:
-
Object
- Object
- Gearman::Worker
- Includes:
- Callbacks
- Defined in:
- lib/gearman/worker.rb
Overview
Worker
Description
A worker that can connect to a Gearman server and perform tasks.
Usage
require 'gearman'
w = Gearman::Worker.new('127.0.0.1')
# Add a handler for a "sleep" function that takes a single argument, the
# number of seconds to sleep before reporting success.
w.add_ability('sleep') do |data,job|
seconds = data
(1..seconds.to_i).each do |i|
sleep 1
# Report our progress to the job server every second.
job.report_status(i, seconds)
end
# Report success.
true
end
loop { w.work }
Defined Under Namespace
Modules: Callbacks Classes: Ability, Job
Instance Attribute Summary collapse
-
#bad_servers ⇒ Object
Returns the value of attribute bad_servers.
-
#client_id ⇒ Object
Returns the value of attribute client_id.
-
#network_timeout_sec ⇒ Object
Returns the value of attribute network_timeout_sec.
-
#reconnect_sec ⇒ Object
Returns the value of attribute reconnect_sec.
-
#status ⇒ Object
Returns the value of attribute status.
-
#worker_enabled ⇒ Object
Returns the value of attribute worker_enabled.
Instance Method Summary collapse
-
#add_ability(func, timeout = nil, &f) ⇒ Object
Add a new ability, announcing it to job servers.
-
#after_ability(func, &f) ⇒ Object
Add an after-ability hook.
-
#handle_job_assign(data, sock, hostport) ⇒ Object
Handle a job_assign packet.
-
#initialize(job_servers = nil, opts = {}) ⇒ Worker
constructor
Create a new worker.
- #job_servers ⇒ Object
-
#job_servers=(servers) ⇒ Object
Connect to job servers to be used by this worker.
-
#remove_ability(func) ⇒ Object
Let job servers know that we’re no longer able to do something.
-
#start_reconnect_thread ⇒ Object
Start a thread to repeatedly attempt to connect to down job servers.
-
#work ⇒ Object
Do a single job and return.
Constructor Details
#initialize(job_servers = nil, opts = {}) ⇒ Worker
Create a new worker.
143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 |
# File 'lib/gearman/worker.rb', line 143 def initialize(job_servers=nil, opts={}) chars = ('a'..'z').to_a @client_id = Array.new(30) { chars[rand(chars.size)] }.join @sockets = {} # "host:port" -> Socket @abilities = {} # "funcname" -> Ability @after_abilities = {} # "funcname" -> Ability @bad_servers = [] # "host:port" @servers_mutex = Mutex.new %w{client_id reconnect_sec network_timeout_sec}.map {|s| s.to_sym }.each do |k| instance_variable_set "@#{k}", opts[k] opts.delete k end if opts.size > 0 raise InvalidArgsError, 'Invalid worker args: ' + opts.keys.sort.join(', ') end @reconnect_sec = 30 if not @reconnect_sec @network_timeout_sec = 5 if not @network_timeout_sec @worker_enabled = true @status = :preparing self.job_servers = job_servers if job_servers start_reconnect_thread end |
Instance Attribute Details
#bad_servers ⇒ Object
Returns the value of attribute bad_servers.
167 168 169 |
# File 'lib/gearman/worker.rb', line 167 def bad_servers @bad_servers end |
#client_id ⇒ Object
Returns the value of attribute client_id.
167 168 169 |
# File 'lib/gearman/worker.rb', line 167 def client_id @client_id end |
#network_timeout_sec ⇒ Object
Returns the value of attribute network_timeout_sec.
167 168 169 |
# File 'lib/gearman/worker.rb', line 167 def network_timeout_sec @network_timeout_sec end |
#reconnect_sec ⇒ Object
Returns the value of attribute reconnect_sec.
167 168 169 |
# File 'lib/gearman/worker.rb', line 167 def reconnect_sec @reconnect_sec end |
#status ⇒ Object
Returns the value of attribute status.
167 168 169 |
# File 'lib/gearman/worker.rb', line 167 def status @status end |
#worker_enabled ⇒ Object
Returns the value of attribute worker_enabled.
167 168 169 |
# File 'lib/gearman/worker.rb', line 167 def worker_enabled @worker_enabled end |
Instance Method Details
#add_ability(func, timeout = nil, &f) ⇒ Object
Add a new ability, announcing it to job servers.
The passed-in block of code will be executed for jobs of this function type. It’ll receive two arguments, the data supplied by the client and a Job object. If it returns nil or false, the server will be informed that the job has failed; otherwise the return value of the block will be passed back to the client in String form.
284 285 286 287 |
# File 'lib/gearman/worker.rb', line 284 def add_ability(func, timeout=nil, &f) @abilities[func] = Ability.new(f, timeout) @sockets.values.each {|s| announce_ability(s, func, timeout) } end |
#after_ability(func, &f) ⇒ Object
Add an after-ability hook
The passed-in block of code will be executed after the work block for jobs with the same function name. It takes two arguments, the result of the work and the original job data. This way, if you need to hook into after the job_complete packet is sent to the server, you can do so.
N.B The after-ability hook ONLY runs if the ability was successful and no exceptions were raised.
303 304 305 |
# File 'lib/gearman/worker.rb', line 303 def after_ability(func, &f) @after_abilities[func] = Ability.new(f) end |
#handle_job_assign(data, sock, hostport) ⇒ Object
Handle a job_assign packet.
323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 |
# File 'lib/gearman/worker.rb', line 323 def handle_job_assign(data, sock, hostport) handle, func, data = data.split("\0", 3) if not func Util.logger.error "GearmanRuby: Ignoring job_assign with no function from #{hostport}" return false end Util.logger.error "GearmanRuby: Got job_assign with handle #{handle} and #{data.size} byte(s) " + "from #{hostport}" ability = @abilities[func] if not ability Util.logger.error "Ignoring job_assign for unsupported func #{func} " + "with handle #{handle} from #{hostport}" Util.send_request(sock, Util.pack_request(:work_fail, handle)) return false end exception = nil begin ret = ability.run(data, Job.new(sock, handle)) rescue Exception => e exception = e Util.logger.debug "GearmanRuby: Exception: #{e}\n#{e.backtrace.join("\n")}\n" end cmd = if ret && exception.nil? Util.logger.debug "GearmanRuby: Sending work_complete for #{handle} with #{ret.to_s.size} byte(s) " + "to #{hostport}" run_work_complete_callback [ Util.pack_request(:work_complete, "#{handle}\0#{ret.to_s}") ] elsif exception.nil? Util.logger.debug "GearmanRuby: Sending work_fail for #{handle} to #{hostport}" run_work_fail_callback [ Util.pack_request(:work_fail, handle) ] elsif exception Util.logger.debug "GearmanRuby: Sending work_exception for #{handle} to #{hostport}" run_work_exception_callback [ Util.pack_request(:work_exception, "#{handle}\0#{exception.}") ] end cmd.each {|p| Util.send_request(sock, p) } # There are cases where we might want to run something after the worker # successfully completes the ability in question and sends its results if ret && exception.nil? after_ability = @after_abilities[func] if after_ability Util.logger.debug "Running after ability for #{func}..." begin after_ability.run(ret, data) rescue Exception => e Util.logger.debug "GearmanRuby: Exception: #{e}\n#{e.backtrace.join("\n")}\n" nil end end end true end |
#job_servers ⇒ Object
184 185 186 187 188 189 190 |
# File 'lib/gearman/worker.rb', line 184 def job_servers servers = nil @servers_mutex.synchronize do servers = @sockets.keys + @bad_servers end servers end |
#job_servers=(servers) ⇒ Object
Connect to job servers to be used by this worker.
196 197 198 199 200 |
# File 'lib/gearman/worker.rb', line 196 def job_servers=(servers) @servers_mutex.synchronize do update_job_servers(servers) end end |
#remove_ability(func) ⇒ Object
Let job servers know that we’re no longer able to do something.
311 312 313 314 315 |
# File 'lib/gearman/worker.rb', line 311 def remove_ability(func) @abilities.delete(func) req = Util.pack_request(:cant_do, func) @sockets.values.each {|s| Util.send_request(s, req) } end |
#start_reconnect_thread ⇒ Object
Start a thread to repeatedly attempt to connect to down job servers.
170 171 172 173 174 175 176 177 178 179 180 181 182 |
# File 'lib/gearman/worker.rb', line 170 def start_reconnect_thread Thread.new do loop do @servers_mutex.synchronize do # If there are any failed servers, try to reconnect to them. if not @bad_servers.empty? update_job_servers(@sockets.keys + @bad_servers) end end sleep @reconnect_sec end end.run end |
#work ⇒ Object
Do a single job and return.
386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 |
# File 'lib/gearman/worker.rb', line 386 def work req = Util.pack_request(:grab_job) type = nil data = nil loop do @status = :preparing bad_servers = [] # We iterate through the servers in sorted order to make testing # easier. servers = nil @servers_mutex.synchronize { servers = @sockets.keys.sort } servers.each do |hostport| Util.logger.debug "GearmanRuby: Sending grab_job to #{hostport}" run_grab_job_callback sock = @sockets[hostport] Util.send_request(sock, req) # Now that we've sent grab_job, we need to keep reading packets # until we see a no_job or job_assign response (there may be a noop # waiting for us in response to a previous pre_sleep). loop do begin type, data = Util.read_response(sock, @network_timeout_sec) case type when :no_job Util.logger.debug "GearmanRuby: Got no_job from #{hostport}" run_no_job_callback break when :job_assign @status = :working run_job_assign_callback return worker_enabled if handle_job_assign(data, sock, hostport) break else Util.logger.debug "GearmanRuby: Got #{type.to_s} from #{hostport}" end rescue Exception Util.logger.info "GearmanRuby: Server #{hostport} timed out or lost connection (#{$!.inspect}); marking bad" bad_servers << hostport break end end end @servers_mutex.synchronize do bad_servers.each do |hostport| @sockets[hostport].close if @sockets[hostport] @bad_servers << hostport if @sockets[hostport] @sockets.delete(hostport) end end Util.logger.debug "GearmanRuby: Sending pre_sleep and going to sleep for #{@reconnect_sec} sec" @servers_mutex.synchronize do @sockets.values.each do |sock| Util.send_request(sock, Util.pack_request(:pre_sleep)) end end return false unless worker_enabled @status = :waiting sleepTime = Time.now while(@status == :waiting) # FIXME: We could optimize things the next time through the 'each' by # sending the first grab_job to one of the servers that had a socket # with data in it. Not bothering with it for now. IO::select(@sockets.values, nil, nil, @reconnect_sec) # If 30 seconds have passed, then wakeup @status = :wakeup if Time.now - sleepTime > 30 if(@status == :waiting) @sockets.values.each do |sock| type, data = Util.read_response(sock, @network_timeout_sec) # there shouldn't be anything else here, if there is, we should be able to ignore it... if(type == :noop) Util.logger.debug "Received NoOp while sleeping... waking up!" @status = :wakeup end end end end end end |