Module: Jets::Job::Dsl::DynamodbEvent
- Defined in:
- lib/jets/job/dsl/dynamodb_event.rb
Instance Method Summary collapse
- #add_dynamodb_table_namespace(table_name_without_namespace) ⇒ Object
- #default_dynamodb_stream_policy(stream_name_arn = '*') ⇒ Object
- #dynamodb_event(table_name_without_namespace, options = {}) ⇒ Object
-
#full_dynamodb_stream_arn(table_name) ⇒ Object
Expands table name to the full stream arn.
Instance Method Details
#add_dynamodb_table_namespace(table_name_without_namespace) ⇒ Object
26 27 28 29 30 31 32 33 34 |
# File 'lib/jets/job/dsl/dynamodb_event.rb', line 26 def add_dynamodb_table_namespace(table_name_without_namespace) ns = if Jets.config.events.dynamodb.table_namespace == true Jets.table_namespace # does not include extra elsif Jets.config.events.dynamodb.table_namespace Jets.config.events.dynamodb.table_namespace # allow user to fully control namespace end ns_separator = Jets.config.events.dynamodb.table_namespace_separator [ns, table_name_without_namespace].compact.join(ns_separator) end |
#default_dynamodb_stream_policy(stream_name_arn = '*') ⇒ Object
60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 |
# File 'lib/jets/job/dsl/dynamodb_event.rb', line 60 def default_dynamodb_stream_policy(stream_name_arn='*') stream = { Action: ["dynamodb:GetRecords", "dynamodb:GetShardIterator", "dynamodb:DescribeStream", "dynamodb:ListStreams"], Effect: "Allow", Resource: stream_name_arn, } table_name_arn = stream_name_arn.gsub(%r{/stream/20.*},'') table = { Action: ["dynamodb:DescribeTable"], Effect: "Allow", Resource: table_name_arn, } [stream, table] end |
#dynamodb_event(table_name_without_namespace, options = {}) ⇒ Object
3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
# File 'lib/jets/job/dsl/dynamodb_event.rb', line 3 def dynamodb_event(table_name_without_namespace, ={}) return if ENV['JETS_NO_INTERNET'] # Disable during build since jets build tries to init this table_name = add_dynamodb_table_namespace(table_name_without_namespace) stream_arn = full_dynamodb_stream_arn(table_name) default_iam_policy = default_dynamodb_stream_policy(stream_arn) # Create iam policy allows access to dynamodb iam_policy_option = [.delete(:iam_policy)].compact.flatten user_iam_policy = [@iam_policy].compact.flatten iam_policy_props = iam_policy_option + user_iam_policy + [default_iam_policy] iam_policy(iam_policy_props) props = # by this time options only has EventSourceMapping properties default = { EventSourceArn: stream_arn, StartingPosition: "TRIM_HORIZON", } props = default.merge(props) event_source_mapping(props) end |
#full_dynamodb_stream_arn(table_name) ⇒ Object
Expands table name to the full stream arn. Example:
test-table
To:
arn:aws:dynamodb:us-west-2:112233445566:table/test-table/stream/2019-02-15T21:41:15.217
Note, this does not check if the stream has been disabled.
43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 |
# File 'lib/jets/job/dsl/dynamodb_event.rb', line 43 def full_dynamodb_stream_arn(table_name) return table_name if table_name.include?("arn:aws:dynamodb") # assume full stream arn begin resp = dynamodb.describe_table(table_name: table_name) rescue Aws::DynamoDB::Errors::ResourceNotFoundException => e puts e. puts "ERROR: Was not able to find the DynamoDB table: #{table_name}.".color(:red) code_line = caller.grep(%r{/app/jobs}).first puts "Please check: #{code_line}" puts "Exiting" exit 1 end stream_arn = resp.table.latest_stream_arn return stream_arn if stream_arn end |