Class: Fluent::MqttOutput

Inherits:
BufferedOutput
  • Object
show all
Includes:
MqttOutputMixin
Defined in:
lib/fluent/plugin/out_mqtt.rb,
lib/fluent/plugin/out_mqtt_buf.rb

Instance Method Summary collapse

Methods included from MqttOutputMixin

#configure, #format_time, included, #json_parse, #rewrite_tag, #shutdown, #start

Instance Method Details

#emit(tag, es, chain) ⇒ Object



10
11
12
13
14
15
16
17
18
# File 'lib/fluent/plugin/out_mqtt.rb', line 10

def emit(tag, es, chain)
  es.each {|time,record|
    $log.debug "#{tag}, #{format_time(time)}, #{record}"
    @connect.publish(rewrite_tag(tag), record.merge(@time_key => format_time(time)).to_json)
  }
  $log.flush

  chain.next
end

#format(tag, time, record) ⇒ Object

This method is called when an event reaches to Fluentd. Convert the event to a raw string.



12
13
14
# File 'lib/fluent/plugin/out_mqtt_buf.rb', line 12

def format(tag, time, record)
  [tag, time, record].to_json + "\n"
end

#write(chunk) ⇒ Object

This method is called every flush interval. Write the buffer chunk to files or databases here. ‘chunk’ is a buffer chunk that includes multiple formatted events. You can use ‘data = chunk.read’ to get all events and ‘chunk.open {|io| … }’ to get IO objects.

NOTE! This method is called by internal thread, not Fluentd’s main thread. So IO wait doesn’t affect other plugins.



23
24
25
26
27
28
# File 'lib/fluent/plugin/out_mqtt_buf.rb', line 23

def write(chunk)
  json = json_parse(chunk.read)
  $log.debug "#{json[0]}, #{format_time(json[1])}, #{json[2]}"
  @connect.publish(rewrite_tag(json[0]), (json[2].merge(@time_key => format_time(json[1]))).to_json)
  $log.flush
end