Class: PulsarSdk::Protocol::Structure
- Inherits:
-
Object
- Object
- PulsarSdk::Protocol::Structure
- Includes:
- Tweaks::CleanInspect
- Defined in:
- lib/pulsar_sdk/protocol/structure.rb
Constant Summary collapse
- MAGIC_NUMBER =
- MAGIC_NUMBER
- CHECKSUM
- METADATA_SIZE
- METADATA
- PAYLOAD
[0x0e, 0x01].pack('C*').freeze
- MAGIC_NUMBER_LEN =
MAGIC_NUMBER.size
- CHECKSUM_LEN =
4
- METADATA_SIZE_LEN =
4
Instance Method Summary collapse
- #decode ⇒ Object
-
#initialize(buff) ⇒ Structure
constructor
A new instance of Structure.
-
#read_checksum ⇒ Object
crc32.
- #read_magic_number ⇒ Object
- #read_metadata ⇒ Object
- #read_remaining ⇒ Object
-
#rewind(x = nil) ⇒ Object
回退若干字节,方便处理非连续段.
Methods included from Tweaks::CleanInspect
Constructor Details
#initialize(buff) ⇒ Structure
Returns a new instance of Structure.
12 13 14 15 |
# File 'lib/pulsar_sdk/protocol/structure.rb', line 12 def initialize(buff) @buff = buff rewind end |
Instance Method Details
#decode ⇒ Object
17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 |
# File 'lib/pulsar_sdk/protocol/structure.rb', line 17 def decode = nil = PulsarSdk::Protocol::Message.new mn_bytes = read_magic_number if mn_bytes == MAGIC_NUMBER _checksum = read_checksum # TODO 可能需要校验一下,防止错误消息 = else rewind(MAGIC_NUMBER_LEN) = end msg = read_remaining # NOTE 同为Ruby SDK时可以根据Content-Type预先还原 # 复杂类型依旧为string,需要特别注意 .properties.each do |x| next unless x.key.to_s =~ /Content-Type/i next unless x.value.to_s =~ /json/i PulsarSdk.logger.info("#{self.class}::#{__method__}"){"Found json encode remark, parse JSON mesaage!"} msg = JSON.parse(msg) end .assign_attributes( publish_time: .publish_time, event_time: .event_time, partition_key: .partition_key, properties: .properties, payload: msg ) end |
#read_checksum ⇒ Object
crc32
66 67 68 |
# File 'lib/pulsar_sdk/protocol/structure.rb', line 66 def read_checksum read(CHECKSUM_LEN) end |
#read_magic_number ⇒ Object
61 62 63 |
# File 'lib/pulsar_sdk/protocol/structure.rb', line 61 def read_magic_number read(MAGIC_NUMBER_LEN) end |
#read_metadata ⇒ Object
70 71 72 73 74 |
# File 'lib/pulsar_sdk/protocol/structure.rb', line 70 def = read(METADATA_SIZE_LEN, 'N') = read() Pulsar::Proto::MessageMetadata.decode() end |
#read_remaining ⇒ Object
76 77 78 79 80 |
# File 'lib/pulsar_sdk/protocol/structure.rb', line 76 def read_remaining payload_size = @buff.size - @readed return if payload_size <= 0 read(payload_size) end |
#rewind(x = nil) ⇒ Object
回退若干字节,方便处理非连续段
55 56 57 58 59 |
# File 'lib/pulsar_sdk/protocol/structure.rb', line 55 def rewind(x = nil) return @readed = 0 if x.nil? @readed -= x end |