Class: Creeper::Worker

Inherits:
Object
  • Object
show all
Includes:
Celluloid
Defined in:
lib/creeper/worker.rb

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(jobs = nil) ⇒ Worker

Returns a new instance of Worker.



96
97
98
99
100
101
# File 'lib/creeper/worker.rb', line 96

def initialize(jobs = nil)
  @jobs = self.class.jobs_for(jobs)

  Creeper.register_worker(self)
  OutLogger.info "#{prefix} Working #{self.jobs.size} jobs: [ #{self.jobs.join(' ')} ]"
end

Instance Attribute Details

#jobsObject (readonly)

Returns the value of attribute jobs.



94
95
96
# File 'lib/creeper/worker.rb', line 94

def jobs
  @jobs
end

#numberObject

Returns the value of attribute number.



93
94
95
# File 'lib/creeper/worker.rb', line 93

def number
  @number
end

Class Method Details

.jobs_for(jobs = nil) ⇒ Object

end



82
83
84
85
86
87
88
89
# File 'lib/creeper/worker.rb', line 82

def self.jobs_for(jobs = nil)
  case jobs
  when :all, nil
    Creeper.all_jobs
  else
    Array(jobs)
  end
end

.work(jobs = nil, size = nil) ⇒ Object



13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
# File 'lib/creeper/worker.rb', line 13

def self.work(jobs = nil, size = nil)

  size ||= Creeper.pool_size

  options = {
    size: size,
    args: [jobs]
  }

  Creeper.worker_pool = Creeper::Worker.pool(options)

  begin
    trap(:INT)  { Creeper.shutdown = true }
    trap(:TERM) { Creeper.shutdown = true }
    trap(:QUIT) { Creeper.shutdown = true }
    Creeper.worker_pool.start
  end until Creeper.shutdown?

  exit
end

Instance Method Details

#beanstalkObject

beanstalk ##



137
138
139
# File 'lib/creeper/worker.rb', line 137

def beanstalk
  Creeper.beanstalk
end

#dump(job, name = nil, data = nil) ⇒ Object



103
104
105
# File 'lib/creeper/worker.rb', line 103

def dump(job, name = nil, data = nil)
  "#{name.inspect rescue nil} { data: #{(data.inspect rescue nil)}, job: #{(job.inspect rescue nil)} }"
end

#finalizeObject



198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
# File 'lib/creeper/worker.rb', line 198

def finalize
  Creeper.finalizers.each do |finalizer|
    begin
      case finalizer.arity
      when 1
        finalizer.call(self)
      else
        finalizer.call
      end
    rescue => e
      OutLogger.crash "#{prefix} finalizer error", e
    end
  end

ensure
  Creeper.disconnect
end

#ignore(tube) ⇒ Object



141
142
143
144
145
# File 'lib/creeper/worker.rb', line 141

def ignore(tube)
  beanstalk.ignore(tube)
rescue Beanstalk::NotConnected => e
  disconnected(self, :ignore, tube) || raise
end

#list_tubes_watched(cached = false) ⇒ Object



147
148
149
150
151
# File 'lib/creeper/worker.rb', line 147

def list_tubes_watched(cached = false)
  beanstalk.list_tubes_watched
rescue Beanstalk::NotConnected => e
  disconnected(self, :list_tubes_watched, cached) || raise
end

#number_formatObject



111
112
113
# File 'lib/creeper/worker.rb', line 111

def number_format
  "%#{Creeper.pool_size.to_s.length}d"
end

#prefixObject



107
108
109
# File 'lib/creeper/worker.rb', line 107

def prefix
  "[#{number_format % number} - #{'%x' % Thread.current.object_id}]"
end

#prepared?Boolean

flags ##

Returns:

  • (Boolean)


296
297
298
# File 'lib/creeper/worker.rb', line 296

def prepared?
  Thread.current[:creeper_prepared] == true
end

#process(handler, data, name, job) ⇒ Object



279
280
281
282
283
284
285
286
287
288
289
290
# File 'lib/creeper/worker.rb', line 279

def process(handler, data, name, job)
  case handler.arity
  when 3
    handler.call(data, name, job)
  when 2
    handler.call(data, name)
  when 1
    handler.call(data)
  else
    handler.call
  end
end

#reserve(timeout = nil) ⇒ Object



153
154
155
156
157
# File 'lib/creeper/worker.rb', line 153

def reserve(timeout = nil)
  beanstalk.reserve(timeout)
rescue Beanstalk::NotConnected => e
  disconnected(self, :reserve, timeout) || raise
end

#start(short_circuit = false) ⇒ Object

work ##



169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
# File 'lib/creeper/worker.rb', line 169

def start(short_circuit = false)
  return false if short_circuit
  exit         if stopped?
  return true  if working?
  prepare      if not prepared?

  begin
    job = reserve Creeper.reserve_timeout
  rescue Beanstalk::TimedOut
    OutLogger.debug "#{prefix} Back to the unemployment line" if $DEBUG
    return false
  end

  exit if stopped?

  Thread.current[:creeper_working] = true

  OutLogger.debug "#{prefix} Got #{job.inspect}" if $DEBUG

  work! job # asynchronously go to work
rescue SystemExit => e
  job.release rescue nil
  Creeper.unregister_worker(self)
rescue => e
  job.release rescue nil
  Creeper.unregister_worker(self, "start loop error")
  raise
end

#started_atObject



119
120
121
# File 'lib/creeper/worker.rb', line 119

def started_at
  Thread.current[:creeper_started_at]
end

#started_at=(started_at) ⇒ Object



123
124
125
# File 'lib/creeper/worker.rb', line 123

def started_at=(started_at)
  Thread.current[:creeper_started_at] = started_at
end

#stopObject



216
217
218
# File 'lib/creeper/worker.rb', line 216

def stop
  Thread.current[:creeper_stopped] = true
end

#stopped?Boolean

Returns:

  • (Boolean)


300
301
302
# File 'lib/creeper/worker.rb', line 300

def stopped?
  Thread.current[:creeper_stopped] == true || Creeper.shutdown?
end

#stopped_atObject



127
128
129
# File 'lib/creeper/worker.rb', line 127

def stopped_at
  Thread.current[:creeper_stopped_at]
end

#stopped_at=(stopped_at) ⇒ Object



131
132
133
# File 'lib/creeper/worker.rb', line 131

def stopped_at=(stopped_at)
  Thread.current[:creeper_stopped_at] = stopped_at
end

#time_in_millisecondsObject



115
116
117
# File 'lib/creeper/worker.rb', line 115

def time_in_milliseconds
  ((stopped_at - started_at).to_f * 1000).to_i
end

#watch(tube) ⇒ Object



159
160
161
162
163
# File 'lib/creeper/worker.rb', line 159

def watch(tube)
  beanstalk.watch(tube)
rescue Beanstalk::NotConnected => e
  disconnected(self, :watch, tube) || raise
end

#work(job) ⇒ Object



220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
# File 'lib/creeper/worker.rb', line 220

def work(job)

  exit if stopped?

  name, data = JSON.parse(job.body)

  Creeper.start_work(self, data, name, job)

  begin
    Creeper.before_handlers_for(name).each do |handler|
      process(handler, data, name, job)
    end

    Creeper.handler_for(name).tap do |handler|
      process(handler, data, name, job)
    end

    Creeper.after_handlers_for(name).each do |handler|
      process(handler, data, name, job)
    end
  end

  job.delete

  Creeper.stop_work(self, data, name, job)

  # start! unless stopped? or EM.reactor_running?
  start! unless stopped? # continue processing, even when end of links is reached

rescue Beanstalk::NotConnected => e
  disconnected(self, :work, job) || begin
    job.release rescue nil
    Creeper.unregister_worker(self)
    raise
  end
rescue SystemExit => e
  job.release rescue nil
  Creeper.unregister_worker(self)
rescue => e

  job.bury rescue nil

  Creeper.error_work(self, data, name, job)

  begin
    Creeper.error_handlers_for(name).each do |handler|
      process(handler, data, name, job)
    end
  end

  Creeper.unregister_worker(self, "work loop error, burying #{dump(job, name, data)}")

  raise
ensure
  Thread.current[:creeper_started_at] = nil
  Thread.current[:creeper_stopped_at] = nil
  Thread.current[:creeper_working] = false
end

#working?Boolean

Returns:

  • (Boolean)


304
305
306
# File 'lib/creeper/worker.rb', line 304

def working?
  Thread.current[:creeper_working] == true
end