Module: Fluent::Plugin::MqttProxy

Included in:
MqttInput, MqttOutput
Defined in:
lib/fluent/plugin/mqtt_proxy.rb

Defined Under Namespace

Classes: ExceedRetryFrequencyThresholdException, MqttError

Constant Summary collapse

MQTT_PORT =
1883

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.included(base) ⇒ Object



6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
# File 'lib/fluent/plugin/mqtt_proxy.rb', line 6

def self.included(base)
  base.helpers :timer, :thread

  base.desc 'The address to connect to.'
  base.config_param :host, :string, default: '127.0.0.1'
  base.desc 'The port to connect to.'
  base.config_param :port, :integer, default: MQTT_PORT
  base.desc 'Client ID of MQTT Connection'
  base.config_param :client_id, :string, default: nil
  base.desc 'Specify clean session value.'
  base.config_param :clean_session, :bool, default: true
  base.desc 'Specify keep alive interval.'
  base.config_param :keep_alive, :integer, default: 15
  base.desc 'Specify initial connection retry interval.'
  base.config_param :initial_interval, :integer, default: 1
  base.desc 'Specify increasing ratio of connection retry interval.'
  base.config_param :retry_inc_ratio, :integer, default: 2
  base.desc 'Specify the maximum connection retry interval.'
  base.config_param :max_retry_interval, :integer, default: 300
  base.desc 'Specify threshold of retry frequency as number of retries per minutes. Frequency is monitored per retry.'
  base.config_param :max_retry_freq, :integer, default: 10

  base.config_section :security, required: false, multi: false do
    ### User based authentication
    desc 'The username for authentication'
    config_param :username, :string, default: nil
    desc 'The password for authentication'
    config_param :password, :string, default: nil
    desc 'Use TLS or not.'
    config_param :use_tls, :bool, default: nil
    config_section :tls, required: false, multi: false do
      desc 'Specify TLS ca file.'
      config_param :ca_file, :string, default: nil
      desc 'Specify TLS key file.'
      config_param :key_file, :string, default: nil
      desc 'Specify TLS cert file.'
      config_param :cert_file, :string, default: nil
    end
  end
end

Instance Method Details

#after_connectionObject



161
162
163
164
165
# File 'lib/fluent/plugin/mqtt_proxy.rb', line 161

def after_connection
  # should be implemented
  # returns thread instance for monitor thread to wait
  # for Exception raised by MQTT I/O
end

#check_retry_frequencyObject



108
109
110
111
112
113
114
# File 'lib/fluent/plugin/mqtt_proxy.rb', line 108

def check_retry_frequency
  return if @retry_sequence.size <= 1
  if @retry_sequence.size > @max_retry_freq
    log.error "Retry frequency threshold is exceeded: #{@retry_sequence}"
    raise ExceedRetryFrequencyThresholdException
  end
end

#connectObject



167
168
169
170
171
172
173
174
175
# File 'lib/fluent/plugin/mqtt_proxy.rb', line 167

def connect
  rescue_disconnection do
    @client.connect
    log.debug "connected to mqtt broker #{@host}:#{@port} for #{current_plugin_name}"
    init_retry_interval
    thread = after_connection
    thread.join
  end
end

#current_plugin_nameObject



51
52
53
# File 'lib/fluent/plugin/mqtt_proxy.rb', line 51

def current_plugin_name
  # should be implemented
end

#disconnectObject



128
129
130
# File 'lib/fluent/plugin/mqtt_proxy.rb', line 128

def disconnect
  # should be implemented
end

#increment_retry_intervalObject



95
96
97
98
# File 'lib/fluent/plugin/mqtt_proxy.rb', line 95

def increment_retry_interval
  return @max_retry_interval if @retry_interval >= @max_retry_interval
  @retry_interval = @retry_interval * @retry_inc_ratio
end

#init_retry_intervalObject



91
92
93
# File 'lib/fluent/plugin/mqtt_proxy.rb', line 91

def init_retry_interval
  @retry_interval = @initial_interval
end

#proxyObject



62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
# File 'lib/fluent/plugin/mqtt_proxy.rb', line 62

def proxy
  log.debug "start mqtt proxy for #{current_plugin_name}"
  log.debug "start to connect mqtt broker #{@host}:#{@port}"
  opts = {
    host: @host,
    port: @port,
    client_id: @client_id,
    clean_session: @clean_session,
    keep_alive: @keep_alive
  }
  opts[:username] = @security.username if @security.to_h.has_key?(:username)
  opts[:password] = @security.password if @security.to_h.has_key?(:password)
  if @security.to_h.has_key?(:use_tls) && @security.use_tls
    opts[:ssl] = @security.use_tls
    opts[:ca_file] = @security.tls.ca_file
    opts[:cert_file] = @security.tls.cert_file
    opts[:key_file] = @security.tls.key_file
  end

  init_retry_interval
  @retry_sequence = []
  @client = MQTT::Client.new(opts)
  connect
end

#rescue_disconnectionObject



135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
# File 'lib/fluent/plugin/mqtt_proxy.rb', line 135

def rescue_disconnection
  # Errors cannot be caught by fluentd core must be caught here.
  # Since fluentd core retries write method for buffered output 
  # when it caught Errors during the previous write,
  # caughtable Error, e.g. MqttError, should be raised here.
  begin
    yield
  rescue MQTT::ProtocolException => e
    retry_connect(e, "Protocol error occurs in #{current_plugin_name}.")
  rescue Timeout::Error => e
    retry_connect(e, "Timeout error occurs in #{current_plugin_name}.")
  rescue SystemCallError => e
    retry_connect(e, "System call error occurs in #{current_plugin_name}.")
  rescue StandardError=> e
    retry_connect(e, "The other error occurs in #{current_plugin_name}.")
  rescue MQTT::NotConnectedException=> e
    # Since MQTT::NotConnectedException is raised only on publish,
    # connection error should be catched before this error.
    # So, reconnection process is omitted for this Exception
    # to prevent waistful increment of retry interval.
    #log.error "MQTT not connected exception occurs.,#{e.class},#{e.message}"
    #retry_connect(e, "MQTT not connected exception occurs.")
    #raise MqttError, "MQTT not connected exception occurs in #{current_plugin_name}."
  end
end

#retry_connect(e, message) ⇒ Object



116
117
118
119
120
121
122
123
124
125
126
# File 'lib/fluent/plugin/mqtt_proxy.rb', line 116

def retry_connect(e, message)
  log.error "#{message},#{e.class},#{e.message}"
  log.error "Retry in #{@retry_interval} sec"
  update_retry_sequence(e)
  check_retry_frequency
  disconnect
  sleep @retry_interval
  increment_retry_interval
  connect
  # never reach here
end

#shutdown_proxyObject



87
88
89
# File 'lib/fluent/plugin/mqtt_proxy.rb', line 87

def shutdown_proxy
  disconnect
end

#start_proxyObject



55
56
57
58
59
60
# File 'lib/fluent/plugin/mqtt_proxy.rb', line 55

def start_proxy
  # Start a thread from main thread for handling a thread generated
  # by MQTT::Client#get (in_mqtt). Dummy thread is used for out_mqtt
  # to keep the same implementation style.
  @proxy_thread = thread_create("#{current_plugin_name}_proxy".to_sym, &method(:proxy))
end

#terminateObject



132
133
# File 'lib/fluent/plugin/mqtt_proxy.rb', line 132

def terminate
end

#update_retry_sequence(e) ⇒ Object



100
101
102
103
104
105
106
# File 'lib/fluent/plugin/mqtt_proxy.rb', line 100

def update_retry_sequence(e) 
  @retry_sequence << {time: Time.now, error: "#{e.class}: #{e.message}"}
  # delete old retry records
  while @retry_sequence[0][:time] < Time.now - 60
    @retry_sequence.shift
  end 
end