Class: BatchQueue
- Inherits:
-
Object
- Object
- BatchQueue
- Defined in:
- lib/batch_queue/version.rb,
lib/batch_queue/batch_queue.rb
Constant Summary collapse
- VERSION =
"0.1.0"
Instance Method Summary collapse
-
#initialize(max_batch_size: nil, max_interval_seconds: nil, &block) ⇒ BatchQueue
constructor
starts the queue either max_batch_size or interval_milliseconds or both must be set.
-
#on_error(&block) ⇒ Object
a block taking taking an exception as a parameter.
- #push(object) ⇒ Object (also: #<<)
- #size ⇒ Object
-
#stop ⇒ Object
stops the queue and signals to flush remaining queue, blocking until done.
Constructor Details
#initialize(max_batch_size: nil, max_interval_seconds: nil, &block) ⇒ BatchQueue
starts the queue either max_batch_size or interval_milliseconds or both must be set
4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
# File 'lib/batch_queue/batch_queue.rb', line 4 def initialize(max_batch_size: nil, max_interval_seconds: nil, &block) if max_batch_size.nil? && max_interval_seconds.nil? raise 'either max_batch_size or max_interval_seconds or both must be set' end @is_running = true @queue = Queue.new @block = block @max_batch_size = max_batch_size @max_interval_seconds = max_interval_seconds @mutex = Mutex.new @cond_var = ConditionVariable.new @runner = Thread.new { run } at_exit do stop end end |
Instance Method Details
#on_error(&block) ⇒ Object
a block taking taking an exception as a parameter
23 24 25 |
# File 'lib/batch_queue/batch_queue.rb', line 23 def on_error(&block) @on_error = block end |
#push(object) ⇒ Object Also known as: <<
27 28 29 30 31 32 33 34 |
# File 'lib/batch_queue/batch_queue.rb', line 27 def push(object) @mutex.synchronize do raise 'BatchQueue is stopped' unless @is_running @queue.push(object) @cond_var.signal end object end |
#size ⇒ Object
37 38 39 40 41 |
# File 'lib/batch_queue/batch_queue.rb', line 37 def size @mutex.synchronize do @queue.size end end |
#stop ⇒ Object
stops the queue and signals to flush remaining queue, blocking until done.
44 45 46 47 48 49 50 |
# File 'lib/batch_queue/batch_queue.rb', line 44 def stop @mutex.synchronize do @is_running = false @cond_var.signal end @runner.join end |