Module: Jets::Job::Dsl::KinesisEvent
- Defined in:
- lib/jets/job/dsl/kinesis_event.rb
Instance Method Summary collapse
- #default_kinesis_stream_policy(stream_name_arn = '*') ⇒ Object
-
#full_kinesis_stream_arn(stream_name) ⇒ Object
Expands table name to the full stream arn.
- #kinesis_event(stream_name, options = {}) ⇒ Object
Instance Method Details
#default_kinesis_stream_policy(stream_name_arn = '*') ⇒ Object
36 37 38 39 40 41 42 43 44 45 |
# File 'lib/jets/job/dsl/kinesis_event.rb', line 36 def default_kinesis_stream_policy(stream_name_arn='*') { Action: ["kinesis:GetRecords", "kinesis:GetShardIterator", "kinesis:DescribeStream", "kinesis:ListStreams"], Effect: "Allow", Resource: stream_name_arn, } end |
#full_kinesis_stream_arn(stream_name) ⇒ Object
Expands table name to the full stream arn. Example:
test-table
To:
arn:aws:kinesis:us-west-2:112233445566:table/test-table/stream/2019-02-15T21:41:15.217
Note, this does not check if the stream has been disabled.
30 31 32 33 34 |
# File 'lib/jets/job/dsl/kinesis_event.rb', line 30 def full_kinesis_stream_arn(stream_name) return stream_name if stream_name.include?("arn:aws:kinesis") # assume full stream arn "arn:aws:kinesis:#{Jets.aws.region}:#{Jets.aws.account}:stream/#{stream_name}" end |
#kinesis_event(stream_name, options = {}) ⇒ Object
3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
# File 'lib/jets/job/dsl/kinesis_event.rb', line 3 def kinesis_event(stream_name, ={}) stream_arn = full_kinesis_stream_arn(stream_name) default_iam_policy = default_kinesis_stream_policy(stream_arn) # Create iam policy allows access to kinesis iam_policy_option = [.delete(:iam_policy)].compact.flatten user_iam_policy = [@iam_policy].compact.flatten iam_policy_props = iam_policy_option + user_iam_policy + [default_iam_policy] iam_policy(iam_policy_props) props = # by this time options only has EventSourceMapping properties default = { EventSourceArn: stream_arn, StartingPosition: "LATEST", } props = default.merge(props) event_source_mapping(props) end |