Class: Fluent::Plugin::TreasureDataLogOutput
- Inherits:
-
Output
- Object
- Output
- Fluent::Plugin::TreasureDataLogOutput
- Defined in:
- lib/fluent/plugin/out_tdlog.rb
Constant Summary collapse
- IMPORT_SIZE_LIMIT =
32 * 1024 * 1024
- IMPORT_RECORDS_LIMIT =
8096- UPLOAD_EXT =
'msgpack.gz'.freeze
Instance Method Summary collapse
- #check_table_exists(key) ⇒ Object
- #configure(conf) ⇒ Object
- #ensure_database_and_table(database, table) ⇒ Object
- #format(tag, time, record) ⇒ Object
- #formatted_to_msgpack_binary ⇒ Object
- #gzip_by_command(chunk, tmp) ⇒ Object
- #gzip_by_writer(chunk, tmp) ⇒ Object
-
#initialize ⇒ TreasureDataLogOutput
constructor
A new instance of TreasureDataLogOutput.
- #multi_workers_ready? ⇒ Boolean
- #start ⇒ Object
- #summarize_record(record) ⇒ Object
- #upload(database, table, io, size, unique_id) ⇒ Object
- #validate_database_and_table_name(database, table) ⇒ Object
- #write(chunk) ⇒ Object
Constructor Details
#initialize ⇒ TreasureDataLogOutput
Returns a new instance of TreasureDataLogOutput.
44 45 46 47 48 49 50 51 52 |
# File 'lib/fluent/plugin/out_tdlog.rb', line 44 def initialize super @key = nil @key_num_limit = 512 @record_size_limit = 32 * 1024 * 1024 @table_list = {} @empty_gz_data = TreasureData::API.create_empty_gz_data @user_agent = "fluent-plugin-td: #{TreasureDataPlugin::VERSION}".freeze end |
Instance Method Details
#check_table_exists(key) ⇒ Object
237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 |
# File 'lib/fluent/plugin/out_tdlog.rb', line 237 def check_table_exists(key) unless @table_list.has_key?(key) database, table = key.split('.', 2) log.debug "checking whether table '#{database}.#{table}' exists on Treasure Data" io = StringIO.new(@empty_gz_data) begin @client.import(database, table, UPLOAD_EXT, io, io.size) @table_list[key] = true rescue TreasureData::NotFoundError raise "Table #{key.inspect} does not exist on Treasure Data. Use 'td table:create #{database} #{table}' to create it." rescue => e log.warn "failed to check existence of '#{database}.#{table}' table on Treasure Data", :error => e.inspect log.debug_backtrace e.backtrace end end end |
#configure(conf) ⇒ Object
54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 |
# File 'lib/fluent/plugin/out_tdlog.rb', line 54 def configure(conf) compat_parameters_convert(conf, :buffer, default_chunk_key: 'tag') super if @use_gzip_command require 'open3' begin Open3.capture3("gzip -V") rescue Errno::ENOENT raise ConfigError, "'gzip' utility must be in PATH for use_gzip_command parameter" end end FileUtils.mkdir_p(@tmpdir) if @tmpdir if @database && @table validate_database_and_table_name(@database, @table) @key = "#{@database}.#{@table}" else unless @chunk_key_tag raise Fluent::ConfigError, "'tag' must be included in <buffer ARG> when database and table are not specified" end end end |
#ensure_database_and_table(database, table) ⇒ Object
267 268 269 270 271 272 273 274 275 276 277 |
# File 'lib/fluent/plugin/out_tdlog.rb', line 267 def ensure_database_and_table(database, table) log.info "Creating table #{database}.#{table} on TreasureData" begin @api_client.create_log_table(database, table) rescue TreasureData::NotFoundError @api_client.create_database(database) @api_client.create_log_table(database, table) rescue TreasureData::AlreadyExistsError # ignored end end |
#format(tag, time, record) ⇒ Object
107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 |
# File 'lib/fluent/plugin/out_tdlog.rb', line 107 def format(tag, time, record) begin record['time'] = time.to_i record.delete(:time) if record.has_key?(:time) if record.size > @key_num_limit router.emit_error_event(tag, time, record, RuntimeError.new("too many number of keys (#{record.size} keys)")) return nil end rescue => e router.emit_error_event(tag, time, {'record' => record}, RuntimeError.new("skipped a broken record: #{e}")) return nil end begin result = record.to_msgpack rescue RangeError result = TreasureData::API.normalized_msgpack(record) rescue => e router.emit_error_event(tag, time, {'record' => record}, RuntimeError.new("can't convert record to msgpack: #{e}")) return nil end if result.bytesize > @record_size_limit # Don't raise error. Large size is not critical for streaming import log.warn "Size of a record too large (#{result.bytesize} bytes): #{summarize_record(record)}" end result end |
#formatted_to_msgpack_binary ⇒ Object
103 104 105 |
# File 'lib/fluent/plugin/out_tdlog.rb', line 103 def formatted_to_msgpack_binary true end |
#gzip_by_command(chunk, tmp) ⇒ Object
173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 |
# File 'lib/fluent/plugin/out_tdlog.rb', line 173 def gzip_by_command(chunk, tmp) chunk_is_file = @buffer_config['@type'] == 'file' path = if chunk_is_file chunk.path else w = Tempfile.new("gzip-tdlog-#{chunk.metadata.tag}-", @tmpdir) w.binmode chunk.write_to(w) w.close w.path end res = system "gzip -c #{path} > #{tmp.path}" unless res log.warn "failed to execute gzip command. Fallback to GzipWriter. status = #{$?}" tmp.truncate(0) return gzip_by_writer(chunk, tmp) end File.size(tmp.path) ensure unless chunk_is_file w.close(true) rescue nil end end |
#gzip_by_writer(chunk, tmp) ⇒ Object
197 198 199 200 201 202 203 204 205 206 207 |
# File 'lib/fluent/plugin/out_tdlog.rb', line 197 def gzip_by_writer(chunk, tmp) w = Zlib::GzipWriter.new(tmp) chunk.write_to(w) w.finish w = nil tmp.pos ensure if w w.close rescue nil end end |
#multi_workers_ready? ⇒ Boolean
99 100 101 |
# File 'lib/fluent/plugin/out_tdlog.rb', line 99 def multi_workers_ready? true end |
#start ⇒ Object
81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 |
# File 'lib/fluent/plugin/out_tdlog.rb', line 81 def start super client_opts = { :ssl => @use_ssl, :http_proxy => @http_proxy, :user_agent => @user_agent, :connect_timeout => @connect_timeout, :read_timeout => @read_timeout, :send_timeout => @send_timeout } @client = TreasureData::Client.new(@apikey, client_opts.merge({:endpoint => @import_endpoint})) @api_client = TreasureData::Client.new(@apikey, client_opts.merge({:endpoint => @api_endpoint})) if @key if @auto_create_table ensure_database_and_table(@database, @table) else check_table_exists(@key) end end end |
#summarize_record(record) ⇒ Object
138 139 140 141 142 143 144 145 |
# File 'lib/fluent/plugin/out_tdlog.rb', line 138 def summarize_record(record) json = Yajl.dump(record) if json.size > 100 json[0..97] + "..." else json end end |
#upload(database, table, io, size, unique_id) ⇒ Object
209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 |
# File 'lib/fluent/plugin/out_tdlog.rb', line 209 def upload(database, table, io, size, unique_id) unique_str = unique_id.unpack('C*').map { |x| "%02x" % x }.join log.trace { "uploading logs to Treasure Data database=#{database} table=#{table} (#{size}bytes)" } start = Time.now begin begin @client.import(database, table, UPLOAD_EXT, io, size, unique_str) rescue TreasureData::NotFoundError unless @auto_create_table raise end ensure_database_and_table(database, table) io.pos = 0 retry end rescue TreasureData::TooManyRequestsError raise rescue TreasureData::ClientError => e raise Fluent::UnrecoverableError.new(e.) rescue => e elapsed = Time.now - start ne = RuntimeError.new("Failed to upload to Treasure Data '#{database}.#{table}' table: #{e.inspect} (#{size} bytes; #{elapsed} seconds)") ne.set_backtrace(e.backtrace) raise ne end end |
#validate_database_and_table_name(database, table) ⇒ Object
254 255 256 257 258 259 260 261 262 263 264 265 |
# File 'lib/fluent/plugin/out_tdlog.rb', line 254 def validate_database_and_table_name(database, table) begin TreasureData::API.validate_database_name(database) rescue => e raise ConfigError, "Invalid database name #{database.inspect}: #{e}" end begin TreasureData::API.validate_table_name(table) rescue => e raise ConfigError, "Invalid table name #{table.inspect}: #{e}" end end |
#write(chunk) ⇒ Object
147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 |
# File 'lib/fluent/plugin/out_tdlog.rb', line 147 def write(chunk) unique_id = chunk.unique_id if @key database, table = @database, @table else database, table = chunk..tag.split('.')[-2, 2] database = TreasureData::API.normalize_database_name(database) table = TreasureData::API.normalize_table_name(table) end FileUtils.mkdir_p(@tmpdir) unless @tmpdir.nil? f = Tempfile.new("tdlog-#{chunk.metadata.tag}-", @tmpdir) f.binmode size = if @use_gzip_command gzip_by_command(chunk, f) else gzip_by_writer(chunk, f) end f.pos = 0 upload(database, table, f, size, unique_id) ensure f.close(true) if f end |