Class: Fluent::Plugin::MqttOutput
- Inherits:
-
Output
- Object
- Output
- Fluent::Plugin::MqttOutput
show all
- Includes:
- MqttProxy, TimeMixin::Formatter
- Defined in:
- lib/fluent/plugin/out_mqtt.rb
Constant Summary
Constants included
from MqttProxy
Fluent::Plugin::MqttProxy::MQTT_PORT
Instance Method Summary
collapse
Methods included from MqttProxy
#check_retry_frequency, #connect, included, #increment_retry_interval, #init_retry_interval, #proxy, #rescue_disconnection, #retry_connect, #shutdown_proxy, #start_proxy, #update_retry_sequence
Instance Method Details
#add_send_time(record) ⇒ Object
125
126
127
128
129
130
131
132
|
# File 'lib/fluent/plugin/out_mqtt.rb', line 125
def add_send_time(record)
if !@monitor.nil? && @monitor.send_time
record.merge({"#{@monitor.send_time_key}": @send_time_formatter.format(Fluent::EventTime.now)})
else
record
end
end
|
#after_connection ⇒ Object
114
115
116
117
118
119
|
# File 'lib/fluent/plugin/out_mqtt.rb', line 114
def after_connection
@dummy_thread = thread_create(:out_mqtt_dummy) do
Thread.stop
end
@dummy_thread
end
|
This method is called before starting. ‘conf’ is a Hash that includes configuration parameters. If the configuration is invalid, raise Fluent::ConfigError.
52
53
54
55
56
57
58
59
60
61
62
63
|
# File 'lib/fluent/plugin/out_mqtt.rb', line 52
def configure(conf)
super
compat_parameters_convert(conf, :formatter, :inject, :buffer, default_chunk_key: "time")
formatter_config = conf.elements(name: 'format').first
@formatter = formatter_create(conf: formatter_config)
@has_buffer_section = conf.elements(name: 'buffer').size > 0
if !@monitor.nil?
@send_time_formatter = time_formatter_create(
type: @monitor.time_type.to_sym, format: @monitor.time_format
)
end
end
|
#current_plugin_name ⇒ Object
121
122
123
|
# File 'lib/fluent/plugin/out_mqtt.rb', line 121
def current_plugin_name
:out_mqtt
end
|
#disconnect ⇒ Object
100
101
102
103
104
105
106
107
|
# File 'lib/fluent/plugin/out_mqtt.rb', line 100
def disconnect
begin
@client.disconnect if @client.connected?
rescue => e
log.error "Error in out_mqtt#disconnect,#{e.class},#{e.message}"
end
exit_thread
end
|
#exit_thread ⇒ Object
96
97
98
|
# File 'lib/fluent/plugin/out_mqtt.rb', line 96
def exit_thread
@dummy_thread.exit if !@dummy_thread.nil?
end
|
149
150
151
152
|
# File 'lib/fluent/plugin/out_mqtt.rb', line 149
def format(tag, time, record)
record = inject_values_to_record(tag, time, record)
[tag, time, record].to_msgpack
end
|
154
155
156
|
# File 'lib/fluent/plugin/out_mqtt.rb', line 154
def formatted_to_msgpack_binary
true
end
|
#prefer_buffered_processing ⇒ Object
73
74
75
|
# File 'lib/fluent/plugin/out_mqtt.rb', line 73
def prefer_buffered_processing
@has_buffer_section
end
|
#prefer_delayed_commit ⇒ Object
77
78
79
|
# File 'lib/fluent/plugin/out_mqtt.rb', line 77
def prefer_delayed_commit
@has_buffer_section && @buffer_config.async
end
|
#process(tag, es) ⇒ Object
145
146
147
|
# File 'lib/fluent/plugin/out_mqtt.rb', line 145
def process(tag, es)
publish_event_stream(tag, es)
end
|
#publish(tag, time, record) ⇒ Object
158
159
160
161
162
163
164
165
166
|
# File 'lib/fluent/plugin/out_mqtt.rb', line 158
def publish(tag, time, record)
log.debug "MqttOutput::#{caller_locations(1,1)[0].label}: #{rewrite_tag(tag)}, #{time}, #{add_send_time(record)}"
@client.publish(
rewrite_tag(tag),
@formatter.format(tag, time, add_send_time(record)),
@retain,
@qos
)
end
|
#publish_event_stream(tag, es) ⇒ Object
134
135
136
137
138
139
140
141
142
143
|
# File 'lib/fluent/plugin/out_mqtt.rb', line 134
def publish_event_stream(tag, es)
log.debug "publish_event_stream: #{es.class}"
es = inject_values_to_event_stream(tag, es)
es.each do |time, record|
rescue_disconnection do
publish(tag, time, record)
end
end
log.flush
end
|
#rewrite_tag(tag) ⇒ Object
65
66
67
68
69
70
71
|
# File 'lib/fluent/plugin/out_mqtt.rb', line 65
def rewrite_tag(tag)
if @topic_rewrite_pattern.nil?
tag.gsub("\.", "/")
else
tag.gsub("\.", "/").gsub(Regexp.new(@topic_rewrite_pattern), @topic_rewrite_replacement)
end
end
|
#shutdown ⇒ Object
This method is called when shutting down. Shutdown the thread and close sockets or files here.
90
91
92
93
94
|
# File 'lib/fluent/plugin/out_mqtt.rb', line 90
def shutdown
shutdown_proxy
exit_thread
super
end
|
#start ⇒ Object
This method is called when starting. Open sockets or files here.
83
84
85
86
|
# File 'lib/fluent/plugin/out_mqtt.rb', line 83
def start
super
start_proxy
end
|
#terminate ⇒ Object
109
110
111
112
|
# File 'lib/fluent/plugin/out_mqtt.rb', line 109
def terminate
exit_thread
super
end
|
#try_write(chunk) ⇒ Object
177
178
179
180
181
182
183
184
185
|
# File 'lib/fluent/plugin/out_mqtt.rb', line 177
def try_write(chunk)
return if chunk.empty?
rescue_disconnection do
chunk.each do |tag, time, record|
publish(tag, time, record)
end
commit_write(chunk.unique_id)
end
end
|
#write(chunk) ⇒ Object
168
169
170
171
172
173
174
175
|
# File 'lib/fluent/plugin/out_mqtt.rb', line 168
def write(chunk)
return if chunk.empty?
chunk.each do |tag, time, record|
rescue_disconnection do
publish(tag, time, record)
end
end
end
|