Class: Fluent::DdOutput
- Inherits:
-
BufferedOutput
- Object
- BufferedOutput
- Fluent::DdOutput
- Defined in:
- lib/fluent/plugin/out_dd.rb
Instance Method Summary collapse
- #configure(conf) ⇒ Object
- #format(tag, time, record) ⇒ Object
-
#initialize ⇒ DdOutput
constructor
A new instance of DdOutput.
- #shutdown ⇒ Object
- #start ⇒ Object
- #write(chunk) ⇒ Object
Constructor Details
#initialize ⇒ DdOutput
Returns a new instance of DdOutput.
14 15 16 17 18 19 |
# File 'lib/fluent/plugin/out_dd.rb', line 14 def initialize super require 'dogapi' require 'socket' require 'thread' end |
Instance Method Details
#configure(conf) ⇒ Object
51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 |
# File 'lib/fluent/plugin/out_dd.rb', line 51 def configure(conf) super unless @dd_api_key raise Fluent::ConfigError, '`dd_api_key` is required' end if !@emit_in_background && @concurrency raise Fluent::ConfigError, '`concurrency` should be used with `emit_in_background`' end @concurrency ||= 1 unless @host @host = %x[hostname -f 2> /dev/null].strip @host = Socket.gethostname if @host.empty? end @dog = Dogapi::Client.new(@dd_api_key, nil, @host) end |
#format(tag, time, record) ⇒ Object
71 72 73 |
# File 'lib/fluent/plugin/out_dd.rb', line 71 def format(tag, time, record) [tag, time, record].to_msgpack end |
#shutdown ⇒ Object
38 39 40 41 42 43 44 45 46 47 48 49 |
# File 'lib/fluent/plugin/out_dd.rb', line 38 def shutdown super if @emit_in_background @threads.size.times do @queue.push(false) end @threads.each do |thread| thread.join end end end |
#start ⇒ Object
21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 |
# File 'lib/fluent/plugin/out_dd.rb', line 21 def start super if @emit_in_background @queue = Queue.new @threads = @concurrency.times.map do Thread.start do while (job = @queue.pop) emit_points(*job) Thread.pass end end end end end |
#write(chunk) ⇒ Object
75 76 77 78 79 80 81 82 83 84 85 |
# File 'lib/fluent/plugin/out_dd.rb', line 75 def write(chunk) jobs = chunk_to_jobs(chunk) jobs.each do |job| if @emit_in_background @queue.push(job) else emit_points(*job) end end end |