Class: Fluent::Plugin::TreasureDataLogOutput
- Inherits:
-
Output
- Object
- Output
- Fluent::Plugin::TreasureDataLogOutput
- Defined in:
- lib/fluent/plugin/out_tdlog.rb
Constant Summary collapse
- IMPORT_SIZE_LIMIT =
32 * 1024 * 1024
- UPLOAD_EXT =
'msgpack.gz'.freeze
Instance Method Summary collapse
- #check_table_exists(key) ⇒ Object
- #configure(conf) ⇒ Object
- #ensure_database_and_table(database, table) ⇒ Object
- #format(tag, time, record) ⇒ Object
- #formatted_to_msgpack_binary ⇒ Object
-
#gzip_by_command(chunk, tmp) ⇒ Object
TODO: Share this routine with s3 compressors.
- #gzip_by_writer(chunk, tmp) ⇒ Object
-
#initialize ⇒ TreasureDataLogOutput
constructor
A new instance of TreasureDataLogOutput.
- #multi_workers_ready? ⇒ Boolean
- #start ⇒ Object
- #summarize_record(record) ⇒ Object
- #upload(database, table, io, size, unique_id) ⇒ Object
- #validate_database_and_table_name(database, table) ⇒ Object
- #write(chunk) ⇒ Object
Constructor Details
#initialize ⇒ TreasureDataLogOutput
Returns a new instance of TreasureDataLogOutput.
40 41 42 43 44 45 46 47 48 |
# File 'lib/fluent/plugin/out_tdlog.rb', line 40 def initialize super @key = nil @key_num_limit = 512 # TODO: Our one-time import has the restriction about the number of record keys. @record_size_limit = 32 * 1024 * 1024 # TODO @table_list = {} @empty_gz_data = TreasureData::API.create_empty_gz_data @user_agent = "fluent-plugin-td: #{TreasureDataPlugin::VERSION}".freeze end |
Instance Method Details
#check_table_exists(key) ⇒ Object
233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 |
# File 'lib/fluent/plugin/out_tdlog.rb', line 233 def check_table_exists(key) unless @table_list.has_key?(key) database, table = key.split('.', 2) log.debug "checking whether table '#{database}.#{table}' exists on Treasure Data" io = StringIO.new(@empty_gz_data) begin @client.import(database, table, UPLOAD_EXT, io, io.size) @table_list[key] = true rescue TreasureData::NotFoundError raise "Table #{key.inspect} does not exist on Treasure Data. Use 'td table:create #{database} #{table}' to create it." rescue => e log.warn "failed to check existence of '#{database}.#{table}' table on Treasure Data", :error => e.inspect log.debug_backtrace e.backtrace end end end |
#configure(conf) ⇒ Object
50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 |
# File 'lib/fluent/plugin/out_tdlog.rb', line 50 def configure(conf) compat_parameters_convert(conf, :buffer, default_chunk_key: 'tag') super if @use_gzip_command require 'open3' begin Open3.capture3("gzip -V") rescue Errno::ENOENT raise ConfigError, "'gzip' utility must be in PATH for use_gzip_command parameter" end end FileUtils.mkdir_p(@tmpdir) if @tmpdir if @database && @table validate_database_and_table_name(@database, @table) @key = "#{@database}.#{@table}" else unless @chunk_key_tag raise Fluent::ConfigError, "'tag' must be included in <buffer ARG> when database and table are not specified" end end end |
#ensure_database_and_table(database, table) ⇒ Object
263 264 265 266 267 268 269 270 271 272 |
# File 'lib/fluent/plugin/out_tdlog.rb', line 263 def ensure_database_and_table(database, table) log.info "Creating table #{database}.#{table} on TreasureData" begin @client.create_log_table(database, table) rescue TreasureData::NotFoundError @client.create_database(database) @client.create_log_table(database, table) rescue TreasureData::AlreadyExistsError end end |
#format(tag, time, record) ⇒ Object
103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 |
# File 'lib/fluent/plugin/out_tdlog.rb', line 103 def format(tag, time, record) begin record['time'] = time.to_i record.delete(:time) if record.has_key?(:time) if record.size > @key_num_limit # TODO include summary of the record router.emit_error_event(tag, time, record, RuntimeError.new("too many number of keys (#{record.size} keys)")) return nil end rescue => e router.emit_error_event(tag, time, {'record' => record}, RuntimeError.new("skipped a broken record: #{e}")) return nil end begin result = record.to_msgpack rescue RangeError result = TreasureData::API.normalized_msgpack(record) rescue => e router.emit_error_event(tag, time, {'record' => record}, RuntimeError.new("can't convert record to msgpack: #{e}")) return nil end if result.bytesize > @record_size_limit # Don't raise error. Large size is not critical for streaming import log.warn "Size of a record too large (#{result.bytesize} bytes): #{summarize_record(record)}" end result end |
#formatted_to_msgpack_binary ⇒ Object
99 100 101 |
# File 'lib/fluent/plugin/out_tdlog.rb', line 99 def formatted_to_msgpack_binary true end |
#gzip_by_command(chunk, tmp) ⇒ Object
TODO: Share this routine with s3 compressors
171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 |
# File 'lib/fluent/plugin/out_tdlog.rb', line 171 def gzip_by_command(chunk, tmp) chunk_is_file = @buffer_config['@type'] == 'file' path = if chunk_is_file chunk.path else w = Tempfile.new("gzip-tdlog-#{chunk..tag}-", @tmpdir) w.binmode chunk.write_to(w) w.close w.path end res = system "gzip -c #{path} > #{tmp.path}" unless res log.warn "failed to execute gzip command. Fallback to GzipWriter. status = #{$?}" begin tmp.truncate(0) return gzip_by_writer(chunk, tmp) end end File.size(tmp.path) ensure unless chunk_is_file w.close(true) rescue nil end end |
#gzip_by_writer(chunk, tmp) ⇒ Object
197 198 199 200 201 202 203 204 205 206 207 |
# File 'lib/fluent/plugin/out_tdlog.rb', line 197 def gzip_by_writer(chunk, tmp) w = Zlib::GzipWriter.new(tmp) chunk.write_to(w) w.finish w = nil tmp.pos ensure if w w.close rescue nil end end |
#multi_workers_ready? ⇒ Boolean
95 96 97 |
# File 'lib/fluent/plugin/out_tdlog.rb', line 95 def multi_workers_ready? true end |
#start ⇒ Object
77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 |
# File 'lib/fluent/plugin/out_tdlog.rb', line 77 def start super client_opts = { :ssl => @use_ssl, :http_proxy => @http_proxy, :user_agent => @user_agent, :endpoint => @endpoint, :connect_timeout => @connect_timeout, :read_timeout => @read_timeout, :send_timeout => @send_timeout } @client = TreasureData::Client.new(@apikey, client_opts) if @key if @auto_create_table ensure_database_and_table(@database, @table) else check_table_exists(@key) end end end |
#summarize_record(record) ⇒ Object
135 136 137 138 139 140 141 142 |
# File 'lib/fluent/plugin/out_tdlog.rb', line 135 def summarize_record(record) json = Yajl.dump(record) if json.size > 100 json[0..97] + "..." else json end end |
#upload(database, table, io, size, unique_id) ⇒ Object
209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 |
# File 'lib/fluent/plugin/out_tdlog.rb', line 209 def upload(database, table, io, size, unique_id) unique_str = unique_id.unpack('C*').map { |x| "%02x" % x }.join log.trace { "uploading logs to Treasure Data database=#{database} table=#{table} (#{size}bytes)" } begin begin start = Time.now @client.import(database, table, UPLOAD_EXT, io, size, unique_str) rescue TreasureData::NotFoundError => e unless @auto_create_table raise e end ensure_database_and_table(database, table) io.pos = 0 retry end rescue => e elapsed = Time.now - start ne = RuntimeError.new("Failed to upload to Treasure Data '#{database}.#{table}' table: #{e.inspect} (#{size} bytes; #{elapsed} seconds)") ne.set_backtrace(e.backtrace) raise ne end end |
#validate_database_and_table_name(database, table) ⇒ Object
250 251 252 253 254 255 256 257 258 259 260 261 |
# File 'lib/fluent/plugin/out_tdlog.rb', line 250 def validate_database_and_table_name(database, table) begin TreasureData::API.validate_database_name(database) rescue => e raise ConfigError, "Invalid database name #{database.inspect}: #{e}" end begin TreasureData::API.validate_table_name(table) rescue => e raise ConfigError, "Invalid table name #{table.inspect}: #{e}" end end |
#write(chunk) ⇒ Object
144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 |
# File 'lib/fluent/plugin/out_tdlog.rb', line 144 def write(chunk) unique_id = chunk.unique_id if @key database, table = @database, @table else database, table = chunk..tag.split('.')[-2, 2] database = TreasureData::API.normalize_database_name(database) table = TreasureData::API.normalize_table_name(table) end FileUtils.mkdir_p(@tmpdir) unless @tmpdir.nil? f = Tempfile.new("tdlog-#{chunk..tag}-", @tmpdir) f.binmode size = if @use_gzip_command gzip_by_command(chunk, f) else gzip_by_writer(chunk, f) end f.pos = 0 upload(database, table, f, size, unique_id) ensure f.close(true) if f end |