Module: Fluent::Plugin::MqttProxy
- Included in:
- MqttInput, MqttOutput
- Defined in:
- lib/fluent/plugin/mqtt_proxy.rb
Defined Under Namespace
Classes: ExceedRetryFrequencyThresholdException, MqttError
Constant Summary collapse
- MQTT_PORT =
1883
Class Method Summary collapse
Instance Method Summary collapse
- #after_connection ⇒ Object
- #check_retry_frequency ⇒ Object
- #connect ⇒ Object
- #current_plugin_name ⇒ Object
- #disconnect ⇒ Object
- #increment_retry_interval ⇒ Object
- #init_retry_interval ⇒ Object
- #proxy ⇒ Object
- #rescue_disconnection ⇒ Object
- #retry_connect(e, message) ⇒ Object
- #shutdown_proxy ⇒ Object
- #start_proxy ⇒ Object
- #terminate ⇒ Object
- #update_retry_sequence(e) ⇒ Object
Class Method Details
.included(base) ⇒ Object
6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 |
# File 'lib/fluent/plugin/mqtt_proxy.rb', line 6 def self.included(base) base.helpers :timer, :thread base.desc 'The address to connect to.' base.config_param :host, :string, default: '127.0.0.1' base.desc 'The port to connect to.' base.config_param :port, :integer, default: MQTT_PORT base.desc 'Client ID of MQTT Connection' base.config_param :client_id, :string, default: nil base.desc 'Specify clean session value.' base.config_param :clean_session, :bool, default: true base.desc 'Specify keep alive interval.' base.config_param :keep_alive, :integer, default: 15 base.desc 'Specify initial connection retry interval.' base.config_param :initial_interval, :integer, default: 1 base.desc 'Specify increasing ratio of connection retry interval.' base.config_param :retry_inc_ratio, :integer, default: 2 base.desc 'Specify the maximum connection retry interval.' base.config_param :max_retry_interval, :integer, default: 300 base.desc 'Specify threshold of retry frequency as number of retries per minutes. Frequency is monitored per retry.' base.config_param :max_retry_freq, :integer, default: 10 base.config_section :security, required: false, multi: false do ### User based authentication desc 'The username for authentication' config_param :username, :string, default: nil desc 'The password for authentication' config_param :password, :string, default: nil desc 'Use TLS or not.' config_param :use_tls, :bool, default: nil config_section :tls, required: false, multi: false do desc 'Specify TLS ca file.' config_param :ca_file, :string, default: nil desc 'Specify TLS key file.' config_param :key_file, :string, default: nil desc 'Specify TLS cert file.' config_param :cert_file, :string, default: nil end end end |
Instance Method Details
#after_connection ⇒ Object
161 162 163 164 165 |
# File 'lib/fluent/plugin/mqtt_proxy.rb', line 161 def after_connection # should be implemented # returns thread instance for monitor thread to wait # for Exception raised by MQTT I/O end |
#check_retry_frequency ⇒ Object
108 109 110 111 112 113 114 |
# File 'lib/fluent/plugin/mqtt_proxy.rb', line 108 def check_retry_frequency return if @retry_sequence.size <= 1 if @retry_sequence.size > @max_retry_freq log.error "Retry frequency threshold is exceeded: #{@retry_sequence}" raise ExceedRetryFrequencyThresholdException end end |
#connect ⇒ Object
167 168 169 170 171 172 173 174 175 |
# File 'lib/fluent/plugin/mqtt_proxy.rb', line 167 def connect rescue_disconnection do @client.connect log.debug "connected to mqtt broker #{@host}:#{@port} for #{current_plugin_name}" init_retry_interval thread = after_connection thread.join end end |
#current_plugin_name ⇒ Object
51 52 53 |
# File 'lib/fluent/plugin/mqtt_proxy.rb', line 51 def current_plugin_name # should be implemented end |
#disconnect ⇒ Object
128 129 130 |
# File 'lib/fluent/plugin/mqtt_proxy.rb', line 128 def disconnect # should be implemented end |
#increment_retry_interval ⇒ Object
95 96 97 98 |
# File 'lib/fluent/plugin/mqtt_proxy.rb', line 95 def increment_retry_interval return @max_retry_interval if @retry_interval >= @max_retry_interval @retry_interval = @retry_interval * @retry_inc_ratio end |
#init_retry_interval ⇒ Object
91 92 93 |
# File 'lib/fluent/plugin/mqtt_proxy.rb', line 91 def init_retry_interval @retry_interval = @initial_interval end |
#proxy ⇒ Object
62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 |
# File 'lib/fluent/plugin/mqtt_proxy.rb', line 62 def proxy log.debug "start mqtt proxy for #{current_plugin_name}" log.debug "start to connect mqtt broker #{@host}:#{@port}" opts = { host: @host, port: @port, client_id: @client_id, clean_session: @clean_session, keep_alive: @keep_alive } opts[:username] = @security.username if @security.to_h.has_key?(:username) opts[:password] = @security.password if @security.to_h.has_key?(:password) if @security.to_h.has_key?(:use_tls) && @security.use_tls opts[:ssl] = @security.use_tls opts[:ca_file] = @security.tls.ca_file opts[:cert_file] = @security.tls.cert_file opts[:key_file] = @security.tls.key_file end init_retry_interval @retry_sequence = [] @client = MQTT::Client.new(opts) connect end |
#rescue_disconnection ⇒ Object
135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 |
# File 'lib/fluent/plugin/mqtt_proxy.rb', line 135 def rescue_disconnection # Errors cannot be caught by fluentd core must be caught here. # Since fluentd core retries write method for buffered output # when it caught Errors during the previous write, # caughtable Error, e.g. MqttError, should be raised here. begin yield rescue MQTT::ProtocolException => e retry_connect(e, "Protocol error occurs in #{current_plugin_name}.") rescue Timeout::Error => e retry_connect(e, "Timeout error occurs in #{current_plugin_name}.") rescue SystemCallError => e retry_connect(e, "System call error occurs in #{current_plugin_name}.") rescue StandardError=> e retry_connect(e, "The other error occurs in #{current_plugin_name}.") rescue MQTT::NotConnectedException=> e # Since MQTT::NotConnectedException is raised only on publish, # connection error should be catched before this error. # So, reconnection process is omitted for this Exception # to prevent waistful increment of retry interval. #log.error "MQTT not connected exception occurs.,#{e.class},#{e.message}" #retry_connect(e, "MQTT not connected exception occurs.") #raise MqttError, "MQTT not connected exception occurs in #{current_plugin_name}." end end |
#retry_connect(e, message) ⇒ Object
116 117 118 119 120 121 122 123 124 125 126 |
# File 'lib/fluent/plugin/mqtt_proxy.rb', line 116 def retry_connect(e, ) log.error "#{},#{e.class},#{e.}" log.error "Retry in #{@retry_interval} sec" update_retry_sequence(e) check_retry_frequency disconnect sleep @retry_interval increment_retry_interval connect # never reach here end |
#shutdown_proxy ⇒ Object
87 88 89 |
# File 'lib/fluent/plugin/mqtt_proxy.rb', line 87 def shutdown_proxy disconnect end |
#start_proxy ⇒ Object
55 56 57 58 59 60 |
# File 'lib/fluent/plugin/mqtt_proxy.rb', line 55 def start_proxy # Start a thread from main thread for handling a thread generated # by MQTT::Client#get (in_mqtt). Dummy thread is used for out_mqtt # to keep the same implementation style. @proxy_thread = thread_create("#{current_plugin_name}_proxy".to_sym, &method(:proxy)) end |
#terminate ⇒ Object
132 133 |
# File 'lib/fluent/plugin/mqtt_proxy.rb', line 132 def terminate end |
#update_retry_sequence(e) ⇒ Object
100 101 102 103 104 105 106 |
# File 'lib/fluent/plugin/mqtt_proxy.rb', line 100 def update_retry_sequence(e) @retry_sequence << {time: Time.now, error: "#{e.class}: #{e.}"} # delete old retry records while @retry_sequence[0][:time] < Time.now - 60 @retry_sequence.shift end end |