Class: Fluent::UnomalyOutput

Inherits:
BufferedOutput
  • Object
show all
Defined in:
lib/fluent/plugin/out_unomaly.rb

Instance Method Summary collapse

Instance Method Details

#configure(conf) ⇒ Object

This method is called before starting. ‘conf’ is a Hash that includes configuration parameters. If the configuration is invalid, raise Fluent::ConfigError.



40
41
42
43
44
# File 'lib/fluent/plugin/out_unomaly.rb', line 40

def configure(conf)
  super
  conf["buffer_chunk_limit"] ||= "500k"
  conf["flush_interval"] ||= "1s"
end

#flatten(data, prefix) ⇒ Object



120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
# File 'lib/fluent/plugin/out_unomaly.rb', line 120

def flatten(data, prefix)
  ret = {}
  if data.is_a? Hash
    data.each { |key, value|
      if prefix.to_s.empty?
        ret.merge! flatten(value, "#{key.to_s}")
      else
        ret.merge! flatten(value, "#{prefix}.#{key.to_s}")
      end
    }
  elsif data.is_a? Array
    data.each_with_index {|val,index | ret.merge! flatten(val, "#{prefix}.#{index}")}
  else
    return {prefix => data.to_s}
  end

  ret
end

#format(tag, time, record) ⇒ Object

This method is called when an event reaches to Fluentd. Convert the event to a raw string.



60
61
62
# File 'lib/fluent/plugin/out_unomaly.rb', line 60

def format(tag, time, record)
  [tag, time, record].to_msgpack
end

#send_batch(events) ⇒ Object



95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
# File 'lib/fluent/plugin/out_unomaly.rb', line 95

def send_batch(events)
  url = @host + @api_path
  body = events.to_json
  ssl = url.start_with?('https')
  uri = URI.parse(url)
  header = {'Content-Type' => 'application/json'}

  http = Net::HTTP.new(uri.host, uri.port)
  if ssl
    http.use_ssl = true
    if @accept_self_signed_certs
      http.verify_mode = OpenSSL::SSL::VERIFY_NONE
    end
  end

  request = Net::HTTP::Post.new(uri.request_uri, header)
  request.body = body

  resp = http.request(request)
  if !resp.kind_of? Net::HTTPSuccess
    log.error "Error sending batch #{resp.to_s}"
  end
end

#shutdownObject

This method is called when shutting down. Shutdown the thread and close sockets or files here.



54
55
56
# File 'lib/fluent/plugin/out_unomaly.rb', line 54

def shutdown
  super
end

#startObject

This method is called when starting. Open sockets or files here.



48
49
50
# File 'lib/fluent/plugin/out_unomaly.rb', line 48

def start
  super
end

#write(chunk) ⇒ Object

This method is called every flush interval. Write the buffer chunk to files or databases here. ‘chunk’ is a buffer chunk that includes multiple formatted events. You can use ‘data = chunk.read’ to get all events and ‘chunk.open {|io| … }’ to get IO objects.

NOTE! This method is called by internal thread, not Fluentd’s main thread. So IO wait doesn’t affect other plugins.



71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
# File 'lib/fluent/plugin/out_unomaly.rb', line 71

def write(chunk)
  documents = []
  chunk.msgpack_each do |(tag, time, record)|
    unomaly_event = {
        message: record[@message_key].to_s,
        source: record[@source_key].to_s,
        timestamp: Time.at(time).utc.to_datetime.rfc3339
    }
     = record.to_hash

    .delete(@source_key)
    .delete(@message_key)
    ["tag"]=tag

    unomaly_event["metadata"]=

    if @debug
      log.info "Event #{unomaly_event.to_json}"
    end
    documents.push(unomaly_event)
  end
  send_batch(documents)
end