Class: Fluent::Plugin::MqttInput
- Inherits:
-
Input
- Object
- Input
- Fluent::Plugin::MqttInput
show all
- Includes:
- MqttProxy, TimeMixin::Formatter
- Defined in:
- lib/fluent/plugin/in_mqtt.rb
Constant Summary
Constants included
from MqttProxy
Fluent::Plugin::MqttProxy::MQTT_PORT
Instance Method Summary
collapse
Methods included from MqttProxy
#connect, included, #increment_retry_interval, #init_retry_interval, #rescue_disconnection, #retry_connect, #shutdown_proxy, #start_proxy
Instance Method Details
#add_recv_time(record) ⇒ Object
84
85
86
87
88
89
90
91
|
# File 'lib/fluent/plugin/in_mqtt.rb', line 84
def add_recv_time(record)
if !@monitor.nil? && @monitor.recv_time
record.merge({"#{@monitor.recv_time_key}": @recv_time_formatter.format(Fluent::EventTime.now)})
else
record
end
end
|
#after_connection ⇒ Object
72
73
74
75
76
77
78
79
80
81
82
|
# File 'lib/fluent/plugin/in_mqtt.rb', line 72
def after_connection
if @client.connected?
@client.subscribe(@topic)
@get_thread = thread_create(:in_mqtt_get) do
@client.get do |topic, message|
emit(topic, message)
end
end
end
@get_thread
end
|
#after_disconnection ⇒ Object
69
70
|
# File 'lib/fluent/plugin/in_mqtt.rb', line 69
def after_disconnection
end
|
39
40
41
42
43
44
45
46
47
|
# File 'lib/fluent/plugin/in_mqtt.rb', line 39
def configure(conf)
super
configure_parser(conf)
if !@monitor.nil?
@recv_time_formatter = time_formatter_create(
type: @monitor.time_type.to_sym, format: @monitor.time_format
)
end
end
|
49
50
51
52
53
|
# File 'lib/fluent/plugin/in_mqtt.rb', line 49
def configure_parser(conf)
compat_parameters_convert(conf, :parser)
parser_config = conf.elements('parse').first
@parser = parser_create(conf: parser_config)
end
|
#current_plugin_name ⇒ Object
65
66
67
|
# File 'lib/fluent/plugin/in_mqtt.rb', line 65
def current_plugin_name
:in_mqtt
end
|
#emit(topic, message) ⇒ Object
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
|
# File 'lib/fluent/plugin/in_mqtt.rb', line 103
def emit(topic, message)
begin
tag = topic.gsub("/","\.")
time, record = parse(message)
if record.is_a?(Array)
mes = Fluent::MultiEventStream.new
record.each do |single_record|
log.debug "MqttInput#emit: #{tag}, #{time}, #{add_recv_time(single_record)}"
mes.add(@parser.parse_time(single_record), add_recv_time(single_record))
end
router.emit_stream(tag, mes)
else
log.debug "MqttInput#emit: #{tag}, #{time}, #{add_recv_time(record)}"
router.emit(tag, time, add_recv_time(record))
end
rescue Exception => e
log.error error: e.to_s
log.debug_backtrace(e.backtrace)
end
end
|
#parse(message) ⇒ Object
93
94
95
96
97
98
99
100
101
|
# File 'lib/fluent/plugin/in_mqtt.rb', line 93
def parse(message)
@parser.parse(message) do |time, record|
if time.nil?
log.debug "Since time_key field is nil, Fluent::EventTime.now is used."
time = Fluent::EventTime.now
end
return [time, record]
end
end
|
#shutdown ⇒ Object
60
61
62
63
|
# File 'lib/fluent/plugin/in_mqtt.rb', line 60
def shutdown
shutdown_proxy
super
end
|
#start ⇒ Object
55
56
57
58
|
# File 'lib/fluent/plugin/in_mqtt.rb', line 55
def start
super
start_proxy
end
|