Class: Fluent::MqttOutput
- Inherits:
-
BufferedOutput
- Object
- BufferedOutput
- Fluent::MqttOutput
- Includes:
- MqttOutputMixin
- Defined in:
- lib/fluent/plugin/out_mqtt.rb,
lib/fluent/plugin/out_mqtt_buf.rb
Instance Method Summary collapse
- #emit(tag, es, chain) ⇒ Object
-
#format(tag, time, record) ⇒ Object
This method is called when an event reaches to Fluentd.
-
#write(chunk) ⇒ Object
This method is called every flush interval.
Methods included from MqttOutputMixin
#configure, #format_time, included, #json_parse, #rewrite_tag, #shutdown, #start
Instance Method Details
#emit(tag, es, chain) ⇒ Object
10 11 12 13 14 15 16 17 18 |
# File 'lib/fluent/plugin/out_mqtt.rb', line 10 def emit(tag, es, chain) es.each {|time,record| $log.debug "#{tag}, #{format_time(time)}, #{record}" @connect.publish(rewrite_tag(tag), record.merge(@time_key => format_time(time)).to_json) } $log.flush chain.next end |
#format(tag, time, record) ⇒ Object
This method is called when an event reaches to Fluentd. Convert the event to a raw string.
12 13 14 |
# File 'lib/fluent/plugin/out_mqtt_buf.rb', line 12 def format(tag, time, record) [tag, time, record].to_json + "\n" end |
#write(chunk) ⇒ Object
This method is called every flush interval. Write the buffer chunk to files or databases here. ‘chunk’ is a buffer chunk that includes multiple formatted events. You can use ‘data = chunk.read’ to get all events and ‘chunk.open {|io| … }’ to get IO objects.
NOTE! This method is called by internal thread, not Fluentd’s main thread. So IO wait doesn’t affect other plugins.
23 24 25 26 27 28 |
# File 'lib/fluent/plugin/out_mqtt_buf.rb', line 23 def write(chunk) json = json_parse(chunk.read) $log.debug "#{json[0]}, #{format_time(json[1])}, #{json[2]}" @connect.publish(rewrite_tag(json[0]), (json[2].merge(@time_key => format_time(json[1]))).to_json) $log.flush end |