Class: Gearman::Worker
- Inherits:
-
Object
- Object
- Gearman::Worker
- Defined in:
- lib/gearman/worker.rb,
lib/gearman/worker/job.rb,
lib/gearman/worker/ability.rb,
lib/gearman/worker/callbacks.rb
Defined Under Namespace
Modules: Callbacks Classes: Ability, Job
Instance Attribute Summary collapse
-
#client_id ⇒ Object
Returns the value of attribute client_id.
-
#network_timeout_sec ⇒ Object
Returns the value of attribute network_timeout_sec.
-
#reconnect_sec ⇒ Object
Returns the value of attribute reconnect_sec.
-
#status ⇒ Object
Returns the value of attribute status.
-
#worker_enabled ⇒ Object
Returns the value of attribute worker_enabled.
Instance Method Summary collapse
-
#add_ability(func_name, timeout = nil, &block) ⇒ Object
Add a new ability, announcing it to job servers.
-
#after_ability(func, &block) ⇒ Object
Callback for after an ability runs.
-
#announce_ability(func_name, timeout, connection) ⇒ Object
Generate CAN_DO (or CAN_DO_TIMEOUT) packet and submit it.
-
#generate_id ⇒ Object
A random string of 30 characters from a-z.
-
#handle_job_assign(data, connection) ⇒ Object
Handle a job_assign packet.
-
#handle_work_message(type, data, connection) ⇒ Object
Handle a message for the worker.
-
#initialize(job_servers = nil, opts = {}) ⇒ Worker
constructor
Create a new worker.
-
#remove_ability(func) ⇒ Object
Let job servers know that we’re no longer able to do something via CANT_DO.
-
#sleep(time_fell_asleep) ⇒ Object
Sleep and poll until timeout occurs or a NO_OP packet is received.
-
#work ⇒ Object
Do a single job and return.
Methods included from Logging
Constructor Details
#initialize(job_servers = nil, opts = {}) ⇒ Worker
Create a new worker.
20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 |
# File 'lib/gearman/worker.rb', line 20 def initialize(job_servers=nil, opts={}) @abilities = {} @client_id = opts[:client_id] || generate_id @connection_pool = ConnectionPool.new(job_servers) @network_timeout_sec = opts[:network_timeout_sec] || 10 @reconnect_sec = opts[:reconnect_sec] || 30 @status = :preparing @worker_enabled = true # Add callback for when connections occur -- register abilities and send client id @connection_pool.on_connection do |connection| connection.send_update(Packet.pack_request(:set_client_id, @client_id)) @abilities.each do |func_name, ability| announce_ability(func_name, ability.timeout, connection) end end end |
Instance Attribute Details
#client_id ⇒ Object
Returns the value of attribute client_id.
38 39 40 |
# File 'lib/gearman/worker.rb', line 38 def client_id @client_id end |
#network_timeout_sec ⇒ Object
Returns the value of attribute network_timeout_sec.
38 39 40 |
# File 'lib/gearman/worker.rb', line 38 def network_timeout_sec @network_timeout_sec end |
#reconnect_sec ⇒ Object
Returns the value of attribute reconnect_sec.
38 39 40 |
# File 'lib/gearman/worker.rb', line 38 def reconnect_sec @reconnect_sec end |
#status ⇒ Object
Returns the value of attribute status.
38 39 40 |
# File 'lib/gearman/worker.rb', line 38 def status @status end |
#worker_enabled ⇒ Object
Returns the value of attribute worker_enabled.
38 39 40 |
# File 'lib/gearman/worker.rb', line 38 def worker_enabled @worker_enabled end |
Instance Method Details
#add_ability(func_name, timeout = nil, &block) ⇒ Object
Add a new ability, announcing it to job servers.
The passed-in block of code will be executed for jobs of this function type. It’ll receive two arguments, the data supplied by the client and a Job object. If it returns nil or false, the server will be informed that the job has failed; otherwise the return value of the block will be passed back to the client in String form.
69 70 71 72 73 74 |
# File 'lib/gearman/worker.rb', line 69 def add_ability(func_name, timeout=nil, &block) @abilities[func_name] = Ability.new(func_name, block, timeout) @connection_pool.with_all_connections do |connection| announce_ability(func_name, timeout, connection) end end |
#after_ability(func, &block) ⇒ Object
Callback for after an ability runs
78 79 80 |
# File 'lib/gearman/worker.rb', line 78 def after_ability(func, &block) abilities[func].after_complete(block) end |
#announce_ability(func_name, timeout, connection) ⇒ Object
Generate CAN_DO (or CAN_DO_TIMEOUT) packet and submit it
49 50 51 52 53 54 |
# File 'lib/gearman/worker.rb', line 49 def announce_ability(func_name, timeout, connection) cmd = timeout ? :can_do_timeout : :can_do arg = timeout ? "#{func_name}\0#{timeout.to_s}" : func_name connection.send_update(Packet.pack_request(cmd, arg)) logger.debug "Announced ability #{func_name}" end |
#generate_id ⇒ Object
Returns A random string of 30 characters from a-z.
42 43 44 45 |
# File 'lib/gearman/worker.rb', line 42 def generate_id chars = ('a'..'z').to_a Array.new(30) { chars[rand(chars.size)] }.join end |
#handle_job_assign(data, connection) ⇒ Object
Handle a job_assign packet.
99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 |
# File 'lib/gearman/worker.rb', line 99 def handle_job_assign(data, connection) handle, func, data = data.split("\0", 3) if not func logger.error "Ignoring JOB_ASSIGN with no function from #{connection}" return false end if not handle logger.error "Ignoring JOB_ASSIGN with no job handle from #{connection}" return false end logger.info "Got JOB_ASSIGN with handle #{handle} and #{data.size} byte(s) from #{connection}" ability = @abilities[func] if ability == nil logger.error "Ignoring JOB_ASSIGN for unsupported function #{func} with handle #{handle} from #{connection}" connection.send_update(Packet.pack_request(:work_fail, handle)) return false end exception = nil begin ret = ability.run(data, Job.new(connection, handle)) rescue Exception => e exception = e logger.debug "Exception: #{e}\n#{e.backtrace.join("\n")}\n" end packets = if ret && exception.nil? logger.debug "Sending WORK_COMPLETE for #{handle} with #{ret.to_s.size} byte(s) to #{connection}" run_work_complete_callback [Packet.pack_request(:work_complete, "#{handle}\0#{ret.to_s}")] elsif exception.nil? logger.debug "Sending WORK_FAIL for #{handle} to #{connection}" run_work_fail_callback [Packet.pack_request(:work_fail, handle)] elsif exception logger.debug "Sending WORK_EXCEPTION for #{handle} to #{connection}" run_work_exception_callback [Packet.pack_request(:work_exception, "#{handle}\0#{exception.}")] end packets.each do |packet| connection.send_update(packet) end true end |
#handle_work_message(type, data, connection) ⇒ Object
Handle a message for the worker
159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 |
# File 'lib/gearman/worker.rb', line 159 def (type, data, connection) case type when :no_job logger.info "Got NO_JOB from #{connection}" run_no_job_callback when :job_assign @status = :working run_job_assign_callback return worker_enabled if handle_job_assign(data, connection) when :noop # We'll have to read again logger.debug "Received NOOP while polling. Ignoring NOOP" else logger.error "Got unexpected #{type.to_s} from #{connection}" end end |
#remove_ability(func) ⇒ Object
Let job servers know that we’re no longer able to do something via CANT_DO
86 87 88 89 90 91 92 |
# File 'lib/gearman/worker.rb', line 86 def remove_ability(func) @abilities.delete(func) req = Packet.pack_request(:cant_do, func) @connection_pool.with_all_connections do |connection| connection.send_update(req) end end |
#sleep(time_fell_asleep) ⇒ Object
Sleep and poll until timeout occurs or a NO_OP packet is received
215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 |
# File 'lib/gearman/worker.rb', line 215 def sleep(time_fell_asleep) max_timeout = 30 - (Time.now - time_fell_asleep).to_i if max_timeout > 0 # Use IO::select to wait for available connection data @connection_pool.poll_connections(max_timeout) end # If 30 seconds have passed, then wakeup time_asleep = (Time.now - time_fell_asleep).to_f @status = :wakeup if time_asleep >= 30 # We didn't sleep for >= 30s, so we need to check for a NOOP if (@status == :waiting) @connection_pool.with_all_connections do |connection| begin type, data = connection.read_response(@network_timeout_sec) # Wake up if we receive a NOOP packet if (type == :noop) logger.debug "Received NOOP while sleeping... waking up!" @status = :wakeup else logger.warn "Received something other than a NOOP packet while sleeping: #{type.to_s}" end rescue SocketTimeoutError # This is okay here. end end end end |
#work ⇒ Object
Do a single job and return.
178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 |
# File 'lib/gearman/worker.rb', line 178 def work grab_job_req = Packet.pack_request(:grab_job) type, data = nil loop do @status = :preparing @connection_pool.with_all_connections do |connection| begin logger.debug "Sending GRAB_JOB to #{connection}" run_grab_job_callback type, data = connection.send_request(grab_job_req, @network_timeout_sec) (type, data, connection) end while type == :job_assign end logger.info "Sending PRE_SLEEP and going to sleep for #{@reconnect_sec} second(s)" @connection_pool.with_all_connections do |connection| connection.send_update(Packet.pack_request(:pre_sleep)) end return false unless worker_enabled @status = :waiting time_asleep = Time.now while (@status == :waiting) sleep(time_asleep) end end end |