Class: Fluent::DynamoDBOutput

Inherits:
BufferedOutput
  • Object
show all
Includes:
DetachMultiProcessMixin
Defined in:
lib/fluent/plugin/out_dynamodb.rb

Constant Summary collapse

BATCHWRITE_ITEM_LIMIT =
25
BATCHWRITE_CONTENT_SIZE_LIMIT =
1024*1024

Instance Method Summary collapse

Constructor Details

#initializeDynamoDBOutput

Returns a new instance of DynamoDBOutput.



18
19
20
21
22
23
24
# File 'lib/fluent/plugin/out_dynamodb.rb', line 18

def initialize
  super
  require 'aws-sdk'
  require 'msgpack'
  require 'time'
  require 'uuidtools'
end

Instance Method Details

#batch_put_records(records) ⇒ Object



129
130
131
# File 'lib/fluent/plugin/out_dynamodb.rb', line 129

def batch_put_records(records)
  @dynamo_db.batch_write_item(request_items: { @dynamo_db_table => records })
end

#configure(conf) ⇒ Object



35
36
37
38
39
# File 'lib/fluent/plugin/out_dynamodb.rb', line 35

def configure(conf)
  super

  @timef = TimeFormatter.new(@time_format, @localtime)
end

#format(tag, time, record) ⇒ Object



90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
# File 'lib/fluent/plugin/out_dynamodb.rb', line 90

def format(tag, time, record)
  if !record.key?(@hash_key.attribute_name)
    record[@hash_key.attribute_name] = UUIDTools::UUID.timestamp_create.to_s
  end
  match_type!(@hash_key, record)

  formatted_time = @timef.format(time)
  if @range_key
    if !record.key?(@range_key.attribute_name)
      record[@range_key.attribute_name] = formatted_time
    end
    match_type!(@range_key, record)
  end
  record['time'] = formatted_time

  record.to_msgpack
end

#match_type!(key, record) ⇒ Object



80
81
82
83
84
85
86
87
88
# File 'lib/fluent/plugin/out_dynamodb.rb', line 80

def match_type!(key, record)
  if key.key_type == "NUMBER"
    potential_value = record[key.attribute_name].to_i
    if potential_value == 0
      log.fatal "Failed attempt to cast hash_key to Integer."
    end
    record[key.attribute_name] = potential_value
  end
end

#restart_session(options) ⇒ Object



67
68
69
70
71
# File 'lib/fluent/plugin/out_dynamodb.rb', line 67

def restart_session(options)
  @dynamo_db = Aws::DynamoDB::Client.new(options)
  @resource = Aws::DynamoDB::Resource.new(client: @dynamo_db)

end

#startObject



41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
# File 'lib/fluent/plugin/out_dynamodb.rb', line 41

def start
  options = {}
  if @aws_key_id && @aws_sec_key
    options[:access_key_id] = @aws_key_id
    options[:secret_access_key] = @aws_sec_key
  end
  options[:region] = @dynamo_db_region if @dynamo_db_region
  options[:endpoint] = @dynamo_db_endpoint
  options[:proxy_uri] = @proxy_uri if @proxy_uri

  detach_multi_process do
    super

    begin
      restart_session(options)
      valid_table(@dynamo_db_table)
    rescue ConfigError => e
      log.fatal "ConfigError: Please check your configuration, then restart fluentd. '#{e}'"
      exit!
    rescue Exception => e
      log.fatal "UnknownError: '#{e}'"
      exit!
    end
  end
end

#valid_table(table_name) ⇒ Object



73
74
75
76
77
78
# File 'lib/fluent/plugin/out_dynamodb.rb', line 73

def valid_table(table_name)
  table = @resource.table(table_name)
  @hash_key = table.key_schema.select{|e| e.key_type == "HASH" }.first
  range_key_candidate = table.key_schema.select{|e| e.key_type == "RANGE" }
  @range_key = range_key_candidate.first if range_key_candidate
end

#write(chunk) ⇒ Object



108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
# File 'lib/fluent/plugin/out_dynamodb.rb', line 108

def write(chunk)
  batch_size = 0
  batch_records = []
  chunk.msgpack_each {|record|
    batch_records << {
      put_request: {
        item: record
      }
    }
    batch_size += record.to_json.length # FIXME: heuristic
    if batch_records.size >= BATCHWRITE_ITEM_LIMIT || batch_size >= BATCHWRITE_CONTENT_SIZE_LIMIT
      batch_put_records(batch_records)
      batch_records.clear
      batch_size = 0
    end
  }
  unless batch_records.empty?
    batch_put_records(batch_records)
  end
end