Class: Lowkiq::ShardHandler
- Inherits:
-
Object
- Object
- Lowkiq::ShardHandler
- Defined in:
- lib/lowkiq/shard_handler.rb
Instance Attribute Summary collapse
-
#queue_name ⇒ Object
readonly
Returns the value of attribute queue_name.
-
#shard_index ⇒ Object
readonly
Returns the value of attribute shard_index.
-
#worker ⇒ Object
readonly
Returns the value of attribute worker.
Class Method Summary collapse
Instance Method Summary collapse
-
#initialize(shard_index, worker, wrapper) ⇒ ShardHandler
constructor
A new instance of ShardHandler.
- #process ⇒ Object
- #restore ⇒ Object
Constructor Details
#initialize(shard_index, worker, wrapper) ⇒ ShardHandler
Returns a new instance of ShardHandler.
11 12 13 14 15 16 17 18 19 20 |
# File 'lib/lowkiq/shard_handler.rb', line 11 def initialize(shard_index, worker, wrapper) @shard_index = shard_index @queue_name = worker.queue_name @worker = worker @wrapper = wrapper @timestamp = Utils::Timestamp.method(:now) @queue = Queue::Queue.new Lowkiq.server_redis_pool, worker.queue_name, worker.shards_count end |
Instance Attribute Details
#queue_name ⇒ Object (readonly)
Returns the value of attribute queue_name.
9 10 11 |
# File 'lib/lowkiq/shard_handler.rb', line 9 def queue_name @queue_name end |
#shard_index ⇒ Object (readonly)
Returns the value of attribute shard_index.
9 10 11 |
# File 'lib/lowkiq/shard_handler.rb', line 9 def shard_index @shard_index end |
#worker ⇒ Object (readonly)
Returns the value of attribute worker.
9 10 11 |
# File 'lib/lowkiq/shard_handler.rb', line 9 def worker @worker end |
Class Method Details
.build_many(worker, wrapper) ⇒ Object
3 4 5 6 7 |
# File 'lib/lowkiq/shard_handler.rb', line 3 def self.build_many(worker, wrapper) (0...worker.shards_count).map do |shard_index| new shard_index, worker, wrapper end end |
Instance Method Details
#process ⇒ Object
22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 |
# File 'lib/lowkiq/shard_handler.rb', line 22 def process data = @queue.pop @shard_index, limit: @worker.batch_size return false if data.empty? begin batch = batch_from_data data @wrapper.call @worker, batch do @worker.perform batch end @queue.ack @shard_index, data, :success true rescue => ex fail! data, ex back, morgue = separate data @queue.push_back back @queue.push_to_morgue morgue @queue.ack @shard_index, data, :fail false end end |
#restore ⇒ Object
47 48 49 50 51 52 |
# File 'lib/lowkiq/shard_handler.rb', line 47 def restore data = @queue.processing_data @shard_index return if data.nil? @queue.push_back data @queue.ack @shard_index, data end |