Class: LogStash::Codecs::BulkEs
- Inherits:
-
Base
- Object
- Base
- LogStash::Codecs::BulkEs
- Defined in:
- lib/logstash/codecs/bulk_es.rb
Overview
This codec decodes the incoming meesage into www.elasticsearch.org/guide/en/elasticsearch/reference/current/docs-bulk.html[Elasticsearch bulk format] into individual events, and metadata into the ‘@metadata` field.
Encoding is not supported.
Instance Method Summary collapse
-
#decode(data) ⇒ Object
def register.
-
#encode(data) ⇒ Object
def decode.
- #register ⇒ Object
Instance Method Details
#decode(data) ⇒ Object
def register
30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 |
# File 'lib/logstash/codecs/bulk_es.rb', line 30 def decode(data) @lines.decode(data) do || begin line = LogStash::Json.load(.get("message")) case @state when :metadata if @metadata["action"] == 'update' if line.has_key?("doc") event = LogStash::Event.new(line["doc"]) if line.has_key?("doc_as_upsert") @metadata["doc_as_upsert"] = line["doc_as_upsert"] end else event = LogStash::Event.new(line) end if line.has_key?("upsert") @metadata["upsert"] = LogStash::Json.dump(line["upsert"]) end elsif #for action = index or create event = LogStash::Event.new(line) end event.set("@metadata", @metadata) yield event @state = :init when :init @metadata = line[line.keys[0]] @metadata["action"] = line.keys[0].to_s @state = :metadata if @metadata["action"] == 'delete' event = LogStash::Event.new() event.set("@metadata", @metadata) yield event @state = :init end end rescue LogStash::Json::ParserError => e log_failure( "messages must in be UTF-8 JSON format", :error => e, :data => data ) end end end |
#encode(data) ⇒ Object
def decode
78 79 80 |
# File 'lib/logstash/codecs/bulk_es.rb', line 78 def encode(data) raise "Not implemented" end |
#register ⇒ Object
23 24 25 26 27 28 |
# File 'lib/logstash/codecs/bulk_es.rb', line 23 def register @lines = LogStash::Codecs::Line.new @lines.charset = "UTF-8" @state = :init @metadata = Hash.new end |