Class: Fluent::Plugin::MqttOutput

Inherits:
Output
  • Object
show all
Includes:
MqttProxy, TimeMixin::Formatter
Defined in:
lib/fluent/plugin/out_mqtt.rb

Constant Summary

Constants included from MqttProxy

Fluent::Plugin::MqttProxy::MQTT_PORT

Instance Method Summary collapse

Methods included from MqttProxy

#check_retry_frequency, #connect, included, #increment_retry_interval, #init_retry_interval, #proxy, #rescue_disconnection, #retry_connect, #shutdown_proxy, #start_proxy, #update_retry_sequence

Instance Method Details

#add_send_time(record) ⇒ Object



125
126
127
128
129
130
131
132
# File 'lib/fluent/plugin/out_mqtt.rb', line 125

def add_send_time(record)
  if !@monitor.nil? && @monitor.send_time
    # send_time is recorded in ms
    record.merge({"#{@monitor.send_time_key}": @send_time_formatter.format(Fluent::EventTime.now)})
  else
    record
  end
end

#after_connectionObject



114
115
116
117
118
119
# File 'lib/fluent/plugin/out_mqtt.rb', line 114

def after_connection
  @dummy_thread = thread_create(:out_mqtt_dummy) do
    Thread.stop
  end
  @dummy_thread
end

#configure(conf) ⇒ Object

This method is called before starting. ‘conf’ is a Hash that includes configuration parameters. If the configuration is invalid, raise Fluent::ConfigError.



52
53
54
55
56
57
58
59
60
61
62
63
# File 'lib/fluent/plugin/out_mqtt.rb', line 52

def configure(conf)
  super
  compat_parameters_convert(conf, :formatter, :inject, :buffer, default_chunk_key: "time")
  formatter_config = conf.elements(name: 'format').first
  @formatter = formatter_create(conf: formatter_config)
  @has_buffer_section = conf.elements(name: 'buffer').size > 0
  if !@monitor.nil?
    @send_time_formatter = time_formatter_create(
      type: @monitor.time_type.to_sym, format: @monitor.time_format
    )
  end
end

#current_plugin_nameObject



121
122
123
# File 'lib/fluent/plugin/out_mqtt.rb', line 121

def current_plugin_name
  :out_mqtt
end

#disconnectObject



100
101
102
103
104
105
106
107
# File 'lib/fluent/plugin/out_mqtt.rb', line 100

def disconnect
  begin
    @client.disconnect if @client.connected?
  rescue => e
    log.error "Error in out_mqtt#disconnect,#{e.class},#{e.message}"
  end
  exit_thread
end

#exit_threadObject



96
97
98
# File 'lib/fluent/plugin/out_mqtt.rb', line 96

def exit_thread
  @dummy_thread.exit if !@dummy_thread.nil?
end

#format(tag, time, record) ⇒ Object



149
150
151
152
# File 'lib/fluent/plugin/out_mqtt.rb', line 149

def format(tag, time, record)
  record = inject_values_to_record(tag, time, record)
  [tag, time, record].to_msgpack
end

#formatted_to_msgpack_binaryObject



154
155
156
# File 'lib/fluent/plugin/out_mqtt.rb', line 154

def formatted_to_msgpack_binary
  true
end

#prefer_buffered_processingObject



73
74
75
# File 'lib/fluent/plugin/out_mqtt.rb', line 73

def prefer_buffered_processing
  @has_buffer_section
end

#prefer_delayed_commitObject



77
78
79
# File 'lib/fluent/plugin/out_mqtt.rb', line 77

def prefer_delayed_commit
  @has_buffer_section && @buffer_config.async
end

#process(tag, es) ⇒ Object



145
146
147
# File 'lib/fluent/plugin/out_mqtt.rb', line 145

def process(tag, es)
  publish_event_stream(tag, es)
end

#publish(tag, time, record) ⇒ Object



158
159
160
161
162
163
164
165
166
# File 'lib/fluent/plugin/out_mqtt.rb', line 158

def publish(tag, time, record)
  log.debug "MqttOutput::#{caller_locations(1,1)[0].label}: #{rewrite_tag(tag)}, #{time}, #{add_send_time(record)}"
  @client.publish(
    rewrite_tag(tag),
    @formatter.format(tag, time, add_send_time(record)),
    @retain,
    @qos
  )
end

#publish_event_stream(tag, es) ⇒ Object



134
135
136
137
138
139
140
141
142
143
# File 'lib/fluent/plugin/out_mqtt.rb', line 134

def publish_event_stream(tag, es)
  log.debug "publish_event_stream: #{es.class}"
  es = inject_values_to_event_stream(tag, es)
  es.each do |time, record|
    rescue_disconnection do
      publish(tag, time, record)
    end
  end
  log.flush
end

#rewrite_tag(tag) ⇒ Object



65
66
67
68
69
70
71
# File 'lib/fluent/plugin/out_mqtt.rb', line 65

def rewrite_tag(tag)
  if @topic_rewrite_pattern.nil?
    tag.gsub("\.", "/")
  else
    tag.gsub("\.", "/").gsub(Regexp.new(@topic_rewrite_pattern), @topic_rewrite_replacement)
  end
end

#shutdownObject

This method is called when shutting down. Shutdown the thread and close sockets or files here.



90
91
92
93
94
# File 'lib/fluent/plugin/out_mqtt.rb', line 90

def shutdown
  shutdown_proxy
  exit_thread
  super
end

#startObject

This method is called when starting. Open sockets or files here.



83
84
85
86
# File 'lib/fluent/plugin/out_mqtt.rb', line 83

def start
  super
  start_proxy
end

#terminateObject



109
110
111
112
# File 'lib/fluent/plugin/out_mqtt.rb', line 109

def terminate
  exit_thread
  super
end

#try_write(chunk) ⇒ Object



177
178
179
180
181
182
183
184
185
# File 'lib/fluent/plugin/out_mqtt.rb', line 177

def try_write(chunk)
  return if chunk.empty?
  rescue_disconnection do
    chunk.each do |tag, time, record|
      publish(tag, time, record)
    end
    commit_write(chunk.unique_id)
  end
end

#write(chunk) ⇒ Object



168
169
170
171
172
173
174
175
# File 'lib/fluent/plugin/out_mqtt.rb', line 168

def write(chunk)
  return if chunk.empty?
  chunk.each do |tag, time, record|
    rescue_disconnection do
      publish(tag, time, record)
    end
  end
end