Class: Fluent::DynamoDrcOutput

Inherits:
BufferedOutput
  • Object
show all
Includes:
DetachMultiProcessMixin
Defined in:
lib/fluent/plugin/out_dynamo_drc.rb

Constant Summary collapse

BATCHWRITE_ITEM_LIMIT =
25
BATCHWRITE_CONTENT_SIZE_LIMIT =
1024*1024

Instance Method Summary collapse

Constructor Details

#initializeDynamoDrcOutput

Returns a new instance of DynamoDrcOutput.



13
14
15
16
17
18
19
# File 'lib/fluent/plugin/out_dynamo_drc.rb', line 13

def initialize
  super
  require 'aws-sdk'
  require 'msgpack'
  require 'time'
  require 'uuidtools'
end

Instance Method Details

#batch_put_records(records) ⇒ Object



123
124
125
126
# File 'lib/fluent/plugin/out_dynamo_drc.rb', line 123

def batch_put_records(records)
  @batch.put(@dynamo_db_table, records)
  @batch.process!
end

#configure(conf) ⇒ Object



29
30
31
32
33
# File 'lib/fluent/plugin/out_dynamo_drc.rb', line 29

def configure(conf)
  super

  @timef = TimeFormatter.new(@time_format, @localtime)
end

#format(tag, time, record) ⇒ Object



83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
# File 'lib/fluent/plugin/out_dynamo_drc.rb', line 83

def format(tag, time, record)
  if !record.key?(@hash_key.name)
    record[@hash_key.name] = UUIDTools::UUID.timestamp_create.to_s
  end
  match_type!(@hash_key, record)

  formatted_time = @timef.format(time)
  if @range_key
    if !record.key?(@range_key.name)
      record[@range_key.name] = formatted_time
    end
    match_type!(@range_key, record)
  end
  record['time'] = formatted_time

  record.to_msgpack
end

#match_type!(key, record) ⇒ Object



73
74
75
76
77
78
79
80
81
# File 'lib/fluent/plugin/out_dynamo_drc.rb', line 73

def match_type!(key, record)
  if key.type == :number
    potential_value = record[key.name].to_i
    if potential_value == 0
      $log.fatal "Failed attempt to cast hash_key to Integer."
    end
    record[key.name] = potential_value
  end
end

#restart_session(options) ⇒ Object



60
61
62
63
64
# File 'lib/fluent/plugin/out_dynamo_drc.rb', line 60

def restart_session(options)
  config = AWS.config(options)
  @batch = AWS::DynamoDB::BatchWrite.new(config)
  @dynamo_db = AWS::DynamoDB.new(options)
end

#startObject



35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
# File 'lib/fluent/plugin/out_dynamo_drc.rb', line 35

def start
  options = {}
  if @aws_key_id && @aws_sec_key
    options[:access_key_id] = @aws_key_id
    options[:secret_access_key] = @aws_sec_key
  end
  options[:dynamo_db_endpoint] = @dynamo_db_endpoint
  options[:proxy_uri] = @proxy_uri if @proxy_uri

  detach_multi_process do
    super

    begin
      restart_session(options)
      valid_table(@dynamo_db_table)
    rescue ConfigError => e
      $log.fatal "ConfigError: Please check your configuration, then restart fluentd. '#{e}'"
      exit!
    rescue Exception => e
      $log.fatal "UnknownError: '#{e}'"
      exit!
    end
  end
end

#valid_table(table_name) ⇒ Object



66
67
68
69
70
71
# File 'lib/fluent/plugin/out_dynamo_drc.rb', line 66

def valid_table(table_name)
  table = @dynamo_db.tables[table_name]
  table.load_schema
  @hash_key = table.hash_key
  @range_key = table.range_key unless table.simple_key?
end

#write(chunk) ⇒ Object



101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
# File 'lib/fluent/plugin/out_dynamo_drc.rb', line 101

def write(chunk)
  batch_size = 0
  batch_records = []
  chunk.msgpack_each {|record|
    time = Time.parse(record["time"]) rescue Time.now
    fixed_timestamp = "#{time.to_i}#{rand(1000000000..9999999999)}".to_i
    record["timestamp"] = fixed_timestamp
    #Ensuring no empty attributes to avoid Dynamodb errors
    record.each{|k,v| record[k] = "nil" if v == ""}
    batch_records << record
    batch_size += record.to_json.length # FIXME: heuristic
    if batch_records.size >= BATCHWRITE_ITEM_LIMIT || batch_size >= BATCHWRITE_CONTENT_SIZE_LIMIT
      batch_put_records(batch_records)
      batch_records.clear
      batch_size = 0
    end
  }
  unless batch_records.empty?
    batch_put_records(batch_records)
  end
end