Class: DBus::MessageQueue
- Inherits:
-
Object
- Object
- DBus::MessageQueue
- Defined in:
- lib/dbus/message_queue.rb
Overview
Constant Summary collapse
- MSG_BUF_SIZE =
The buffer size for messages.
4096
Instance Attribute Summary collapse
-
#socket ⇒ Object
readonly
The socket that is used to connect with the bus.
Instance Method Summary collapse
-
#buffer_from_socket_nonblock ⇒ void
Fill (append) the buffer from data that might be available on the socket.
-
#initialize(address) ⇒ MessageQueue
constructor
A new instance of MessageQueue.
-
#message_from_buffer_nonblock ⇒ Message?
Get and remove one message from the buffer.
-
#pop(blocking: true) ⇒ Message?
One message or nil if unavailable.
- #push(message) ⇒ Object (also: #<<)
Constructor Details
#initialize(address) ⇒ MessageQueue
Returns a new instance of MessageQueue.
24 25 26 27 28 29 30 31 32 33 |
# File 'lib/dbus/message_queue.rb', line 24 def initialize(address) DBus.logger.debug "MessageQueue: #{address}" @address = address @buffer = "" # Reduce allocations by using a single buffer for our socket @read_buffer = String.new(capacity: MSG_BUF_SIZE) @is_tcp = false @mutex = Mutex.new connect end |
Instance Attribute Details
#socket ⇒ Object (readonly)
The socket that is used to connect with the bus.
19 20 21 |
# File 'lib/dbus/message_queue.rb', line 19 def socket @socket end |
Instance Method Details
#buffer_from_socket_nonblock ⇒ void
This method returns an undefined value.
Fill (append) the buffer from data that might be available on the socket.
169 170 171 172 173 174 175 176 177 178 179 180 181 |
# File 'lib/dbus/message_queue.rb', line 169 def buffer_from_socket_nonblock @buffer += @socket.read_nonblock(MSG_BUF_SIZE, @read_buffer) rescue EOFError raise # the caller expects it rescue Errno::EAGAIN # fine, would block rescue Exception => e puts "Oops:", e raise if @is_tcp # why? puts "WARNING: read_nonblock failed, falling back to .recv" @buffer += @socket.recv(MSG_BUF_SIZE) end |
#message_from_buffer_nonblock ⇒ Message?
Get and remove one message from the buffer.
152 153 154 155 156 157 158 159 160 161 162 163 |
# File 'lib/dbus/message_queue.rb', line 152 def return nil if @buffer.empty? ret = nil begin ret, size = Message.new.unmarshall_buffer(@buffer) @buffer.slice!(0, size) rescue IncompleteBufferException # fall through, let ret remain nil end ret end |
#pop(blocking: true) ⇒ Message?
TODO:
failure modes
Returns one message or nil if unavailable.
41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 |
# File 'lib/dbus/message_queue.rb', line 41 def pop(blocking: true) # FIXME: this is not enough, the R/W test deadlocks on shared connections @mutex.synchronize do buffer_from_socket_nonblock = if blocking # we can block while .nil? r, _d, _d = IO.select([@socket]) if r && r[0] == @socket buffer_from_socket_nonblock = end end end end end |
#push(message) ⇒ Object Also known as: <<
60 61 62 63 64 |
# File 'lib/dbus/message_queue.rb', line 60 def push() @mutex.synchronize do @socket.write(.marshall) end end |