Module: Smash::CloudPowers::Synapse::Pipe

Includes:
AwsResources, Helper, Helpers, Zenv
Included in:
Smash::CloudPowers::SelfAwareness, Smash::CloudPowers::Synapse
Defined in:
lib/cloud_powers/synapse/pipe.rb,
lib/cloud_powers/synapse/pipe/pipe.rb,
lib/cloud_powers/synapse/pipe/stream.rb

Defined Under Namespace

Classes: Stream

Instance Method Summary collapse

Methods included from Zenv

#env_vars, #file_tree_search, #i_vars, #project_root, #project_root=, #system_vars, #zfind

Methods included from Helpers

#create_logger, #log_file, #logger

Methods included from PathHelp

#job_exist?, #job_home, #job_path, #job_require_path

Methods included from LogicHelp

#attr_map, #called_from, #instance_attr_accessor, #smart_retry, #update_message_body

Methods included from LangHelp

#deep_modify_keys_with, #find_and_remove, #format_error_message, #from_json, #modify_keys_with, #to_basic_hash, #to_camel, #to_hyph, #to_i_var, #to_pascal, #to_ruby_file_name, #to_snake, #valid_json?, #valid_url?

Methods included from AwsResources

#ec2, #image, #kinesis, #queue_poller, #region, #s3, #sns, #sqs

Methods included from Auth

creds, region

Instance Method Details

#build_pipe(name:, type: :stream, **config) ⇒ Object



11
12
13
14
15
16
17
18
# File 'lib/cloud_powers/synapse/pipe.rb', line 11

def build_pipe(name:, type: :stream, **config)
  build_method_name = "build_#{type}"
  if self.respond_to? build_method_name
    self.public_send build_method_name, name: name, **config
  else
    build_stream(name: name, **config)
  end
end

#build_stream(name:, client: kinesis, **config) ⇒ Object

Create a Kinesis stream or wait until the stream with the given name is through being created.

Parameters

  • name String

Returns Boolean or nil

  • returns true or false if the request was successful or not

  • returns true if the stream has already been created

  • returns false if the stream was not created



39
40
41
42
43
44
45
46
47
48
49
50
# File 'lib/cloud_powers/synapse/pipe.rb', line 39

def build_stream(name:, client: kinesis, **config)
  stream_resource = Smash::CloudPowers::Synapse::Pipe::Stream.build(
    name: name, client: client, **config
  )

  self.attr_map(stream_resource.call_name => stream_resource) do |attribute, resource|
    instance_attr_accessor attribute
    resource
  end

  stream_resource
end

#create_pipe(name:, type: :stream, **config) ⇒ Object



20
21
22
23
24
25
26
27
# File 'lib/cloud_powers/synapse/pipe.rb', line 20

def create_pipe(name:, type: :stream, **config)
  create_method_name = "create_#{type}"
  if self.respond_to? create_method_name
    self.public_send create_method_name, name: name, **config
  else
    create_stream(name: name, **config)
  end
end

#create_stream(name) ⇒ Object

Create a Kinesis stream or wait until the stream with the given name is through being created.

Parameters

  • name String

Returns Boolean or nil

  • returns true or false if the request was successful or not

  • returns true if the stream has already been created

  • returns false if the stream was not created



19
20
21
22
23
24
25
26
27
28
29
# File 'lib/cloud_powers/synapse/pipe/pipe.rb', line 19

def create_stream(name:, client: kinesis, **config)
  stream_resource =
    Smash::CloudPowers::Synapse::Pipe::Stream.create!(name: name, client: client, **config)

  self.attr_map(stream_resource.call_name => stream_resource) do |attribute, resource|
    instance_attr_accessor attribute
    resource
  end

  stream_resource
end

#flow_from_pipe(stream) ⇒ Object

Use the KCL and LangDaemon to read from a stream

Parameters stream String

Notes

  • This method is not implemented yet (V 0.2.7)



69
70
71
# File 'lib/cloud_powers/synapse/pipe.rb', line 69

def flow_from_pipe(stream)
  throw NotImplementedError
end

#flow_to_pipe(stream) ⇒ Object

Sends data through a Pipe. This method is used for lower throughput applications, e.g. logging, status updates

Parameters

  • stream String

Returns Notes

  • This method is not implemented yet (V 0.2.7)



84
85
86
87
88
89
90
91
92
93
94
95
# File 'lib/cloud_powers/synapse/pipe.rb', line 84

def flow_to_pipe(stream)
  throw NotImplementedError
  create_stream(stream) unless stream_exists? stream
  records = yield if block_given?
  body = message_body_collection(records)
  puts body
  # TODO: this isn't working yet.  figure out retry logic
  # resp = kinesis.put_records(body)
  # retry(lambda { stream_exists? stream }) flow_to(stream)
  @last_sequence_number = resp.records.map(&:sequence_number).sort.last
  # TODO: what to return? true?
end

#from_pipe(stream) ⇒ Object

Read messages from the Pipe without using the KCL

Parameters stream String

Notes

  • This method is not implemented yet (V 0.2.7)



103
104
105
106
# File 'lib/cloud_powers/synapse/pipe.rb', line 103

def from_pipe(stream)
  # implement get_records and/or other consuming app stuff
  throw NotImplementedError
end

#message_body_collection(records) ⇒ Object

This message will prepare a set of collections to be sent through the Pipe

Parameters

  • records

Notes

  • This method is not implemented yet (V 0.2.7)



115
116
117
# File 'lib/cloud_powers/synapse/pipe.rb', line 115

def message_body_collection(records)
  throw NotImplementedError
end

#pipe_message_body(opts = {}) ⇒ Object

Default message package. This method yields the basic configuration and message body for a stream and all options can be changed.

Parameters opts Hash (optional)

  • stream_name: String name of the stream to pipe to

  • data: String message to send

  • partition_key: String defaults to @instance_id

Returns Hash

Notes

  • See #zfind()

  • See #instance_id()

  • See #update_message_body()



134
135
136
137
138
139
140
# File 'lib/cloud_powers/synapse/pipe.rb', line 134

def pipe_message_body(opts = {})
  {
    stream_name:      zfind(opts[:stream_name]) || zfind('status_stream'),
    data:             opts[:data] || update_message_body(opts),
    partition_key:    opts[:partition_key] || @instance_id || 'unk'
  }
end

#pipe_status(name) ⇒ Object

Get the status name for this stream

Parameters *name String

Returns String - stream status, one of: CREATING, DELETING, ACTIVE or UPDATING



205
206
207
# File 'lib/cloud_powers/synapse/pipe.rb', line 205

def pipe_status(name)
  kinesis.describe_stream(stream_name: name).stream_description.stream_status
end

#pipe_to(stream) ⇒ Object

Use Kinesis streams to send a message. The message is given to the method through a block that gets passed to the method.

Parameters

  • stream String

  • block - a block that generates a string that will be used in the message body

Returns the sequence_number from the sent message.

Example

pipe_to(:status_stream) do
  # the return from the inner method is what is sent
  do_some_stuff_to_generate_a_message()
end


157
158
159
160
161
162
163
164
165
# File 'lib/cloud_powers/synapse/pipe.rb', line 157

def pipe_to(stream)
  message = ''
  create_stream() unless stream_exists?(stream)
  message = yield if block_given?
  body = update_message_body(message)
  resp = kinesis.put_record pipe_message_body(stream_name: stream, data: body.to_json)
  # TODO: implement retry logic for failed request
  @last_sequence_number = resp.sequence_number
end

#stream_config(opts = {}) ⇒ Object

New stream config with sensible defaults

Parameters

  • opts Hash (optional)

    • stream_name - the name to give the stream

    • shard_count - the number of shards to create



173
174
175
176
177
178
# File 'lib/cloud_powers/synapse/pipe.rb', line 173

def stream_config(opts = {})
  {
    stream_name: opts[:stream_name] || zfind(:status_stream),
    shard_count: opts[:shard_count] || 1
  }
end

#stream_exists?(name) ⇒ Boolean

Find out if the stream already exists.

Parameters

  • name String

Returns Boolean

Returns:

  • (Boolean)


187
188
189
190
191
192
193
194
195
196
# File 'lib/cloud_powers/synapse/pipe.rb', line 187

def stream_exists?(name)
  return true unless zfind(name).nil?

  begin
    kinesis.describe_stream(stream_name: name)
    true
  rescue Aws::Kinesis::Errors::ResourceNotFoundException
    false
  end
end

#stream_status(name) ⇒ Object

Get the status name for this stream

Parameters *name String

Returns String - stream status, one of: CREATING, DELETING, ACTIVE or UPDATING



180
181
182
# File 'lib/cloud_powers/synapse/pipe/pipe.rb', line 180

def stream_status(name)
  kinesis.describe_stream(stream_name: name).stream_description.stream_status
end