Module: RxIO::IOFilters::MsgSize

Defined in:
lib/rxio/io_filters/msg_size.rb

Overview

Message-Size I/O Filter: Splits messages according to a _4-byte unsigned big-endian integer_ size field, which prefixes every message and indicates its length in bytes.

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.extended(base) ⇒ Object

Inject Dependencies into Extending Module

Parameters:

  • base (Module)


22
23
24
# File 'lib/rxio/io_filters/msg_size.rb', line 22

def self.extended base
	base.extend RxIO::HandlerBase
end

Instance Method Details

#filter_input(endpoint, chunk) ⇒ Object

Filter Input: Buffers data chunks sent by the endpoint and extracts messages from them, according to the size field present at the beginning of each message.

Parameters:

  • endpoint (Hash)
  • chunk (String)


30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
# File 'lib/rxio/io_filters/msg_size.rb', line 30

def filter_input endpoint, chunk

	# Buffer dat shit
	buffer_input endpoint, chunk

	# Loop over Messages
	while true

		# Check Buffer Size (can we at least determine the next message size?)
		break unless endpoint[:ibuf].bytesize >= 4

		# Acquire Message Size
		size = endpoint[:ibuf][0, 4].unpack('N')[0]

		# Check Buffer Size again (is the complete message present in the buffer?)
		break unless endpoint[:ibuf].bytesize >= 4 + size

		# Slice out Message from Input Buffer
		m = endpoint[:ibuf].slice!(0, 4 + size)[4, size]

		# Register Message
		# endpoint[:msgs] << Base64.decode64(m)
		endpoint[:msgs] << m
	end
end

#send_msg(endpoint, msg) ⇒ Object

Send Message: Buffers a message to be sent to the endpoint, after prefixing it with a size field.

Parameters:

  • endpoint (Hash)
  • msg (String)


60
61
62
63
64
# File 'lib/rxio/io_filters/msg_size.rb', line 60

def send_msg endpoint, msg
	# msg = Base64.encode64 msg
	msg = msg.to_s
	write endpoint, [msg.bytesize].pack('N').force_encoding(msg.encoding), msg
end