Module: Gearman::Evented::WorkerReactor

Includes:
Reactor
Defined in:
lib/gearman/evented/worker.rb

Instance Method Summary collapse

Methods included from Reactor

connect, #connected?, #disconnect, included, #log, #reconnect, #send, #server, #to_s, #unbind

Instance Method Details

#announce_ability(name, timeout) ⇒ Object



19
20
21
22
23
24
# File 'lib/gearman/evented/worker.rb', line 19

def announce_ability(name, timeout)
  cmd = timeout ? :can_do_timeout : :can_do
  arg = timeout ? "#{name}\0#{timeout.to_s}" : name
  log "announce_ability #{name} #{timeout}"
  send cmd, arg
end

#announce_disability(name) ⇒ Object



26
27
28
# File 'lib/gearman/evented/worker.rb', line 26

def announce_disability(name)
  send :cant_do, name
end

#client_idObject



111
112
113
# File 'lib/gearman/evented/worker.rb', line 111

def client_id
  @client_id ||= `uuidgen`.strip
end

#connection_completedObject



7
8
9
10
11
12
13
14
15
16
17
# File 'lib/gearman/evented/worker.rb', line 7

def connection_completed
  send :set_client_id, client_id
  super

  @abilities ||= @opts.delete(:abilities) || []
  @abilities.each do |ability, args|
    announce_ability(ability, args[:timeout])
  end

  grab_job
end

#dispatch_packet(type, handle, *data) ⇒ Object



53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
# File 'lib/gearman/evented/worker.rb', line 53

def dispatch_packet(type, handle, *data)
  success = true
  timer   = 0
  case type
  when :no_job
    send :pre_sleep
    timer = @opts[:reconnect_sec] || 30
  when :job_assign, :job_assign_uniq
    log "job assign #{handle}, #{data.inspect}"
    handle_job_assign(handle, data[0], data[1])
  when :noop
    log "NOOP"
  when :error
    log "[ERROR]: error from server #{server}: #{data}"
  else
    log "Got unknown #{type}, #{data} from #{server}"
  end

  EM.add_timer(timer) { grab_job }
  succeed [handle, data]
end

#grab_jobObject



30
31
32
33
# File 'lib/gearman/evented/worker.rb', line 30

def grab_job
  log "Grab Job"
  send :grab_job_uniq
end

#handle_job_assign(handle, func, args = '') ⇒ Object



75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
# File 'lib/gearman/evented/worker.rb', line 75

def handle_job_assign(handle, func, args = '')
  return unless handle
  unless func
    log "ERROR: Ignoring job_assign with no function"
    return
  end

  log "Got job_assign '#{func}' with handle #{handle} and #{args.size rescue 0} byte(s)"

  unless @abilities.has_key?(func)
    log "Ignoring job_assign for unsupported func #{func} with handle #{handle}"
    work_fail handle
    return
  end

  exception = nil
  begin
    ret = @abilities[func][:callback].call(args, Gearman::Job.new(self, handle))
  rescue Exception => e
    exception = e
  end

  if ret && exception.nil?
    ret = ret.to_s
    log "Sending work_complete for #{handle} with #{ret.size} byte(s)"
    work_complete handle, ret
  elsif exception.nil?
    log "Sending work_fail for #{handle} to #{server}"
    work_fail handle
  elsif exception
    log "exception #{exception.message}, sending work_warning, work_fail for #{handle}"
    work_warning handle, exception.message
    work_fail handle
  end
end

#receive_data(data) ⇒ Object



47
48
49
50
51
# File 'lib/gearman/evented/worker.rb', line 47

def receive_data(data)
  Gearman::Protocol.decode_response(data).each do |type, handle, *data|
    dispatch_packet(type, handle, *data)
  end
end

#work_complete(handle, data) ⇒ Object



39
40
41
# File 'lib/gearman/evented/worker.rb', line 39

def work_complete(handle, data)
  send :work_complete, "#{handle}\0#{data}"
end

#work_fail(handle) ⇒ Object



35
36
37
# File 'lib/gearman/evented/worker.rb', line 35

def work_fail(handle)
  send :work_fail, handle
end

#work_warning(handle, message) ⇒ Object



43
44
45
# File 'lib/gearman/evented/worker.rb', line 43

def work_warning(handle, message)
  send :work_warning, "#{handle}\0#{message}"
end