Class: Fluent::Plugin::MqttInput

Inherits:
Input
  • Object
show all
Includes:
MqttProxy, TimeMixin::Formatter
Defined in:
lib/fluent/plugin/in_mqtt.rb

Constant Summary

Constants included from MqttProxy

Fluent::Plugin::MqttProxy::MQTT_PORT

Instance Method Summary collapse

Methods included from MqttProxy

#connect, included, #increment_retry_interval, #init_retry_interval, #rescue_disconnection, #retry_connect, #shutdown_proxy, #start_proxy

Instance Method Details

#add_recv_time(record) ⇒ Object



84
85
86
87
88
89
90
91
# File 'lib/fluent/plugin/in_mqtt.rb', line 84

def add_recv_time(record)
  if !@monitor.nil? && @monitor.recv_time
    # recv_time is recorded in ms
    record.merge({"#{@monitor.recv_time_key}": @recv_time_formatter.format(Fluent::EventTime.now)})
  else
    record
  end
end

#after_connectionObject



72
73
74
75
76
77
78
79
80
81
82
# File 'lib/fluent/plugin/in_mqtt.rb', line 72

def after_connection
  if @client.connected?
    @client.subscribe(@topic)
    @get_thread = thread_create(:in_mqtt_get) do
      @client.get do |topic, message|
        emit(topic, message)
      end
    end
  end
  @get_thread
end

#after_disconnectionObject



69
70
# File 'lib/fluent/plugin/in_mqtt.rb', line 69

def after_disconnection
end

#configure(conf) ⇒ Object



39
40
41
42
43
44
45
46
47
# File 'lib/fluent/plugin/in_mqtt.rb', line 39

def configure(conf)
  super
  configure_parser(conf)
  if !@monitor.nil?
    @recv_time_formatter = time_formatter_create(
      type: @monitor.time_type.to_sym, format: @monitor.time_format
    )
  end
end

#configure_parser(conf) ⇒ Object



49
50
51
52
53
# File 'lib/fluent/plugin/in_mqtt.rb', line 49

def configure_parser(conf)
  compat_parameters_convert(conf, :parser)
  parser_config = conf.elements('parse').first
  @parser = parser_create(conf: parser_config)
end

#current_plugin_nameObject



65
66
67
# File 'lib/fluent/plugin/in_mqtt.rb', line 65

def current_plugin_name
  :in_mqtt
end

#emit(topic, message) ⇒ Object



103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
# File 'lib/fluent/plugin/in_mqtt.rb', line 103

def emit(topic, message)
  begin
    tag = topic.gsub("/","\.")
    time, record = parse(message)
    if record.is_a?(Array)
      mes = Fluent::MultiEventStream.new
      record.each do |single_record|
        log.debug "MqttInput#emit: #{tag}, #{time}, #{add_recv_time(single_record)}"
        mes.add(@parser.parse_time(single_record), add_recv_time(single_record))
      end
      router.emit_stream(tag, mes)
    else
      log.debug "MqttInput#emit: #{tag}, #{time}, #{add_recv_time(record)}"
      router.emit(tag, time, add_recv_time(record))
    end
  rescue Exception => e
    log.error error: e.to_s
    log.debug_backtrace(e.backtrace)
  end
end

#parse(message) ⇒ Object



93
94
95
96
97
98
99
100
101
# File 'lib/fluent/plugin/in_mqtt.rb', line 93

def parse(message)
  @parser.parse(message) do |time, record|
    if time.nil?
      log.debug "Since time_key field is nil, Fluent::EventTime.now is used."
      time = Fluent::EventTime.now
    end
    return [time, record]
  end
end

#shutdownObject



60
61
62
63
# File 'lib/fluent/plugin/in_mqtt.rb', line 60

def shutdown
  shutdown_proxy
  super
end

#startObject



55
56
57
58
# File 'lib/fluent/plugin/in_mqtt.rb', line 55

def start
  super
  start_proxy
end