Class: Fluent::MqttInput
- Inherits:
-
Input
- Object
- Input
- Fluent::MqttInput
- Includes:
- SetTagKeyMixin, SetTimeKeyMixin
- Defined in:
- lib/fluent/plugin/in_mqtt.rb
Instance Method Summary collapse
- #configure(conf) ⇒ Object
- #emit(topic, message, time = Fluent::Engine.now) ⇒ Object
- #json_parse(message) ⇒ Object
- #shutdown ⇒ Object
- #start ⇒ Object
Instance Method Details
#configure(conf) ⇒ Object
17 18 19 20 21 22 |
# File 'lib/fluent/plugin/in_mqtt.rb', line 17 def configure(conf) super @bind ||= conf['bind'] @topic ||= conf['topic'] @port ||= conf['port'] end |
#emit(topic, message, time = Fluent::Engine.now) ⇒ Object
38 39 40 |
# File 'lib/fluent/plugin/in_mqtt.rb', line 38 def emit topic, , time = Fluent::Engine.now Fluent::Engine.emit(topic, time , ) end |
#json_parse(message) ⇒ Object
42 43 44 45 46 47 48 49 50 |
# File 'lib/fluent/plugin/in_mqtt.rb', line 42 def json_parse begin y = Yajl::Parser.new y.parse() rescue $log.error "JSON parse error", :error => $!.to_s, :error_class => $!.class.to_s $log.warn_backtrace $!.backtrace end end |
#shutdown ⇒ Object
51 52 53 54 |
# File 'lib/fluent/plugin/in_mqtt.rb', line 51 def shutdown @thread.kill @connect.disconnect end |
#start ⇒ Object
24 25 26 27 28 29 30 31 32 33 34 35 36 |
# File 'lib/fluent/plugin/in_mqtt.rb', line 24 def start $log.debug "start mqtt" @connect = MQTT::Client.connect({remote_host: @bind, remote_port: @port}) @connect.subscribe(@topic) @thread = Thread.new do @connect.get do |topic,| topic.gsub!("/","\.") $log.debug "#{topic}: #{}" emit topic, json_parse() end end end |