Class: Kafka::Protocol::Encoder

Inherits:
Object
  • Object
show all
Defined in:
lib/kafka/protocol/encoder.rb

Overview

An encoder wraps an IO object, making it easy to write specific data types to it.

Constant Summary collapse

VARINT_MASK =
0b10000000

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(io) ⇒ Encoder

Initializes a new encoder.

Parameters:

  • io (IO)

    an object that acts as an IO.



15
16
17
18
# File 'lib/kafka/protocol/encoder.rb', line 15

def initialize(io)
  @io = io
  @io.set_encoding(Encoding::BINARY)
end

Class Method Details

.encode_with(object) ⇒ String

Encodes an object into a new buffer.

Parameters:

  • object (#encode)

    the object that will encode itself.

Returns:

  • (String)

    the encoded data.



179
180
181
182
183
184
185
186
# File 'lib/kafka/protocol/encoder.rb', line 179

def self.encode_with(object)
  buffer = StringIO.new
  encoder = new(buffer)

  object.encode(encoder)

  buffer.string
end

Instance Method Details

#write(bytes) ⇒ nil

Writes bytes directly to the IO object.

Parameters:

  • bytes (String)

Returns:

  • (nil)


24
25
26
27
28
# File 'lib/kafka/protocol/encoder.rb', line 24

def write(bytes)
  @io.write(bytes)

  nil
end

#write_array(array, &block) ⇒ nil

Writes an array to the IO object.

Each item in the specified array will be yielded to the provided block; it's the responsibility of the block to write those items using the encoder.

Parameters:

  • array (Array)

Returns:

  • (nil)


78
79
80
81
82
83
84
85
86
# File 'lib/kafka/protocol/encoder.rb', line 78

def write_array(array, &block)
  if array.nil?
    # An array can be null, which is different from it being empty.
    write_int32(-1)
  else
    write_int32(array.size)
    array.each(&block)
  end
end

#write_boolean(boolean) ⇒ nil

Writes an 8-bit boolean to the IO object.

Parameters:

  • boolean (Boolean)

Returns:

  • (nil)


34
35
36
# File 'lib/kafka/protocol/encoder.rb', line 34

def write_boolean(boolean)
  boolean ? write_int8(1) : write_int8(0)
end

#write_bytes(bytes) ⇒ nil

Writes a byte string to the IO object.

Parameters:

  • bytes (String)

Returns:

  • (nil)


153
154
155
156
157
158
159
160
# File 'lib/kafka/protocol/encoder.rb', line 153

def write_bytes(bytes)
  if bytes.nil?
    write_int32(-1)
  else
    write_int32(bytes.bytesize)
    write(bytes)
  end
end

#write_int16(int) ⇒ nil

Writes a 16-bit integer to the IO object.

Parameters:

  • int (Integer)

Returns:

  • (nil)


50
51
52
# File 'lib/kafka/protocol/encoder.rb', line 50

def write_int16(int)
  write([int].pack("s>"))
end

#write_int32(int) ⇒ nil

Writes a 32-bit integer to the IO object.

Parameters:

  • int (Integer)

Returns:

  • (nil)


58
59
60
# File 'lib/kafka/protocol/encoder.rb', line 58

def write_int32(int)
  write([int].pack("l>"))
end

#write_int64(int) ⇒ nil

Writes a 64-bit integer to the IO object.

Parameters:

  • int (Integer)

Returns:

  • (nil)


66
67
68
# File 'lib/kafka/protocol/encoder.rb', line 66

def write_int64(int)
  write([int].pack("q>"))
end

#write_int8(int) ⇒ nil

Writes an 8-bit integer to the IO object.

Parameters:

  • int (Integer)

Returns:

  • (nil)


42
43
44
# File 'lib/kafka/protocol/encoder.rb', line 42

def write_int8(int)
  write([int].pack("C"))
end

#write_string(string) ⇒ nil

Writes a string to the IO object.

Parameters:

  • string (String)

Returns:

  • (nil)


106
107
108
109
110
111
112
113
# File 'lib/kafka/protocol/encoder.rb', line 106

def write_string(string)
  if string.nil?
    write_int16(-1)
  else
    write_int16(string.bytesize)
    write(string)
  end
end

#write_varint(int) ⇒ nil

Writes an integer under varints serializing to the IO object. https://developers.google.com/protocol-buffers/docs/encoding#varints

Parameters:

  • string (Integer)

Returns:

  • (nil)


133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
# File 'lib/kafka/protocol/encoder.rb', line 133

def write_varint(int)
  int = int << 1
  int = ~int | 1 if int < 0

  loop do
    chunk = int & (~VARINT_MASK)
    int = int >> 7
    if int == 0
      write_int8(chunk)
      return
    else
      write_int8(chunk | VARINT_MASK)
    end
  end
end

#write_varint_array(array, &block) ⇒ nil

Writes an array to the IO object. Just like #write_array, unless the size is under varint format

Parameters:

  • array (Array)

Returns:

  • (nil)


93
94
95
96
97
98
99
100
# File 'lib/kafka/protocol/encoder.rb', line 93

def write_varint_array(array, &block)
  if array.nil?
    write_varint(-1)
  else
    write_varint(array.size)
    array.each(&block)
  end
end

#write_varint_bytes(bytes) ⇒ nil

Writes a byte string to the IO object, the size is under varint format

Parameters:

  • bytes (String)

Returns:

  • (nil)


166
167
168
169
170
171
172
173
# File 'lib/kafka/protocol/encoder.rb', line 166

def write_varint_bytes(bytes)
  if bytes.nil?
    write_varint(-1)
  else
    write_varint(bytes.bytesize)
    write(bytes)
  end
end

#write_varint_string(string) ⇒ nil

Writes a string to the IO object, the size is under varint format

Parameters:

  • string (String)

Returns:

  • (nil)


119
120
121
122
123
124
125
126
# File 'lib/kafka/protocol/encoder.rb', line 119

def write_varint_string(string)
  if string.nil?
    write_varint(-1)
  else
    write_varint(string.bytesize)
    write(string)
  end
end