Class: Fluent::DynamoDBOutput
- Inherits:
-
BufferedOutput
- Object
- BufferedOutput
- Fluent::DynamoDBOutput
- Includes:
- DetachMultiProcessMixin
- Defined in:
- lib/fluent/plugin/out_dynamodb.rb
Constant Summary collapse
- BATCHWRITE_ITEM_LIMIT =
25- BATCHWRITE_CONTENT_SIZE_LIMIT =
1024*1024
Instance Method Summary collapse
- #batch_put_records(records) ⇒ Object
- #configure(conf) ⇒ Object
- #format(tag, time, record) ⇒ Object
-
#initialize ⇒ DynamoDBOutput
constructor
A new instance of DynamoDBOutput.
- #match_type!(key, record) ⇒ Object
- #restart_session(options) ⇒ Object
- #start ⇒ Object
- #valid_table(table_name) ⇒ Object
- #write(chunk) ⇒ Object
Constructor Details
#initialize ⇒ DynamoDBOutput
Returns a new instance of DynamoDBOutput.
18 19 20 21 22 23 24 |
# File 'lib/fluent/plugin/out_dynamodb.rb', line 18 def initialize super require 'aws-sdk' require 'msgpack' require 'time' require 'uuidtools' end |
Instance Method Details
#batch_put_records(records) ⇒ Object
129 130 131 |
# File 'lib/fluent/plugin/out_dynamodb.rb', line 129 def batch_put_records(records) @dynamo_db.batch_write_item(request_items: { @dynamo_db_table => records }) end |
#configure(conf) ⇒ Object
35 36 37 38 39 |
# File 'lib/fluent/plugin/out_dynamodb.rb', line 35 def configure(conf) super @timef = TimeFormatter.new(@time_format, @localtime) end |
#format(tag, time, record) ⇒ Object
90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 |
# File 'lib/fluent/plugin/out_dynamodb.rb', line 90 def format(tag, time, record) if !record.key?(@hash_key.attribute_name) record[@hash_key.attribute_name] = UUIDTools::UUID..to_s end match_type!(@hash_key, record) formatted_time = @timef.format(time) if @range_key if !record.key?(@range_key.attribute_name) record[@range_key.attribute_name] = formatted_time end match_type!(@range_key, record) end record['time'] = formatted_time record.to_msgpack end |
#match_type!(key, record) ⇒ Object
80 81 82 83 84 85 86 87 88 |
# File 'lib/fluent/plugin/out_dynamodb.rb', line 80 def match_type!(key, record) if key.key_type == "NUMBER" potential_value = record[key.attribute_name].to_i if potential_value == 0 log.fatal "Failed attempt to cast hash_key to Integer." end record[key.attribute_name] = potential_value end end |
#restart_session(options) ⇒ Object
67 68 69 70 71 |
# File 'lib/fluent/plugin/out_dynamodb.rb', line 67 def restart_session() @dynamo_db = Aws::DynamoDB::Client.new() @resource = Aws::DynamoDB::Resource.new(client: @dynamo_db) end |
#start ⇒ Object
41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 |
# File 'lib/fluent/plugin/out_dynamodb.rb', line 41 def start = {} if @aws_key_id && @aws_sec_key [:access_key_id] = @aws_key_id [:secret_access_key] = @aws_sec_key end [:region] = @dynamo_db_region if @dynamo_db_region [:endpoint] = @dynamo_db_endpoint [:proxy_uri] = @proxy_uri if @proxy_uri detach_multi_process do super begin restart_session() valid_table(@dynamo_db_table) rescue ConfigError => e log.fatal "ConfigError: Please check your configuration, then restart fluentd. '#{e}'" exit! rescue Exception => e log.fatal "UnknownError: '#{e}'" exit! end end end |
#valid_table(table_name) ⇒ Object
73 74 75 76 77 78 |
# File 'lib/fluent/plugin/out_dynamodb.rb', line 73 def valid_table(table_name) table = @resource.table(table_name) @hash_key = table.key_schema.select{|e| e.key_type == "HASH" }.first range_key_candidate = table.key_schema.select{|e| e.key_type == "RANGE" } @range_key = range_key_candidate.first if range_key_candidate end |
#write(chunk) ⇒ Object
108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 |
# File 'lib/fluent/plugin/out_dynamodb.rb', line 108 def write(chunk) batch_size = 0 batch_records = [] chunk.msgpack_each {|record| batch_records << { put_request: { item: record } } batch_size += record.to_json.length # FIXME: heuristic if batch_records.size >= BATCHWRITE_ITEM_LIMIT || batch_size >= BATCHWRITE_CONTENT_SIZE_LIMIT batch_put_records(batch_records) batch_records.clear batch_size = 0 end } unless batch_records.empty? batch_put_records(batch_records) end end |