Class: Sidekiq::Batching::Batch

Inherits:
Object
  • Object
show all
Defined in:
lib/sidekiq/batching/batch.rb

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(worker_class, queue, redis_pool = nil) ⇒ Batch

Returns a new instance of Batch.



5
6
7
8
9
10
# File 'lib/sidekiq/batching/batch.rb', line 5

def initialize(worker_class, queue, redis_pool = nil)
  @worker_class = worker_class
  @queue = queue
  @name = "#{worker_class.underscore}:#{queue}"
  @redis = Sidekiq::Batching::Redis.new
end

Instance Attribute Details

#nameObject (readonly)

Returns the value of attribute name.



12
13
14
# File 'lib/sidekiq/batching/batch.rb', line 12

def name
  @name
end

#queueObject (readonly)

Returns the value of attribute queue.



12
13
14
# File 'lib/sidekiq/batching/batch.rb', line 12

def queue
  @queue
end

#worker_classObject (readonly)

Returns the value of attribute worker_class.



12
13
14
# File 'lib/sidekiq/batching/batch.rb', line 12

def worker_class
  @worker_class
end

Class Method Details

.allObject



112
113
114
115
116
117
118
# File 'lib/sidekiq/batching/batch.rb', line 112

def all
  redis = Sidekiq::Batching::Redis.new

  redis.batches.map do |name|
    new(*extract_worker_klass_and_queue(name))
  end
end

.extract_worker_klass_and_queue(name) ⇒ Object



120
121
122
123
# File 'lib/sidekiq/batching/batch.rb', line 120

def extract_worker_klass_and_queue(name)
  klass, queue = name.split(':')
  [klass.classify, queue]
end

Instance Method Details

#add(msg) ⇒ Object



14
15
16
17
# File 'lib/sidekiq/batching/batch.rb', line 14

def add(msg)
  msg = msg.to_json
  @redis.push_msg(@name, msg, enqueue_similar_once?) if should_add? msg
end

#chunk_sizeObject



28
29
30
31
# File 'lib/sidekiq/batching/batch.rb', line 28

def chunk_size
  worker_class_options['batch_size'] ||
  Sidekiq::Batching::Config.max_batch_size
end

#could_flush?Boolean

Returns:

  • (Boolean)


61
62
63
# File 'lib/sidekiq/batching/batch.rb', line 61

def could_flush?
  could_flush_on_overflow? || could_flush_on_time?
end

#deleteObject



77
78
79
# File 'lib/sidekiq/batching/batch.rb', line 77

def delete
  @redis.delete(@name)
end

#flushObject



39
40
41
42
43
44
45
46
47
48
49
# File 'lib/sidekiq/batching/batch.rb', line 39

def flush
  chunk = pluck
  if chunk
    set_current_time_as_last
    Sidekiq::Client.push(
      'class' => @worker_class,
      'queue' => @queue,
      'args' => [true, chunk]
    )
  end
end

#last_execution_timeObject



65
66
67
68
# File 'lib/sidekiq/batching/batch.rb', line 65

def last_execution_time
  last_time = @redis.get_last_execution_time(@name)
  Time.parse(last_time) if last_time
end

#next_execution_timeObject



70
71
72
73
74
75
# File 'lib/sidekiq/batching/batch.rb', line 70

def next_execution_time
  if interval = worker_class_options['batch_flush_interval']
    last_time = last_execution_time
    last_time + interval.seconds if last_time
  end
end

#pluckObject



33
34
35
36
37
# File 'lib/sidekiq/batching/batch.rb', line 33

def pluck
  if @redis.lock(@name)
    @redis.pluck(@name, chunk_size).map { |value| JSON.parse(value) }
  end
end

#should_add?(msg) ⇒ Boolean

Returns:

  • (Boolean)


19
20
21
22
# File 'lib/sidekiq/batching/batch.rb', line 19

def should_add? msg
  return true unless enqueue_similar_once?
  !@redis.enqueued?(@name, msg)
end

#sizeObject



24
25
26
# File 'lib/sidekiq/batching/batch.rb', line 24

def size
  @redis.batch_size(@name)
end

#worker_class_constantObject



51
52
53
# File 'lib/sidekiq/batching/batch.rb', line 51

def worker_class_constant
  @worker_class.constantize
end

#worker_class_optionsObject



55
56
57
58
59
# File 'lib/sidekiq/batching/batch.rb', line 55

def worker_class_options
  worker_class_constant.get_sidekiq_options
rescue NameError
  {}
end