Class: Fluent::DynamodbAltOutput

Inherits:
BufferedOutput
  • Object
show all
Includes:
SetTagKeyMixin, SetTimeKeyMixin
Defined in:
lib/fluent/plugin/out_dynamodb_alt.rb

Instance Method Summary collapse

Constructor Details

#initializeDynamodbAltOutput

Returns a new instance of DynamodbAltOutput.



25
26
27
28
29
# File 'lib/fluent/plugin/out_dynamodb_alt.rb', line 25

def initialize
  super
  require 'aws-sdk-core'
  require 'parallel'
end

Instance Method Details

#configure(conf) ⇒ Object



31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
# File 'lib/fluent/plugin/out_dynamodb_alt.rb', line 31

def configure(conf)
  super

  aws_opts = {}

  if @profile
    credentials_opts = {:profile_name => @profile}
    credentials_opts[:path] = @credentials_path if @credentials_path
    credentials = Aws::SharedCredentials.new(credentials_opts)
    aws_opts[:credentials] = credentials
  end

  aws_opts[:access_key_id] = @aws_key_id if @aws_key_id
  aws_opts[:secret_access_key] = @aws_sec_key if @aws_sec_key
  aws_opts[:region] = @region if @region

  configure_aws(aws_opts)

  client = create_client
  table = client.describe_table(:table_name => @table_name)

  table.table.key_schema.each do |attribute|
    case attribute.key_type
    when 'HASH'
      @hash_key = attribute.attribute_name
    when 'RANGE'
      @range_key = attribute.attribute_name
    else
      raise 'must not happen'
    end
  end

  if @expected
    @expected = parse_expected(@expected)
    log.info("dynamodb_alt expected: #{@expected.inspect}")
  end
end

#format(tag, time, record) ⇒ Object



75
76
77
# File 'lib/fluent/plugin/out_dynamodb_alt.rb', line 75

def format(tag, time, record)
  [tag, time, record].to_msgpack
end

#startObject



69
70
71
72
73
# File 'lib/fluent/plugin/out_dynamodb_alt.rb', line 69

def start
  super

  @client = create_client
end

#write(chunk) ⇒ Object



79
80
81
82
83
84
85
86
87
88
89
90
# File 'lib/fluent/plugin/out_dynamodb_alt.rb', line 79

def write(chunk)
  chunk = aggregate_records(chunk)
  block = proc do |tag, time, record|
    put_record(record)
  end

  if @concurrency > 1
    Parallel.each(chunk, :in_threads => @concurrency, &block)
  else
    chunk.each(&block)
  end
end