Class: Gearman::Worker

Inherits:
Object
  • Object
show all
Includes:
Callbacks
Defined in:
lib/gearman/worker.rb

Overview

Worker

Description

A worker that can connect to a Gearman server and perform tasks.

Usage

require 'gearman'

w = Gearman::Worker.new('127.0.0.1')

# Add a handler for a "sleep" function that takes a single argument, the
# number of seconds to sleep before reporting success.
w.add_ability('sleep') do |data,job|
  seconds = data
  (1..seconds.to_i).each do |i|
    sleep 1
    # Report our progress to the job server every second.
    job.report_status(i, seconds)
  end
  # Report success.
  true
end
loop { w.work }

Defined Under Namespace

Modules: Callbacks Classes: Ability, Job

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(job_servers = nil, opts = {}) ⇒ Worker

Create a new worker.

Parameters:

  • job_servers;eitherasingleserveroranarray ("host:port")

    ob_servers “host:port”; either a single server or an array

  • opts (defaults to: {})

    hash of additional options



143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
# File 'lib/gearman/worker.rb', line 143

def initialize(job_servers=nil, opts={})
  chars = ('a'..'z').to_a
  @client_id = Array.new(30) { chars[rand(chars.size)] }.join
  @sockets = {}  # "host:port" -> Socket
  @abilities = {}  # "funcname" -> Ability
  @after_abilities = {} # "funcname" -> Ability
  @bad_servers = []  # "host:port"
  @servers_mutex = Mutex.new
  %w{client_id reconnect_sec
     network_timeout_sec}.map {|s| s.to_sym }.each do |k|
    instance_variable_set "@#{k}", opts[k]
    opts.delete k
  end
  if opts.size > 0
    raise InvalidArgsError,
      'Invalid worker args: ' + opts.keys.sort.join(', ')
  end
  @reconnect_sec = 30 if not @reconnect_sec
  @network_timeout_sec = 5 if not @network_timeout_sec
  @worker_enabled = true
  @status = :preparing
  self.job_servers = job_servers if job_servers
  start_reconnect_thread
end

Instance Attribute Details

#bad_serversObject

Returns the value of attribute bad_servers.



167
168
169
# File 'lib/gearman/worker.rb', line 167

def bad_servers
  @bad_servers
end

#client_idObject

Returns the value of attribute client_id.



167
168
169
# File 'lib/gearman/worker.rb', line 167

def client_id
  @client_id
end

#network_timeout_secObject

Returns the value of attribute network_timeout_sec.



167
168
169
# File 'lib/gearman/worker.rb', line 167

def network_timeout_sec
  @network_timeout_sec
end

#reconnect_secObject

Returns the value of attribute reconnect_sec.



167
168
169
# File 'lib/gearman/worker.rb', line 167

def reconnect_sec
  @reconnect_sec
end

#statusObject

Returns the value of attribute status.



167
168
169
# File 'lib/gearman/worker.rb', line 167

def status
  @status
end

#worker_enabledObject

Returns the value of attribute worker_enabled.



167
168
169
# File 'lib/gearman/worker.rb', line 167

def worker_enabled
  @worker_enabled
end

Instance Method Details

#add_ability(func, timeout = nil, &f) ⇒ Object

Add a new ability, announcing it to job servers.

The passed-in block of code will be executed for jobs of this function type. It’ll receive two arguments, the data supplied by the client and a Job object. If it returns nil or false, the server will be informed that the job has failed; otherwise the return value of the block will be passed back to the client in String form.

Parameters:

  • func

    function name (without prefix)

  • timeout (defaults to: nil)

    the server will give up on us if we don’t finish a task in this many seconds



284
285
286
287
# File 'lib/gearman/worker.rb', line 284

def add_ability(func, timeout=nil, &f)
  @abilities[func] = Ability.new(f, timeout)
  @sockets.values.each {|s| announce_ability(s, func, timeout) }
end

#after_ability(func, &f) ⇒ Object

Add an after-ability hook

The passed-in block of code will be executed after the work block for jobs with the same function name. It takes two arguments, the result of the work and the original job data. This way, if you need to hook into after the job_complete packet is sent to the server, you can do so.

N.B The after-ability hook ONLY runs if the ability was successful and no exceptions were raised.

Parameters:

  • func

    function name (without prefix)



303
304
305
# File 'lib/gearman/worker.rb', line 303

def after_ability(func, &f)
  @after_abilities[func] = Ability.new(f)
end

#handle_job_assign(data, sock, hostport) ⇒ Object

Handle a job_assign packet.

Parameters:

  • data

    data in the packet

  • sock

    Socket on which the packet arrived

  • hostport ("host:port")

    ostport “host:port”



323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
# File 'lib/gearman/worker.rb', line 323

def handle_job_assign(data, sock, hostport)
  handle, func, data = data.split("\0", 3)
  if not func
    Util.logger.error "GearmanRuby: Ignoring job_assign with no function from #{hostport}"
    return false
  end

  Util.logger.error "GearmanRuby: Got job_assign with handle #{handle} and #{data.size} byte(s) " +
    "from #{hostport}"

  ability = @abilities[func]
  if not ability
    Util.logger.error "Ignoring job_assign for unsupported func #{func} " +
      "with handle #{handle} from #{hostport}"
    Util.send_request(sock, Util.pack_request(:work_fail, handle))
    return false
  end

  exception = nil
  begin
    ret = ability.run(data, Job.new(sock, handle))
  rescue Exception => e
    exception = e
    Util.logger.debug "GearmanRuby: Exception: #{e}\n#{e.backtrace.join("\n")}\n"
  end

  cmd = if ret && exception.nil?
    Util.logger.debug "GearmanRuby: Sending work_complete for #{handle} with #{ret.to_s.size} byte(s) " +
      "to #{hostport}"
    run_work_complete_callback
    [ Util.pack_request(:work_complete, "#{handle}\0#{ret.to_s}") ]
  elsif exception.nil?
    Util.logger.debug "GearmanRuby: Sending work_fail for #{handle} to #{hostport}"
    run_work_fail_callback
    [ Util.pack_request(:work_fail, handle) ]
  elsif exception
    Util.logger.debug "GearmanRuby: Sending work_exception for #{handle} to #{hostport}"
    run_work_exception_callback
    [ Util.pack_request(:work_exception, "#{handle}\0#{exception.message}") ]
  end

  cmd.each {|p| Util.send_request(sock, p) }
  
  # There are cases where we might want to run something after the worker
  # successfully completes the ability in question and sends its results
  if ret && exception.nil?
    after_ability = @after_abilities[func]
    if after_ability
      Util.logger.debug "Running after ability for #{func}..."
      begin
        after_ability.run(ret, data)
      rescue Exception => e
        Util.logger.debug "GearmanRuby: Exception: #{e}\n#{e.backtrace.join("\n")}\n"
        nil
      end
    end
  end 
  
  true
end

#job_serversObject



184
185
186
187
188
189
190
# File 'lib/gearman/worker.rb', line 184

def job_servers
  servers = nil
  @servers_mutex.synchronize do
    servers = @sockets.keys + @bad_servers
  end
  servers
end

#job_servers=(servers) ⇒ Object

Connect to job servers to be used by this worker.

Parameters:

  • servers;eitherasingleserveroranarray ("host:port")

    ervers “host:port”; either a single server or an array



196
197
198
199
200
# File 'lib/gearman/worker.rb', line 196

def job_servers=(servers)
  @servers_mutex.synchronize do
    update_job_servers(servers)
  end
end

#remove_ability(func) ⇒ Object

Let job servers know that we’re no longer able to do something.

Parameters:

  • func

    function name



311
312
313
314
315
# File 'lib/gearman/worker.rb', line 311

def remove_ability(func)
  @abilities.delete(func)
  req = Util.pack_request(:cant_do, func)
  @sockets.values.each {|s| Util.send_request(s, req) }
end

#start_reconnect_threadObject

Start a thread to repeatedly attempt to connect to down job servers.



170
171
172
173
174
175
176
177
178
179
180
181
182
# File 'lib/gearman/worker.rb', line 170

def start_reconnect_thread
  Thread.new do
    loop do
      @servers_mutex.synchronize do
        # If there are any failed servers, try to reconnect to them.
        if not @bad_servers.empty?
          update_job_servers(@sockets.keys + @bad_servers)
        end
      end
      sleep @reconnect_sec
    end
  end.run
end

#workObject

Do a single job and return.



386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
# File 'lib/gearman/worker.rb', line 386

def work
  req = Util.pack_request(:grab_job)
  type = nil
  data = nil
  loop do
    @status = :preparing
    bad_servers = []
    # We iterate through the servers in sorted order to make testing
    # easier.
    servers = nil
    @servers_mutex.synchronize { servers = @sockets.keys.sort }
    servers.each do |hostport|
      Util.logger.debug "GearmanRuby: Sending grab_job to #{hostport}"
      run_grab_job_callback
      sock = @sockets[hostport]
      Util.send_request(sock, req)

      # Now that we've sent grab_job, we need to keep reading packets
      # until we see a no_job or job_assign response (there may be a noop
      # waiting for us in response to a previous pre_sleep).
      loop do
        begin
          type, data = Util.read_response(sock, @network_timeout_sec)
          case type
          when :no_job
            Util.logger.debug "GearmanRuby: Got no_job from #{hostport}"
            run_no_job_callback
            break
          when :job_assign
            @status = :working
            run_job_assign_callback
            return worker_enabled if handle_job_assign(data, sock, hostport)
            break
          else
            Util.logger.debug "GearmanRuby: Got #{type.to_s} from #{hostport}"
          end
        rescue Exception
          Util.logger.info "GearmanRuby: Server #{hostport} timed out or lost connection (#{$!.inspect}); marking bad"
          bad_servers << hostport
          break
        end
      end
    end

    @servers_mutex.synchronize do
      bad_servers.each do |hostport|
        @sockets[hostport].close if @sockets[hostport]
        @bad_servers << hostport if @sockets[hostport]
        @sockets.delete(hostport)
      end
    end

    Util.logger.debug "GearmanRuby: Sending pre_sleep and going to sleep for #{@reconnect_sec} sec"
    @servers_mutex.synchronize do
      @sockets.values.each do |sock|
        Util.send_request(sock, Util.pack_request(:pre_sleep))
      end
    end

    return false unless worker_enabled
    @status = :waiting

    sleepTime = Time.now
    while(@status == :waiting)
      # FIXME: We could optimize things the next time through the 'each' by
      # sending the first grab_job to one of the servers that had a socket
      # with data in it.  Not bothering with it for now.
      IO::select(@sockets.values, nil, nil, @reconnect_sec)
      
      # If 30 seconds have passed, then wakeup
      @status = :wakeup if Time.now - sleepTime > 30 

      if(@status == :waiting)
        @sockets.values.each do |sock|
          type, data = Util.read_response(sock, @network_timeout_sec)
 
          # there shouldn't be anything else here, if there is, we should be able to ignore it...
          if(type == :noop)
            Util.logger.debug "Received NoOp while sleeping... waking up!"
            @status = :wakeup
          end
        end
      end
    end
    
  end
end