Class: Fluent::Plugin::OutMqtt

Inherits:
Output
  • Object
show all
Defined in:
lib/fluent/plugin/out_mqtt.rb

Constant Summary collapse

DEFAULT_BUFFER_TYPE =
"memory"

Instance Method Summary collapse

Constructor Details

#initializeOutMqtt

Returns a new instance of OutMqtt.



40
41
42
43
44
45
46
# File 'lib/fluent/plugin/out_mqtt.rb', line 40

def initialize
  super

  @clients = {}
  @connection_options = {}
  @collection_options = {:capped => false}
end

Instance Method Details

#configure(conf) ⇒ Object



48
49
50
51
52
53
54
55
56
57
58
59
# File 'lib/fluent/plugin/out_mqtt.rb', line 48

def configure(conf)
  compat_parameters_convert(conf, :buffer, :inject, :formatter)
  super
  @bind ||= conf['bind']
  @topic ||= conf['topic']
  @port ||= conf['port']
  @formatter = formatter_create
  if conf.has_key?('buffer_chunk_limit')
    #check buffer_size
    conf['buffer_chunk_limit'] = available_buffer_chunk_limit(conf)
  end
end

#format(tag, time, record) ⇒ Object



82
83
84
# File 'lib/fluent/plugin/out_mqtt.rb', line 82

def format(tag, time, record)
  [time, record].to_msgpack
end

#formatted_to_msgpack_binaryObject



86
87
88
# File 'lib/fluent/plugin/out_mqtt.rb', line 86

def formatted_to_msgpack_binary
  true
end

#multi_workers_ready?Boolean

Returns:

  • (Boolean)


90
91
92
# File 'lib/fluent/plugin/out_mqtt.rb', line 90

def multi_workers_ready?
  true
end

#shutdownObject



77
78
79
80
# File 'lib/fluent/plugin/out_mqtt.rb', line 77

def shutdown
  @connect.disconnect
  super
end

#startObject



61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
# File 'lib/fluent/plugin/out_mqtt.rb', line 61

def start

  log.debug "start mqtt #{@bind}"
  opts = {host: @bind,
          port: @port}
  opts[:client_id] = @client_id if @client_id
  opts[:username] =  @username if @username
  opts[:password] = @password if @password
  opts[:ssl] = @ssl if @ssl
  opts[:ca_file] = @ca if @ca
  opts[:cert_file] = @cert if @cert
  opts[:key_file] = @key if @key
  @connect = MQTT::Client.connect(opts)
  super
end

#write(chunk) ⇒ Object



94
95
96
97
98
99
100
101
# File 'lib/fluent/plugin/out_mqtt.rb', line 94

def write(chunk)
  tag = chunk..tag
  chunk.msgpack_each { |time, record|
    record = inject_values_to_record(tag, time, record)
    log.debug "write #{@topic} #{@formatter.format(tag,time,record)}"
    @connect.publish(@topic, @formatter.format(tag,time,record), retain=@retain)
  }
end