Module: RxIO::IOFilters::MsgSize

Defined in:
lib/rxio/io_filters/msg_size.rb

Overview

Message-Size I/O Filter: Splits messages according to a _4-byte unsigned big-endian integer_ size field, which prefixes every message and indicates its length in bytes.

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.extended(base) ⇒ Object

Inject Dependencies into Extending Module



22
23
24
# File 'lib/rxio/io_filters/msg_size.rb', line 22

def self.extended base
  base.extend RxIO::HandlerBase
end

Instance Method Details

#filter_input(endpoint, chunk) ⇒ Object

Filter Input: Buffers data chunks sent by the endpoint and extracts messages from them, according to the size field present at the beginning of each message.



30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
# File 'lib/rxio/io_filters/msg_size.rb', line 30

def filter_input endpoint, chunk

  # Buffer dat shit
  buffer_input endpoint, chunk

  # Loop over Messages
  while true

    # Check Buffer Size (can we at least determine the next message size?)
    break unless endpoint[:ibuf].bytesize >= 4

    # Acquire Message Size
    size = endpoint[:ibuf][0, 4].unpack('N')[0]

    # Check Buffer Size again (is the complete message present in the buffer?)
    break unless endpoint[:ibuf].bytesize >= 4 + size

    # Slice out Message from Input Buffer
    m = endpoint[:ibuf].slice!(0, 4 + size)[4, size]

    # Register Message
    endpoint[:msgs] << Base64.decode64(m)
  end
end

#send_msg(endpoint, msg) ⇒ Object

Send Message: Buffers a message to be sent to the endpoint, after prefixing it with a size field.



59
60
61
62
# File 'lib/rxio/io_filters/msg_size.rb', line 59

def send_msg endpoint, msg
  msg = Base64.encode64 msg
  write endpoint, [msg.bytesize].pack('N').force_encoding(msg.encoding), msg
end