Class: Fluent::Plugin::DynamoOutput

Inherits:
Output
  • Object
show all
Defined in:
lib/fluent/plugin/out_dynamo.rb

Constant Summary collapse

DEFAULT_BUFFER_TYPE =
"memory"
BATCHWRITE_ITEM_LIMIT =
25
BATCHWRITE_CONTENT_SIZE_LIMIT =
1024*1024

Instance Method Summary collapse

Instance Method Details

#batch_put_records(records) ⇒ Object



136
137
138
# File 'lib/fluent/plugin/out_dynamo.rb', line 136

def batch_put_records(records)
  @dynamo_db.batch_write_item(request_items: { @dynamo_db_table => records })
end

#configure(conf) ⇒ Object



35
36
37
38
39
40
# File 'lib/fluent/plugin/out_dynamo.rb', line 35

def configure(conf)
  compat_parameters_convert(conf, :buffer)
  super

  @timef = Fluent::TimeFormatter.new(@time_format, @localtime)
end

#format(tag, time, record) ⇒ Object



89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
# File 'lib/fluent/plugin/out_dynamo.rb', line 89

def format(tag, time, record)
  if !record.key?(@hash_key.attribute_name)
    record[@hash_key.attribute_name] = UUIDTools::UUID.timestamp_create.to_s
  end
  match_type!(@hash_key, record)

  formatted_time = @timef.format(time)
  if @range_key
    if !record.key?(@range_key.attribute_name)
      record[@range_key.attribute_name] = formatted_time
    end
    match_type!(@range_key, record)
  end
  record['time'] = formatted_time if @add_time_attribute

  record.to_msgpack
end

#formatted_to_msgpack_binary?Boolean

Returns:



107
108
109
# File 'lib/fluent/plugin/out_dynamo.rb', line 107

def formatted_to_msgpack_binary?
  true
end

#match_type!(key, record) ⇒ Object



79
80
81
82
83
84
85
86
87
# File 'lib/fluent/plugin/out_dynamo.rb', line 79

def match_type!(key, record)
  if key.key_type == "NUMBER"
    potential_value = record[key.attribute_name].to_i
    if potential_value == 0
      log.fatal "Failed attempt to cast hash_key to Integer."
    end
    record[key.attribute_name] = potential_value
  end
end

#multi_workers_ready?Boolean

Returns:



111
112
113
# File 'lib/fluent/plugin/out_dynamo.rb', line 111

def multi_workers_ready?
  true
end

#restart_session(options) ⇒ Object



66
67
68
69
70
# File 'lib/fluent/plugin/out_dynamo.rb', line 66

def restart_session(options)
  @dynamo_db = Aws::DynamoDB::Client.new(options)
  @resource = Aws::DynamoDB::Resource.new(client: @dynamo_db)

end

#startObject



42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
# File 'lib/fluent/plugin/out_dynamo.rb', line 42

def start
  options = {}
  if @aws_key_id && @aws_sec_key
    options[:access_key_id] = @aws_key_id
    options[:secret_access_key] = @aws_sec_key
  end
  options[:region] = @dynamo_db_region if @dynamo_db_region
  options[:endpoint] = @dynamo_db_endpoint
  options[:proxy_uri] = @proxy_uri if @proxy_uri

  super

  begin
    restart_session(options)
    valid_table(@dynamo_db_table)
  rescue Fluent::ConfigError => e
    log.fatal "ConfigError: Please check your configuration, then restart fluentd. '#{e}'"
    exit!
  rescue Exception => e
    log.fatal "UnknownError: '#{e}'"
    exit!
  end
end

#valid_table(table_name) ⇒ Object



72
73
74
75
76
77
# File 'lib/fluent/plugin/out_dynamo.rb', line 72

def valid_table(table_name)
  table = @resource.table(table_name)
  @hash_key = table.key_schema.select{|e| e.key_type == "HASH" }.first
  range_key_candidate = table.key_schema.select{|e| e.key_type == "RANGE" }
  @range_key = range_key_candidate.first if range_key_candidate
end

#write(chunk) ⇒ Object



115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
# File 'lib/fluent/plugin/out_dynamo.rb', line 115

def write(chunk)
  batch_size = 0
  batch_records = []
  chunk.msgpack_each {|record|
    batch_records << {
      put_request: {
        item: record
      }
    }
    batch_size += record.to_json.length # FIXME: heuristic
    if batch_records.size >= BATCHWRITE_ITEM_LIMIT || batch_size >= BATCHWRITE_CONTENT_SIZE_LIMIT
      batch_put_records(batch_records)
      batch_records.clear
      batch_size = 0
    end
  }
  unless batch_records.empty?
    batch_put_records(batch_records)
  end
end