Class: Fluent::Plugin::DynamoOutput
- Inherits:
-
Output
- Object
- Output
- Fluent::Plugin::DynamoOutput
- Defined in:
- lib/fluent/plugin/out_dynamo.rb
Constant Summary collapse
- DEFAULT_BUFFER_TYPE =
"memory"- BATCHWRITE_ITEM_LIMIT =
25- BATCHWRITE_CONTENT_SIZE_LIMIT =
1024*1024
Instance Method Summary collapse
- #batch_put_records(records) ⇒ Object
- #configure(conf) ⇒ Object
- #format(tag, time, record) ⇒ Object
- #formatted_to_msgpack_binary? ⇒ Boolean
- #match_type!(key, record) ⇒ Object
- #multi_workers_ready? ⇒ Boolean
- #restart_session(options) ⇒ Object
- #start ⇒ Object
- #valid_table(table_name) ⇒ Object
- #write(chunk) ⇒ Object
Instance Method Details
#batch_put_records(records) ⇒ Object
136 137 138 |
# File 'lib/fluent/plugin/out_dynamo.rb', line 136 def batch_put_records(records) @dynamo_db.batch_write_item(request_items: { @dynamo_db_table => records }) end |
#configure(conf) ⇒ Object
35 36 37 38 39 40 |
# File 'lib/fluent/plugin/out_dynamo.rb', line 35 def configure(conf) compat_parameters_convert(conf, :buffer) super @timef = Fluent::TimeFormatter.new(@time_format, @localtime) end |
#format(tag, time, record) ⇒ Object
89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 |
# File 'lib/fluent/plugin/out_dynamo.rb', line 89 def format(tag, time, record) if !record.key?(@hash_key.attribute_name) record[@hash_key.attribute_name] = UUIDTools::UUID..to_s end match_type!(@hash_key, record) formatted_time = @timef.format(time) if @range_key if !record.key?(@range_key.attribute_name) record[@range_key.attribute_name] = formatted_time end match_type!(@range_key, record) end record['time'] = formatted_time if @add_time_attribute record.to_msgpack end |
#formatted_to_msgpack_binary? ⇒ Boolean
107 108 109 |
# File 'lib/fluent/plugin/out_dynamo.rb', line 107 def formatted_to_msgpack_binary? true end |
#match_type!(key, record) ⇒ Object
79 80 81 82 83 84 85 86 87 |
# File 'lib/fluent/plugin/out_dynamo.rb', line 79 def match_type!(key, record) if key.key_type == "NUMBER" potential_value = record[key.attribute_name].to_i if potential_value == 0 log.fatal "Failed attempt to cast hash_key to Integer." end record[key.attribute_name] = potential_value end end |
#multi_workers_ready? ⇒ Boolean
111 112 113 |
# File 'lib/fluent/plugin/out_dynamo.rb', line 111 def multi_workers_ready? true end |
#restart_session(options) ⇒ Object
66 67 68 69 70 |
# File 'lib/fluent/plugin/out_dynamo.rb', line 66 def restart_session() @dynamo_db = Aws::DynamoDB::Client.new() @resource = Aws::DynamoDB::Resource.new(client: @dynamo_db) end |
#start ⇒ Object
42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 |
# File 'lib/fluent/plugin/out_dynamo.rb', line 42 def start = {} if @aws_key_id && @aws_sec_key [:access_key_id] = @aws_key_id [:secret_access_key] = @aws_sec_key end [:region] = @dynamo_db_region if @dynamo_db_region [:endpoint] = @dynamo_db_endpoint [:proxy_uri] = @proxy_uri if @proxy_uri super begin restart_session() valid_table(@dynamo_db_table) rescue Fluent::ConfigError => e log.fatal "ConfigError: Please check your configuration, then restart fluentd. '#{e}'" exit! rescue Exception => e log.fatal "UnknownError: '#{e}'" exit! end end |
#valid_table(table_name) ⇒ Object
72 73 74 75 76 77 |
# File 'lib/fluent/plugin/out_dynamo.rb', line 72 def valid_table(table_name) table = @resource.table(table_name) @hash_key = table.key_schema.select{|e| e.key_type == "HASH" }.first range_key_candidate = table.key_schema.select{|e| e.key_type == "RANGE" } @range_key = range_key_candidate.first if range_key_candidate end |
#write(chunk) ⇒ Object
115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 |
# File 'lib/fluent/plugin/out_dynamo.rb', line 115 def write(chunk) batch_size = 0 batch_records = [] chunk.msgpack_each {|record| batch_records << { put_request: { item: record } } batch_size += record.to_json.length # FIXME: heuristic if batch_records.size >= BATCHWRITE_ITEM_LIMIT || batch_size >= BATCHWRITE_CONTENT_SIZE_LIMIT batch_put_records(batch_records) batch_records.clear batch_size = 0 end } unless batch_records.empty? batch_put_records(batch_records) end end |