Class: ASIR::Transport::Buffer
- Inherits:
-
ASIR::Transport
- Object
- ASIR::Transport
- ASIR::Transport::Buffer
- Includes:
- Delegation
- Defined in:
- lib/asir/transport/buffer.rb
Overview
!SLIDE Buffer Transport
Buffers Messages until #flush! Assumes One-way Messages.
Constant Summary
Constants included from ASIR::ThreadVariable
ASIR::ThreadVariable::DEBUG, ASIR::ThreadVariable::EMPTY_HASH, ASIR::ThreadVariable::SETTER
Instance Attribute Summary collapse
-
#transport ⇒ Object
Transport to send_message.
Attributes included from Delegation
#on_failed_message, #on_send_message_exception, #reraise_first_exception
Attributes inherited from ASIR::Transport
#after_invoke_message, #after_receive_message, #before_send_message, #coder_needs_result_message, #decoder, #encoder, #invoker, #message_count, #needs_message_identifier, #needs_message_timestamp, #on_exception, #on_result_exception, #one_way, #running, #verbose
Attributes included from Log
Instance Method Summary collapse
-
#_send_message(state) ⇒ Object
If paused, queue messages, Otherwise delegate immediately to #transport.
-
#clear! ⇒ Object
Clear all pending Messages without sending them.
-
#flush! ⇒ Object
Will flush pending Messages even if ! #paused?.
-
#initialize(*args) ⇒ Buffer
constructor
A new instance of Buffer.
-
#pause! ⇒ Object
Pauses all messages until resume!.
-
#paused? ⇒ Boolean
Returns true if currently paused.
-
#process!(non_block = false) ⇒ Object
Processes queue.
-
#resume! ⇒ Object
Will automatically call #flush! when not #paused?.
-
#shift(non_block = false) ⇒ Object
Take Message from head of Queue.
- #size ⇒ Object
-
#stop! ⇒ Object
Stop processing queue.
Methods included from Delegation
#_handle_send_message_exception!, #_receive_result, #needs_message_identifier?, #needs_message_timestamp?, #transports
Methods inherited from ASIR::Transport
#_subclass_responsibility, #invoke_message!, #needs_message_identifier?, #needs_message_timestamp?, #receive_message, #receive_result, #send_message, #send_result, #serve_message!, #with_server_signals!
Methods included from Log
#_log, #_log_enabled=, #_log_enabled?, #_log_format, #_log_result, enabled, enabled=, included
Methods included from AdditionalData
#[], #[]=, #_additional_data, #additional_data, #additional_data!, #additional_data=, included
Methods included from Message::Delay
#relative_message_delay!, #wait_for_delay!
Methods included from ASIR::ThreadVariable
Constructor Details
#initialize(*args) ⇒ Buffer
Returns a new instance of Buffer.
18 19 20 21 22 23 24 |
# File 'lib/asir/transport/buffer.rb', line 18 def initialize *args super @messages = Queue.new @messages_mutex = Mutex.new @paused = 0 @paused_mutex = Mutex.new end |
Instance Attribute Details
#transport ⇒ Object
Transport to send_message.
16 17 18 |
# File 'lib/asir/transport/buffer.rb', line 16 def transport @transport end |
Instance Method Details
#_send_message(state) ⇒ Object
If paused, queue messages, Otherwise delegate immediately to #transport.
28 29 30 31 32 33 34 35 36 37 38 |
# File 'lib/asir/transport/buffer.rb', line 28 def state return nil if @ignore if paused? @messages_mutex.synchronize do @messages << state. end nil else @transport.(state.) end end |
#clear! ⇒ Object
Clear all pending Messages without sending them. Returns Array of Messages that would have been sent.
81 82 83 84 85 86 87 88 89 |
# File 'lib/asir/transport/buffer.rb', line 81 def clear! = [ ] @messages_mutex.synchronize do @messages.size.times do << @messages.shift(true) end end end |
#flush! ⇒ Object
Will flush pending Messages even if ! #paused?.
72 73 74 75 76 77 |
# File 'lib/asir/transport/buffer.rb', line 72 def flush! clear!.each do | | @transport.() end self end |
#pause! ⇒ Object
Pauses all messages until resume!. May be called multiple times.
48 49 50 51 52 53 |
# File 'lib/asir/transport/buffer.rb', line 48 def pause! @paused_mutex.synchronize do @paused += 1 end self end |
#paused? ⇒ Boolean
Returns true if currently paused. Messages are queued until #resume!.
42 43 44 |
# File 'lib/asir/transport/buffer.rb', line 42 def paused? @paused > 0 end |
#process!(non_block = false) ⇒ Object
Processes queue. Usually used in worker Thread.
98 99 100 101 102 103 104 |
# File 'lib/asir/transport/buffer.rb', line 98 def process! non_block=false @running = true while @running && ( = shift(non_block)) @transport.() end end |
#resume! ⇒ Object
Will automatically call #flush! when not #paused?.
56 57 58 59 60 61 62 63 |
# File 'lib/asir/transport/buffer.rb', line 56 def resume! should_flush = @paused_mutex.synchronize do @paused -= 1 if @paused > 0 @paused == 0 end flush! if should_flush self end |
#shift(non_block = false) ⇒ Object
Take Message from head of Queue.
92 93 94 |
# File 'lib/asir/transport/buffer.rb', line 92 def shift non_block=false @messages.shift(non_block) end |
#size ⇒ Object
65 66 67 68 69 |
# File 'lib/asir/transport/buffer.rb', line 65 def size @messages_mutex.synchronize do @messages.size end end |
#stop! ⇒ Object
Stop processing queue.
107 108 109 110 111 112 |
# File 'lib/asir/transport/buffer.rb', line 107 def stop! @messages_mutex.synchronize do @ignore = true; @running = false end self end |