Class: LogStash::Codecs::Protobuf
- Inherits:
-
Base
- Object
- Base
- LogStash::Codecs::Protobuf
- Defined in:
- lib/logstash/codecs/protobuf.rb
Overview
This codec converts protobuf encoded messages into logstash events and vice versa.
Requires the protobuf definitions as ruby files. You can create those using the [ruby-protoc compiler](github.com/codekitchen/ruby-protocol-buffers).
The following shows a usage example for decoding protobuf 2 encoded events from a kafka stream:
- source,ruby
-
kafka {
zk_connect => "127.0.0.1" topic_id => "your_topic_goes_here" key_deserializer_class => "org.apache.kafka.common.serialization.ByteArrayDeserializer" value_deserializer_class => "org.apache.kafka.common.serialization.ByteArrayDeserializer" codec => protobuf { class_name => "Animal::Unicorn" include_path => ['/path/to/protobuf/definitions/UnicornProtobuf.pb.rb'] }
}
Same example for protobuf 3:
- source,ruby
-
kafka {
zk_connect => "127.0.0.1" topic_id => "your_topic_goes_here" key_deserializer_class => "org.apache.kafka.common.serialization.ByteArrayDeserializer" value_deserializer_class => "org.apache.kafka.common.serialization.ByteArrayDeserializer" codec => protobuf { class_name => "Animal.Unicorn" include_path => ['/path/to/protobuf/definitions/UnicornProtobuf_pb.rb'] protobuf_version => 3 }
}
Specifically for the kafka input: please set the deserializer classes as shown above.
Instance Attribute Summary collapse
-
#execution_context ⇒ Object
readonly
Returns the value of attribute execution_context.
Instance Method Summary collapse
- #decode(data) ⇒ Object
- #encode(event) ⇒ Object
-
#pipeline_id ⇒ Object
id of the pipeline whose events you want to read from.
- #register ⇒ Object
-
#reloadable? ⇒ Boolean
Pipelines using this plugin cannot be reloaded.
Instance Attribute Details
#execution_context ⇒ Object (readonly)
Returns the value of attribute execution_context.
155 156 157 |
# File 'lib/logstash/codecs/protobuf.rb', line 155 def execution_context @execution_context end |
Instance Method Details
#decode(data) ⇒ Object
212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 |
# File 'lib/logstash/codecs/protobuf.rb', line 212 def decode(data) if @protobuf_version == 3 decoded = @pb_builder.decode(data.to_s) if @pb3_set_oneof_metainfo = (decoded, @class_name) end h = pb3_deep_to_hash(decoded) else decoded = @pb_builder.parse(data.to_s) h = decoded.to_hash end e = LogStash::Event.new(h) if @protobuf_version == 3 and @pb3_set_oneof_metainfo e.set("[@metadata][pb_oneof]", ) end yield e if block_given? rescue => ex @logger.warn("Couldn't decode protobuf: #{ex.inspect}.") if stop_on_error raise ex else # keep original message so that the user can debug it. yield LogStash::Event.new("message" => data, "tags" => ["_protobufdecodefailure"]) end end |
#encode(event) ⇒ Object
238 239 240 241 242 243 244 245 246 247 |
# File 'lib/logstash/codecs/protobuf.rb', line 238 def encode(event) if @protobuf_version == 3 protobytes = pb3_encode(event) else protobytes = pb2_encode(event) end unless protobytes.nil? or protobytes.empty? @on_event.call(event, protobytes) end end |
#pipeline_id ⇒ Object
id of the pipeline whose events you want to read from.
158 159 160 |
# File 'lib/logstash/codecs/protobuf.rb', line 158 def pipeline_id respond_to?(:execution_context) && !execution_context.nil? ? execution_context.pipeline_id : "main" end |
#register ⇒ Object
162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 |
# File 'lib/logstash/codecs/protobuf.rb', line 162 def register @metainfo_messageclasses = {} @metainfo_enumclasses = {} @metainfo_pb2_enumlist = [] @pb3_typeconversion_tag = "_protobuf_type_converted" if @include_path.length > 0 and not class_file.strip.empty? raise LogStash::ConfigurationError, "Cannot use `include_path` and `class_file` at the same time" end if @include_path.length == 0 and class_file.strip.empty? raise LogStash::ConfigurationError, "Need to specify `include_path` or `class_file`" end should_register = Google::Protobuf::DescriptorPool.generated_pool.lookup(class_name).nil? unless @protobuf_root_directory.nil? or @protobuf_root_directory.strip.empty? if !$LOAD_PATH.include? @protobuf_root_directory and should_register $LOAD_PATH.unshift(@protobuf_root_directory) end end @class_file = "#{@protobuf_root_directory}/#{@class_file}" unless (Pathname.new @class_file).absolute? or @class_file.empty? # exclusive access while loading protobuf definitions Google::Protobuf::DescriptorPool.with_lock.synchronize do # load from `class_file` load_protobuf_definition(@class_file) if should_register and !@class_file.empty? # load from `include_path` include_path.each { |path| load_protobuf_definition(path) } if include_path.length > 0 and should_register if @protobuf_version == 3 @pb_builder = Google::Protobuf::DescriptorPool.generated_pool.lookup(class_name).msgclass else @pb_builder = pb2_create_instance(class_name) end end end |
#reloadable? ⇒ Boolean
Pipelines using this plugin cannot be reloaded. github.com/elastic/logstash/pull/6499
The DescriptorPool instance registers the protobuf classes (and dependencies) as global objects. This makes it very difficult to reload a pipeline, because ‘class_name` and all of its dependencies are already registered.
208 209 210 |
# File 'lib/logstash/codecs/protobuf.rb', line 208 def reloadable? return false end |