Class: MsgBatcher

Inherits:
Object
  • Object
show all
Defined in:
lib/msg-batcher.rb

Defined Under Namespace

Classes: Error

Constant Summary collapse

DEBUG =
false

Instance Method Summary collapse

Constructor Details

#initialize(max_length, max_time_msecs, on_error = nil, &block) ⇒ MsgBatcher



14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# File 'lib/msg-batcher.rb', line 14

def initialize(max_length, max_time_msecs, on_error=nil, &block)
  @max_length = max_length
  @max_time_msecs = max_time_msecs
  @on_error = on_error
  @on_error ||= lambda { |ex| raise ex }
  @block = block

  @closed = false

  @storage = []
  @m = Mutex.new
  @m2 = Mutex.new # used besides @m mutex. Used because of timer thread.

  @timer_start_cv = ConditionVariable.new
  @timer_started_cv = ConditionVariable.new
  @timer_full_cycle_cv = ConditionVariable.new
  @timer_release_cv = ConditionVariable.new


  # It is important that push invocation start after full completion of this method.
  # So initialize instance of this class first and only then start pushing.
  start_timer
end

Instance Method Details

#kill(blocking: true) ⇒ Object



38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
# File 'lib/msg-batcher.rb', line 38

def kill(blocking: true)
  # no #push will interfere
  @m.synchronize do
    # want to make sure that timer thread is in a waiting position. Hence, acquiring @m2
    @m2.synchronize do
      @closed = true
      # releasing timer thread
      @timer_start_cv.signal
      # This can happen, however, that timer thread will wait timeout on @timer_release_cv.
      # If it was waiting on @timer_start_cv. Because timer thread won't reach @timer_release_cv wait poisition
      @timer_release_cv.signal
    end
    @timer_thread.join if blocking
  end
end

#push(entry) ⇒ Object

Thread-safe

Raises:

  • (Error)

    when invoked when batcher has been closed



56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
# File 'lib/msg-batcher.rb', line 56

def push(entry)
  raise Error, 'Batcher is closed - cannot push' if @closed

  @m.lock
  @m2.lock

  # Start timer
  # Timer thread must be in TT1 position
  if @storage.empty?
    @timer_start_cv.signal
    @timer_started_cv.wait @m2 # waiting for timer thread to be in position TT2
  end

  @storage.push entry
  # curr_size = @storage.inject(0) { |sum, e| s}
  if @storage.size == @max_length
    # unlocks @m inside release method
    release
  else
    @m2.unlock
    @m.unlock
  end
end