Class: Smash::CloudPowers::Synapse::Pipe::Stream

Inherits:
Resource
  • Object
show all
Defined in:
lib/cloud_powers/synapse/pipe/stream.rb

Instance Attribute Summary collapse

Attributes inherited from Resource

#call_name, #client, #linked, #name, #remote_id, #tags, #type

Instance Method Summary collapse

Methods included from Zenv

#env_vars, #file_tree_search, #i_vars, #project_root, #project_root=, #system_vars, #zfind

Methods included from Helpers

#create_logger, #log_file, #logger

Methods included from PathHelp

#job_exist?, #job_home, #job_path, #job_require_path

Methods included from LogicHelp

#attr_map, #called_from, #instance_attr_accessor, #smart_retry, #update_message_body

Methods included from LangHelp

#deep_modify_keys_with, #find_and_remove, #format_error_message, #from_json, #modify_keys_with, #to_basic_hash, #to_camel, #to_hyph, #to_i_var, #to_pascal, #to_ruby_file_name, #to_snake, #valid_json?, #valid_url?

Methods included from AwsResources

#ec2, #image, #queue_poller, #region, #s3, #sns, #sqs

Methods included from Auth

creds, region

Methods included from Creatable

included

Constructor Details

#initialize(name:, client: kinesis, **config) ⇒ Stream

Returns a new instance of Stream.



9
10
11
12
13
# File 'lib/cloud_powers/synapse/pipe/stream.rb', line 9

def initialize(name:, client: kinesis, **config)
  super
  @kinesis = client
  @shard_count = config[:shard_count] || 1
end

Instance Attribute Details

#kinesisObject

Returns the value of attribute kinesis.



7
8
9
# File 'lib/cloud_powers/synapse/pipe/stream.rb', line 7

def kinesis
  @kinesis
end

#shard_countObject

Returns the value of attribute shard_count.



7
8
9
# File 'lib/cloud_powers/synapse/pipe/stream.rb', line 7

def shard_count
  @shard_count
end

Instance Method Details

#configObject



34
35
36
# File 'lib/cloud_powers/synapse/pipe/stream.rb', line 34

def config
  { stream_name: @name, shard_count: @shard_count }
end

#create_resourceObject



15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
# File 'lib/cloud_powers/synapse/pipe/stream.rb', line 15

def create_resource
  begin
    @response = kinesis.create_stream(config)
    kinesis.wait_until(:stream_exists, stream_name: config[:stream_name])
    @response.successful? # (http request successful && stream created)?
  rescue Exception => e
    if e.kind_of? Aws::Kinesis::Errors::ResourceInUseException
      logger.info "#{name} already created"
      return if stream_status == 'ACTIVE'
      logger.info "Not ready for traffic.  Wait for 30 seconds..."
      sleep 1
      @saved = true # acts like it would if it had to create the stream
      @linked = true
    else
      raise
    end
  end
end