Module: Cumulus::Kinesis

Defined in:
lib/kinesis/Kinesis.rb,
lib/kinesis/Commands.rb,
lib/kinesis/loader/Loader.rb,
lib/kinesis/manager/Manager.rb,
lib/kinesis/models/StreamDiff.rb,
lib/kinesis/models/StreamConfig.rb

Defined Under Namespace

Modules: Loader, StreamChange Classes: Commands, Manager, StreamConfig, StreamDiff

Constant Summary collapse

@@client =
Aws::Kinesis::Client.new(Configuration.instance.client)

Class Method Summary collapse

Class Method Details

.describe_stream(stream_name) ⇒ Object

Public - Load the entire stream description with all shards

Returns a Aws::Kinesis::Types::StreamDescription with all shards loaded



30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
# File 'lib/kinesis/Kinesis.rb', line 30

def describe_stream(stream_name)
  stream = @@client.describe_stream({
    stream_name: stream_name
  }).stream_description

  while stream.has_more_shards do
    stream_continued = @@client.describe_stream({
      stream_name: stream_name,
      exclusive_start_shard_id: stream.shards.last.shard_id
    }).stream_description

    stream.shards.concat(stream_continued.shards)
    stream.has_more_shards = stream_continued.has_more_shards
  end

  stream
end

.named_streamsObject

Public - Returns a Hash of stream name to Aws::Kinesis::Types::StreamDescription with all shards loaded



13
14
15
# File 'lib/kinesis/Kinesis.rb', line 13

def named_streams
  @named_streams ||= Hash[stream_names.map { |name| [name, describe_stream(name)] }]
end

.stream_namesObject

Public - Returns an array of all the stream names



18
19
20
# File 'lib/kinesis/Kinesis.rb', line 18

def stream_names
  @stream_names ||= init_stream_names
end

.stream_tagsObject

Public - Returns a Hash of stream name to tags



23
24
25
# File 'lib/kinesis/Kinesis.rb', line 23

def stream_tags
  @stream_tags ||= Hash[stream_names.map { |name| [name, init_tags(name) ] }]
end