Module: Smash::CloudPowers::Synapse::Pipe
- Includes:
- AwsResources, Helper, Helpers, Zenv
- Included in:
- Smash::CloudPowers::SelfAwareness, Smash::CloudPowers::Synapse
- Defined in:
- lib/cloud_powers/synapse/pipe.rb,
lib/cloud_powers/synapse/pipe/pipe.rb,
lib/cloud_powers/synapse/pipe/stream.rb
Defined Under Namespace
Classes: Stream
Instance Method Summary collapse
- #build_pipe(name:, type: :stream, **config) ⇒ Object
-
#build_stream(name:, client: kinesis, **config) ⇒ Object
Create a Kinesis stream or wait until the stream with the given name is through being created.
- #create_pipe(name:, type: :stream, **config) ⇒ Object
-
#create_stream(name) ⇒ Object
Create a Kinesis stream or wait until the stream with the given name is through being created.
-
#flow_from_pipe(stream) ⇒ Object
Use the KCL and LangDaemon to read from a stream.
-
#flow_to_pipe(stream) ⇒ Object
Sends data through a Pipe.
-
#from_pipe(stream) ⇒ Object
Read messages from the Pipe without using the KCL.
-
#message_body_collection(records) ⇒ Object
This message will prepare a set of collections to be sent through the Pipe.
-
#pipe_message_body(opts = {}) ⇒ Object
Default message package.
-
#pipe_status(name) ⇒ Object
Get the status name for this stream.
-
#pipe_to(stream) ⇒ Object
Use Kinesis streams to send a message.
-
#stream_config(opts = {}) ⇒ Object
New stream config with sensible defaults.
-
#stream_exists?(name) ⇒ Boolean
Find out if the stream already exists.
-
#stream_status(name) ⇒ Object
Get the status name for this stream.
Methods included from Zenv
#env_vars, #file_tree_search, #i_vars, #project_root, #project_root=, #system_vars, #zfind
Methods included from Helpers
#create_logger, #log_file, #logger
Methods included from PathHelp
#job_exist?, #job_home, #job_path, #job_require_path
Methods included from LogicHelp
#attr_map, #called_from, #instance_attr_accessor, #smart_retry, #update_message_body
Methods included from LangHelp
#deep_modify_keys_with, #find_and_remove, #format_error_message, #from_json, #modify_keys_with, #to_basic_hash, #to_camel, #to_hyph, #to_i_var, #to_pascal, #to_ruby_file_name, #to_snake, #valid_json?, #valid_url?
Methods included from AwsResources
#ec2, #image, #kinesis, #queue_poller, #region, #s3, #sns, #sqs
Methods included from Auth
Instance Method Details
#build_pipe(name:, type: :stream, **config) ⇒ Object
11 12 13 14 15 16 17 18 |
# File 'lib/cloud_powers/synapse/pipe.rb', line 11 def build_pipe(name:, type: :stream, **config) build_method_name = "build_#{type}" if self.respond_to? build_method_name self.public_send build_method_name, name: name, **config else build_stream(name: name, **config) end end |
#build_stream(name:, client: kinesis, **config) ⇒ Object
Create a Kinesis stream or wait until the stream with the given name is through being created.
Parameters
-
name
String
Returns Boolean or nil
-
returns true or false if the request was successful or not
-
returns true if the stream has already been created
-
returns false if the stream was not created
39 40 41 42 43 44 45 46 47 48 49 50 |
# File 'lib/cloud_powers/synapse/pipe.rb', line 39 def build_stream(name:, client: kinesis, **config) stream_resource = Smash::CloudPowers::Synapse::Pipe::Stream.build( name: name, client: client, **config ) self.attr_map(stream_resource.call_name => stream_resource) do |attribute, resource| instance_attr_accessor attribute resource end stream_resource end |
#create_pipe(name:, type: :stream, **config) ⇒ Object
20 21 22 23 24 25 26 27 |
# File 'lib/cloud_powers/synapse/pipe.rb', line 20 def create_pipe(name:, type: :stream, **config) create_method_name = "create_#{type}" if self.respond_to? create_method_name self.public_send create_method_name, name: name, **config else create_stream(name: name, **config) end end |
#create_stream(name) ⇒ Object
Create a Kinesis stream or wait until the stream with the given name is through being created.
Parameters
-
name
String
Returns Boolean or nil
-
returns true or false if the request was successful or not
-
returns true if the stream has already been created
-
returns false if the stream was not created
19 20 21 22 23 24 25 26 27 28 29 |
# File 'lib/cloud_powers/synapse/pipe/pipe.rb', line 19 def create_stream(name:, client: kinesis, **config) stream_resource = Smash::CloudPowers::Synapse::Pipe::Stream.create!(name: name, client: client, **config) self.attr_map(stream_resource.call_name => stream_resource) do |attribute, resource| instance_attr_accessor attribute resource end stream_resource end |
#flow_from_pipe(stream) ⇒ Object
Use the KCL and LangDaemon to read from a stream
Parameters stream String
Notes
-
This method is not implemented yet (V 0.2.7)
69 70 71 |
# File 'lib/cloud_powers/synapse/pipe.rb', line 69 def flow_from_pipe(stream) throw NotImplementedError end |
#flow_to_pipe(stream) ⇒ Object
Sends data through a Pipe. This method is used for lower throughput applications, e.g. logging, status updates
Parameters
-
stream
String
Returns Notes
-
This method is not implemented yet (V 0.2.7)
84 85 86 87 88 89 90 91 92 93 94 95 |
# File 'lib/cloud_powers/synapse/pipe.rb', line 84 def flow_to_pipe(stream) throw NotImplementedError create_stream(stream) unless stream_exists? stream records = yield if block_given? body = (records) puts body # TODO: this isn't working yet. figure out retry logic # resp = kinesis.put_records(body) # retry(lambda { stream_exists? stream }) flow_to(stream) @last_sequence_number = resp.records.map(&:sequence_number).sort.last # TODO: what to return? true? end |
#from_pipe(stream) ⇒ Object
Read messages from the Pipe without using the KCL
Parameters stream String
Notes
-
This method is not implemented yet (V 0.2.7)
103 104 105 106 |
# File 'lib/cloud_powers/synapse/pipe.rb', line 103 def from_pipe(stream) # implement get_records and/or other consuming app stuff throw NotImplementedError end |
#message_body_collection(records) ⇒ Object
This message will prepare a set of collections to be sent through the Pipe
Parameters
-
records
Notes
-
This method is not implemented yet (V 0.2.7)
115 116 117 |
# File 'lib/cloud_powers/synapse/pipe.rb', line 115 def (records) throw NotImplementedError end |
#pipe_message_body(opts = {}) ⇒ Object
Default message package. This method yields the basic configuration and message body for a stream and all options can be changed.
Parameters opts Hash (optional)
-
stream_name: String name of the stream to pipe to
-
data: String message to send
-
partition_key: String defaults to @instance_id
Returns Hash
Notes
-
See #zfind()
-
See #instance_id()
-
See #update_message_body()
134 135 136 137 138 139 140 |
# File 'lib/cloud_powers/synapse/pipe.rb', line 134 def (opts = {}) { stream_name: zfind(opts[:stream_name]) || zfind('status_stream'), data: opts[:data] || (opts), partition_key: opts[:partition_key] || @instance_id || 'unk' } end |
#pipe_status(name) ⇒ Object
Get the status name for this stream
Parameters *name String
Returns String - stream status, one of: CREATING, DELETING, ACTIVE or UPDATING
205 206 207 |
# File 'lib/cloud_powers/synapse/pipe.rb', line 205 def pipe_status(name) kinesis.describe_stream(stream_name: name).stream_description.stream_status end |
#pipe_to(stream) ⇒ Object
Use Kinesis streams to send a message. The message is given to the method through a block that gets passed to the method.
Parameters
-
stream
String -
block - a block that generates a string that will be used in the message body
Returns the sequence_number from the sent message.
Example
pipe_to(:status_stream) do
# the return from the inner method is what is sent
()
end
157 158 159 160 161 162 163 164 165 |
# File 'lib/cloud_powers/synapse/pipe.rb', line 157 def pipe_to(stream) = '' create_stream() unless stream_exists?(stream) = yield if block_given? body = () resp = kinesis.put_record (stream_name: stream, data: body.to_json) # TODO: implement retry logic for failed request @last_sequence_number = resp.sequence_number end |
#stream_config(opts = {}) ⇒ Object
New stream config with sensible defaults
Parameters
-
opts
Hash(optional) -
stream_name - the name to give the stream
-
-
shard_count - the number of shards to create
-
173 174 175 176 177 178 |
# File 'lib/cloud_powers/synapse/pipe.rb', line 173 def stream_config(opts = {}) { stream_name: opts[:stream_name] || zfind(:status_stream), shard_count: opts[:shard_count] || 1 } end |
#stream_exists?(name) ⇒ Boolean
Find out if the stream already exists.
Parameters
-
name
String
Returns Boolean
187 188 189 190 191 192 193 194 195 196 |
# File 'lib/cloud_powers/synapse/pipe.rb', line 187 def stream_exists?(name) return true unless zfind(name).nil? begin kinesis.describe_stream(stream_name: name) true rescue Aws::Kinesis::Errors::ResourceNotFoundException false end end |
#stream_status(name) ⇒ Object
Get the status name for this stream
Parameters *name String
Returns String - stream status, one of: CREATING, DELETING, ACTIVE or UPDATING
180 181 182 |
# File 'lib/cloud_powers/synapse/pipe/pipe.rb', line 180 def stream_status(name) kinesis.describe_stream(stream_name: name).stream_description.stream_status end |