Module: Jets::Job::Dsl::DynamodbEvent

Defined in:
lib/jets/job/dsl/dynamodb_event.rb

Instance Method Summary collapse

Instance Method Details

#add_dynamodb_table_namespace(table_name_without_namespace) ⇒ Object



26
27
28
29
30
31
32
33
34
# File 'lib/jets/job/dsl/dynamodb_event.rb', line 26

def add_dynamodb_table_namespace(table_name_without_namespace)
  ns = if Jets.config.events.dynamodb.table_namespace == true
         Jets.table_namespace # does not include extra
       elsif Jets.config.events.dynamodb.table_namespace
         Jets.config.events.dynamodb.table_namespace # allow user to fully control namespace
       end
  ns_separator = Jets.config.events.dynamodb.table_namespace_separator
  [ns, table_name_without_namespace].compact.join(ns_separator)
end

#default_dynamodb_stream_policy(stream_name_arn = '*') ⇒ Object



60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
# File 'lib/jets/job/dsl/dynamodb_event.rb', line 60

def default_dynamodb_stream_policy(stream_name_arn='*')
  stream = {
    Action: ["dynamodb:GetRecords",
             "dynamodb:GetShardIterator",
             "dynamodb:DescribeStream",
             "dynamodb:ListStreams"],
    Effect: "Allow",
    Resource: stream_name_arn,
  }
  table_name_arn = stream_name_arn.gsub(%r{/stream/20.*},'')
  table = {
    Action: ["dynamodb:DescribeTable"],
    Effect: "Allow",
    Resource: table_name_arn,
  }
  [stream, table]
end

#dynamodb_event(table_name_without_namespace, options = {}) ⇒ Object



3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# File 'lib/jets/job/dsl/dynamodb_event.rb', line 3

def dynamodb_event(table_name_without_namespace, options={})
  return if ENV['JETS_NO_INTERNET'] # Disable during build since jets build tries to init this

  table_name = add_dynamodb_table_namespace(table_name_without_namespace)
  stream_arn = full_dynamodb_stream_arn(table_name)
  default_iam_policy = default_dynamodb_stream_policy(stream_arn)

  # Create iam policy allows access to dynamodb
  iam_policy_option = [options.delete(:iam_policy)].compact.flatten
  user_iam_policy = [@iam_policy].compact.flatten
  iam_policy_props = iam_policy_option + user_iam_policy + [default_iam_policy]
  iam_policy(iam_policy_props)

  props = options # by this time options only has EventSourceMapping properties
  default = {
    EventSourceArn: stream_arn,
    StartingPosition: "TRIM_HORIZON",
  }
  props = default.merge(props)

  event_source_mapping(props)
end

#full_dynamodb_stream_arn(table_name) ⇒ Object

Expands table name to the full stream arn. Example:

test-table

To:

arn:aws:dynamodb:us-west-2:112233445566:table/test-table/stream/2019-02-15T21:41:15.217

Note, this does not check if the stream has been disabled.



43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
# File 'lib/jets/job/dsl/dynamodb_event.rb', line 43

def full_dynamodb_stream_arn(table_name)
  return table_name if table_name.include?("arn:aws:dynamodb") # assume full stream arn

  begin
    resp = dynamodb.describe_table(table_name: table_name)
  rescue Aws::DynamoDB::Errors::ResourceNotFoundException => e
    puts e.message
    puts "ERROR: Was not able to find the DynamoDB table: #{table_name}.".color(:red)
    code_line = caller.grep(%r{/app/jobs}).first
    puts "Please check: #{code_line}"
    puts "Exiting"
    exit 1
  end
  stream_arn = resp.table.latest_stream_arn
  return stream_arn if stream_arn
end