Module: EMRPC::FastMessageProtocol

Defined in:
lib/emrpc/fast_message_protocol.rb

Overview

Receives data with a 4-byte integer size prefix (network byte order). Underlying protocol must implement #send_data and invoke #receive_data. User’s protocol must call #send_fast_message and listen to #receive_fast_message callback.

Constant Summary collapse

LENGTH_FORMAT =
"N".freeze
LENGTH_FORMAT_LENGTH =
4

Instance Method Summary collapse

Instance Method Details

#errObject

def receive_data



87
88
89
# File 'lib/emrpc/fast_message_protocol.rb', line 87

def err
  STDERR.write("FastMessageProtocol: #{yield}\n")
end

#logObject



90
91
92
# File 'lib/emrpc/fast_message_protocol.rb', line 90

def log
  puts("FastMessageProtocol: #{yield}")
end

#post_initObject



6
7
8
9
10
11
12
# File 'lib/emrpc/fast_message_protocol.rb', line 6

def post_init
  @fmp_size = 0         # if 0, we're waiting for a new message,
                        # else - accumulating data.
  @fmp_size_chunk = ""  # we store a part of size chunk here
  @fmp_data = ""
  super
end

#receive_data(next_chunk) ⇒ Object



24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
# File 'lib/emrpc/fast_message_protocol.rb', line 24

def receive_data(next_chunk)
  while true # helps fight deep recursion when receiving many messages in a single buffer.
    data = next_chunk
    # accumulate data
    if @fmp_size > 0
      left = @fmp_size - @fmp_data.size
      now  = data.size
      log { "Need more #{left} bytes, got #{now} for now." }

      if left > now
        @fmp_data << data
        break
      elsif left == now
        @fmp_data << data
        data = @fmp_data
        @fmp_data = ""
        @fmp_size = 0
        @fmp_size_chunk = ""
        receive_fast_message(data)
        break
      else
        # Received more, than expected.
        # 1. Forward expected part
        # 2. Put unexpected part into receive_data
        @fmp_data << data[0, left]
        next_chunk = data[left, now]
        data = @fmp_data
        @fmp_data = ""
        @fmp_size = 0
        @fmp_size_chunk = ""
        log { "Returning #{data.size} bytes (#{data[0..32]})"  }
        receive_fast_message(data)
        # (see while true: processing next chunk without recursive calls)
      end

    # get message size prefix
    else
      left = LENGTH_FORMAT_LENGTH - @fmp_size_chunk.size
      now  = data.size
      log { "Need more #{left} bytes for size_chunk, got #{now} for now." }

      if left > now
        @fmp_size_chunk << data
        break
      elsif left == now
        @fmp_size_chunk << data
        @fmp_size = @fmp_size_chunk.unpack(LENGTH_FORMAT)[0]
        log { "Ready to receive #{@fmp_size} bytes."}
        break
      else
        # Received more, than expected.
        # 1. Pick only expected part for length
        # 2. Pass unexpected part into receive_data
        @fmp_size_chunk << data[0, left]
        next_chunk = data[left, now]
        @fmp_size = @fmp_size_chunk.unpack(LENGTH_FORMAT)[0]
        log { "Ready to receive #{@fmp_size} bytes."}
        # (see while true) receive_data(next_chunk) # process next chunk
      end # if
    end # if 
  end # while true
end

#send_fast_message(data) ⇒ Object



17
18
19
20
21
22
# File 'lib/emrpc/fast_message_protocol.rb', line 17

def send_fast_message(data)
  size = data.size
  packed_size = [size].pack(LENGTH_FORMAT)
  send_data packed_size
  send_data data
end