Class: Fluent::Plugin::MqttInput
- Inherits:
-
Input
- Object
- Input
- Fluent::Plugin::MqttInput
- Defined in:
- lib/fluent/plugin/in_mqtt.rb
Constant Summary collapse
- DEFAULT_PARSER_TYPE =
'none'
Instance Method Summary collapse
- #configure(conf) ⇒ Object
- #configure_parser(conf) ⇒ Object
- #emit(topic, message, time = Fluent::Engine.now) ⇒ Object
-
#parse(message) ⇒ Object
Return [time (if not available return now), message].
- #shutdown ⇒ Object
- #start ⇒ Object
Instance Method Details
#configure(conf) ⇒ Object
32 33 34 35 36 |
# File 'lib/fluent/plugin/in_mqtt.rb', line 32 def configure(conf) compat_parameters_convert(conf, :inject, :parser) super configure_parser(conf) end |
#configure_parser(conf) ⇒ Object
38 39 40 |
# File 'lib/fluent/plugin/in_mqtt.rb', line 38 def configure_parser(conf) @parser = parser_create(usage: 'in_mqtt_parser', type: @format, conf: conf) end |
#emit(topic, message, time = Fluent::Engine.now) ⇒ Object
80 81 82 83 84 85 86 87 88 89 |
# File 'lib/fluent/plugin/in_mqtt.rb', line 80 def emit topic, , time = Fluent::Engine.now if .class == Array .each do |data| log.debug "#{topic}: #{data}" router.emit(topic , time , data) end else router.emit(topic , time , ) end end |
#parse(message) ⇒ Object
Return [time (if not available return now), message]
43 44 45 46 47 |
# File 'lib/fluent/plugin/in_mqtt.rb', line 43 def parse() @parser.parse() {|time, record| return (time || Fluent::Engine.now), record } end |
#shutdown ⇒ Object
91 92 93 94 |
# File 'lib/fluent/plugin/in_mqtt.rb', line 91 def shutdown @connect.disconnect super end |
#start ⇒ Object
49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 |
# File 'lib/fluent/plugin/in_mqtt.rb', line 49 def start super log.debug "start mqtt #{@bind}" opts = {host: @bind, port: @port} opts[:client_id] = @client_id if @client_id opts[:username] = @username if @username opts[:password] = @password if @password opts[:ssl] = @ssl if @ssl opts[:ca_file] = @ca if @ca opts[:cert_file] = @cert if @cert opts[:key_file] = @key if @key @connect = MQTT::Client.connect(opts) @connect.subscribe(@topic) thread_create(:in_mqtt_worker) do @connect.get do |topic,| topic.gsub!("/","\.") log.debug "#{topic}: #{message}" begin time, record = self.parse() record = inject_values_to_record(topic, time, record) rescue Exception => e log.error e end emit topic, record, time end end end |