Class: Fluent::UnomalyOutput

Inherits:
BufferedOutput
  • Object
show all
Defined in:
lib/fluent/plugin/out_unomaly.rb

Instance Method Summary collapse

Instance Method Details

#configure(conf) ⇒ Object

This method is called before starting. ‘conf’ is a Hash that includes configuration parameters. If the configuration is invalid, raise Fluent::ConfigError.



38
39
40
41
42
# File 'lib/fluent/plugin/out_unomaly.rb', line 38

def configure(conf)
  super
  conf["buffer_chunk_limit"] ||= "500k"
  conf["flush_interval"] ||= "1s"
end

#flatten(data, prefix) ⇒ Object



125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
# File 'lib/fluent/plugin/out_unomaly.rb', line 125

def flatten(data, prefix)
  ret = {}
  if data.is_a? Hash
    data.each { |key, value|
      if prefix.to_s.empty?
        ret.merge! flatten(value, "#{key.to_s}")
      else
        ret.merge! flatten(value, "#{prefix}.#{key.to_s}")
      end
    }
  elsif data.is_a? Array
    data.each_with_index {|val,index | ret.merge! flatten(val, "#{prefix}.#{index}")}
  else
    return {prefix => data.to_s}
  end

  ret
end

#format(tag, time, record) ⇒ Object

This method is called when an event reaches to Fluentd. Convert the event to a raw string.



58
59
60
# File 'lib/fluent/plugin/out_unomaly.rb', line 58

def format(tag, time, record)
  [tag, time, record].to_msgpack
end

#send_batch(events) ⇒ Object



98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
# File 'lib/fluent/plugin/out_unomaly.rb', line 98

def send_batch(events)
  url = @host + @api_path
  body = events.to_json
  ssl = url.start_with?('https')
  uri = URI.parse(url)
  header = {'Content-Type' => 'application/json'}

  http = Net::HTTP.new(uri.host, uri.port)
  if ssl
    http.use_ssl = true
    if @accept_self_signed_certs
      http.verify_mode = OpenSSL::SSL::VERIFY_NONE
    end
  end

  log.info "Sending #{events.length} events to unomaly at #{@host}#{@api_path} (batch_size=#{@batch_size})"

  request = Net::HTTP::Post.new(uri.request_uri, header)
  request.body = body

  resp = http.request(request)
  if !resp.kind_of? Net::HTTPSuccess
    log.error "Error sending batch #{resp.to_s}"
  end
end

#shutdownObject

This method is called when shutting down. Shutdown the thread and close sockets or files here.



52
53
54
# File 'lib/fluent/plugin/out_unomaly.rb', line 52

def shutdown
  super
end

#startObject

This method is called when starting. Open sockets or files here.



46
47
48
# File 'lib/fluent/plugin/out_unomaly.rb', line 46

def start
  super
end

#write(chunk) ⇒ Object

This method is called every flush interval. Write the buffer chunk to files or databases here. ‘chunk’ is a buffer chunk that includes multiple formatted events. You can use ‘data = chunk.read’ to get all events and ‘chunk.open {|io| … }’ to get IO objects.

NOTE! This method is called by internal thread, not Fluentd’s main thread. So IO wait doesn’t affect other plugins.



69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
# File 'lib/fluent/plugin/out_unomaly.rb', line 69

def write(chunk)
  documents = []
  chunk.msgpack_each do |(tag, time, record)|
    unomaly_event = {
        message: record[@message_key].to_s,
        source: record[@source_key].to_s,
        timestamp: Time.at(time).utc.to_datetime.rfc3339
    }
     = record.to_hash

    .delete(@source_key)
    .delete(@message_key)
    ["tag"]=tag

    unomaly_event["metadata"]=

    if @debug
      log.info "Event #{unomaly_event.to_json}"
    end
    documents.push(unomaly_event)

    if documents.length >= @batch_size
      send_batch(documents)
      documents = []
    end
  end
  send_batch(documents)
end