Class: BatchQueue

Inherits:
Object
  • Object
show all
Defined in:
lib/batch_queue/version.rb,
lib/batch_queue/batch_queue.rb

Constant Summary collapse

VERSION =
"1.0.0"

Instance Method Summary collapse

Constructor Details

#initialize(max_batch_size: nil, max_interval_seconds: nil, &block) ⇒ BatchQueue

starts the queue either max_batch_size or interval_milliseconds or both must be set



4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# File 'lib/batch_queue/batch_queue.rb', line 4

def initialize(max_batch_size: nil, max_interval_seconds: nil, &block)
  if max_batch_size.nil? && max_interval_seconds.nil?
    raise 'either max_batch_size or max_interval_seconds or both must be set'
  end
  @is_running = true
  @queue = Queue.new
  @block = block
  @max_batch_size = max_batch_size
  @max_interval_seconds = max_interval_seconds
  @mutex = Mutex.new
  @cond_var = ConditionVariable.new
  @runner = Thread.new { run }

  at_exit do
    stop
  end
end

Instance Method Details

#push(object) ⇒ Object Also known as: <<



22
23
24
25
26
27
28
29
# File 'lib/batch_queue/batch_queue.rb', line 22

def push(object)
  @mutex.synchronize do
    raise 'BatchQueue is stopped' unless @is_running
    @queue.push(object)
    @cond_var.signal
  end
  object
end

#sizeObject



32
33
34
35
36
# File 'lib/batch_queue/batch_queue.rb', line 32

def size
  @mutex.synchronize do
    @queue.size
  end
end

#stopObject

stops the queue and signals to flush remaining queue, blocking until done.



39
40
41
42
43
44
45
# File 'lib/batch_queue/batch_queue.rb', line 39

def stop
  @mutex.synchronize do
    @is_running = false
    @cond_var.signal
  end
  @runner.join
end