Class: Fluent::UnomalyOutput
- Inherits:
-
BufferedOutput
- Object
- BufferedOutput
- Fluent::UnomalyOutput
- Defined in:
- lib/fluent/plugin/out_unomaly.rb
Instance Method Summary collapse
-
#configure(conf) ⇒ Object
This method is called before starting.
- #flatten(data, prefix) ⇒ Object
-
#format(tag, time, record) ⇒ Object
This method is called when an event reaches to Fluentd.
- #send_batch(events) ⇒ Object
-
#shutdown ⇒ Object
This method is called when shutting down.
-
#start ⇒ Object
This method is called when starting.
-
#write(chunk) ⇒ Object
This method is called every flush interval.
Instance Method Details
#configure(conf) ⇒ Object
This method is called before starting. ‘conf’ is a Hash that includes configuration parameters. If the configuration is invalid, raise Fluent::ConfigError.
38 39 40 41 42 |
# File 'lib/fluent/plugin/out_unomaly.rb', line 38 def configure(conf) super conf["buffer_chunk_limit"] ||= "500k" conf["flush_interval"] ||= "1s" end |
#flatten(data, prefix) ⇒ Object
125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 |
# File 'lib/fluent/plugin/out_unomaly.rb', line 125 def flatten(data, prefix) ret = {} if data.is_a? Hash data.each { |key, value| if prefix.to_s.empty? ret.merge! flatten(value, "#{key.to_s}") else ret.merge! flatten(value, "#{prefix}.#{key.to_s}") end } elsif data.is_a? Array data.each_with_index {|val,index | ret.merge! flatten(val, "#{prefix}.#{index}")} else return {prefix => data.to_s} end ret end |
#format(tag, time, record) ⇒ Object
This method is called when an event reaches to Fluentd. Convert the event to a raw string.
58 59 60 |
# File 'lib/fluent/plugin/out_unomaly.rb', line 58 def format(tag, time, record) [tag, time, record].to_msgpack end |
#send_batch(events) ⇒ Object
98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 |
# File 'lib/fluent/plugin/out_unomaly.rb', line 98 def send_batch(events) url = @host + @api_path body = events.to_json ssl = url.start_with?('https') uri = URI.parse(url) header = {'Content-Type' => 'application/json'} http = Net::HTTP.new(uri.host, uri.port) if ssl http.use_ssl = true if @accept_self_signed_certs http.verify_mode = OpenSSL::SSL::VERIFY_NONE end end log.info "Sending #{events.length} events to unomaly at #{@host}#{@api_path} (batch_size=#{@batch_size})" request = Net::HTTP::Post.new(uri.request_uri, header) request.body = body resp = http.request(request) if !resp.kind_of? Net::HTTPSuccess log.error "Error sending batch #{resp.to_s}" end end |
#shutdown ⇒ Object
This method is called when shutting down. Shutdown the thread and close sockets or files here.
52 53 54 |
# File 'lib/fluent/plugin/out_unomaly.rb', line 52 def shutdown super end |
#start ⇒ Object
This method is called when starting. Open sockets or files here.
46 47 48 |
# File 'lib/fluent/plugin/out_unomaly.rb', line 46 def start super end |
#write(chunk) ⇒ Object
This method is called every flush interval. Write the buffer chunk to files or databases here. ‘chunk’ is a buffer chunk that includes multiple formatted events. You can use ‘data = chunk.read’ to get all events and ‘chunk.open {|io| … }’ to get IO objects.
NOTE! This method is called by internal thread, not Fluentd’s main thread. So IO wait doesn’t affect other plugins.
69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 |
# File 'lib/fluent/plugin/out_unomaly.rb', line 69 def write(chunk) documents = [] chunk.msgpack_each do |(tag, time, record)| unomaly_event = { message: record[].to_s, source: record[@source_key].to_s, timestamp: Time.at(time).utc.to_datetime.rfc3339 } = record.to_hash .delete(@source_key) .delete() ["tag"]=tag unomaly_event["metadata"]= if @debug log.info "Event #{unomaly_event.to_json}" end documents.push(unomaly_event) if documents.length >= @batch_size send_batch(documents) documents = [] end end send_batch(documents) end |