Class: Fluent::Plugin::MqttOutput

Inherits:
Output
  • Object
show all
Includes:
MqttProxy, TimeMixin::Formatter
Defined in:
lib/fluent/plugin/out_mqtt.rb

Constant Summary

Constants included from MqttProxy

Fluent::Plugin::MqttProxy::MQTT_PORT

Instance Method Summary collapse

Methods included from MqttProxy

#after_disconnection, #connect, included, #increment_retry_interval, #init_retry_interval, #rescue_disconnection, #retry_connect, #shutdown_proxy, #start_proxy

Instance Method Details

#add_send_time(record) ⇒ Object



92
93
94
95
96
97
98
99
# File 'lib/fluent/plugin/out_mqtt.rb', line 92

def add_send_time(record)
  if !@monitor.nil? && @monitor.send_time
    # send_time is recorded in ms
    record.merge({"#{@monitor.send_time_key}": @send_time_formatter.format(Fluent::EventTime.now)})
  else
    record
  end
end

#after_connectionObject



81
82
83
84
85
86
# File 'lib/fluent/plugin/out_mqtt.rb', line 81

def after_connection
  @dummy_thread = thread_create(:out_mqtt_dummy) do
    Thread.stop
  end
  @dummy_thread
end

#configure(conf) ⇒ Object

This method is called before starting. ‘conf’ is a Hash that includes configuration parameters. If the configuration is invalid, raise Fluent::ConfigError.



42
43
44
45
46
47
48
49
50
51
52
53
# File 'lib/fluent/plugin/out_mqtt.rb', line 42

def configure(conf)
  super
  compat_parameters_convert(conf, :formatter, :inject, :buffer, default_chunk_key: "time")
  formatter_config = conf.elements(name: 'format').first
  @formatter = formatter_create(conf: formatter_config)
  @has_buffer_section = conf.elements(name: 'buffer').size > 0
  if !@monitor.nil?
    @send_time_formatter = time_formatter_create(
      type: @monitor.time_type.to_sym, format: @monitor.time_format
    )
  end
end

#current_plugin_nameObject



88
89
90
# File 'lib/fluent/plugin/out_mqtt.rb', line 88

def current_plugin_name
  :out_mqtt
end

#format(tag, time, record) ⇒ Object



128
129
130
131
# File 'lib/fluent/plugin/out_mqtt.rb', line 128

def format(tag, time, record)
  record = inject_values_to_record(tag, time, record)
  [tag, time, record].to_msgpack
end

#formatted_to_msgpack_binaryObject



133
134
135
# File 'lib/fluent/plugin/out_mqtt.rb', line 133

def formatted_to_msgpack_binary
  true
end

#prefer_buffered_processingObject



63
64
65
# File 'lib/fluent/plugin/out_mqtt.rb', line 63

def prefer_buffered_processing
  @has_buffer_section
end

#process(tag, es) ⇒ Object



124
125
126
# File 'lib/fluent/plugin/out_mqtt.rb', line 124

def process(tag, es)
  publish_event_stream(tag, es)
end

#publish_event_stream(tag, es) ⇒ Object



101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
# File 'lib/fluent/plugin/out_mqtt.rb', line 101

def publish_event_stream(tag, es)
  if es.class == Fluent::OneEventStream
    es = inject_values_to_event_stream(tag, es)
    es.each do |time, record|
      log.debug "MqttOutput#publish_event_stream: #{rewrite_tag(tag)}, #{time}, #{add_send_time(record)}"
      rescue_disconnection do
        @client.publish(rewrite_tag(tag), @formatter.format(tag, time, add_send_time(record)))
      end
    end
  else
    es = inject_values_to_event_stream(tag, es)
    array = []
    es.each do |time, record|
      log.debug "MqttOutput#publish_event_stream: #{rewrite_tag(tag)}, #{time}, #{add_send_time(record)}"
      array << add_send_time(record)
    end
    rescue_disconnection do
      @client.publish(rewrite_tag(tag), @formatter.format(tag, Fluent::EventTime.now, array))
    end
  end
  log.flush
end

#rewrite_tag(tag) ⇒ Object



55
56
57
58
59
60
61
# File 'lib/fluent/plugin/out_mqtt.rb', line 55

def rewrite_tag(tag)
  if @topic_rewrite_pattern.nil?
    tag.gsub("\.", "/")
  else
    tag.gsub("\.", "/").gsub(Regexp.new(@topic_rewrite_pattern), @topic_rewrite_replacement)
  end
end

#shutdownObject

This method is called when shutting down. Shutdown the thread and close sockets or files here.



76
77
78
79
# File 'lib/fluent/plugin/out_mqtt.rb', line 76

def shutdown
  shutdown_proxy
  super
end

#startObject

This method is called when starting. Open sockets or files here.



69
70
71
72
# File 'lib/fluent/plugin/out_mqtt.rb', line 69

def start
  super
  start_proxy
end

#write(chunk) ⇒ Object



137
138
139
140
141
142
143
144
145
# File 'lib/fluent/plugin/out_mqtt.rb', line 137

def write(chunk)
  return if chunk.empty?
  chunk.each do |tag, time, record|
    rescue_disconnection do
      log.debug "MqttOutput#write: #{rewrite_tag(rewrite_tag(tag))}, #{time}, #{add_send_time(record)}"
      @client.publish(rewrite_tag(tag), @formatter.format(tag, time, add_send_time(record)))
    end
  end
end