Class: Fluent::DynamodbAltOutput

Inherits:
BufferedOutput
  • Object
show all
Includes:
SetTagKeyMixin, SetTimeKeyMixin
Defined in:
lib/fluent/plugin/out_dynamodb_alt.rb

Instance Method Summary collapse

Constructor Details

#initializeDynamodbAltOutput

Returns a new instance of DynamodbAltOutput.



28
29
30
31
32
33
34
# File 'lib/fluent/plugin/out_dynamodb_alt.rb', line 28

def initialize
  super
  require 'aws-sdk-core'
  require 'parallel'
  require 'set'
  require 'stringio'
end

Instance Method Details

#configure(conf) ⇒ Object



36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
# File 'lib/fluent/plugin/out_dynamodb_alt.rb', line 36

def configure(conf)
  super

  aws_opts = {}

  if @profile
    credentials_opts = {:profile_name => @profile}
    credentials_opts[:path] = @credentials_path if @credentials_path
    credentials = Aws::SharedCredentials.new(credentials_opts)
    aws_opts[:credentials] = credentials
  end

  aws_opts[:access_key_id] = @aws_key_id if @aws_key_id
  aws_opts[:secret_access_key] = @aws_sec_key if @aws_sec_key
  aws_opts[:region] = @region if @region
  aws_opts[:endpoint] = @endpoint if @endpoint

  configure_aws(aws_opts)

  client = create_client
  table = client.describe_table(:table_name => @table_name)

  table.table.key_schema.each do |attribute|
    case attribute.key_type
    when 'HASH'
      @hash_key = attribute.attribute_name
    when 'RANGE'
      @range_key = attribute.attribute_name
    else
      raise 'must not happen'
    end
  end

  if @expected
    @expected = parse_expected(@expected)
    log.info("dynamodb_alt expected: #{@expected.inspect}")
  end

  if @binary_keys
    @binary_keys = @binary_keys.strip.split(/\s*,\s*/)
  else
    @binary_keys = []
  end
end

#format(tag, time, record) ⇒ Object



87
88
89
# File 'lib/fluent/plugin/out_dynamodb_alt.rb', line 87

def format(tag, time, record)
  [tag, time, record].to_msgpack
end

#startObject



81
82
83
84
85
# File 'lib/fluent/plugin/out_dynamodb_alt.rb', line 81

def start
  super

  @client = create_client
end

#write(chunk) ⇒ Object



91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
# File 'lib/fluent/plugin/out_dynamodb_alt.rb', line 91

def write(chunk)
  chunk = aggregate_records(chunk)

  block = proc do |tag, time, record|
    if @delete_key and record[@delete_key]
      delete_record(record)
    else
      put_record(record)
    end
  end

  if @concurrency > 1
    Parallel.each(chunk, :in_threads => @concurrency, &block)
  else
    chunk.each(&block)
  end
end