Class: Fluent::Plugin::MqttOutput
- Inherits:
-
Output
- Object
- Output
- Fluent::Plugin::MqttOutput
show all
- Includes:
- MqttProxy, TimeMixin::Formatter
- Defined in:
- lib/fluent/plugin/out_mqtt.rb
Constant Summary
Constants included
from MqttProxy
Fluent::Plugin::MqttProxy::MQTT_PORT
Instance Method Summary
collapse
Methods included from MqttProxy
#after_disconnection, #connect, included, #increment_retry_interval, #init_retry_interval, #rescue_disconnection, #retry_connect, #shutdown_proxy, #start_proxy
Instance Method Details
#add_send_time(record) ⇒ Object
92
93
94
95
96
97
98
99
|
# File 'lib/fluent/plugin/out_mqtt.rb', line 92
def add_send_time(record)
if !@monitor.nil? && @monitor.send_time
record.merge({"#{@monitor.send_time_key}": @send_time_formatter.format(Fluent::EventTime.now)})
else
record
end
end
|
#after_connection ⇒ Object
81
82
83
84
85
86
|
# File 'lib/fluent/plugin/out_mqtt.rb', line 81
def after_connection
@dummy_thread = thread_create(:out_mqtt_dummy) do
Thread.stop
end
@dummy_thread
end
|
This method is called before starting. ‘conf’ is a Hash that includes configuration parameters. If the configuration is invalid, raise Fluent::ConfigError.
42
43
44
45
46
47
48
49
50
51
52
53
|
# File 'lib/fluent/plugin/out_mqtt.rb', line 42
def configure(conf)
super
compat_parameters_convert(conf, :formatter, :inject, :buffer, default_chunk_key: "time")
formatter_config = conf.elements(name: 'format').first
@formatter = formatter_create(conf: formatter_config)
@has_buffer_section = conf.elements(name: 'buffer').size > 0
if !@monitor.nil?
@send_time_formatter = time_formatter_create(
type: @monitor.time_type.to_sym, format: @monitor.time_format
)
end
end
|
#current_plugin_name ⇒ Object
88
89
90
|
# File 'lib/fluent/plugin/out_mqtt.rb', line 88
def current_plugin_name
:out_mqtt
end
|
128
129
130
131
|
# File 'lib/fluent/plugin/out_mqtt.rb', line 128
def format(tag, time, record)
record = inject_values_to_record(tag, time, record)
[tag, time, record].to_msgpack
end
|
133
134
135
|
# File 'lib/fluent/plugin/out_mqtt.rb', line 133
def formatted_to_msgpack_binary
true
end
|
#prefer_buffered_processing ⇒ Object
63
64
65
|
# File 'lib/fluent/plugin/out_mqtt.rb', line 63
def prefer_buffered_processing
@has_buffer_section
end
|
#process(tag, es) ⇒ Object
124
125
126
|
# File 'lib/fluent/plugin/out_mqtt.rb', line 124
def process(tag, es)
publish_event_stream(tag, es)
end
|
#publish_event_stream(tag, es) ⇒ Object
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
|
# File 'lib/fluent/plugin/out_mqtt.rb', line 101
def publish_event_stream(tag, es)
if es.class == Fluent::OneEventStream
es = inject_values_to_event_stream(tag, es)
es.each do |time, record|
log.debug "MqttOutput#publish_event_stream: #{rewrite_tag(tag)}, #{time}, #{add_send_time(record)}"
rescue_disconnection do
@client.publish(rewrite_tag(tag), @formatter.format(tag, time, add_send_time(record)))
end
end
else
es = inject_values_to_event_stream(tag, es)
array = []
es.each do |time, record|
log.debug "MqttOutput#publish_event_stream: #{rewrite_tag(tag)}, #{time}, #{add_send_time(record)}"
array << add_send_time(record)
end
rescue_disconnection do
@client.publish(rewrite_tag(tag), @formatter.format(tag, Fluent::EventTime.now, array))
end
end
log.flush
end
|
#rewrite_tag(tag) ⇒ Object
55
56
57
58
59
60
61
|
# File 'lib/fluent/plugin/out_mqtt.rb', line 55
def rewrite_tag(tag)
if @topic_rewrite_pattern.nil?
tag.gsub("\.", "/")
else
tag.gsub("\.", "/").gsub(Regexp.new(@topic_rewrite_pattern), @topic_rewrite_replacement)
end
end
|
#shutdown ⇒ Object
This method is called when shutting down. Shutdown the thread and close sockets or files here.
76
77
78
79
|
# File 'lib/fluent/plugin/out_mqtt.rb', line 76
def shutdown
shutdown_proxy
super
end
|
#start ⇒ Object
This method is called when starting. Open sockets or files here.
69
70
71
72
|
# File 'lib/fluent/plugin/out_mqtt.rb', line 69
def start
super
start_proxy
end
|
#write(chunk) ⇒ Object
137
138
139
140
141
142
143
144
145
|
# File 'lib/fluent/plugin/out_mqtt.rb', line 137
def write(chunk)
return if chunk.empty?
chunk.each do |tag, time, record|
rescue_disconnection do
log.debug "MqttOutput#write: #{rewrite_tag(rewrite_tag(tag))}, #{time}, #{add_send_time(record)}"
@client.publish(rewrite_tag(tag), @formatter.format(tag, time, add_send_time(record)))
end
end
end
|