Class: Sidekiq::Grouping::Batch
- Inherits:
-
Object
- Object
- Sidekiq::Grouping::Batch
- Defined in:
- lib/sidekiq/grouping/batch.rb
Instance Attribute Summary collapse
-
#name ⇒ Object
readonly
Returns the value of attribute name.
-
#queue ⇒ Object
readonly
Returns the value of attribute queue.
-
#worker_class ⇒ Object
readonly
Returns the value of attribute worker_class.
Class Method Summary collapse
Instance Method Summary collapse
- #add(msg) ⇒ Object
- #chunk_size ⇒ Object
- #could_flush? ⇒ Boolean
- #delete ⇒ Object
- #flush ⇒ Object
-
#initialize(worker_class, queue, redis_pool = nil) ⇒ Batch
constructor
A new instance of Batch.
- #last_execution_time ⇒ Object
- #next_execution_time ⇒ Object
- #pluck ⇒ Object
- #pluck_size ⇒ Object
- #should_add?(msg) ⇒ Boolean
- #size ⇒ Object
- #worker_class_constant ⇒ Object
- #worker_class_options ⇒ Object
Constructor Details
#initialize(worker_class, queue, redis_pool = nil) ⇒ Batch
Returns a new instance of Batch.
4 5 6 7 8 9 |
# File 'lib/sidekiq/grouping/batch.rb', line 4 def initialize(worker_class, queue, redis_pool = nil) @worker_class = worker_class @queue = queue @name = "#{worker_class.underscore}:#{queue}" @redis = Sidekiq::Grouping::Redis.new end |
Instance Attribute Details
#name ⇒ Object (readonly)
Returns the value of attribute name.
11 12 13 |
# File 'lib/sidekiq/grouping/batch.rb', line 11 def name @name end |
#queue ⇒ Object (readonly)
Returns the value of attribute queue.
11 12 13 |
# File 'lib/sidekiq/grouping/batch.rb', line 11 def queue @queue end |
#worker_class ⇒ Object (readonly)
Returns the value of attribute worker_class.
11 12 13 |
# File 'lib/sidekiq/grouping/batch.rb', line 11 def worker_class @worker_class end |
Class Method Details
.all ⇒ Object
116 117 118 119 120 121 122 |
# File 'lib/sidekiq/grouping/batch.rb', line 116 def all redis = Sidekiq::Grouping::Redis.new redis.batches.map do |name| new(*extract_worker_klass_and_queue(name)) end end |
.extract_worker_klass_and_queue(name) ⇒ Object
124 125 126 127 |
# File 'lib/sidekiq/grouping/batch.rb', line 124 def extract_worker_klass_and_queue(name) klass, queue = name.split(':') [klass.camelize, queue] end |
Instance Method Details
#add(msg) ⇒ Object
13 14 15 16 |
# File 'lib/sidekiq/grouping/batch.rb', line 13 def add(msg) msg = msg.to_json @redis.push_msg(@name, msg, enqueue_similar_once?) if should_add? msg end |
#chunk_size ⇒ Object
27 28 29 30 |
# File 'lib/sidekiq/grouping/batch.rb', line 27 def chunk_size ['batch_size'] || Sidekiq::Grouping::Config.max_batch_size end |
#could_flush? ⇒ Boolean
67 68 69 |
# File 'lib/sidekiq/grouping/batch.rb', line 67 def could_flush? could_flush_on_overflow? || could_flush_on_time? end |
#delete ⇒ Object
83 84 85 |
# File 'lib/sidekiq/grouping/batch.rb', line 83 def delete @redis.delete(@name) end |
#flush ⇒ Object
43 44 45 46 47 48 49 50 51 52 53 54 55 |
# File 'lib/sidekiq/grouping/batch.rb', line 43 def flush chunk = pluck return unless chunk chunk.each_slice(chunk_size) do |subchunk| Sidekiq::Client.push( 'class' => @worker_class, 'queue' => @queue, 'args' => [true, subchunk] ) end set_current_time_as_last end |
#last_execution_time ⇒ Object
71 72 73 74 |
# File 'lib/sidekiq/grouping/batch.rb', line 71 def last_execution_time last_time = @redis.get_last_execution_time(@name) Time.parse(last_time) if last_time end |
#next_execution_time ⇒ Object
76 77 78 79 80 81 |
# File 'lib/sidekiq/grouping/batch.rb', line 76 def next_execution_time if interval = ['batch_flush_interval'] last_time = last_execution_time last_time + interval.seconds if last_time end end |
#pluck ⇒ Object
37 38 39 40 41 |
# File 'lib/sidekiq/grouping/batch.rb', line 37 def pluck if @redis.lock(@name) @redis.pluck(@name, pluck_size).map { |value| JSON.parse(value) } end end |
#pluck_size ⇒ Object
32 33 34 35 |
# File 'lib/sidekiq/grouping/batch.rb', line 32 def pluck_size ['batch_flush_size'] || chunk_size end |
#should_add?(msg) ⇒ Boolean
18 19 20 21 |
# File 'lib/sidekiq/grouping/batch.rb', line 18 def should_add? msg return true unless enqueue_similar_once? !@redis.enqueued?(@name, msg) end |
#size ⇒ Object
23 24 25 |
# File 'lib/sidekiq/grouping/batch.rb', line 23 def size @redis.batch_size(@name) end |
#worker_class_constant ⇒ Object
57 58 59 |
# File 'lib/sidekiq/grouping/batch.rb', line 57 def worker_class_constant @worker_class.constantize end |
#worker_class_options ⇒ Object
61 62 63 64 65 |
# File 'lib/sidekiq/grouping/batch.rb', line 61 def worker_class_constant. rescue NameError {} end |