Class: Fluent::Plugin::OutMqtt
- Inherits:
-
Output
- Object
- Output
- Fluent::Plugin::OutMqtt
- Defined in:
- lib/fluent/plugin/out_mqtt.rb
Constant Summary collapse
- DEFAULT_BUFFER_TYPE =
"memory"
Instance Method Summary collapse
- #configure(conf) ⇒ Object
- #format(tag, time, record) ⇒ Object
- #formatted_to_msgpack_binary ⇒ Object
-
#initialize ⇒ OutMqtt
constructor
A new instance of OutMqtt.
- #multi_workers_ready? ⇒ Boolean
- #shutdown ⇒ Object
- #start ⇒ Object
- #write(chunk) ⇒ Object
Constructor Details
#initialize ⇒ OutMqtt
Returns a new instance of OutMqtt.
40 41 42 43 44 45 46 |
# File 'lib/fluent/plugin/out_mqtt.rb', line 40 def initialize super @clients = {} = {} = {:capped => false} end |
Instance Method Details
#configure(conf) ⇒ Object
48 49 50 51 52 53 54 55 56 57 58 59 |
# File 'lib/fluent/plugin/out_mqtt.rb', line 48 def configure(conf) compat_parameters_convert(conf, :buffer, :inject, :formatter) super @bind ||= conf['bind'] @topic ||= conf['topic'] @port ||= conf['port'] @formatter = formatter_create if conf.has_key?('buffer_chunk_limit') #check buffer_size conf['buffer_chunk_limit'] = available_buffer_chunk_limit(conf) end end |
#format(tag, time, record) ⇒ Object
82 83 84 |
# File 'lib/fluent/plugin/out_mqtt.rb', line 82 def format(tag, time, record) [time, record].to_msgpack end |
#formatted_to_msgpack_binary ⇒ Object
86 87 88 |
# File 'lib/fluent/plugin/out_mqtt.rb', line 86 def formatted_to_msgpack_binary true end |
#multi_workers_ready? ⇒ Boolean
90 91 92 |
# File 'lib/fluent/plugin/out_mqtt.rb', line 90 def multi_workers_ready? true end |
#shutdown ⇒ Object
77 78 79 80 |
# File 'lib/fluent/plugin/out_mqtt.rb', line 77 def shutdown @connect.disconnect super end |
#start ⇒ Object
61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 |
# File 'lib/fluent/plugin/out_mqtt.rb', line 61 def start log.debug "start mqtt #{@bind}" opts = {host: @bind, port: @port} opts[:client_id] = @client_id if @client_id opts[:username] = @username if @username opts[:password] = @password if @password opts[:ssl] = @ssl if @ssl opts[:ca_file] = @ca if @ca opts[:cert_file] = @cert if @cert opts[:key_file] = @key if @key @connect = MQTT::Client.connect(opts) super end |
#write(chunk) ⇒ Object
94 95 96 97 98 99 100 101 |
# File 'lib/fluent/plugin/out_mqtt.rb', line 94 def write(chunk) tag = chunk..tag chunk.msgpack_each { |time, record| record = inject_values_to_record(tag, time, record) log.debug "write #{@topic} #{@formatter.format(tag,time,record)}" @connect.publish(@topic, @formatter.format(tag,time,record), retain=@retain) } end |