Class: ASIR::Transport::Buffer

Inherits:
ASIR::Transport show all
Includes:
Delegation
Defined in:
lib/asir/transport/buffer.rb

Overview

!SLIDE Buffer Transport

Buffers Messages until #flush! Assumes One-way Messages.

Constant Summary

Constants included from ASIR::ThreadVariable

ASIR::ThreadVariable::DEBUG, ASIR::ThreadVariable::EMPTY_HASH, ASIR::ThreadVariable::SETTER

Instance Attribute Summary collapse

Attributes included from Delegation

#on_failed_message, #on_send_message_exception, #reraise_first_exception

Attributes inherited from ASIR::Transport

#after_invoke_message, #after_receive_message, #before_send_message, #coder_needs_result_message, #decoder, #encoder, #invoker, #message_count, #needs_message_identifier, #needs_message_timestamp, #on_exception, #on_result_exception, #one_way, #running, #verbose

Attributes included from Log

#_logger

Instance Method Summary collapse

Methods included from Delegation

#_handle_send_message_exception!, #_receive_result, #needs_message_identifier?, #needs_message_timestamp?, #transports

Methods inherited from ASIR::Transport

#_subclass_responsibility, #invoke_message!, #needs_message_identifier?, #needs_message_timestamp?, #receive_message, #receive_result, #send_message, #send_result, #serve_message!, #with_server_signals!

Methods included from Log

#_log, #_log_enabled=, #_log_enabled?, #_log_format, #_log_result, enabled, enabled=, included

Methods included from AdditionalData

#[], #[]=, #_additional_data, #additional_data, #additional_data!, #additional_data=, included

Methods included from Message::Delay

#relative_message_delay!, #wait_for_delay!

Methods included from ASIR::ThreadVariable

included, setter

Constructor Details

#initialize(*args) ⇒ Buffer

Returns a new instance of Buffer.



18
19
20
21
22
23
24
# File 'lib/asir/transport/buffer.rb', line 18

def initialize *args
  super
  @messages = Queue.new
  @messages_mutex = Mutex.new
  @paused = 0
  @paused_mutex = Mutex.new
end

Instance Attribute Details

#transportObject

Transport to send_message.



16
17
18
# File 'lib/asir/transport/buffer.rb', line 16

def transport
  @transport
end

Instance Method Details

#_send_message(state) ⇒ Object

If paused, queue messages, Otherwise delegate immediately to #transport.



28
29
30
31
32
33
34
35
36
37
38
# File 'lib/asir/transport/buffer.rb', line 28

def _send_message state
  return nil if @ignore
  if paused?
    @messages_mutex.synchronize do
      @messages << state.message
    end
    nil
  else
    @transport.send_message(state.message)
  end
end

#clear!Object

Clear all pending Messages without sending them. Returns Array of Messages that would have been sent.



81
82
83
84
85
86
87
88
89
# File 'lib/asir/transport/buffer.rb', line 81

def clear!
  messages = [ ]
  @messages_mutex.synchronize do
    @messages.size.times do
      messages << @messages.shift(true)
    end
  end
  messages
end

#flush!Object

Will flush pending Messages even if ! #paused?.



72
73
74
75
76
77
# File 'lib/asir/transport/buffer.rb', line 72

def flush!
  clear!.each do | message |
    @transport.send_message(message)
  end
  self
end

#pause!Object

Pauses all messages until resume!. May be called multiple times.



48
49
50
51
52
53
# File 'lib/asir/transport/buffer.rb', line 48

def pause!
  @paused_mutex.synchronize do
    @paused += 1
  end
  self
end

#paused?Boolean

Returns true if currently paused. Messages are queued until #resume!.

Returns:

  • (Boolean)


42
43
44
# File 'lib/asir/transport/buffer.rb', line 42

def paused?
  @paused > 0
end

#process!(non_block = false) ⇒ Object

Processes queue. Usually used in worker Thread.



98
99
100
101
102
103
104
# File 'lib/asir/transport/buffer.rb', line 98

def process! non_block=false
  @running = true
  while @running && (message = shift(non_block))
    @transport.send_message(message)
  end
  message
end

#resume!Object

Will automatically call #flush! when not #paused?.



56
57
58
59
60
61
62
63
# File 'lib/asir/transport/buffer.rb', line 56

def resume!
  should_flush = @paused_mutex.synchronize do
    @paused -= 1 if @paused > 0
    @paused == 0
  end
  flush! if should_flush
  self
end

#shift(non_block = false) ⇒ Object

Take Message from head of Queue.



92
93
94
# File 'lib/asir/transport/buffer.rb', line 92

def shift non_block=false
  @messages.shift(non_block)
end

#sizeObject



65
66
67
68
69
# File 'lib/asir/transport/buffer.rb', line 65

def size
  @messages_mutex.synchronize do
    @messages.size
  end
end

#stop!Object

Stop processing queue.



107
108
109
110
111
112
# File 'lib/asir/transport/buffer.rb', line 107

def stop!
  @messages_mutex.synchronize do
    @ignore = true; @running = false
  end
  self
end