Class: Fluent::Plugin::TreasureDataLogOutput
- Inherits:
-
Output
- Object
- Output
- Fluent::Plugin::TreasureDataLogOutput
- Defined in:
- lib/fluent/plugin/out_tdlog.rb
Constant Summary collapse
- IMPORT_SIZE_LIMIT =
32 * 1024 * 1024
- UPLOAD_EXT =
'msgpack.gz'.freeze
Instance Method Summary collapse
- #check_table_exists(key) ⇒ Object
- #configure(conf) ⇒ Object
- #ensure_database_and_table(database, table) ⇒ Object
- #format(tag, time, record) ⇒ Object
- #formatted_to_msgpack_binary ⇒ Object
-
#gzip_by_command(chunk, tmp) ⇒ Object
TODO: Share this routine with s3 compressors.
- #gzip_by_writer(chunk, tmp) ⇒ Object
-
#initialize ⇒ TreasureDataLogOutput
constructor
A new instance of TreasureDataLogOutput.
- #multi_workers_ready? ⇒ Boolean
- #start ⇒ Object
- #summarize_record(record) ⇒ Object
- #upload(database, table, io, size, unique_id) ⇒ Object
- #validate_database_and_table_name(database, table) ⇒ Object
- #write(chunk) ⇒ Object
Constructor Details
#initialize ⇒ TreasureDataLogOutput
Returns a new instance of TreasureDataLogOutput.
41 42 43 44 45 46 47 48 49 |
# File 'lib/fluent/plugin/out_tdlog.rb', line 41 def initialize super @key = nil @key_num_limit = 512 # TODO: Our one-time import has the restriction about the number of record keys. @record_size_limit = 32 * 1024 * 1024 # TODO @table_list = {} @empty_gz_data = TreasureData::API.create_empty_gz_data @user_agent = "fluent-plugin-td: #{TreasureDataPlugin::VERSION}".freeze end |
Instance Method Details
#check_table_exists(key) ⇒ Object
234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 |
# File 'lib/fluent/plugin/out_tdlog.rb', line 234 def check_table_exists(key) unless @table_list.has_key?(key) database, table = key.split('.', 2) log.debug "checking whether table '#{database}.#{table}' exists on Treasure Data" io = StringIO.new(@empty_gz_data) begin @client.import(database, table, UPLOAD_EXT, io, io.size) @table_list[key] = true rescue TreasureData::NotFoundError raise "Table #{key.inspect} does not exist on Treasure Data. Use 'td table:create #{database} #{table}' to create it." rescue => e log.warn "failed to check existence of '#{database}.#{table}' table on Treasure Data", :error => e.inspect log.debug_backtrace e.backtrace end end end |
#configure(conf) ⇒ Object
51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 |
# File 'lib/fluent/plugin/out_tdlog.rb', line 51 def configure(conf) compat_parameters_convert(conf, :buffer, default_chunk_key: 'tag') super if @use_gzip_command require 'open3' begin Open3.capture3("gzip -V") rescue Errno::ENOENT raise ConfigError, "'gzip' utility must be in PATH for use_gzip_command parameter" end end FileUtils.mkdir_p(@tmpdir) if @tmpdir if @database && @table validate_database_and_table_name(@database, @table) @key = "#{@database}.#{@table}" else unless @chunk_key_tag raise Fluent::ConfigError, "'tag' must be included in <buffer ARG> when database and table are not specified" end end end |
#ensure_database_and_table(database, table) ⇒ Object
264 265 266 267 268 269 270 271 272 273 |
# File 'lib/fluent/plugin/out_tdlog.rb', line 264 def ensure_database_and_table(database, table) log.info "Creating table #{database}.#{table} on TreasureData" begin @api_client.create_log_table(database, table) rescue TreasureData::NotFoundError @api_client.create_database(database) @api_client.create_log_table(database, table) rescue TreasureData::AlreadyExistsError end end |
#format(tag, time, record) ⇒ Object
104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 |
# File 'lib/fluent/plugin/out_tdlog.rb', line 104 def format(tag, time, record) begin record['time'] = time.to_i record.delete(:time) if record.has_key?(:time) if record.size > @key_num_limit # TODO include summary of the record router.emit_error_event(tag, time, record, RuntimeError.new("too many number of keys (#{record.size} keys)")) return nil end rescue => e router.emit_error_event(tag, time, {'record' => record}, RuntimeError.new("skipped a broken record: #{e}")) return nil end begin result = record.to_msgpack rescue RangeError result = TreasureData::API.normalized_msgpack(record) rescue => e router.emit_error_event(tag, time, {'record' => record}, RuntimeError.new("can't convert record to msgpack: #{e}")) return nil end if result.bytesize > @record_size_limit # Don't raise error. Large size is not critical for streaming import log.warn "Size of a record too large (#{result.bytesize} bytes): #{summarize_record(record)}" end result end |
#formatted_to_msgpack_binary ⇒ Object
100 101 102 |
# File 'lib/fluent/plugin/out_tdlog.rb', line 100 def formatted_to_msgpack_binary true end |
#gzip_by_command(chunk, tmp) ⇒ Object
TODO: Share this routine with s3 compressors
172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 |
# File 'lib/fluent/plugin/out_tdlog.rb', line 172 def gzip_by_command(chunk, tmp) chunk_is_file = @buffer_config['@type'] == 'file' path = if chunk_is_file chunk.path else w = Tempfile.new("gzip-tdlog-#{chunk.metadata.tag}-", @tmpdir) w.binmode chunk.write_to(w) w.close w.path end res = system "gzip -c #{path} > #{tmp.path}" unless res log.warn "failed to execute gzip command. Fallback to GzipWriter. status = #{$?}" begin tmp.truncate(0) return gzip_by_writer(chunk, tmp) end end File.size(tmp.path) ensure unless chunk_is_file w.close(true) rescue nil end end |
#gzip_by_writer(chunk, tmp) ⇒ Object
198 199 200 201 202 203 204 205 206 207 208 |
# File 'lib/fluent/plugin/out_tdlog.rb', line 198 def gzip_by_writer(chunk, tmp) w = Zlib::GzipWriter.new(tmp) chunk.write_to(w) w.finish w = nil tmp.pos ensure if w w.close rescue nil end end |
#multi_workers_ready? ⇒ Boolean
96 97 98 |
# File 'lib/fluent/plugin/out_tdlog.rb', line 96 def multi_workers_ready? true end |
#start ⇒ Object
78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 |
# File 'lib/fluent/plugin/out_tdlog.rb', line 78 def start super client_opts = { :ssl => @use_ssl, :http_proxy => @http_proxy, :user_agent => @user_agent, :connect_timeout => @connect_timeout, :read_timeout => @read_timeout, :send_timeout => @send_timeout } @client = TreasureData::Client.new(@apikey, client_opts.merge({:endpoint => @import_endpoint})) @api_client = TreasureData::Client.new(@apikey, client_opts.merge({:endpoint => @api_endpoint})) if @key if @auto_create_table ensure_database_and_table(@database, @table) else check_table_exists(@key) end end end |
#summarize_record(record) ⇒ Object
136 137 138 139 140 141 142 143 |
# File 'lib/fluent/plugin/out_tdlog.rb', line 136 def summarize_record(record) json = Yajl.dump(record) if json.size > 100 json[0..97] + "..." else json end end |
#upload(database, table, io, size, unique_id) ⇒ Object
210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 |
# File 'lib/fluent/plugin/out_tdlog.rb', line 210 def upload(database, table, io, size, unique_id) unique_str = unique_id.unpack('C*').map { |x| "%02x" % x }.join log.trace { "uploading logs to Treasure Data database=#{database} table=#{table} (#{size}bytes)" } begin begin start = Time.now @client.import(database, table, UPLOAD_EXT, io, size, unique_str) rescue TreasureData::NotFoundError => e unless @auto_create_table raise e end ensure_database_and_table(database, table) io.pos = 0 retry end rescue => e elapsed = Time.now - start ne = RuntimeError.new("Failed to upload to Treasure Data '#{database}.#{table}' table: #{e.inspect} (#{size} bytes; #{elapsed} seconds)") ne.set_backtrace(e.backtrace) raise ne end end |
#validate_database_and_table_name(database, table) ⇒ Object
251 252 253 254 255 256 257 258 259 260 261 262 |
# File 'lib/fluent/plugin/out_tdlog.rb', line 251 def validate_database_and_table_name(database, table) begin TreasureData::API.validate_database_name(database) rescue => e raise ConfigError, "Invalid database name #{database.inspect}: #{e}" end begin TreasureData::API.validate_table_name(table) rescue => e raise ConfigError, "Invalid table name #{table.inspect}: #{e}" end end |
#write(chunk) ⇒ Object
145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 |
# File 'lib/fluent/plugin/out_tdlog.rb', line 145 def write(chunk) unique_id = chunk.unique_id if @key database, table = @database, @table else database, table = chunk..tag.split('.')[-2, 2] database = TreasureData::API.normalize_database_name(database) table = TreasureData::API.normalize_table_name(table) end FileUtils.mkdir_p(@tmpdir) unless @tmpdir.nil? f = Tempfile.new("tdlog-#{chunk.metadata.tag}-", @tmpdir) f.binmode size = if @use_gzip_command gzip_by_command(chunk, f) else gzip_by_writer(chunk, f) end f.pos = 0 upload(database, table, f, size, unique_id) ensure f.close(true) if f end |