Module: Rubarb::FastMessageProtocol

Defined in:
lib/rubarb/fast_message_protocol.rb

Overview

Receives data with a 4-byte integer size prefix (network byte order). Underlying protocol must implement #send_data and invoke #receive_data. User’s protocol must call #send_message and listen to #receive_message callback.

Constant Summary collapse

LENGTH_FORMAT =
"N".freeze
LENGTH_FORMAT_LENGTH =
4

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.install(object) ⇒ Object



7
8
9
10
# File 'lib/rubarb/fast_message_protocol.rb', line 7

def self.install(object)
  object.extend Rubarb::FastMessageProtocol
  object.post_init
end

Instance Method Details

#errObject

def receive_data



95
96
97
# File 'lib/rubarb/fast_message_protocol.rb', line 95

def err
  STDERR.write("FastMessageProtocol: #{yield}\n")
end

#logObject



99
100
101
# File 'lib/rubarb/fast_message_protocol.rb', line 99

def log
  puts("FastMessageProtocol: #{yield}")
end

#post_initObject



12
13
14
15
16
17
# File 'lib/rubarb/fast_message_protocol.rb', line 12

def post_init
  @fmp_size = 0 # if 0, we're waiting for a new message,
  # else - accumulating data.
  @fmp_size_chunk = "" # we store a part of size chunk here
  @fmp_data = ""
end

#receive_data(next_chunk) ⇒ Object



29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
# File 'lib/rubarb/fast_message_protocol.rb', line 29

def receive_data(next_chunk)
  log {"fmp rx:  #{next_chunk}"}
  while true # helps fight deep recursion when receiving many messages in a single buffer.
    data = next_chunk
    # accumulate data
    if @fmp_size > 0
      left = @fmp_size - @fmp_data.size
      now  = data.size
      log { "Need more #{left} bytes, got #{now} for now." }

      if left > now
        @fmp_data << data
        break
      elsif left == now
        @fmp_data << data
        data = @fmp_data
        @fmp_data = ""
        @fmp_size = 0
        @fmp_size_chunk = ""
        receive_message(data)
        break
      else
        # Received more, than expected.
        # 1. Forward expected part
        # 2. Put unexpected part into receive_data
        @fmp_data << data[0, left]
        next_chunk = data[left, now]
        data = @fmp_data
        @fmp_data = ""
        @fmp_size = 0
        @fmp_size_chunk = ""
        log { "Returning #{data.size} bytes (#{data[0..32]})" }
        receive_message(data)
        # (see while true: processing next chunk without recursive calls)
      end

      # get message size prefix
    else
      left = LENGTH_FORMAT_LENGTH - @fmp_size_chunk.size
      now  = data.size
      log { "Need more #{left} bytes for size_chunk, got #{now} for now." }

      if left > now
        @fmp_size_chunk << data
        break
      elsif left == now
        @fmp_size_chunk << data
        @fmp_size = @fmp_size_chunk.unpack(LENGTH_FORMAT)[0]
        log { "Ready to receive #{@fmp_size} bytes." }
        break
      else
        # Received more, than expected.
        # 1. Pick only expected part for length
        # 2. Pass unexpected part into receive_data
        @fmp_size_chunk << data[0, left]
        next_chunk = data[left, now]
        @fmp_size = @fmp_size_chunk.unpack(LENGTH_FORMAT)[0]
        log { "Ready to receive #{@fmp_size} bytes." }
        # (see while true) receive_data(next_chunk) # process next chunk
      end # if
    end # if
  end # while true
end

#send_message(data) ⇒ Object



22
23
24
25
26
27
# File 'lib/rubarb/fast_message_protocol.rb', line 22

def send_message(data)
  size = data.size
  packed_size = [size].pack(LENGTH_FORMAT)
  send_data packed_size
  send_data data
end