Class: Sidekiq::Batching::Batch
- Inherits:
-
Object
- Object
- Sidekiq::Batching::Batch
- Defined in:
- lib/sidekiq/batching/batch.rb
Instance Attribute Summary collapse
-
#name ⇒ Object
readonly
Returns the value of attribute name.
-
#queue ⇒ Object
readonly
Returns the value of attribute queue.
-
#worker_class ⇒ Object
readonly
Returns the value of attribute worker_class.
Class Method Summary collapse
Instance Method Summary collapse
- #add(msg) ⇒ Object
- #chunk_size ⇒ Object
- #could_flush? ⇒ Boolean
- #delete ⇒ Object
- #flush ⇒ Object
-
#initialize(worker_class, queue, redis_pool = nil) ⇒ Batch
constructor
A new instance of Batch.
- #last_execution_time ⇒ Object
- #next_execution_time ⇒ Object
- #pluck ⇒ Object
- #should_add?(msg) ⇒ Boolean
- #size ⇒ Object
- #worker_class_constant ⇒ Object
- #worker_class_options ⇒ Object
Constructor Details
#initialize(worker_class, queue, redis_pool = nil) ⇒ Batch
Returns a new instance of Batch.
5 6 7 8 9 10 |
# File 'lib/sidekiq/batching/batch.rb', line 5 def initialize(worker_class, queue, redis_pool = nil) @worker_class = worker_class @queue = queue @name = "#{worker_class.underscore}:#{queue}" @redis = Sidekiq::Batching::Redis.new end |
Instance Attribute Details
#name ⇒ Object (readonly)
Returns the value of attribute name.
12 13 14 |
# File 'lib/sidekiq/batching/batch.rb', line 12 def name @name end |
#queue ⇒ Object (readonly)
Returns the value of attribute queue.
12 13 14 |
# File 'lib/sidekiq/batching/batch.rb', line 12 def queue @queue end |
#worker_class ⇒ Object (readonly)
Returns the value of attribute worker_class.
12 13 14 |
# File 'lib/sidekiq/batching/batch.rb', line 12 def worker_class @worker_class end |
Class Method Details
.all ⇒ Object
112 113 114 115 116 117 118 |
# File 'lib/sidekiq/batching/batch.rb', line 112 def all redis = Sidekiq::Batching::Redis.new redis.batches.map do |name| new(*extract_worker_klass_and_queue(name)) end end |
.extract_worker_klass_and_queue(name) ⇒ Object
120 121 122 123 |
# File 'lib/sidekiq/batching/batch.rb', line 120 def extract_worker_klass_and_queue(name) klass, queue = name.split(':') [klass.classify, queue] end |
Instance Method Details
#add(msg) ⇒ Object
14 15 16 17 |
# File 'lib/sidekiq/batching/batch.rb', line 14 def add(msg) msg = msg.to_json @redis.push_msg(@name, msg, enqueue_similar_once?) if should_add? msg end |
#chunk_size ⇒ Object
28 29 30 31 |
# File 'lib/sidekiq/batching/batch.rb', line 28 def chunk_size ['batch_size'] || Sidekiq::Batching::Config.max_batch_size end |
#could_flush? ⇒ Boolean
61 62 63 |
# File 'lib/sidekiq/batching/batch.rb', line 61 def could_flush? could_flush_on_overflow? || could_flush_on_time? end |
#delete ⇒ Object
77 78 79 |
# File 'lib/sidekiq/batching/batch.rb', line 77 def delete @redis.delete(@name) end |
#flush ⇒ Object
39 40 41 42 43 44 45 46 47 48 49 |
# File 'lib/sidekiq/batching/batch.rb', line 39 def flush chunk = pluck if chunk set_current_time_as_last Sidekiq::Client.push( 'class' => @worker_class, 'queue' => @queue, 'args' => [true, chunk] ) end end |
#last_execution_time ⇒ Object
65 66 67 68 |
# File 'lib/sidekiq/batching/batch.rb', line 65 def last_execution_time last_time = @redis.get_last_execution_time(@name) Time.parse(last_time) if last_time end |
#next_execution_time ⇒ Object
70 71 72 73 74 75 |
# File 'lib/sidekiq/batching/batch.rb', line 70 def next_execution_time if interval = ['batch_flush_interval'] last_time = last_execution_time last_time + interval.seconds if last_time end end |
#pluck ⇒ Object
33 34 35 36 37 |
# File 'lib/sidekiq/batching/batch.rb', line 33 def pluck if @redis.lock(@name) @redis.pluck(@name, chunk_size).map { |value| JSON.parse(value) } end end |
#should_add?(msg) ⇒ Boolean
19 20 21 22 |
# File 'lib/sidekiq/batching/batch.rb', line 19 def should_add? msg return true unless enqueue_similar_once? !@redis.enqueued?(@name, msg) end |
#size ⇒ Object
24 25 26 |
# File 'lib/sidekiq/batching/batch.rb', line 24 def size @redis.batch_size(@name) end |
#worker_class_constant ⇒ Object
51 52 53 |
# File 'lib/sidekiq/batching/batch.rb', line 51 def worker_class_constant @worker_class.constantize end |
#worker_class_options ⇒ Object
55 56 57 58 59 |
# File 'lib/sidekiq/batching/batch.rb', line 55 def worker_class_constant. rescue NameError {} end |