Class: PulsarSdk::Protocol::Reader

Inherits:
Object
  • Object
show all
Includes:
Tweaks::CleanInspect
Defined in:
lib/pulsar_sdk/protocol/reader.rb

Constant Summary collapse

FRAME_SIZE_LEN =
4
CMD_SIZE_LEN =
4

Instance Method Summary collapse

Methods included from Tweaks::CleanInspect

#inspect

Constructor Details

#initialize(io) ⇒ Reader

Returns a new instance of Reader.



9
10
11
12
13
14
# File 'lib/pulsar_sdk/protocol/reader.rb', line 9

def initialize(io)
  ensure_interface_implemented!(io)
  @io = io

  @readed = 0
end

Instance Method Details

#read_commandObject



35
36
37
38
39
# File 'lib/pulsar_sdk/protocol/reader.rb', line 35

def read_command
  cmd_size = read(CMD_SIZE_LEN, 'N')
  cmd_bytes = read(cmd_size)
  Pulsar::Proto::BaseCommand.decode(cmd_bytes)
end

#read_frame_sizeObject



28
29
30
31
32
33
# File 'lib/pulsar_sdk/protocol/reader.rb', line 28

def read_frame_size
  frame_size = read(FRAME_SIZE_LEN, 'N')
  # reset cursor! let's read the frame
  @readed = 0
  frame_size
end

#read_fullyObject

TODO add timeout?



17
18
19
20
21
22
23
24
25
26
# File 'lib/pulsar_sdk/protocol/reader.rb', line 17

def read_fully
  frame_szie = read_frame_size
  raise "IO reader is empty! maybe server error, please check server log for help." if frame_szie.nil?

  base_cmd = read_command

  buffer = read_remaining(frame_szie)

  [base_cmd, buffer]
end

#read_remaining(frame_szie) ⇒ Object



41
42
43
44
45
# File 'lib/pulsar_sdk/protocol/reader.rb', line 41

def read_remaining(frame_szie)
  meta_and_payload_size = frame_szie - @readed
  return if meta_and_payload_size <= 0
  read(meta_and_payload_size)
end