Class: Fluent::UnomalyOutput
- Inherits:
-
BufferedOutput
- Object
- BufferedOutput
- Fluent::UnomalyOutput
- Defined in:
- lib/fluent/plugin/out_unomaly.rb
Instance Method Summary collapse
-
#configure(conf) ⇒ Object
This method is called before starting.
- #flatten(data, prefix) ⇒ Object
-
#format(tag, time, record) ⇒ Object
This method is called when an event reaches to Fluentd.
- #send_batch(events) ⇒ Object
-
#shutdown ⇒ Object
This method is called when shutting down.
-
#start ⇒ Object
This method is called when starting.
-
#write(chunk) ⇒ Object
This method is called every flush interval.
Instance Method Details
#configure(conf) ⇒ Object
This method is called before starting. ‘conf’ is a Hash that includes configuration parameters. If the configuration is invalid, raise Fluent::ConfigError.
40 41 42 43 44 |
# File 'lib/fluent/plugin/out_unomaly.rb', line 40 def configure(conf) super conf["buffer_chunk_limit"] ||= "500k" conf["flush_interval"] ||= "1s" end |
#flatten(data, prefix) ⇒ Object
120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 |
# File 'lib/fluent/plugin/out_unomaly.rb', line 120 def flatten(data, prefix) ret = {} if data.is_a? Hash data.each { |key, value| if prefix.to_s.empty? ret.merge! flatten(value, "#{key.to_s}") else ret.merge! flatten(value, "#{prefix}.#{key.to_s}") end } elsif data.is_a? Array data.each_with_index {|val,index | ret.merge! flatten(val, "#{prefix}.#{index}")} else return {prefix => data.to_s} end ret end |
#format(tag, time, record) ⇒ Object
This method is called when an event reaches to Fluentd. Convert the event to a raw string.
60 61 62 |
# File 'lib/fluent/plugin/out_unomaly.rb', line 60 def format(tag, time, record) [tag, time, record].to_msgpack end |
#send_batch(events) ⇒ Object
95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 |
# File 'lib/fluent/plugin/out_unomaly.rb', line 95 def send_batch(events) url = @host + @api_path body = events.to_json ssl = url.start_with?('https') uri = URI.parse(url) header = {'Content-Type' => 'application/json'} http = Net::HTTP.new(uri.host, uri.port) if ssl http.use_ssl = true if @accept_self_signed_certs http.verify_mode = OpenSSL::SSL::VERIFY_NONE end end request = Net::HTTP::Post.new(uri.request_uri, header) request.body = body resp = http.request(request) if !resp.kind_of? Net::HTTPSuccess log.error "Error sending batch #{resp.to_s}" end end |
#shutdown ⇒ Object
This method is called when shutting down. Shutdown the thread and close sockets or files here.
54 55 56 |
# File 'lib/fluent/plugin/out_unomaly.rb', line 54 def shutdown super end |
#start ⇒ Object
This method is called when starting. Open sockets or files here.
48 49 50 |
# File 'lib/fluent/plugin/out_unomaly.rb', line 48 def start super end |
#write(chunk) ⇒ Object
This method is called every flush interval. Write the buffer chunk to files or databases here. ‘chunk’ is a buffer chunk that includes multiple formatted events. You can use ‘data = chunk.read’ to get all events and ‘chunk.open {|io| … }’ to get IO objects.
NOTE! This method is called by internal thread, not Fluentd’s main thread. So IO wait doesn’t affect other plugins.
71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 |
# File 'lib/fluent/plugin/out_unomaly.rb', line 71 def write(chunk) documents = [] chunk.msgpack_each do |(tag, time, record)| unomaly_event = { message: record[@message_key].to_s, source: record[@source_key].to_s, timestamp: Time.at(time).utc.to_datetime.rfc3339 } = record.to_hash .delete(@source_key) .delete(@message_key) ["tag"]=tag unomaly_event["metadata"]= if @debug log.info "Event #{unomaly_event.to_json}" end documents.push(unomaly_event) end send_batch(documents) end |