Class: Sidekiq::Grouping::Batch

Inherits:
Object
  • Object
show all
Defined in:
lib/sidekiq/grouping/batch.rb

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(worker_class, queue, redis_pool = nil) ⇒ Batch

Returns a new instance of Batch.



4
5
6
7
8
9
# File 'lib/sidekiq/grouping/batch.rb', line 4

def initialize(worker_class, queue, redis_pool = nil)
  @worker_class = worker_class
  @queue = queue
  @name = "#{worker_class.underscore}:#{queue}"
  @redis = Sidekiq::Grouping::Redis.new
end

Instance Attribute Details

#nameObject (readonly)

Returns the value of attribute name.



11
12
13
# File 'lib/sidekiq/grouping/batch.rb', line 11

def name
  @name
end

#queueObject (readonly)

Returns the value of attribute queue.



11
12
13
# File 'lib/sidekiq/grouping/batch.rb', line 11

def queue
  @queue
end

#worker_classObject (readonly)

Returns the value of attribute worker_class.



11
12
13
# File 'lib/sidekiq/grouping/batch.rb', line 11

def worker_class
  @worker_class
end

Class Method Details

.allObject



116
117
118
119
120
121
122
# File 'lib/sidekiq/grouping/batch.rb', line 116

def all
  redis = Sidekiq::Grouping::Redis.new

  redis.batches.map do |name|
    new(*extract_worker_klass_and_queue(name))
  end
end

.extract_worker_klass_and_queue(name) ⇒ Object



124
125
126
127
# File 'lib/sidekiq/grouping/batch.rb', line 124

def extract_worker_klass_and_queue(name)
  klass, queue = name.split(':')
  [klass.camelize, queue]
end

Instance Method Details

#add(msg) ⇒ Object



13
14
15
16
# File 'lib/sidekiq/grouping/batch.rb', line 13

def add(msg)
  msg = msg.to_json
  @redis.push_msg(@name, msg, enqueue_similar_once?) if should_add? msg
end

#chunk_sizeObject



27
28
29
30
# File 'lib/sidekiq/grouping/batch.rb', line 27

def chunk_size
  worker_class_options['batch_size'] ||
    Sidekiq::Grouping::Config.max_batch_size
end

#could_flush?Boolean

Returns:

  • (Boolean)


67
68
69
# File 'lib/sidekiq/grouping/batch.rb', line 67

def could_flush?
  could_flush_on_overflow? || could_flush_on_time?
end

#deleteObject



83
84
85
# File 'lib/sidekiq/grouping/batch.rb', line 83

def delete
  @redis.delete(@name)
end

#flushObject



43
44
45
46
47
48
49
50
51
52
53
54
55
# File 'lib/sidekiq/grouping/batch.rb', line 43

def flush
  chunk = pluck
  return unless chunk

  chunk.each_slice(chunk_size) do |subchunk|
    Sidekiq::Client.push(
      'class' => @worker_class,
      'queue' => @queue,
      'args' => [true, subchunk]
    )
  end
  set_current_time_as_last
end

#last_execution_timeObject



71
72
73
74
# File 'lib/sidekiq/grouping/batch.rb', line 71

def last_execution_time
  last_time = @redis.get_last_execution_time(@name)
  Time.parse(last_time) if last_time
end

#next_execution_timeObject



76
77
78
79
80
81
# File 'lib/sidekiq/grouping/batch.rb', line 76

def next_execution_time
  if interval = worker_class_options['batch_flush_interval']
    last_time = last_execution_time
    last_time + interval.seconds if last_time
  end
end

#pluckObject



37
38
39
40
41
# File 'lib/sidekiq/grouping/batch.rb', line 37

def pluck
  if @redis.lock(@name)
    @redis.pluck(@name, pluck_size).map { |value| JSON.parse(value) }
  end
end

#pluck_sizeObject



32
33
34
35
# File 'lib/sidekiq/grouping/batch.rb', line 32

def pluck_size
  worker_class_options['batch_flush_size'] ||
    chunk_size
end

#should_add?(msg) ⇒ Boolean

Returns:

  • (Boolean)


18
19
20
21
# File 'lib/sidekiq/grouping/batch.rb', line 18

def should_add? msg
  return true unless enqueue_similar_once?
  !@redis.enqueued?(@name, msg)
end

#sizeObject



23
24
25
# File 'lib/sidekiq/grouping/batch.rb', line 23

def size
  @redis.batch_size(@name)
end

#worker_class_constantObject



57
58
59
# File 'lib/sidekiq/grouping/batch.rb', line 57

def worker_class_constant
  @worker_class.constantize
end

#worker_class_optionsObject



61
62
63
64
65
# File 'lib/sidekiq/grouping/batch.rb', line 61

def worker_class_options
  worker_class_constant.get_sidekiq_options
rescue NameError
  {}
end