Class: Fluent::DdOutput
- Inherits:
-
BufferedOutput
- Object
- BufferedOutput
- Fluent::DdOutput
- Defined in:
- lib/fluent/plugin/out_dd.rb
Instance Method Summary collapse
- #configure(conf) ⇒ Object
- #format(tag, time, record) ⇒ Object
-
#initialize ⇒ DdOutput
constructor
A new instance of DdOutput.
- #shutdown ⇒ Object
- #start ⇒ Object
- #write(chunk) ⇒ Object
Constructor Details
#initialize ⇒ DdOutput
Returns a new instance of DdOutput.
18 19 20 21 22 23 |
# File 'lib/fluent/plugin/out_dd.rb', line 18 def initialize super require 'dogapi' require 'socket' require 'thread' end |
Instance Method Details
#configure(conf) ⇒ Object
55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 |
# File 'lib/fluent/plugin/out_dd.rb', line 55 def configure(conf) super unless @dd_api_key raise Fluent::ConfigError, '`dd_api_key` is required' end if !@emit_in_background && @concurrency raise Fluent::ConfigError, '`concurrency` should be used with `emit_in_background`' end @concurrency ||= 1 unless @host @host = %x[hostname -f 2> /dev/null].strip @host = Socket.gethostname if @host.empty? end @dog = Dogapi::Client.new(@dd_api_key, @dd_app_key, @host, @device, @silent, @timeout) end |
#format(tag, time, record) ⇒ Object
75 76 77 |
# File 'lib/fluent/plugin/out_dd.rb', line 75 def format(tag, time, record) [tag, time, record].to_msgpack end |
#shutdown ⇒ Object
42 43 44 45 46 47 48 49 50 51 52 53 |
# File 'lib/fluent/plugin/out_dd.rb', line 42 def shutdown super if @emit_in_background @threads.size.times do @queue.push(false) end @threads.each do |thread| thread.join end end end |
#start ⇒ Object
25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 |
# File 'lib/fluent/plugin/out_dd.rb', line 25 def start super if @emit_in_background @queue = Queue.new @threads = @concurrency.times.map do Thread.start do while (job = @queue.pop) emit_points(*job) Thread.pass end end end end end |
#write(chunk) ⇒ Object
79 80 81 82 83 84 85 86 87 88 89 |
# File 'lib/fluent/plugin/out_dd.rb', line 79 def write(chunk) jobs = chunk_to_jobs(chunk) jobs.each do |job| if @emit_in_background @queue.push(job) else emit_points(*job) end end end |