Class: Fluent::NcmbOutput

Inherits:
BufferedOutput
  • Object
show all
Defined in:
lib/fluent/plugin/out_ncmb.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#ncmbObject (readonly)

Returns the value of attribute ncmb.



17
18
19
# File 'lib/fluent/plugin/out_ncmb.rb', line 17

def ncmb
  @ncmb
end

Instance Method Details

#bulk_insert(records) ⇒ Object



92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
# File 'lib/fluent/plugin/out_ncmb.rb', line 92

def bulk_insert(records)
  requests = records.map {|tag, time, record|
    {path: @path, method: :POST, body: {tag: tag, time: time, record: record}}
  }
  cnt = 0
  while cnt < @retry_limit
    begin
      res = @ncmb.post("/#{@api_version}/batch", requests: requests)
    rescue
      # リクエストが失敗した場合
      next cnt += 1
    end

    # レスポンスがエラーだった場合
    if res.is_a?(Hash) && res.has_key?(:code)
      next cnt += 1
    else
      return
    end
  end

  # retry上限を超えても送信できなかった場合、送信失敗logに書き込む
  write_failed_log(records)
  return
end

#configure(conf) ⇒ Object

This method is called before starting.

Raises:

  • (Fluent::ConfigError)


20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
# File 'lib/fluent/plugin/out_ncmb.rb', line 20

def configure(conf)
  super

  raise Fluent::ConfigError.new("ConfigError: Please input application_key") if @application_key.nil?
  raise Fluent::ConfigError.new("ConfigError: Please input client_key") if @client_key.nil?
  raise Fluent::ConfigError.new("ConfigError: Please input class_name") if @class_name.nil?

  if @buffer_type == 'file'
    if Dir.exist?(@config['buffer_path'])
      unless File.writable?(@config['buffer_path'])
        raise Fluent::ConfigError.new("ConfigError: Permission denied => buffer_path: #{@buffer_path}")
      end
    else
      begin
        FileUtils.mkdir_p(@config['buffer_path'])
      rescue
        raise Fluent::ConfigError.new("ConfigError: Permission denied => buffer_path: #{@buffer_path}")
      end
    end
  end

  if Dir.exist?(@failed_log_path)
    unless File.writable?(@failed_log_path)
      raise Fluent::ConfigError.new("ConfigError: Permission denied => failed_log_path: #{@failed_log_path}")
    end
  else
    begin
      FileUtils.mkdir_p(@failed_log_path)
    rescue
      raise Fluent::ConfigError.new("ConfigError: Permission denied => failed_log_path: #{@failed_log_path}")
    end
  end

  require 'ncmb'
  @ncmb = NCMB.initialize(application_key: @application_key, client_key: @client_key)
end

#format(tag, time, record) ⇒ Object



70
71
72
# File 'lib/fluent/plugin/out_ncmb.rb', line 70

def format(tag, time, record)
  [tag, time, record].to_msgpack
end

#shutdownObject

This method is called when shutting down.



66
67
68
# File 'lib/fluent/plugin/out_ncmb.rb', line 66

def shutdown
  super
end

#startObject

This method is called when starting.



58
59
60
61
62
63
# File 'lib/fluent/plugin/out_ncmb.rb', line 58

def start
  super

  @path = "/#{@api_version}/classes/#{@class_name}"
  @mutex = Mutex.new
end

#write(chunk) ⇒ Object



74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
# File 'lib/fluent/plugin/out_ncmb.rb', line 74

def write(chunk)
  records = []

  @mutex.lock
  begin
    chunk.msgpack_each do |tag, time, record|
      records << [tag, time, record]
    end
  ensure
    @mutex.unlock
  end

  # batchAPIの件数上限が50のため、50件毎に実行する
  records.each_slice(50) do |slice|
    bulk_insert(slice)
  end
end

#write_failed_log(records) ⇒ Object



118
119
120
121
122
123
124
# File 'lib/fluent/plugin/out_ncmb.rb', line 118

def write_failed_log(records)
  msgpack = records.map{|record| format(*record)}.to_msgpack
  File.open("#{@failed_log_path}/failed.log", 'a+') do |file|
    file.sync = true
    file.write(msgpack)
  end
end