Class: EventMachine::Kafka::Message

Inherits:
Object
  • Object
show all
Defined in:
lib/em-kafka/message.rb

Overview

1 byte “magic” identifier to allow format changes 4 byte CRC32 of the payload N - 5 byte payload

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(payload, magic = 0, checksum = nil, size = nil) ⇒ Message

Returns a new instance of Message.



10
11
12
13
14
# File 'lib/em-kafka/message.rb', line 10

def initialize(payload, magic = 0, checksum = nil, size = nil)
  self.payload  = payload
  self.magic    = magic
  self.checksum = checksum || Zlib.crc32(payload)
end

Instance Attribute Details

#checksumObject

Returns the value of attribute checksum.



8
9
10
# File 'lib/em-kafka/message.rb', line 8

def checksum
  @checksum
end

#magicObject

Returns the value of attribute magic.



8
9
10
# File 'lib/em-kafka/message.rb', line 8

def magic
  @magic
end

#payloadObject

Returns the value of attribute payload.



8
9
10
# File 'lib/em-kafka/message.rb', line 8

def payload
  @payload
end

#sizeObject

Returns the value of attribute size.



8
9
10
# File 'lib/em-kafka/message.rb', line 8

def size
  @size
end

Class Method Details

.decode(size, binary) ⇒ Object



25
26
27
28
29
30
31
# File 'lib/em-kafka/message.rb', line 25

def self.decode(size, binary)
  return unless binary
  magic    = binary[4].unpack("C").shift
  checksum = binary[5..9].unpack("N").shift
  payload  = binary[9..-1]
  new(payload, magic, checksum)
end

Instance Method Details

#encodeObject



20
21
22
23
# File 'lib/em-kafka/message.rb', line 20

def encode
  [magic, checksum].pack("CN") +
  payload.to_s.force_encoding(Encoding::ASCII_8BIT)
end

#valid?Boolean

Returns:

  • (Boolean)


16
17
18
# File 'lib/em-kafka/message.rb', line 16

def valid?
  checksum == Zlib.crc32(payload)
end