Class: LogStash::Codecs::Protobuf

Inherits:
Base
  • Object
show all
Defined in:
lib/logstash/codecs/protobuf.rb

Overview

This codec converts protobuf encoded messages into logstash events and vice versa.

Requires the protobuf definitions as ruby files. You can create those using the [ruby-protoc compiler](github.com/codekitchen/ruby-protocol-buffers).

The following shows a usage example for decoding protobuf 2 encoded events from a kafka stream:

source,ruby

kafka {

zk_connect => "127.0.0.1"
topic_id => "your_topic_goes_here"
key_deserializer_class => "org.apache.kafka.common.serialization.ByteArrayDeserializer"
value_deserializer_class => "org.apache.kafka.common.serialization.ByteArrayDeserializer"
codec => protobuf
{
  class_name => "Animal::Unicorn"
  include_path => ['/path/to/protobuf/definitions/UnicornProtobuf.pb.rb']
}

}

Same example for protobuf 3:

source,ruby

kafka {

zk_connect => "127.0.0.1"
topic_id => "your_topic_goes_here"
key_deserializer_class => "org.apache.kafka.common.serialization.ByteArrayDeserializer"
value_deserializer_class => "org.apache.kafka.common.serialization.ByteArrayDeserializer"
codec => protobuf
{
  class_name => "Animal.Unicorn"
  include_path => ['/path/to/protobuf/definitions/UnicornProtobuf_pb.rb']
  protobuf_version => 3
}

}

Specifically for the kafka input: please set the deserializer classes as shown above.

Instance Attribute Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#execution_contextObject (readonly)

Returns the value of attribute execution_context.



155
156
157
# File 'lib/logstash/codecs/protobuf.rb', line 155

def execution_context
  @execution_context
end

Instance Method Details

#decode(data) ⇒ Object



212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
# File 'lib/logstash/codecs/protobuf.rb', line 212

def decode(data)
  if @protobuf_version == 3
    decoded = @pb_builder.decode(data.to_s)
    if @pb3_set_oneof_metainfo
      meta = pb3_get_oneof_metainfo(decoded, @class_name)
    end
    h = pb3_deep_to_hash(decoded)
  else
    decoded = @pb_builder.parse(data.to_s)
    h = decoded.to_hash
  end
  e = LogStash::Event.new(h)
  if @protobuf_version == 3 and @pb3_set_oneof_metainfo
    e.set("[@metadata][pb_oneof]", meta)
  end
  yield e if block_given?
rescue => ex
  @logger.warn("Couldn't decode protobuf: #{ex.inspect}.")
  if stop_on_error
    raise ex
  else # keep original message so that the user can debug it.
    yield LogStash::Event.new("message" => data, "tags" => ["_protobufdecodefailure"])
  end
end

#encode(event) ⇒ Object



238
239
240
241
242
243
244
245
246
247
# File 'lib/logstash/codecs/protobuf.rb', line 238

def encode(event)
  if @protobuf_version == 3
    protobytes = pb3_encode(event)
  else
    protobytes = pb2_encode(event)
  end
  unless protobytes.nil? or protobytes.empty?
    @on_event.call(event, protobytes)
  end
end

#pipeline_idObject

id of the pipeline whose events you want to read from.



158
159
160
# File 'lib/logstash/codecs/protobuf.rb', line 158

def pipeline_id
  respond_to?(:execution_context) && !execution_context.nil? ? execution_context.pipeline_id : "main"
end

#registerObject



162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
# File 'lib/logstash/codecs/protobuf.rb', line 162

def register
  @metainfo_messageclasses = {}
  @metainfo_enumclasses = {}
  @metainfo_pb2_enumlist = []
  @pb3_typeconversion_tag = "_protobuf_type_converted"

  if @include_path.length > 0 and not class_file.strip.empty?
    raise LogStash::ConfigurationError, "Cannot use `include_path` and `class_file` at the same time"
  end

  if @include_path.length == 0 and class_file.strip.empty?
    raise LogStash::ConfigurationError, "Need to specify `include_path` or `class_file`"
  end

  should_register = Google::Protobuf::DescriptorPool.generated_pool.lookup(class_name).nil?

  unless @protobuf_root_directory.nil? or @protobuf_root_directory.strip.empty?
    if !$LOAD_PATH.include? @protobuf_root_directory and should_register
      $LOAD_PATH.unshift(@protobuf_root_directory)
    end
  end

  @class_file = "#{@protobuf_root_directory}/#{@class_file}" unless (Pathname.new @class_file).absolute? or @class_file.empty?
  # exclusive access while loading protobuf definitions
  Google::Protobuf::DescriptorPool.with_lock.synchronize do
    # load from `class_file`
    load_protobuf_definition(@class_file) if should_register and !@class_file.empty?
    # load from `include_path`
    include_path.each { |path| load_protobuf_definition(path) } if include_path.length > 0 and should_register

    if @protobuf_version == 3
      @pb_builder = Google::Protobuf::DescriptorPool.generated_pool.lookup(class_name).msgclass

    else
      @pb_builder = pb2_create_instance(class_name)
    end
  end
end

#reloadable?Boolean

Pipelines using this plugin cannot be reloaded. github.com/elastic/logstash/pull/6499

The DescriptorPool instance registers the protobuf classes (and dependencies) as global objects. This makes it very difficult to reload a pipeline, because ‘class_name` and all of its dependencies are already registered.

Returns:

  • (Boolean)


208
209
210
# File 'lib/logstash/codecs/protobuf.rb', line 208

def reloadable?
  return false
end