Module: Fluent::Plugin::MqttProxy

Included in:
MqttInput, MqttOutput
Defined in:
lib/fluent/plugin/mqtt_proxy.rb

Defined Under Namespace

Classes: MqttProxyError

Constant Summary collapse

MQTT_PORT =
1883

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.included(base) ⇒ Object



6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
# File 'lib/fluent/plugin/mqtt_proxy.rb', line 6

def self.included(base)
  base.helpers :timer, :thread

  base.desc 'The address to connect to.'
  base.config_param :host, :string, default: '127.0.0.1'
  base.desc 'The port to connect to.'
  base.config_param :port, :integer, default: MQTT_PORT
  base.desc 'Client ID of MQTT Connection'
  base.config_param :client_id, :string, default: nil
  base.desc 'Specify keep alive interval.'
  base.config_param :keep_alive, :integer, default: 15
  base.desc 'Specify initial connection retry interval.'
  base.config_param :initial_interval, :integer, default: 1
  base.desc 'Specify increasing ratio of connection retry interval.'
  base.config_param :retry_inc_ratio, :integer, default: 2
  base.desc 'Specify maximum connection retry interval.'
  base.config_param :max_retry_interval, :integer, default: 300

  base.config_section :security, required: false, multi: false do
    ### User based authentication
    desc 'The username for authentication'
    config_param :username, :string, default: nil
    desc 'The password for authentication'
    config_param :password, :string, default: nil
    desc 'Use TLS or not.'
    config_param :use_tls, :bool, default: nil
    config_section :tls, required: false, multi: false do
      desc 'Specify TLS ca file.'
      config_param :ca_file, :string, default: nil
      desc 'Specify TLS key file.'
      config_param :key_file, :string, default: nil
      desc 'Specify TLS cert file.'
      config_param :cert_file, :string, default: nil
    end
  end
end

Instance Method Details

#after_connectionObject



116
117
118
119
120
# File 'lib/fluent/plugin/mqtt_proxy.rb', line 116

def after_connection
  # should be implemented
  # returns thread instance for monitor thread to wait
  # for Exception raised by MQTT I/O
end

#after_disconnectionObject



95
96
97
# File 'lib/fluent/plugin/mqtt_proxy.rb', line 95

def after_disconnection
  # should be implemented
end

#connectObject



122
123
124
125
126
127
128
129
130
131
132
# File 'lib/fluent/plugin/mqtt_proxy.rb', line 122

def connect
  thread_create("#{current_plugin_name}_monitor".to_sym) do
    rescue_disconnection do
      @client.connect
      log.debug "connected to mqtt broker #{@host}:#{@port} for #{current_plugin_name}"
      init_retry_interval
      thread = after_connection
      thread.join
    end
  end
end

#current_plugin_nameObject



46
47
48
# File 'lib/fluent/plugin/mqtt_proxy.rb', line 46

def current_plugin_name
  # should be implemented
end

#increment_retry_intervalObject



81
82
83
84
# File 'lib/fluent/plugin/mqtt_proxy.rb', line 81

def increment_retry_interval
  return @retry_interval if @retry_interval >= @max_retry_interval
  @retry_interval = @retry_interval * @retry_inc_ratio
end

#init_retry_intervalObject



77
78
79
# File 'lib/fluent/plugin/mqtt_proxy.rb', line 77

def init_retry_interval
  @retry_interval = @initial_interval
end

#rescue_disconnection(*block) ⇒ Object



99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
# File 'lib/fluent/plugin/mqtt_proxy.rb', line 99

def rescue_disconnection(*block)
  begin
    yield *block
  rescue MQTT::ProtocolException => e
    # TODO:
    # Currently MQTT::ProtocolException cannot be caught during @client.get
    # and @client.publish. The reason must be investigated...
    retry_connect(e, "Protocol error occurs.")
  rescue Timeout::Error => e
    retry_connect(e, "Timeout error occurs.")
  rescue SystemCallError => e
    retry_connect(e, "System call error occurs.")
  rescue StandardError=> e
    retry_connect(e, "The other error occurs.")
  end
end

#retry_connect(e, message) ⇒ Object



86
87
88
89
90
91
92
93
# File 'lib/fluent/plugin/mqtt_proxy.rb', line 86

def retry_connect(e, message)
  log.error "#{message},#{e.class},#{e.message}"
  log.error "Retry in #{@retry_interval} sec"
  timer_execute("#{current_plugin_name}_connect".to_sym, @retry_interval, repeat: false, &method(:connect))
  increment_retry_interval
  after_disconnection
  @client.disconnect if @client.connected?
end

#shutdown_proxyObject



73
74
75
# File 'lib/fluent/plugin/mqtt_proxy.rb', line 73

def shutdown_proxy
  @client.disconnect
end

#start_proxyObject



50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
# File 'lib/fluent/plugin/mqtt_proxy.rb', line 50

def start_proxy
  log.debug "start mqtt proxy for #{current_plugin_name}"
  log.debug "start to connect mqtt broker #{@host}:#{@port}"
  opts = {
    host: @host,
    port: @port,
    client_id: @client_id,
    keep_alive: @keep_alive
  }
  opts[:username] = @security.username if @security.to_h.has_key?(:username)
  opts[:password] = @security.password if @security.to_h.has_key?(:password)
  if @security.to_h.has_key?(:use_tls) && @security.use_tls
    opts[:ssl] = @security.use_tls
    opts[:ca_file] = @security.tls.ca_file
    opts[:cert_file] = @security.tls.cert_file
    opts[:key_file] = @security.tls.key_file
  end

  init_retry_interval
  @client = MQTT::Client.new(opts)
  connect
end