Class: Fluent::TreasureDataLogOutput
- Inherits:
-
BufferedOutput
- Object
- BufferedOutput
- Fluent::TreasureDataLogOutput
- Defined in:
- lib/fluent/plugin/out_tdlog.rb
Constant Summary collapse
- IMPORT_SIZE_LIMIT =
32 * 1024 * 1024
Instance Method Summary collapse
- #check_table_exists(key) ⇒ Object
- #configure(conf) ⇒ Object
- #emit(tag, es, chain) ⇒ Object
- #ensure_database_and_table(database, table) ⇒ Object
- #format_stream(tag, es) ⇒ Object
-
#gzip_by_command(chunk, tmp) ⇒ Object
TODO: Share this routine with s3 compressors.
- #gzip_by_writer(chunk, tmp) ⇒ Object
-
#initialize ⇒ TreasureDataLogOutput
constructor
A new instance of TreasureDataLogOutput.
- #start ⇒ Object
- #summarize_record(record) ⇒ Object
- #upload(database, table, io, size, unique_id) ⇒ Object
- #validate_database_and_table_name(database, table, conf) ⇒ Object
- #write(chunk) ⇒ Object
Constructor Details
#initialize ⇒ TreasureDataLogOutput
Returns a new instance of TreasureDataLogOutput.
27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 |
# File 'lib/fluent/plugin/out_tdlog.rb', line 27 def initialize require 'fileutils' require 'tempfile' require 'zlib' require 'net/http' require 'json' require 'cgi' # CGI.escape require 'time' # Time#rfc2822 require 'digest/md5' require 'stringio' super @tmpdir = nil @key = nil @key_num_limit = 512 # TODO: Our one-time import has the restriction about the number of record keys. @record_size_limit = 32 * 1024 * 1024 # TODO @table_list = {} @empty_gz_data = TreasureData::API.create_empty_gz_data @user_agent = "fluent-plugin-td: #{TreasureDataPlugin::VERSION}".freeze end |
Instance Method Details
#check_table_exists(key) ⇒ Object
255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 |
# File 'lib/fluent/plugin/out_tdlog.rb', line 255 def check_table_exists(key) unless @table_list.has_key?(key) database, table = key.split('.', 2) log.debug "checking whether table '#{database}.#{table}' exists on Treasure Data" io = StringIO.new(@empty_gz_data) begin @client.import(database, table, "msgpack.gz", io, io.size) @table_list[key] = true rescue TreasureData::NotFoundError raise "Table #{key.inspect} does not exist on Treasure Data. Use 'td table:create #{database} #{table}' to create it." rescue => e log.warn "failed to check existence of '#{database}.#{table}' table on Treasure Data", :error => e.inspect log.debug_backtrace e.backtrace end end end |
#configure(conf) ⇒ Object
47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 |
# File 'lib/fluent/plugin/out_tdlog.rb', line 47 def configure(conf) # overwrite default value of buffer_chunk_limit unless conf.has_key?('buffer_chunk_limit') conf['buffer_chunk_limit'] = IMPORT_SIZE_LIMIT end # v0.14 seems to have a bug of config_set_default: https://github.com/treasure-data/fluent-plugin-td/pull/22#issuecomment-230782005 unless conf.has_key?('buffer_type') conf['buffer_type'] = 'file' end super if @use_gzip_command require 'open3' begin Open3.capture3("gzip -V") rescue Errno::ENOENT raise ConfigError, "'gzip' utility must be in PATH for use_gzip_command parameter" end end if conf.has_key?('tmpdir') @tmpdir = conf['tmpdir'] FileUtils.mkdir_p(@tmpdir) end database = conf['database'] table = conf['table'] if database && table validate_database_and_table_name(database, table, conf) @key = "#{database}.#{table}" end @http_proxy = conf['http_proxy'] end |
#emit(tag, es, chain) ⇒ Object
104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 |
# File 'lib/fluent/plugin/out_tdlog.rb', line 104 def emit(tag, es, chain) if @key key = @key else database, table = tag.split('.')[-2,2] database = TreasureData::API.normalize_database_name(database) table = TreasureData::API.normalize_table_name(table) key = "#{database}.#{table}" end unless @auto_create_table check_table_exists(key) end super(tag, es, chain, key) end |
#ensure_database_and_table(database, table) ⇒ Object
285 286 287 288 289 290 291 292 293 294 |
# File 'lib/fluent/plugin/out_tdlog.rb', line 285 def ensure_database_and_table(database, table) log.info "Creating table #{database}.#{table} on TreasureData" begin @client.create_log_table(database, table) rescue TreasureData::NotFoundError @client.create_database(database) @client.create_log_table(database, table) rescue TreasureData::AlreadyExistsError end end |
#format_stream(tag, es) ⇒ Object
121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 |
# File 'lib/fluent/plugin/out_tdlog.rb', line 121 def format_stream(tag, es) out = $use_msgpack_5 ? MessagePack::Buffer.new : ''.force_encoding('ASCII-8BIT') # this condition will be removed after removed msgpack v0.4 support off = out.size # size is same as bytesize in ASCII-8BIT string es.each { |time, record| # Applications may send non-hash record or broken chunk may generate non-hash record so such records should be skipped next unless record.is_a?(Hash) begin record['time'] = time record.delete(:time) if record.has_key?(:time) if record.size > @key_num_limit raise "Too many number of keys (#{record.size} keys)" # TODO include summary of the record end rescue => e # TODO (a) Remove the transaction mechanism of fluentd # or (b) keep transaction boundaries in in/out_forward. # This code disables the transaction mechanism (a). log.warn "Skipped a broken record (#{e}): #{summarize_record(record)}" log.warn_backtrace e.backtrace next end begin record.to_msgpack(out) rescue RangeError # In msgpack v0.5, 'out' becomes String, not Buffer. This is not a problem because Buffer has a compatibility with String out = out.to_s[0, off] TreasureData::API.normalized_msgpack(record, out) end noff = out.size sz = noff - off if sz > @record_size_limit # TODO don't raise error #raise "Size of a record too large (#{sz} bytes)" # TODO include summary of the record log.warn "Size of a record too large (#{sz} bytes): #{summarize_record(record)}" end off = noff } out.to_s end |
#gzip_by_command(chunk, tmp) ⇒ Object
TODO: Share this routine with s3 compressors
193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 |
# File 'lib/fluent/plugin/out_tdlog.rb', line 193 def gzip_by_command(chunk, tmp) chunk_is_file = @buffer_type == 'file' path = if chunk_is_file chunk.path else w = Tempfile.new("gzip-tdlog-#{chunk.key}-", @tmpdir) w.binmode chunk.write_to(w) w.close w.path end res = system "gzip -c #{path} > #{tmp.path}" unless res log.warn "failed to execute gzip command. Fallback to GzipWriter. status = #{$?}" begin tmp.truncate(0) return gzip_by_writer(chunk, tmp) end end File.size(tmp.path) ensure unless chunk_is_file w.close(true) rescue nil end end |
#gzip_by_writer(chunk, tmp) ⇒ Object
219 220 221 222 223 224 225 226 227 228 229 |
# File 'lib/fluent/plugin/out_tdlog.rb', line 219 def gzip_by_writer(chunk, tmp) w = Zlib::GzipWriter.new(tmp) chunk.write_to(w) w.finish w = nil tmp.pos ensure if w w.close rescue nil end end |
#start ⇒ Object
85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 |
# File 'lib/fluent/plugin/out_tdlog.rb', line 85 def start super client_opts = { :ssl => @use_ssl, :http_proxy => @http_proxy, :user_agent => @user_agent, :endpoint => @endpoint, :connect_timeout => @connect_timeout, :read_timeout => @read_timeout, :send_timeout => @send_timeout } @client = TreasureData::Client.new(@apikey, client_opts) if @key if @auto_create_table database, table = @key.split('.',2) ensure_database_and_table(database, table) else check_table_exists(@key) end end end |
#summarize_record(record) ⇒ Object
164 165 166 167 168 169 170 171 |
# File 'lib/fluent/plugin/out_tdlog.rb', line 164 def summarize_record(record) json = Yajl.dump(record) if json.size > 100 json[0..97] + "..." else json end end |
#upload(database, table, io, size, unique_id) ⇒ Object
231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 |
# File 'lib/fluent/plugin/out_tdlog.rb', line 231 def upload(database, table, io, size, unique_id) unique_str = unique_id.unpack('C*').map { |x| "%02x" % x }.join log.trace { "uploading logs to Treasure Data database=#{database} table=#{table} (#{size}bytes)" } begin begin start = Time.now @client.import(database, table, "msgpack.gz", io, size, unique_str) rescue TreasureData::NotFoundError => e unless @auto_create_table raise e end ensure_database_and_table(database, table) io.pos = 0 retry end rescue => e elapsed = Time.now - start ne = RuntimeError.new("Failed to upload to Treasure Data '#{database}.#{table}' table: #{e.inspect} (#{size} bytes; #{elapsed} seconds)") ne.set_backtrace(e.backtrace) raise ne end end |
#validate_database_and_table_name(database, table, conf) ⇒ Object
272 273 274 275 276 277 278 279 280 281 282 283 |
# File 'lib/fluent/plugin/out_tdlog.rb', line 272 def validate_database_and_table_name(database, table, conf) begin TreasureData::API.validate_database_name(database) rescue => e raise ConfigError, "Invalid database name #{database.inspect}: #{e}: #{conf}" end begin TreasureData::API.validate_table_name(table) rescue => e raise ConfigError, "Invalid table name #{table.inspect}: #{e}: #{conf}" end end |
#write(chunk) ⇒ Object
173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 |
# File 'lib/fluent/plugin/out_tdlog.rb', line 173 def write(chunk) unique_id = chunk.unique_id database, table = chunk.key.split('.', 2) FileUtils.mkdir_p(@tmpdir) unless @tmpdir.nil? f = Tempfile.new("tdlog-#{chunk.key}-", @tmpdir) f.binmode size = if @use_gzip_command gzip_by_command(chunk, f) else gzip_by_writer(chunk, f) end f.pos = 0 upload(database, table, f, size, unique_id) ensure f.close(true) if f end |