Class: Creeper::Worker
- Inherits:
-
Object
- Object
- Creeper::Worker
- Includes:
- Celluloid
- Defined in:
- lib/creeper/worker.rb
Instance Attribute Summary collapse
-
#jobs ⇒ Object
readonly
Returns the value of attribute jobs.
-
#number ⇒ Object
Returns the value of attribute number.
Class Method Summary collapse
Instance Method Summary collapse
-
#beanstalk ⇒ Object
beanstalk ##.
- #dump(job, name = nil, data = nil) ⇒ Object
- #finalize ⇒ Object
- #ignore(tube) ⇒ Object
-
#initialize(jobs = nil) ⇒ Worker
constructor
A new instance of Worker.
- #list_tubes_watched(cached = false) ⇒ Object
- #number_format ⇒ Object
- #prefix ⇒ Object
-
#prepared? ⇒ Boolean
flags ##.
- #process(handler, data, name, job) ⇒ Object
- #reserve(timeout = nil) ⇒ Object
-
#start(short_circuit = false) ⇒ Object
work ##.
- #started_at ⇒ Object
- #started_at=(started_at) ⇒ Object
- #stop ⇒ Object
- #stopped? ⇒ Boolean
- #stopped_at ⇒ Object
- #stopped_at=(stopped_at) ⇒ Object
- #time_in_milliseconds ⇒ Object
- #watch(tube) ⇒ Object
- #work(job) ⇒ Object
- #working? ⇒ Boolean
Constructor Details
#initialize(jobs = nil) ⇒ Worker
Returns a new instance of Worker.
96 97 98 99 100 101 |
# File 'lib/creeper/worker.rb', line 96 def initialize(jobs = nil) @jobs = self.class.jobs_for(jobs) Creeper.register_worker(self) OutLogger.info "#{prefix} Working #{self.jobs.size} jobs: [ #{self.jobs.join(' ')} ]" end |
Instance Attribute Details
#jobs ⇒ Object (readonly)
Returns the value of attribute jobs.
94 95 96 |
# File 'lib/creeper/worker.rb', line 94 def jobs @jobs end |
#number ⇒ Object
Returns the value of attribute number.
93 94 95 |
# File 'lib/creeper/worker.rb', line 93 def number @number end |
Class Method Details
.jobs_for(jobs = nil) ⇒ Object
end
82 83 84 85 86 87 88 89 |
# File 'lib/creeper/worker.rb', line 82 def self.jobs_for(jobs = nil) case jobs when :all, nil Creeper.all_jobs else Array(jobs) end end |
.work(jobs = nil, size = nil) ⇒ Object
13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 |
# File 'lib/creeper/worker.rb', line 13 def self.work(jobs = nil, size = nil) size ||= Creeper.pool_size = { size: size, args: [jobs] } Creeper.worker_pool = Creeper::Worker.pool() begin trap(:INT) { Creeper.shutdown = true } trap(:TERM) { Creeper.shutdown = true } trap(:QUIT) { Creeper.shutdown = true } Creeper.worker_pool.start end until Creeper.shutdown? exit end |
Instance Method Details
#beanstalk ⇒ Object
beanstalk ##
137 138 139 |
# File 'lib/creeper/worker.rb', line 137 def beanstalk Creeper.beanstalk end |
#dump(job, name = nil, data = nil) ⇒ Object
103 104 105 |
# File 'lib/creeper/worker.rb', line 103 def dump(job, name = nil, data = nil) "#{name.inspect rescue nil} { data: #{(data.inspect rescue nil)}, job: #{(job.inspect rescue nil)} }" end |
#finalize ⇒ Object
198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 |
# File 'lib/creeper/worker.rb', line 198 def finalize Creeper.finalizers.each do |finalizer| begin case finalizer.arity when 1 finalizer.call(self) else finalizer.call end rescue => e OutLogger.crash "#{prefix} finalizer error", e end end ensure Creeper.disconnect end |
#ignore(tube) ⇒ Object
141 142 143 144 145 |
# File 'lib/creeper/worker.rb', line 141 def ignore(tube) beanstalk.ignore(tube) rescue Beanstalk::NotConnected => e disconnected(self, :ignore, tube) || raise end |
#list_tubes_watched(cached = false) ⇒ Object
147 148 149 150 151 |
# File 'lib/creeper/worker.rb', line 147 def list_tubes_watched(cached = false) beanstalk.list_tubes_watched rescue Beanstalk::NotConnected => e disconnected(self, :list_tubes_watched, cached) || raise end |
#number_format ⇒ Object
111 112 113 |
# File 'lib/creeper/worker.rb', line 111 def number_format "%#{Creeper.pool_size.to_s.length}d" end |
#prefix ⇒ Object
107 108 109 |
# File 'lib/creeper/worker.rb', line 107 def prefix "[#{number_format % number} - #{'%x' % Thread.current.object_id}]" end |
#prepared? ⇒ Boolean
flags ##
296 297 298 |
# File 'lib/creeper/worker.rb', line 296 def prepared? Thread.current[:creeper_prepared] == true end |
#process(handler, data, name, job) ⇒ Object
279 280 281 282 283 284 285 286 287 288 289 290 |
# File 'lib/creeper/worker.rb', line 279 def process(handler, data, name, job) case handler.arity when 3 handler.call(data, name, job) when 2 handler.call(data, name) when 1 handler.call(data) else handler.call end end |
#reserve(timeout = nil) ⇒ Object
153 154 155 156 157 |
# File 'lib/creeper/worker.rb', line 153 def reserve(timeout = nil) beanstalk.reserve(timeout) rescue Beanstalk::NotConnected => e disconnected(self, :reserve, timeout) || raise end |
#start(short_circuit = false) ⇒ Object
work ##
169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 |
# File 'lib/creeper/worker.rb', line 169 def start(short_circuit = false) return false if short_circuit exit if stopped? return true if working? prepare if not prepared? begin job = reserve Creeper.reserve_timeout rescue Beanstalk::TimedOut OutLogger.debug "#{prefix} Back to the unemployment line" if $DEBUG return false end exit if stopped? Thread.current[:creeper_working] = true OutLogger.debug "#{prefix} Got #{job.inspect}" if $DEBUG work! job # asynchronously go to work rescue SystemExit => e job.release rescue nil Creeper.unregister_worker(self) rescue => e job.release rescue nil Creeper.unregister_worker(self, "start loop error") raise end |
#started_at ⇒ Object
119 120 121 |
# File 'lib/creeper/worker.rb', line 119 def started_at Thread.current[:creeper_started_at] end |
#started_at=(started_at) ⇒ Object
123 124 125 |
# File 'lib/creeper/worker.rb', line 123 def started_at=(started_at) Thread.current[:creeper_started_at] = started_at end |
#stop ⇒ Object
216 217 218 |
# File 'lib/creeper/worker.rb', line 216 def stop Thread.current[:creeper_stopped] = true end |
#stopped? ⇒ Boolean
300 301 302 |
# File 'lib/creeper/worker.rb', line 300 def stopped? Thread.current[:creeper_stopped] == true || Creeper.shutdown? end |
#stopped_at ⇒ Object
127 128 129 |
# File 'lib/creeper/worker.rb', line 127 def stopped_at Thread.current[:creeper_stopped_at] end |
#stopped_at=(stopped_at) ⇒ Object
131 132 133 |
# File 'lib/creeper/worker.rb', line 131 def stopped_at=(stopped_at) Thread.current[:creeper_stopped_at] = stopped_at end |
#time_in_milliseconds ⇒ Object
115 116 117 |
# File 'lib/creeper/worker.rb', line 115 def time_in_milliseconds ((stopped_at - started_at).to_f * 1000).to_i end |
#watch(tube) ⇒ Object
159 160 161 162 163 |
# File 'lib/creeper/worker.rb', line 159 def watch(tube) beanstalk.watch(tube) rescue Beanstalk::NotConnected => e disconnected(self, :watch, tube) || raise end |
#work(job) ⇒ Object
220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 |
# File 'lib/creeper/worker.rb', line 220 def work(job) exit if stopped? name, data = JSON.parse(job.body) Creeper.start_work(self, data, name, job) begin Creeper.before_handlers_for(name).each do |handler| process(handler, data, name, job) end Creeper.handler_for(name).tap do |handler| process(handler, data, name, job) end Creeper.after_handlers_for(name).each do |handler| process(handler, data, name, job) end end job.delete Creeper.stop_work(self, data, name, job) # start! unless stopped? or EM.reactor_running? start! unless stopped? # continue processing, even when end of links is reached rescue Beanstalk::NotConnected => e disconnected(self, :work, job) || begin job.release rescue nil Creeper.unregister_worker(self) raise end rescue SystemExit => e job.release rescue nil Creeper.unregister_worker(self) rescue => e job.bury rescue nil Creeper.error_work(self, data, name, job) begin Creeper.error_handlers_for(name).each do |handler| process(handler, data, name, job) end end Creeper.unregister_worker(self, "work loop error, burying #{dump(job, name, data)}") raise ensure Thread.current[:creeper_started_at] = nil Thread.current[:creeper_stopped_at] = nil Thread.current[:creeper_working] = false end |
#working? ⇒ Boolean
304 305 306 |
# File 'lib/creeper/worker.rb', line 304 def working? Thread.current[:creeper_working] == true end |