Module: Fluent::Plugin::MqttProxy
- Included in:
- MqttInput, MqttOutput
- Defined in:
- lib/fluent/plugin/mqtt_proxy.rb
Defined Under Namespace
Classes: MqttProxyError
Constant Summary collapse
- MQTT_PORT =
1883
Class Method Summary collapse
Instance Method Summary collapse
- #after_connection ⇒ Object
- #after_disconnection ⇒ Object
- #connect ⇒ Object
- #current_plugin_name ⇒ Object
- #increment_retry_interval ⇒ Object
- #init_retry_interval ⇒ Object
- #rescue_disconnection(*block) ⇒ Object
- #retry_connect(e, message) ⇒ Object
- #shutdown_proxy ⇒ Object
- #start_proxy ⇒ Object
Class Method Details
.included(base) ⇒ Object
6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 |
# File 'lib/fluent/plugin/mqtt_proxy.rb', line 6 def self.included(base) base.helpers :timer, :thread base.desc 'The address to connect to.' base.config_param :host, :string, default: '127.0.0.1' base.desc 'The port to connect to.' base.config_param :port, :integer, default: MQTT_PORT base.desc 'Client ID of MQTT Connection' base.config_param :client_id, :string, default: nil base.desc 'Specify keep alive interval.' base.config_param :keep_alive, :integer, default: 15 base.desc 'Specify initial connection retry interval.' base.config_param :initial_interval, :integer, default: 1 base.desc 'Specify increasing ratio of connection retry interval.' base.config_param :retry_inc_ratio, :integer, default: 2 base.desc 'Specify maximum connection retry interval.' base.config_param :max_retry_interval, :integer, default: 300 base.config_section :security, required: false, multi: false do ### User based authentication desc 'The username for authentication' config_param :username, :string, default: nil desc 'The password for authentication' config_param :password, :string, default: nil desc 'Use TLS or not.' config_param :use_tls, :bool, default: nil config_section :tls, required: false, multi: false do desc 'Specify TLS ca file.' config_param :ca_file, :string, default: nil desc 'Specify TLS key file.' config_param :key_file, :string, default: nil desc 'Specify TLS cert file.' config_param :cert_file, :string, default: nil end end end |
Instance Method Details
#after_connection ⇒ Object
116 117 118 119 120 |
# File 'lib/fluent/plugin/mqtt_proxy.rb', line 116 def after_connection # should be implemented # returns thread instance for monitor thread to wait # for Exception raised by MQTT I/O end |
#after_disconnection ⇒ Object
95 96 97 |
# File 'lib/fluent/plugin/mqtt_proxy.rb', line 95 def after_disconnection # should be implemented end |
#connect ⇒ Object
122 123 124 125 126 127 128 129 130 131 132 |
# File 'lib/fluent/plugin/mqtt_proxy.rb', line 122 def connect thread_create("#{current_plugin_name}_monitor".to_sym) do rescue_disconnection do @client.connect log.debug "connected to mqtt broker #{@host}:#{@port} for #{current_plugin_name}" init_retry_interval thread = after_connection thread.join end end end |
#current_plugin_name ⇒ Object
46 47 48 |
# File 'lib/fluent/plugin/mqtt_proxy.rb', line 46 def current_plugin_name # should be implemented end |
#increment_retry_interval ⇒ Object
81 82 83 84 |
# File 'lib/fluent/plugin/mqtt_proxy.rb', line 81 def increment_retry_interval return @retry_interval if @retry_interval >= @max_retry_interval @retry_interval = @retry_interval * @retry_inc_ratio end |
#init_retry_interval ⇒ Object
77 78 79 |
# File 'lib/fluent/plugin/mqtt_proxy.rb', line 77 def init_retry_interval @retry_interval = @initial_interval end |
#rescue_disconnection(*block) ⇒ Object
99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 |
# File 'lib/fluent/plugin/mqtt_proxy.rb', line 99 def rescue_disconnection(*block) begin yield *block rescue MQTT::ProtocolException => e # TODO: # Currently MQTT::ProtocolException cannot be caught during @client.get # and @client.publish. The reason must be investigated... retry_connect(e, "Protocol error occurs.") rescue Timeout::Error => e retry_connect(e, "Timeout error occurs.") rescue SystemCallError => e retry_connect(e, "System call error occurs.") rescue StandardError=> e retry_connect(e, "The other error occurs.") end end |
#retry_connect(e, message) ⇒ Object
86 87 88 89 90 91 92 93 |
# File 'lib/fluent/plugin/mqtt_proxy.rb', line 86 def retry_connect(e, ) log.error "#{},#{e.class},#{e.}" log.error "Retry in #{@retry_interval} sec" timer_execute("#{current_plugin_name}_connect".to_sym, @retry_interval, repeat: false, &method(:connect)) increment_retry_interval after_disconnection @client.disconnect if @client.connected? end |
#shutdown_proxy ⇒ Object
73 74 75 |
# File 'lib/fluent/plugin/mqtt_proxy.rb', line 73 def shutdown_proxy @client.disconnect end |
#start_proxy ⇒ Object
50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 |
# File 'lib/fluent/plugin/mqtt_proxy.rb', line 50 def start_proxy log.debug "start mqtt proxy for #{current_plugin_name}" log.debug "start to connect mqtt broker #{@host}:#{@port}" opts = { host: @host, port: @port, client_id: @client_id, keep_alive: @keep_alive } opts[:username] = @security.username if @security.to_h.has_key?(:username) opts[:password] = @security.password if @security.to_h.has_key?(:password) if @security.to_h.has_key?(:use_tls) && @security.use_tls opts[:ssl] = @security.use_tls opts[:ca_file] = @security.tls.ca_file opts[:cert_file] = @security.tls.cert_file opts[:key_file] = @security.tls.key_file end init_retry_interval @client = MQTT::Client.new(opts) connect end |