Class: Fluent::Plugin::MqttInput
- Inherits:
-
Input
- Object
- Input
- Fluent::Plugin::MqttInput
show all
- Includes:
- MqttProxy, TimeMixin::Formatter
- Defined in:
- lib/fluent/plugin/in_mqtt.rb
Constant Summary
Constants included
from MqttProxy
Fluent::Plugin::MqttProxy::MQTT_PORT
Instance Method Summary
collapse
Methods included from MqttProxy
#check_retry_frequency, #connect, included, #increment_retry_interval, #init_retry_interval, #proxy, #rescue_disconnection, #retry_connect, #shutdown_proxy, #start_proxy, #update_retry_sequence
Instance Method Details
#add_recv_time(record) ⇒ Object
94
95
96
97
98
99
100
101
|
# File 'lib/fluent/plugin/in_mqtt.rb', line 94
def add_recv_time(record)
if !@monitor.nil? && @monitor.recv_time
record.merge({"#{@monitor.recv_time_key}": @recv_time_formatter.format(Fluent::EventTime.now)})
else
record
end
end
|
#after_connection ⇒ Object
82
83
84
85
86
87
88
89
90
91
92
|
# File 'lib/fluent/plugin/in_mqtt.rb', line 82
def after_connection
if @client.connected?
@client.subscribe(@topic)
@get_thread = thread_create(:in_mqtt_get) do
@client.get do |topic, message|
emit(topic, message)
end
end
end
@get_thread
end
|
34
35
36
37
38
39
40
41
42
|
# File 'lib/fluent/plugin/in_mqtt.rb', line 34
def configure(conf)
super
configure_parser(conf)
if !@monitor.nil?
@recv_time_formatter = time_formatter_create(
type: @monitor.time_type.to_sym, format: @monitor.time_format
)
end
end
|
44
45
46
47
48
|
# File 'lib/fluent/plugin/in_mqtt.rb', line 44
def configure_parser(conf)
compat_parameters_convert(conf, :parser)
parser_config = conf.elements('parse').first
@parser = parser_create(conf: parser_config)
end
|
#current_plugin_name ⇒ Object
60
61
62
|
# File 'lib/fluent/plugin/in_mqtt.rb', line 60
def current_plugin_name
:in_mqtt
end
|
#disconnect ⇒ Object
68
69
70
71
72
73
74
75
|
# File 'lib/fluent/plugin/in_mqtt.rb', line 68
def disconnect
begin
@client.disconnect if @client.connected?
rescue => e
log.error "Error in in_mqtt#disconnect,#{e.class},#{e.message}"
end
exit_thread
end
|
#emit(topic, message) ⇒ Object
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
|
# File 'lib/fluent/plugin/in_mqtt.rb', line 113
def emit(topic, message)
begin
tag = topic.gsub("/","\.")
time, record = parse(message)
if record.is_a?(Array)
mes = Fluent::MultiEventStream.new
record.each do |single_record|
log.debug "MqttInput#emit: #{tag}, #{time}, #{add_recv_time(single_record)}"
mes.add(@parser.parse_time(single_record), add_recv_time(single_record))
end
router.emit_stream(tag, mes)
else
log.debug "MqttInput#emit: #{tag}, #{time}, #{add_recv_time(record)}"
router.emit(tag, time, add_recv_time(record))
end
rescue Exception => e
log.error error: e.to_s
log.debug_backtrace(e.backtrace)
end
end
|
#exit_thread ⇒ Object
64
65
66
|
# File 'lib/fluent/plugin/in_mqtt.rb', line 64
def exit_thread
@get_thread.exit if !@get_thread.nil?
end
|
#parse(message) ⇒ Object
103
104
105
106
107
108
109
110
111
|
# File 'lib/fluent/plugin/in_mqtt.rb', line 103
def parse(message)
@parser.parse(message) do |time, record|
if time.nil?
log.debug "Since time_key field is nil, Fluent::EventTime.now is used."
time = Fluent::EventTime.now
end
return [time, record]
end
end
|
#shutdown ⇒ Object
55
56
57
58
|
# File 'lib/fluent/plugin/in_mqtt.rb', line 55
def shutdown
shutdown_proxy
super
end
|
#start ⇒ Object
50
51
52
53
|
# File 'lib/fluent/plugin/in_mqtt.rb', line 50
def start
super
start_proxy
end
|
#terminate ⇒ Object
77
78
79
80
|
# File 'lib/fluent/plugin/in_mqtt.rb', line 77
def terminate
exit_thread
super
end
|