Module: Fluent::TDPluginUtil

Defined in:
lib/fluent/plugin/td_plugin_util.rb

Instance Method Summary collapse

Instance Method Details

#check_table_existence(database, table) ⇒ Object



41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
# File 'lib/fluent/plugin/td_plugin_util.rb', line 41

def check_table_existence(database, table)
  @table_list ||= {}
  key = "#{database}.#{table}"
  unless @table_list.has_key?(key)
    log.debug "checking whether table '#{key}' exists on Treasure Data"
    io = StringIO.new(@empty_gz_data)
    begin
      # here doesn't check whether target table is item table or not because import-only user can't read the table status.
      # So I use empty import request to check table existence.
      @client.import(database, table, "msgpack.gz", io, io.size)
      @table_list[key] = true
    rescue TreasureData::NotFoundError
      args = self.class == TreasureDataItemOutput ? ' -t item' : ''
      raise "Table #{key.inspect} does not exist on Treasure Data. Use 'td table:create #{database} #{table}#{args}' to create it."
    rescue => e
      log.warn "failed to check table existence on Treasure Data", :error => e.inspect
      log.debug_backtrace e
    end
  end
end

#parse_bool_parameter(param) ⇒ Object



22
23
24
25
26
27
28
29
30
# File 'lib/fluent/plugin/td_plugin_util.rb', line 22

def parse_bool_parameter(param)
  if param.empty?
    true
  else
    param = Config.bool_value(param)
    raise ConfigError, "'true' or 'false' is required for #{key} option on tdlog output" if param.nil?
    param
  end
end

#summarize_record(record) ⇒ Object



32
33
34
35
36
37
38
39
# File 'lib/fluent/plugin/td_plugin_util.rb', line 32

def summarize_record(record)
  json = record.to_json
  if json.size > 100
    json[0..97] + "..."
  else
    json
  end
end

#upload(database, table, io, size, unique_id) ⇒ Object

assume @client and @auto_create_table variable exist



83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
# File 'lib/fluent/plugin/td_plugin_util.rb', line 83

def upload(database, table, io, size, unique_id)
  unique_str = unique_id.unpack('C*').map {|x| "%02x" % x }.join
  log.trace { "uploading logs to Treasure Data database=#{database} table=#{table} (#{size}bytes)" }

  begin
    begin
      start = Time.now
      @client.import(database, table, "msgpack.gz", io, size, unique_str)
    rescue TreasureData::NotFoundError => e
      unless @auto_create_table
        raise e
      end
      ensure_database_and_table(database, table)
      io.pos = 0
      retry
    end
  rescue => e
    elapsed = Time.now - start
    ne = RuntimeError.new("Failed to upload to TreasureData: #{e} (#{size} bytes; #{elapsed} seconds)")
    ne.set_backtrace(e.backtrace)
    raise ne
  end
end

#validate_database_and_table_name(database, table, conf) ⇒ Object



9
10
11
12
13
14
15
16
17
18
19
20
# File 'lib/fluent/plugin/td_plugin_util.rb', line 9

def validate_database_and_table_name(database, table, conf)
  begin
    TreasureData::API.validate_database_name(database)
  rescue => e
    raise ConfigError, "Invalid database name #{database.inspect}: #{e}: #{conf}"
  end
  begin
    TreasureData::API.validate_table_name(table)
  rescue => e
    raise ConfigError, "Invalid table name #{table.inspect}: #{e}: #{conf}"
  end
end

#write(chunk) ⇒ Object



62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
# File 'lib/fluent/plugin/td_plugin_util.rb', line 62

def write(chunk)
  unique_id = chunk.unique_id
  database, table = chunk.key.split('.', 2)

  FileUtils.mkdir_p(@tmpdir) unless @tmpdir.nil?
  f = Tempfile.new(@tmpdir_prefix, @tmpdir)
  w = Zlib::GzipWriter.new(f)

  chunk.write_to(w)
  w.finish
  w = nil

  size = f.pos
  f.pos = 0
  upload(database, table, f, size, unique_id)
ensure
  w.close if w
  f.close if f
end