Class: DBus::MessageQueue

Inherits:
Object
  • Object
show all
Defined in:
lib/dbus/message_queue.rb

Overview

Encapsulates a socket so that we can #push and #pop Messages.

Constant Summary collapse

MSG_BUF_SIZE =

The buffer size for messages.

4096

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(address) ⇒ MessageQueue

Returns a new instance of MessageQueue.



24
25
26
27
28
29
30
31
32
33
# File 'lib/dbus/message_queue.rb', line 24

def initialize(address)
  DBus.logger.debug "MessageQueue: #{address}"
  @address = address
  @buffer = ""
  # Reduce allocations by using a single buffer for our socket
  @read_buffer = String.new(capacity: MSG_BUF_SIZE)
  @is_tcp = false
  @mutex = Mutex.new
  connect
end

Instance Attribute Details

#socketObject (readonly)

The socket that is used to connect with the bus.



19
20
21
# File 'lib/dbus/message_queue.rb', line 19

def socket
  @socket
end

Instance Method Details

#buffer_from_socket_nonblockvoid

This method returns an undefined value.

Fill (append) the buffer from data that might be available on the socket.

Raises:

  • EOFError



169
170
171
172
173
174
175
176
177
178
179
180
181
# File 'lib/dbus/message_queue.rb', line 169

def buffer_from_socket_nonblock
  @buffer += @socket.read_nonblock(MSG_BUF_SIZE, @read_buffer)
rescue EOFError
  raise # the caller expects it
rescue Errno::EAGAIN
  # fine, would block
rescue Exception => e
  puts "Oops:", e
  raise if @is_tcp # why?

  puts "WARNING: read_nonblock failed, falling back to .recv"
  @buffer += @socket.recv(MSG_BUF_SIZE)
end

#message_from_buffer_nonblockMessage?

Get and remove one message from the buffer.

Returns:

  • (Message, nil)

    the message or nil if unavailable



152
153
154
155
156
157
158
159
160
161
162
163
# File 'lib/dbus/message_queue.rb', line 152

def message_from_buffer_nonblock
  return nil if @buffer.empty?

  ret = nil
  begin
    ret, size = Message.new.unmarshall_buffer(@buffer)
    @buffer.slice!(0, size)
  rescue IncompleteBufferException
    # fall through, let ret remain nil
  end
  ret
end

#pop(blocking: true) ⇒ Message?

TODO:

failure modes

Returns one message or nil if unavailable.

Parameters:

  • blocking (Boolean) (defaults to: true)

    true: wait to return a DBus::Message; false: may return ‘nil`

Returns:

  • (Message, nil)

    one message or nil if unavailable

Raises:

  • EOFError



41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
# File 'lib/dbus/message_queue.rb', line 41

def pop(blocking: true)
  # FIXME: this is not enough, the R/W test deadlocks on shared connections
  @mutex.synchronize do
    buffer_from_socket_nonblock
    message = message_from_buffer_nonblock
    if blocking
      # we can block
      while message.nil?
        r, _d, _d = IO.select([@socket])
        if r && r[0] == @socket
          buffer_from_socket_nonblock
          message = message_from_buffer_nonblock
        end
      end
    end
    message
  end
end

#push(message) ⇒ Object Also known as: <<



60
61
62
63
64
# File 'lib/dbus/message_queue.rb', line 60

def push(message)
  @mutex.synchronize do
    @socket.write(message.marshall)
  end
end