Class: PulsarSdk::Protocol::Frame

Inherits:
Object
  • Object
show all
Defined in:
lib/pulsar_sdk/protocol/frame.rb

Constant Summary collapse

PREPENDED_SIZE =

预留4byte存放帧长度

4
CHECKSUM_SIZE =
4
MAGIC_NUMBER =
[0x0e, 0x01].pack('C*').freeze

Class Method Summary collapse

Class Method Details

.binary(*obj) ⇒ Object



42
43
44
# File 'lib/pulsar_sdk/protocol/frame.rb', line 42

def self.binary(*obj)
  obj.map { |x| Array(x).pack('N') }.join
end

.crc32(bytes) ⇒ Object



46
47
48
49
50
# File 'lib/pulsar_sdk/protocol/frame.rb', line 46

def self.crc32(bytes)
  crc = Digest::CRC32c.new
  crc << bytes
  crc.checksum
end

.decode(byte) ⇒ Object



38
39
40
# File 'lib/pulsar_sdk/protocol/frame.rb', line 38

def self.decode(byte)
  Pulsar::Proto::BaseCommand.decode(byte)
end

.encode(command, message = nil) ⇒ Object



11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
# File 'lib/pulsar_sdk/protocol/frame.rb', line 11

def self.encode(command, message = nil)
  raise "command MUST be Pulsar::Proto::BaseCommand but got #{command.class}" unless command.is_a?(Pulsar::Proto::BaseCommand)

  pb_cmd = command.to_proto

  # 非发送消息帧
  return encode_command(pb_cmd) if message.nil?

  # 消息发送帧
  # [TOTAL_SIZE] [CMD_SIZE] [CMD] [MAGIC_NUMBER] [CHECKSUM] [METADATA_SIZE] [METADATA] [PAYLOAD]
  raise "message MUST be PulsarSdk::Producer::Message but got #{message.class}" unless message.is_a?(PulsarSdk::Producer::Message)

   = message.
  pb_meta = .to_proto

  meta_payload = binary(pb_meta.size) + pb_meta + message.binary_string
  checksum = crc32(meta_payload)

  total_size = PREPENDED_SIZE + pb_cmd.size + MAGIC_NUMBER.size + CHECKSUM_SIZE + meta_payload.size

  binary(total_size, pb_cmd.size) + pb_cmd + MAGIC_NUMBER + binary(checksum) + meta_payload
end

.encode_command(pb_cmd) ⇒ Object



34
35
36
# File 'lib/pulsar_sdk/protocol/frame.rb', line 34

def self.encode_command(pb_cmd)
  binary(pb_cmd.size + PREPENDED_SIZE, pb_cmd.size) + pb_cmd
end