Class: Fluent::NcmbOutput
- Inherits:
-
BufferedOutput
- Object
- BufferedOutput
- Fluent::NcmbOutput
- Defined in:
- lib/fluent/plugin/out_ncmb.rb
Instance Attribute Summary collapse
-
#ncmb ⇒ Object
readonly
Returns the value of attribute ncmb.
Instance Method Summary collapse
- #bulk_insert(records) ⇒ Object
-
#configure(conf) ⇒ Object
This method is called before starting.
- #format(tag, time, record) ⇒ Object
-
#shutdown ⇒ Object
This method is called when shutting down.
-
#start ⇒ Object
This method is called when starting.
- #write(chunk) ⇒ Object
- #write_failed_log(records) ⇒ Object
Instance Attribute Details
#ncmb ⇒ Object (readonly)
Returns the value of attribute ncmb.
17 18 19 |
# File 'lib/fluent/plugin/out_ncmb.rb', line 17 def ncmb @ncmb end |
Instance Method Details
#bulk_insert(records) ⇒ Object
92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 |
# File 'lib/fluent/plugin/out_ncmb.rb', line 92 def bulk_insert(records) requests = records.map {|tag, time, record| {path: @path, method: :POST, body: {tag: tag, time: time, record: record}} } cnt = 0 while cnt < @retry_limit begin res = @ncmb.post("/#{@api_version}/batch", requests: requests) rescue # リクエストが失敗した場合 next cnt += 1 end # レスポンスがエラーだった場合 if res.is_a?(Hash) && res.has_key?(:code) next cnt += 1 else return end end # retry上限を超えても送信できなかった場合、送信失敗logに書き込む write_failed_log(records) return end |
#configure(conf) ⇒ Object
This method is called before starting.
20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 |
# File 'lib/fluent/plugin/out_ncmb.rb', line 20 def configure(conf) super raise Fluent::ConfigError.new("ConfigError: Please input application_key") if @application_key.nil? raise Fluent::ConfigError.new("ConfigError: Please input client_key") if @client_key.nil? raise Fluent::ConfigError.new("ConfigError: Please input class_name") if @class_name.nil? if @buffer_type == 'file' if Dir.exist?(@config['buffer_path']) unless File.writable?(@config['buffer_path']) raise Fluent::ConfigError.new("ConfigError: Permission denied => buffer_path: #{@buffer_path}") end else begin FileUtils.mkdir_p(@config['buffer_path']) rescue raise Fluent::ConfigError.new("ConfigError: Permission denied => buffer_path: #{@buffer_path}") end end end if Dir.exist?(@failed_log_path) unless File.writable?(@failed_log_path) raise Fluent::ConfigError.new("ConfigError: Permission denied => failed_log_path: #{@failed_log_path}") end else begin FileUtils.mkdir_p(@failed_log_path) rescue raise Fluent::ConfigError.new("ConfigError: Permission denied => failed_log_path: #{@failed_log_path}") end end require 'ncmb' @ncmb = NCMB.initialize(application_key: @application_key, client_key: @client_key) end |
#format(tag, time, record) ⇒ Object
70 71 72 |
# File 'lib/fluent/plugin/out_ncmb.rb', line 70 def format(tag, time, record) [tag, time, record].to_msgpack end |
#shutdown ⇒ Object
This method is called when shutting down.
66 67 68 |
# File 'lib/fluent/plugin/out_ncmb.rb', line 66 def shutdown super end |
#start ⇒ Object
This method is called when starting.
58 59 60 61 62 63 |
# File 'lib/fluent/plugin/out_ncmb.rb', line 58 def start super @path = "/#{@api_version}/classes/#{@class_name}" @mutex = Mutex.new end |
#write(chunk) ⇒ Object
74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 |
# File 'lib/fluent/plugin/out_ncmb.rb', line 74 def write(chunk) records = [] @mutex.lock begin chunk.msgpack_each do |tag, time, record| records << [tag, time, record] end ensure @mutex.unlock end # batchAPIの件数上限が50のため、50件毎に実行する records.each_slice(50) do |slice| bulk_insert(slice) end end |
#write_failed_log(records) ⇒ Object
118 119 120 121 122 123 124 |
# File 'lib/fluent/plugin/out_ncmb.rb', line 118 def write_failed_log(records) msgpack = records.map{|record| format(*record)}.to_msgpack File.open("#{@failed_log_path}/failed.log", 'a+') do |file| file.sync = true file.write(msgpack) end end |