Class: ASIR::Transport::Buffer
- Inherits:
-
ASIR::Transport
- Object
- ASIR::Transport
- ASIR::Transport::Buffer
- Includes:
- Delegation
- Defined in:
- lib/asir/transport/buffer.rb
Overview
!SLIDE Buffer Transport
Buffers Messages until #flush! Assumes One-way Messages.
Constant Summary
Constants included from ASIR::ThreadVariable
ASIR::ThreadVariable::DEBUG, ASIR::ThreadVariable::EMPTY_HASH, ASIR::ThreadVariable::SETTER
Instance Attribute Summary collapse
-
#transport ⇒ Object
Transport to send_message.
Attributes included from Delegation
#on_failed_message, #on_send_message_exception, #reraise_first_exception
Attributes inherited from ASIR::Transport
#after_receive_message, #before_send_message, #decoder, #encoder, #invoker, #message_count, #needs_message_identifier, #needs_message_timestamp, #on_exception, #on_result_exception, #one_way, #running, #verbose
Attributes included from Log
Instance Method Summary collapse
-
#_send_message(message, message_payload) ⇒ Object
If paused, queue messages, Otherwise delegate immediately to #transport.
-
#clear! ⇒ Object
Clear all pending Messages without sending them.
-
#flush! ⇒ Object
Will flush pending Messages even if ! #paused?.
-
#initialize(*args) ⇒ Buffer
constructor
A new instance of Buffer.
-
#pause! ⇒ Object
Pauses all messages until resume!.
-
#paused? ⇒ Boolean
Returns true if currently paused.
-
#process!(non_block = false) ⇒ Object
Processes queue.
-
#resume! ⇒ Object
Will automatically call #flush! when not #paused?.
-
#shift(non_block = false) ⇒ Object
Take Message from head of Queue.
- #size ⇒ Object
-
#stop! ⇒ Object
Stop processing queue.
Methods included from Delegation
#_handle_send_message_exception!, #_receive_result, #needs_message_identifier?, #needs_message_timestamp?, #receive_result, #transports
Methods inherited from ASIR::Transport
#_subclass_responsibility, #invoke_message!, #needs_message_identifier?, #needs_message_timestamp?, #receive_message, #receive_result, #send_message, #send_result, #serve_message!, #with_server_signals!
Methods included from Log
#_log, #_log_enabled=, #_log_enabled?, #_log_format, #_log_result, enabled, enabled=, included
Methods included from AdditionalData
#[], #[]=, #additional_data, #additional_data!, #additional_data=, included
Methods included from Message::Delay
#relative_message_delay!, #wait_for_delay!
Methods included from ASIR::ThreadVariable
Constructor Details
#initialize(*args) ⇒ Buffer
Returns a new instance of Buffer.
17 18 19 20 21 22 23 |
# File 'lib/asir/transport/buffer.rb', line 17 def initialize *args super @messages = Queue.new @messages_mutex = Mutex.new @paused = 0 @paused_mutex = Mutex.new end |
Instance Attribute Details
#transport ⇒ Object
Transport to send_message.
15 16 17 |
# File 'lib/asir/transport/buffer.rb', line 15 def transport @transport end |
Instance Method Details
#_send_message(message, message_payload) ⇒ Object
If paused, queue messages, Otherwise delegate immediately to #transport.
27 28 29 30 31 32 33 34 35 36 37 |
# File 'lib/asir/transport/buffer.rb', line 27 def , return nil if @ignore if paused? @messages_mutex.synchronize do @messages << end nil else @transport.() end end |
#clear! ⇒ Object
Clear all pending Messages without sending them. Returns Array of Messages that would have been sent.
80 81 82 83 84 85 86 87 88 |
# File 'lib/asir/transport/buffer.rb', line 80 def clear! = [ ] @messages_mutex.synchronize do @messages.size.times do << @messages.shift(true) end end end |
#flush! ⇒ Object
Will flush pending Messages even if ! #paused?.
71 72 73 74 75 76 |
# File 'lib/asir/transport/buffer.rb', line 71 def flush! clear!.each do | | @transport.() end self end |
#pause! ⇒ Object
Pauses all messages until resume!. May be called multiple times.
47 48 49 50 51 52 |
# File 'lib/asir/transport/buffer.rb', line 47 def pause! @paused_mutex.synchronize do @paused += 1 end self end |
#paused? ⇒ Boolean
Returns true if currently paused. Messages are queued until #resume!.
41 42 43 |
# File 'lib/asir/transport/buffer.rb', line 41 def paused? @paused > 0 end |
#process!(non_block = false) ⇒ Object
Processes queue. Usually used in worker Thread.
97 98 99 100 101 102 103 |
# File 'lib/asir/transport/buffer.rb', line 97 def process! non_block=false @running = true while @running && = shift(non_block) @transport.() end end |
#resume! ⇒ Object
Will automatically call #flush! when not #paused?.
55 56 57 58 59 60 61 62 |
# File 'lib/asir/transport/buffer.rb', line 55 def resume! should_flush = @paused_mutex.synchronize do @paused -= 1 if @paused > 0 @paused == 0 end flush! if should_flush self end |
#shift(non_block = false) ⇒ Object
Take Message from head of Queue.
91 92 93 |
# File 'lib/asir/transport/buffer.rb', line 91 def shift non_block=false @messages.shift(non_block) end |
#size ⇒ Object
64 65 66 67 68 |
# File 'lib/asir/transport/buffer.rb', line 64 def size @messages_mutex.synchronize do @messages.size end end |
#stop! ⇒ Object
Stop processing queue.
106 107 108 109 110 111 |
# File 'lib/asir/transport/buffer.rb', line 106 def stop! @messages_mutex.synchronize do @ignore = true; @running = false end self end |