Class: MsgBatcher

Inherits:
Object
  • Object
show all
Defined in:
lib/msg-batcher.rb

Defined Under Namespace

Classes: Error

Constant Summary collapse

DEBUG =
false

Instance Method Summary collapse

Constructor Details

#initialize(max_length, max_time_msecs, on_error = nil, &block) ⇒ MsgBatcher

Returns a new instance of MsgBatcher.



14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# File 'lib/msg-batcher.rb', line 14

def initialize(max_length, max_time_msecs, on_error=nil, &block)
  @max_length = max_length
  @max_time_msecs = max_time_msecs
  @on_error = on_error
  @on_error ||= lambda { |ex| raise ex }
  @block = block

  @closed = false

  @storage = []
  @m = Mutex.new
  @m2 = Mutex.new # used besides @m mutex. Used because of timer thread.

  @timer_start_cv = ConditionVariable.new
  @timer_started_cv = ConditionVariable.new
  @timer_full_cycle_cv = ConditionVariable.new
  @timer_release_cv = ConditionVariable.new


  # It is important that push invocation start after full completion of this method.
  # So initialize instance of this class first and only then start pushing.
  start_timer
end

Instance Method Details

#kill(blocking: true) ⇒ Object



38
39
40
41
42
43
44
# File 'lib/msg-batcher.rb', line 38

def kill(blocking: true)
  @closed = true
  # releasing timer thread
  @timer_start_cv.signal
  @timer_release_cv.signal
  @timer_thread.join if blocking
end

#push(entry) ⇒ Object

Thread-safe

Raises:

  • when invoked when batcher has been closed



48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
# File 'lib/msg-batcher.rb', line 48

def push(entry)
  raise Error, "Batcher is closed - cannot push" if @closed

  @m.lock
  @m2.lock

  # Start timer
  # Timer thread must be in TT1 position
  if @storage.empty?
    @timer_start_cv.signal
    @timer_started_cv.wait @m2 # waiting for timer thread to be in position TT2
  end

  @storage.push entry
  # curr_size = @storage.inject(0) { |sum, e| s}
  if @storage.size == @max_length
    # unlocks @m inside release method
    release
  else
    @m2.unlock
    @m.unlock
  end
end