Class: Gearman::Worker

Inherits:
Object
  • Object
show all
Defined in:
lib/gearman/worker.rb

Overview

Worker

Description

A worker that can connect to a Gearman server and perform tasks.

Usage

require 'gearman'

w = Gearman::Worker.new('127.0.0.1')

# Add a handler for a "sleep" function that takes a single argument, the
# number of seconds to sleep before reporting success.
w.add_ability('sleep') do |data,job|
  seconds = data
  (1..seconds.to_i).each do |i|
    sleep 1
    # Report our progress to the job server every second.
    job.report_status(i, seconds)
  end
  # Report success.
  true
end
loop { w.work }

Defined Under Namespace

Classes: Ability, Job

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(job_servers = nil, opts = {}) ⇒ Worker

Create a new worker.

Parameters:

  • job_servers;eitherasingleserveroranarray ("host:port")

    ob_servers “host:port”; either a single server or an array

  • opts (defaults to: {})

    hash of additional options



106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
# File 'lib/gearman/worker.rb', line 106

def initialize(job_servers=nil, opts={})
  chars = ('a'..'z').to_a
  @client_id = Array.new(30) { chars[rand(chars.size)] }.join
  @sockets = {}  # "host:port" -> Socket
  @abilities = {}  # "funcname" -> Ability
  @after_abilities = {} # "funcname" -> Ability
  @bad_servers = []  # "host:port"
  @servers_mutex = Mutex.new
  %w{client_id reconnect_sec
     network_timeout_sec}.map {|s| s.to_sym }.each do |k|
    instance_variable_set "@#{k}", opts[k]
    opts.delete k
  end
  if opts.size > 0
    raise InvalidArgsError,
      'Invalid worker args: ' + opts.keys.sort.join(', ')
  end
  @reconnect_sec = 30 if not @reconnect_sec
  @network_timeout_sec = 5 if not @network_timeout_sec
  @worker_enabled = true
  @status = :preparing
  self.job_servers = job_servers if job_servers
  start_reconnect_thread
end

Instance Attribute Details

#bad_serversObject

Returns the value of attribute bad_servers.



130
131
132
# File 'lib/gearman/worker.rb', line 130

def bad_servers
  @bad_servers
end

#client_idObject

Returns the value of attribute client_id.



130
131
132
# File 'lib/gearman/worker.rb', line 130

def client_id
  @client_id
end

#network_timeout_secObject

Returns the value of attribute network_timeout_sec.



130
131
132
# File 'lib/gearman/worker.rb', line 130

def network_timeout_sec
  @network_timeout_sec
end

#reconnect_secObject

Returns the value of attribute reconnect_sec.



130
131
132
# File 'lib/gearman/worker.rb', line 130

def reconnect_sec
  @reconnect_sec
end

#statusObject

Returns the value of attribute status.



130
131
132
# File 'lib/gearman/worker.rb', line 130

def status
  @status
end

#worker_enabledObject

Returns the value of attribute worker_enabled.



130
131
132
# File 'lib/gearman/worker.rb', line 130

def worker_enabled
  @worker_enabled
end

Instance Method Details

#add_ability(func, timeout = nil, &f) ⇒ Object

Add a new ability, announcing it to job servers.

The passed-in block of code will be executed for jobs of this function type. It’ll receive two arguments, the data supplied by the client and a Job object. If it returns nil or false, the server will be informed that the job has failed; otherwise the return value of the block will be passed back to the client in String form.

Parameters:

  • func

    function name (without prefix)

  • timeout (defaults to: nil)

    the server will give up on us if we don’t finish a task in this many seconds



246
247
248
249
# File 'lib/gearman/worker.rb', line 246

def add_ability(func, timeout=nil, &f)
  @abilities[func] = Ability.new(f, timeout)
  @sockets.values.each {|s| announce_ability(s, func, timeout) }
end

#after_ability(func, &f) ⇒ Object

Add an after-ability hook

The passed-in block of code will be executed after the work block for jobs with the same function name. It takes two arguments, the result of the work and the original job data. This way, if you need to hook into after the job_complete packet is sent to the server, you can do so.

N.B The after-ability hook ONLY runs if the ability was successful and no exceptions were raised.

Parameters:

  • func

    function name (without prefix)



265
266
267
# File 'lib/gearman/worker.rb', line 265

def after_ability(func, &f)
  @after_abilities[func] = Ability.new(f)
end

#handle_job_assign(data, sock, hostport) ⇒ Object

Handle a job_assign packet.

Parameters:

  • data

    data in the packet

  • sock

    Socket on which the packet arrived

  • hostport ("host:port")

    ostport “host:port”



285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
# File 'lib/gearman/worker.rb', line 285

def handle_job_assign(data, sock, hostport)
  handle, func, data = data.split("\0", 3)
  if not func
    Util.logger.error "GearmanRuby: Ignoring job_assign with no function from #{hostport}"
    return false
  end

  Util.logger.error "GearmanRuby: Got job_assign with handle #{handle} and #{data.size} byte(s) " +
    "from #{hostport}"

  ability = @abilities[func]
  if not ability
    Util.logger.error "Ignoring job_assign for unsupported func #{func} " +
      "with handle #{handle} from #{hostport}"
    Util.send_request(sock, Util.pack_request(:work_fail, handle))
    return false
  end

  exception = nil
  begin
    ret = ability.run(data, Job.new(sock, handle))
  rescue Exception => e
    exception = e
    Util.logger.debug "GearmanRuby: Exception: #{e}\n#{e.backtrace.join("\n")}\n"
  end

  cmd = if ret && exception.nil?
    Util.logger.debug "GearmanRuby: Sending work_complete for #{handle} with #{ret.to_s.size} byte(s) " +
      "to #{hostport}"
    [ Util.pack_request(:work_complete, "#{handle}\0#{ret.to_s}") ]
  elsif exception.nil?
    Util.logger.debug "GearmanRuby: Sending work_fail for #{handle} to #{hostport}"
    [ Util.pack_request(:work_fail, handle) ]
  elsif exception
    Util.logger.debug "GearmanRuby: Sending work_exception for #{handle} to #{hostport}"
    [ Util.pack_request(:work_exception, "#{handle}\0#{exception.message}") ]
  end

  cmd.each {|p| Util.send_request(sock, p) }
  
  # There are cases where we might want to run something after the worker
  # successfully completes the ability in question and sends its results
  if ret && exception.nil?
    after_ability = @after_abilities[func]
    if after_ability
      Util.logger.debug "Running after ability for #{func}..."
      begin
        after_ability.run(ret, data)
      rescue Exception => e
        Util.logger.debug "GearmanRuby: Exception: #{e}\n#{e.backtrace.join("\n")}\n"
        nil
      end
    end
  end 
  
  true
end

#job_serversObject



147
148
149
150
151
152
153
# File 'lib/gearman/worker.rb', line 147

def job_servers
  servers = nil
  @servers_mutex.synchronize do
    servers = @sockets.keys + @bad_servers
  end
  servers
end

#job_servers=(servers) ⇒ Object

Connect to job servers to be used by this worker.

Parameters:

  • servers;eitherasingleserveroranarray ("host:port")

    ervers “host:port”; either a single server or an array



159
160
161
162
163
# File 'lib/gearman/worker.rb', line 159

def job_servers=(servers)
  @servers_mutex.synchronize do
    update_job_servers(servers)
  end
end

#remove_ability(func) ⇒ Object

Let job servers know that we’re no longer able to do something.

Parameters:

  • func

    function name



273
274
275
276
277
# File 'lib/gearman/worker.rb', line 273

def remove_ability(func)
  @abilities.delete(func)
  req = Util.pack_request(:cant_do, func)
  @sockets.values.each {|s| Util.send_request(s, req) }
end

#start_reconnect_threadObject

Start a thread to repeatedly attempt to connect to down job servers.



133
134
135
136
137
138
139
140
141
142
143
144
145
# File 'lib/gearman/worker.rb', line 133

def start_reconnect_thread
  Thread.new do
    loop do
      @servers_mutex.synchronize do
        # If there are any failed servers, try to reconnect to them.
        if not @bad_servers.empty?
          update_job_servers(@sockets.keys + @bad_servers)
        end
      end
      sleep @reconnect_sec
    end
  end.run
end

#workObject

Do a single job and return.



345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
# File 'lib/gearman/worker.rb', line 345

def work
  req = Util.pack_request(:grab_job)
  loop do
    @status = :preparing
    bad_servers = []
    # We iterate through the servers in sorted order to make testing
    # easier.
    servers = nil
    @servers_mutex.synchronize { servers = @sockets.keys.sort }
    servers.each do |hostport|
      Util.logger.debug "GearmanRuby: Sending grab_job to #{hostport}"
      sock = @sockets[hostport]
      Util.send_request(sock, req)

      # Now that we've sent grab_job, we need to keep reading packets
      # until we see a no_job or job_assign response (there may be a noop
      # waiting for us in response to a previous pre_sleep).
      loop do
        begin
          type, data = Util.read_response(sock, @network_timeout_sec)
          case type
          when :no_job
            Util.logger.debug "GearmanRuby: Got no_job from #{hostport}"
            break
          when :job_assign
            @status = :working
            return worker_enabled if handle_job_assign(data, sock, hostport)
            break
          else
            Util.logger.debug "GearmanRuby: Got #{type.to_s} from #{hostport}"
          end
        rescue Exception
          Util.logger.debug "GearmanRuby: Server #{hostport} timed out or lost connection (#{$!.inspect}); marking bad"
          bad_servers << hostport
          break
        end
      end
    end

    @servers_mutex.synchronize do
      bad_servers.each do |hostport|
        @sockets[hostport].close if @sockets[hostport]
        @bad_servers << hostport if @sockets[hostport]
        @sockets.delete(hostport)
      end
    end

    Util.logger.debug "GearmanRuby: Sending pre_sleep and going to sleep for #{@reconnect_sec} sec"
    @servers_mutex.synchronize do
      @sockets.values.each do |sock|
        Util.send_request(sock, Util.pack_request(:pre_sleep))
      end
    end

    return false unless worker_enabled
    @status = :waiting

    # FIXME: We could optimize things the next time through the 'each' by
    # sending the first grab_job to one of the servers that had a socket
    # with data in it.  Not bothering with it for now.
    IO::select(@sockets.values, nil, nil, @reconnect_sec)
  end
end