Module: Jets::Job::Dsl::KinesisEvent

Defined in:
lib/jets/job/dsl/kinesis_event.rb

Instance Method Summary collapse

Instance Method Details

#default_kinesis_stream_policy(stream_name_arn = '*') ⇒ Object



36
37
38
39
40
41
42
43
44
45
# File 'lib/jets/job/dsl/kinesis_event.rb', line 36

def default_kinesis_stream_policy(stream_name_arn='*')
  {
    Action: ["kinesis:GetRecords",
             "kinesis:GetShardIterator",
             "kinesis:DescribeStream",
             "kinesis:ListStreams"],
    Effect: "Allow",
    Resource: stream_name_arn,
  }
end

#full_kinesis_stream_arn(stream_name) ⇒ Object

Expands table name to the full stream arn. Example:

test-table

To:

arn:aws:kinesis:us-west-2:112233445566:table/test-table/stream/2019-02-15T21:41:15.217

Note, this does not check if the stream has been disabled.



30
31
32
33
34
# File 'lib/jets/job/dsl/kinesis_event.rb', line 30

def full_kinesis_stream_arn(stream_name)
  return stream_name if stream_name.include?("arn:aws:kinesis") # assume full stream arn

  "arn:aws:kinesis:#{Jets.aws.region}:#{Jets.aws.}:stream/#{stream_name}"
end

#kinesis_event(stream_name, options = {}) ⇒ Object



3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# File 'lib/jets/job/dsl/kinesis_event.rb', line 3

def kinesis_event(stream_name, options={})
  stream_arn = full_kinesis_stream_arn(stream_name)
  default_iam_policy = default_kinesis_stream_policy(stream_arn)

  # Create iam policy allows access to kinesis
  iam_policy_option = [options.delete(:iam_policy)].compact.flatten
  user_iam_policy = [@iam_policy].compact.flatten
  iam_policy_props = iam_policy_option + user_iam_policy + [default_iam_policy]
  iam_policy(iam_policy_props)

  props = options # by this time options only has EventSourceMapping properties
  default = {
    EventSourceArn: stream_arn,
    StartingPosition: "LATEST",
  }
  props = default.merge(props)

  event_source_mapping(props)
end