Class: Fluent::DynamoDrcOutput
- Inherits:
-
BufferedOutput
- Object
- BufferedOutput
- Fluent::DynamoDrcOutput
- Includes:
- DetachMultiProcessMixin
- Defined in:
- lib/fluent/plugin/out_dynamo_drc.rb
Constant Summary collapse
- BATCHWRITE_ITEM_LIMIT =
25- BATCHWRITE_CONTENT_SIZE_LIMIT =
1024*1024
Instance Method Summary collapse
- #batch_put_records(records) ⇒ Object
- #configure(conf) ⇒ Object
- #format(tag, time, record) ⇒ Object
-
#initialize ⇒ DynamoDrcOutput
constructor
A new instance of DynamoDrcOutput.
- #match_type!(key, record) ⇒ Object
- #restart_session(options) ⇒ Object
- #start ⇒ Object
- #valid_table(table_name) ⇒ Object
- #write(chunk) ⇒ Object
Constructor Details
#initialize ⇒ DynamoDrcOutput
Returns a new instance of DynamoDrcOutput.
13 14 15 16 17 18 19 |
# File 'lib/fluent/plugin/out_dynamo_drc.rb', line 13 def initialize super require 'aws-sdk' require 'msgpack' require 'time' require 'uuidtools' end |
Instance Method Details
#batch_put_records(records) ⇒ Object
123 124 125 126 |
# File 'lib/fluent/plugin/out_dynamo_drc.rb', line 123 def batch_put_records(records) @batch.put(@dynamo_db_table, records) @batch.process! end |
#configure(conf) ⇒ Object
29 30 31 32 33 |
# File 'lib/fluent/plugin/out_dynamo_drc.rb', line 29 def configure(conf) super @timef = TimeFormatter.new(@time_format, @localtime) end |
#format(tag, time, record) ⇒ Object
83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 |
# File 'lib/fluent/plugin/out_dynamo_drc.rb', line 83 def format(tag, time, record) if !record.key?(@hash_key.name) record[@hash_key.name] = UUIDTools::UUID..to_s end match_type!(@hash_key, record) formatted_time = @timef.format(time) if @range_key if !record.key?(@range_key.name) record[@range_key.name] = formatted_time end match_type!(@range_key, record) end record['time'] = formatted_time record.to_msgpack end |
#match_type!(key, record) ⇒ Object
73 74 75 76 77 78 79 80 81 |
# File 'lib/fluent/plugin/out_dynamo_drc.rb', line 73 def match_type!(key, record) if key.type == :number potential_value = record[key.name].to_i if potential_value == 0 $log.fatal "Failed attempt to cast hash_key to Integer." end record[key.name] = potential_value end end |
#restart_session(options) ⇒ Object
60 61 62 63 64 |
# File 'lib/fluent/plugin/out_dynamo_drc.rb', line 60 def restart_session() config = AWS.config() @batch = AWS::DynamoDB::BatchWrite.new(config) @dynamo_db = AWS::DynamoDB.new() end |
#start ⇒ Object
35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 |
# File 'lib/fluent/plugin/out_dynamo_drc.rb', line 35 def start = {} if @aws_key_id && @aws_sec_key [:access_key_id] = @aws_key_id [:secret_access_key] = @aws_sec_key end [:dynamo_db_endpoint] = @dynamo_db_endpoint [:proxy_uri] = @proxy_uri if @proxy_uri detach_multi_process do super begin restart_session() valid_table(@dynamo_db_table) rescue ConfigError => e $log.fatal "ConfigError: Please check your configuration, then restart fluentd. '#{e}'" exit! rescue Exception => e $log.fatal "UnknownError: '#{e}'" exit! end end end |
#valid_table(table_name) ⇒ Object
66 67 68 69 70 71 |
# File 'lib/fluent/plugin/out_dynamo_drc.rb', line 66 def valid_table(table_name) table = @dynamo_db.tables[table_name] table.load_schema @hash_key = table.hash_key @range_key = table.range_key unless table.simple_key? end |
#write(chunk) ⇒ Object
101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 |
# File 'lib/fluent/plugin/out_dynamo_drc.rb', line 101 def write(chunk) batch_size = 0 batch_records = [] chunk.msgpack_each {|record| time = Time.parse(record["time"]) rescue Time.now = "#{time.to_i}#{rand(1000000000..9999999999)}".to_i record["timestamp"] = #Ensuring no empty attributes to avoid Dynamodb errors record.each{|k,v| record[k] = "nil" if v == ""} batch_records << record batch_size += record.to_json.length # FIXME: heuristic if batch_records.size >= BATCHWRITE_ITEM_LIMIT || batch_size >= BATCHWRITE_CONTENT_SIZE_LIMIT batch_put_records(batch_records) batch_records.clear batch_size = 0 end } unless batch_records.empty? batch_put_records(batch_records) end end |