Class: BucketProcessor

Inherits:
Object
  • Object
show all
Defined in:
lib/s3-object-processor.rb

Instance Method Summary collapse

Constructor Details

#initialize(key_id, key_secret, bucket, options = {}, &callback) ⇒ BucketProcessor

Returns a new instance of BucketProcessor.



204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
# File 'lib/s3-object-processor.rb', line 204

def initialize(key_id, key_secret, bucket, options = {}, &callback)
  protocol = options[:no_https] ? 'http' : 'https'
  port = options[:no_https] ? 80 : 443
  @log = options[:log] || Logger.new(STDERR)
  workers = options[:workers] || 10
  lister_fetch_size = options[:lister_fetch_size] || 200
  lister_backlog = options[:lister_backlog] || 1000
  max_keys = options[:max_keys]
  reporter_backlog = options[:reporter_backlog] || 1000
  reporter_summary_interval = options[:reporter_summary_interval] || 100
  reporter_average_contribution = options[:reporter_average_contribution] || 0.10
  custom_reports = options[:reports] || []
  @key_list = options[:key_list]

  s3 = RightAws::S3.new(key_id, key_secret, multi_thread: true, logger: @log, protocol: protocol, port: port)
  bucket = s3.bucket(bucket)

  @key_queue = SizedQueue.new(lister_backlog)

  @reporter = Reporter.new(reporter_backlog) do |reports|
    total_listed_keys = 0
    total_processed_keys = 0
    total_succeeded_keys = 0
    total_failed_keys = 0
    total_handled_keys = 0
    total_skipped_keys = 0
    total_nooped_keys = 0

    processed_avg = 0.0
    last_time = nil
    last_total = 0

    reports.each do |key, value|
      case key
      when :new_keys_count
        total_listed_keys += value
      when :processed_key
        total_processed_keys += 1
        if total_processed_keys % reporter_summary_interval == 0
          if last_time
            contribution = reporter_average_contribution
            new = (total_processed_keys - last_total).to_f / (Time.now.to_f - last_time)
            processed_avg = processed_avg * (1.0 - contribution) + new * contribution
          end
          last_time = Time.now.to_f
          last_total = total_processed_keys

          log_line = "-[%s]- processed %6d: failed: %6d (%6.2f %%) handled: %6d skipped: %6d (%6.2f %%)" % [
            value.to_s[0...2].ljust(2),
            total_processed_keys,
            total_failed_keys,
            total_failed_keys.to_f / total_processed_keys * 100,
            total_handled_keys,
            total_skipped_keys,
            total_skipped_keys.to_f / total_processed_keys * 100
          ]
          log_line << custom_reports.each_value.map{|v| ' ' + v.to_s}.join
          log_line << " [backlog: %4d] @ %.1f op/s" % [
            @key_queue.size,
            processed_avg
          ]

          @log.info log_line
        end
      when :succeeded_key
        total_succeeded_keys += 1
      when :failed_key
        key, error = *value
        @log.error "Key processing failed: #{key}: #{error.class.name}, #{error.message}"
        total_failed_keys += 1
      when :handled_key
        total_handled_keys += 1
      when :skipped_key
        total_skipped_keys += 1
      when :noop_key
        total_nooped_keys += 1
      else
        #@log.debug "custom report event: #{key}: #{value}"
        custom_reports[key].update(value)
      end
      #@log.debug("Report: #{key}: #{value}")
    end

    reports.on_finish do
      @log.info("total listed keys:                      #{total_listed_keys}")
      @log.info("total processed keys:                   #{total_processed_keys}")
      @log.info("total succeeded keys:                   #{total_succeeded_keys}")
      @log.info("total failed keys:                      #{total_failed_keys}")
      @log.info("total handled keys:                     #{total_handled_keys}")
      @log.info("total skipped keys:                     #{total_skipped_keys}")
      @log.info("total nooped keys:                      #{total_nooped_keys}")
      custom_reports.each_value do |report|
        @log.info report.final
      end
    end
  end

  # create lister
  @lister = if @key_list
    @log.info "processing #{@key_list.length} keys from list file"
    @lister = ListLister.new(bucket, @key_queue, max_keys)
  else
    @lister = Lister.new(bucket, @key_queue, lister_fetch_size, max_keys)
  end
  .on_keys_chunk do |keys_chunk|
    @log.debug "Got #{keys_chunk.length} new keys"
    @reporter.report(:new_keys_count, keys_chunk.length)
  end
  .on_finish do
    @log.debug "Done listing keys"
    # notify all workers that no more messages will be posted
    workers.times{ @key_queue << :end }
  end

  # create workers
  @log.info "Launching #{workers} workers"
  @workers = (1..workers).to_a.map do |worker_no|
    Worker.new(worker_no, @key_queue) do |key|
      @log.debug "Worker[#{worker_no}]: Processing key #{key}"
      yield bucket, key, @reporter
      @reporter.report :processed_key, key
      @reporter.report :succeeded_key, key
    end
    .on_error do |key, error|
      @reporter.report :processed_key, key
      @reporter.report :failed_key, [key, error]
    end
    .on_finish do
      @log.debug "Worker #{worker_no} done"
    end
  end
end

Instance Method Details

#run(prefix = nil) ⇒ Object



337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
# File 'lib/s3-object-processor.rb', line 337

def run(prefix = nil)
  begin
    @reporter.run
    if @key_list
      @lister.run(@key_list)
    else
      @lister.run(prefix)
    end
    @workers.each(&:run)

    # wait for all to finish
    @workers.each(&:join)
    @log.info "All workers done"

    @lister.join
    @reporter.join
  rescue Interrupt => error
    @log.warn 'Interrupted'
    # flush thread waiting on queues
    @key_queue.max = 999999
    @reporter.join
  end
end