Class: MsgBatcher
- Inherits:
-
Object
- Object
- MsgBatcher
- Defined in:
- lib/msg-batcher.rb
Defined Under Namespace
Classes: Error
Constant Summary collapse
- DEBUG =
false
Instance Method Summary collapse
-
#initialize(max_length, max_time_msecs, on_error = nil, &block) ⇒ MsgBatcher
constructor
A new instance of MsgBatcher.
- #kill(blocking: true) ⇒ Object
-
#push(entry) ⇒ Object
Thread-safe.
Constructor Details
#initialize(max_length, max_time_msecs, on_error = nil, &block) ⇒ MsgBatcher
14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 |
# File 'lib/msg-batcher.rb', line 14 def initialize(max_length, max_time_msecs, on_error=nil, &block) @max_length = max_length @max_time_msecs = max_time_msecs @on_error = on_error @on_error ||= lambda { |ex| raise ex } @block = block @closed = false @storage = [] @m = Mutex.new @m2 = Mutex.new # used besides @m mutex. Used because of timer thread. @timer_start_cv = ConditionVariable.new @timer_started_cv = ConditionVariable.new @timer_full_cycle_cv = ConditionVariable.new @timer_release_cv = ConditionVariable.new # It is important that push invocation start after full completion of this method. # So initialize instance of this class first and only then start pushing. start_timer end |
Instance Method Details
#kill(blocking: true) ⇒ Object
38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 |
# File 'lib/msg-batcher.rb', line 38 def kill(blocking: true) # no #push will interfere @m.synchronize do # want to make sure that timer thread is in a waiting position. Hence, acquiring @m2 @m2.synchronize do @closed = true # releasing timer thread @timer_start_cv.signal # This can happen, however, that timer thread will wait timeout on @timer_release_cv. # If it was waiting on @timer_start_cv. Because timer thread won't reach @timer_release_cv wait poisition @timer_release_cv.signal end @timer_thread.join if blocking end end |
#push(entry) ⇒ Object
Thread-safe
56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 |
# File 'lib/msg-batcher.rb', line 56 def push(entry) raise Error, 'Batcher is closed - cannot push' if @closed @m.lock @m2.lock # Start timer # Timer thread must be in TT1 position if @storage.empty? @timer_start_cv.signal @timer_started_cv.wait @m2 # waiting for timer thread to be in position TT2 end @storage.push entry # curr_size = @storage.inject(0) { |sum, e| s} if @storage.size == @max_length # unlocks @m inside release method release else @m2.unlock @m.unlock end end |