Class: Gearman::Worker

Inherits:
Object
  • Object
show all
Includes:
Logging, Callbacks
Defined in:
lib/gearman/worker.rb,
lib/gearman/worker/job.rb,
lib/gearman/worker/ability.rb,
lib/gearman/worker/callbacks.rb

Defined Under Namespace

Modules: Callbacks Classes: Ability, Job

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Logging

included, #logger

Constructor Details

#initialize(job_servers = nil, opts = {}) ⇒ Worker

Create a new worker.

Parameters:

  • job_servers;eitherasingleserveroranarray ("host:port")

    ob_servers “host:port”; either a single server or an array

  • opts (defaults to: {})

    hash of additional options



20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# File 'lib/gearman/worker.rb', line 20

def initialize(job_servers=nil, opts={})
  @abilities            = {}
  @client_id            = opts[:client_id] || generate_id
  @connection_pool      = ConnectionPool.new(job_servers)
  @network_timeout_sec  = opts[:network_timeout_sec] || 10
  @reconnect_sec        = opts[:reconnect_sec] || 30
  @status               = :preparing
  @worker_enabled       = true

  # Add callback for when connections occur -- register abilities and send client id
  @connection_pool.on_connection do |connection|
    connection.send_update(Packet.pack_request(:set_client_id, @client_id))
    @abilities.each do |func_name, ability|
      announce_ability(func_name, ability.timeout, connection)
    end
  end
end

Instance Attribute Details

#client_idObject

Returns the value of attribute client_id.



38
39
40
# File 'lib/gearman/worker.rb', line 38

def client_id
  @client_id
end

#network_timeout_secObject

Returns the value of attribute network_timeout_sec.



38
39
40
# File 'lib/gearman/worker.rb', line 38

def network_timeout_sec
  @network_timeout_sec
end

#reconnect_secObject

Returns the value of attribute reconnect_sec.



38
39
40
# File 'lib/gearman/worker.rb', line 38

def reconnect_sec
  @reconnect_sec
end

#statusObject

Returns the value of attribute status.



38
39
40
# File 'lib/gearman/worker.rb', line 38

def status
  @status
end

#worker_enabledObject

Returns the value of attribute worker_enabled.



38
39
40
# File 'lib/gearman/worker.rb', line 38

def worker_enabled
  @worker_enabled
end

Instance Method Details

#add_ability(func_name, timeout = nil, &block) ⇒ Object

Add a new ability, announcing it to job servers.

The passed-in block of code will be executed for jobs of this function type. It’ll receive two arguments, the data supplied by the client and a Job object. If it returns nil or false, the server will be informed that the job has failed; otherwise the return value of the block will be passed back to the client in String form.

Parameters:

  • func_name

    function name (without prefix)

  • timeout (defaults to: nil)

    the server will give up on us if we don’t finish a task in this many seconds

  • block

    Block to associate with the function



69
70
71
72
73
74
# File 'lib/gearman/worker.rb', line 69

def add_ability(func_name, timeout=nil, &block)
  @abilities[func_name] = Ability.new(func_name, block, timeout)
  @connection_pool.with_all_connections do |connection|
    announce_ability(func_name, timeout, connection)
  end
end

#after_ability(func, &block) ⇒ Object

Callback for after an ability runs



78
79
80
# File 'lib/gearman/worker.rb', line 78

def after_ability(func, &block)
  abilities[func].after_complete(block)
end

#announce_ability(func_name, timeout, connection) ⇒ Object

Generate CAN_DO (or CAN_DO_TIMEOUT) packet and submit it



49
50
51
52
53
54
# File 'lib/gearman/worker.rb', line 49

def announce_ability(func_name, timeout, connection)
  cmd = timeout ? :can_do_timeout : :can_do
  arg = timeout ? "#{func_name}\0#{timeout.to_s}" : func_name
  connection.send_update(Packet.pack_request(cmd, arg))
  logger.debug "Announced ability #{func_name}"
end

#generate_idObject

Returns A random string of 30 characters from a-z.

Returns:

  • A random string of 30 characters from a-z



42
43
44
45
# File 'lib/gearman/worker.rb', line 42

def generate_id
  chars = ('a'..'z').to_a
  Array.new(30) { chars[rand(chars.size)] }.join
end

#handle_job_assign(data, connection) ⇒ Object

Handle a job_assign packet.

Parameters:

  • data

    data in the packet

  • connection

    Connection where the data originated



99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
# File 'lib/gearman/worker.rb', line 99

def handle_job_assign(data, connection)
  handle, func, data = data.split("\0", 3)

  if not func
    logger.error "Ignoring JOB_ASSIGN with no function from #{connection}"
    return false
  end

  if not handle
    logger.error "Ignoring JOB_ASSIGN with no job handle from #{connection}"
    return false
  end

  logger.info "Got JOB_ASSIGN with handle #{handle} and #{data.size} byte(s) from #{connection}"

  ability = @abilities[func]

  if ability == nil
    logger.error "Ignoring JOB_ASSIGN for unsupported function #{func} with handle #{handle} from #{connection}"
    connection.send_update(Packet.pack_request(:work_fail, handle))
    return false
  end

  exception = nil
  begin
    ret = ability.run(data, Job.new(connection, handle))
  rescue Exception => e
    exception = e
    logger.debug "Exception: #{e}\n#{e.backtrace.join("\n")}\n"
  end

  packets = if ret && exception.nil?
          logger.debug "Sending WORK_COMPLETE for #{handle} with #{ret.to_s.size} byte(s) to #{connection}"
          run_work_complete_callback
          [Packet.pack_request(:work_complete, "#{handle}\0#{ret.to_s}")]
        elsif exception.nil?
          logger.debug "Sending WORK_FAIL for #{handle} to #{connection}"
          run_work_fail_callback
          [Packet.pack_request(:work_fail, handle)]
        elsif exception
          logger.debug "Sending WORK_EXCEPTION for #{handle} to #{connection}"
          run_work_exception_callback
          [Packet.pack_request(:work_exception, "#{handle}\0#{exception.message}")]
        end

  packets.each do |packet|
    connection.send_update(packet)
  end

  true
end

#handle_work_message(type, data, connection) ⇒ Object

Handle a message for the worker

Parameters:

  • type

    Packet type (NO_JOB, JOB_ASSIGN, NO_OP)

  • data

    Opaque data being passed with the message

  • connection

    The Connection object where the message originates

Returns:



159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
# File 'lib/gearman/worker.rb', line 159

def handle_work_message(type, data, connection)
  case type
    when :no_job
      logger.info "Got NO_JOB from #{connection}"
      run_no_job_callback
    when :job_assign
      @status = :working
      run_job_assign_callback
      return worker_enabled if handle_job_assign(data, connection)
    when :noop
      # We'll have to read again
      logger.debug "Received NOOP while polling. Ignoring NOOP"
    else
      logger.error "Got unexpected #{type.to_s} from #{connection}"
  end
end

#remove_ability(func) ⇒ Object

Let job servers know that we’re no longer able to do something via CANT_DO

Parameters:

  • func

    function name



86
87
88
89
90
91
92
# File 'lib/gearman/worker.rb', line 86

def remove_ability(func)
  @abilities.delete(func)
  req = Packet.pack_request(:cant_do, func)
  @connection_pool.with_all_connections do  |connection|
    connection.send_update(req)
  end
end

#sleep(time_fell_asleep) ⇒ Object

Sleep and poll until timeout occurs or a NO_OP packet is received

Parameters:

  • time_fell_asleep

    The time that we fell asleep (Time object)



215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
# File 'lib/gearman/worker.rb', line 215

def sleep(time_fell_asleep)
  max_timeout = 30 - (Time.now - time_fell_asleep).to_i

  if max_timeout > 0
    # Use IO::select to wait for available connection data
    @connection_pool.poll_connections(max_timeout)
  end

  # If 30 seconds have passed, then wakeup
  time_asleep = (Time.now - time_fell_asleep).to_f
  @status = :wakeup if time_asleep >= 30
  
  # We didn't sleep for >= 30s, so we need to check for a NOOP
  if (@status == :waiting)
    @connection_pool.with_all_connections do |connection|
      begin
        type, data = connection.read_response(@network_timeout_sec)

        # Wake up if we receive a NOOP packet
        if (type == :noop)
          logger.debug "Received NOOP while sleeping... waking up!"
          @status = :wakeup
        else
          logger.warn "Received something other than a NOOP packet while sleeping: #{type.to_s}"
        end
      rescue SocketTimeoutError
        # This is okay here.
      end
    end
  end
end

#workObject

Do a single job and return.



178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
# File 'lib/gearman/worker.rb', line 178

def work
  grab_job_req = Packet.pack_request(:grab_job)
  type, data = nil

  loop do
    @status = :preparing
  
    @connection_pool.with_all_connections do |connection|
      begin
        logger.debug "Sending GRAB_JOB to #{connection}"
        run_grab_job_callback
        type, data = connection.send_request(grab_job_req, @network_timeout_sec)
        handle_work_message(type, data, connection)
      end while type == :job_assign
    end
  

    logger.info "Sending PRE_SLEEP and going to sleep for #{@reconnect_sec} second(s)"
    @connection_pool.with_all_connections do |connection|
        connection.send_update(Packet.pack_request(:pre_sleep))
    end

    return false unless worker_enabled
    @status = :waiting

    time_asleep = Time.now

    while (@status == :waiting)
      sleep(time_asleep)
    end

  end
end