Top Level Namespace

Defined Under Namespace

Modules: Kafka

Constant Summary collapse

PRODUCE_REQUEST_ID =
0

Instance Method Summary collapse

Instance Method Details

#encode_message(message) ⇒ Object



7
8
9
10
11
12
13
# File 'lib/test.rb', line 7

def encode_message(message)
    # <MAGIC_BYTE: char> <CRC32: int> <PAYLOAD: bytes>
    data = [0].pack("C").to_s + [Zlib.crc32(message)].pack('N').to_s + message
    # print ("CHECKSUM " + Zlib.crc32(message).to_s)
    # print ("MES " + data.length.to_s)
    return data
end

#encode_produce_request(topic, partition, messages) ⇒ Object

encode_message(“ale”)



16
17
18
19
20
21
22
23
24
25
26
27
28
29
# File 'lib/test.rb', line 16

def encode_produce_request(topic, partition, messages)
    encoded = messages.collect { |m| encode_message(m) }
    message_set = encoded.collect { |e| puts "Message size #{e.length}"; [e.length].pack("N") + e }.join("")

    puts "MESSAGE"
    puts message_set.inspect

    data = [PRODUCE_REQUEST_ID].pack("n") + \
           [topic.length].pack("n") + topic.to_s + \
           [partition].pack("N") + \
           [message_set.length].pack("N") + message_set
    puts "DATA " + message_set.length.to_s
    return [data.length].pack("N") + data
end