Module: Smash::CloudPowers::Synapse::Queue
- Includes:
- AwsResources, Helpers
- Included in:
- Smash::CloudPowers::SelfAwareness, Smash::CloudPowers::Synapse, Board
- Defined in:
- lib/cloud_powers/synapse/queue.rb,
lib/cloud_powers/synapse/queue/board.rb,
lib/cloud_powers/synapse/queue/poller.rb
Defined Under Namespace
Instance Method Summary collapse
-
#best_guess_address(board_name = @name) ⇒ Object
Gives a best guess at the URL that points toward this Resource’s Queue.
-
#board_name(arg) ⇒ Object
This method can be used to parse a queue name from its address.
-
#build_board(name:, client: sqs, **config) ⇒ Object
This method builds a Queue::QueueResource object for you to use but doesn’t invoke the
#create!()method, so no API call is made to create the Queue on SQS. -
#build_queue(name:, type: :board, **config) ⇒ Object
Build a Smash::CloudPowers::Synapse::Queue and the getter and setter methods.
-
#create_board(name:, client: sqs, **config) ⇒ Object
This method allows you to create a queue on SQS without explicitly creating a QueueResource object.
-
#create_queue(name:, type: :board, **config) ⇒ Object
Create a
Smash::CloudPowers::Synapse::Queue, the getter and setter methods and the real object, either out in the wild or right here, on disk; e.g. -
#delete_queue_message(queue, opts = {}) ⇒ Object
Deletes a queue message without caring about reading/interacting with the message.
-
#get_queue_message_count(resource_url) ⇒ Object
This method is used to get the approximate count of messages in a given queue.
-
#get_queue_poller(name, client = sqs) ⇒ Object
Get an Aws::SQS::QueuePoller for a a queue to deal with messages.
-
#pluck_queue_message(resource) ⇒ Object
Get a message from a Queue.
-
#poll(queue_name, client: sqs, poller: get_queue_poller(queue_name), **config) ⇒ Object
Polls the given resource with the given options hash and a block that interacts with the message that is retrieved from the queue.
-
#queue_exists?(name) ⇒ Boolean
Checks SQS for the existence of this queue using the
#queue_search()method. -
#queue_name(arg) ⇒ Object
This method can be used to parse a queue name from its address.
-
#queue_poller_name(arg) ⇒ Object
This method can be used to parse a queue poller name from its queue name or
@url. -
#queue_search(name) ⇒ Object
Searches for a queue based on the name.
-
#send_queue_message(address, message, this_sqs = sqs) ⇒ Object
Sends a given message to a given queue.
Methods included from Helpers
#create_logger, #log_file, #logger
Methods included from PathHelp
#common_delimiter, #expand_path, #file_exists?, #file_search, #filename?, #job_exist?, #job_path, #job_require_path, #path_search, #paths_gcd, #paths_lcd, #to_path, #to_pathname, #to_realpath, #touch, #zlib_path
Methods included from LogicHelp
#attr_map, #called_from, #i_var_hash, #instance_attr_accessor, #smart_retry, #update_message_body
Methods included from LangHelp
#deep_modify_keys_with, #extract!, #find_and_remove, #format_error_message, #from_json, #modify_keys_with, #to_basic_hash, #to_camel, #to_hyph, #to_i_var, #to_pascal, #to_ruby_file_name, #to_snake, #valid_json?, #valid_url?
Methods included from AwsResources
#ec2, #image, #kinesis, #queue_poller, #region, #s3, #sns, #sqs
Methods included from Zenv
#env_vars, #i_vars, #lsof_cwd, #pid, #proc_cwd, #process_search, #project_root, #project_root=, #ps_cwd, #system_vars, #zfind, #zselect
Methods included from Auth
Instance Method Details
#best_guess_address(board_name = @name) ⇒ Object
Gives a best guess at the URL that points toward this Resource’s Queue. It uses a couple params to build a standard URL for SQS. The only problem with using this last resort is you may need to use a Queue from a different region, account or name but it can be a handy catch-all for the URLs for most cases.
Returns String
Example
best_guess_address('fooBar')
=> "https://sqs.us-west-2.amazonaws.com/12345678/fooBar"
Notes
-
See
Smash::CloudPowers::Zenv#zfind()to understand how this method finds your region and account number
65 66 67 |
# File 'lib/cloud_powers/synapse/queue.rb', line 65 def best_guess_address(board_name = @name) "https://sqs.#{zfind(:aws_region)}.amazonaws.com/#{zfind(:account_number)}/#{board_name}" end |
#board_name(arg) ⇒ Object
This method can be used to parse a queue name from its address. It can be handy if you need the name of a queue but you don’t want the overhead of creating a QueueResource object.
Parameters
-
url
String
Returns String
Example
board_name('https://sqs.us-west-53.amazonaws.com/001101010010/fooBar')
=>
82 83 84 85 |
# File 'lib/cloud_powers/synapse/queue.rb', line 82 def board_name(arg) base_name = to_snake(arg.to_s.split('/').last) %r{_board$} =~ base_name ? base_name : "#{base_name}_board" end |
#build_board(name:, client: sqs, **config) ⇒ Object
This method builds a Queue::QueueResource object for you to use but doesn’t invoke the #create!() method, so no API call is made to create the Queue on SQS. This can be used if the QueueResource and/or Queue already exists.
Parameters
-
name
String- name of the Queue you want to interact with
Returns Queue::QueueResource
Example
queue_object = build_queue('exampleQueue')
queue_object.address
=> https://sqs.us-west-2.amazonaws.com/81234567/exampleQueue
172 173 174 175 176 177 178 179 180 181 182 183 |
# File 'lib/cloud_powers/synapse/queue.rb', line 172 def build_board(name:, client: sqs, **config) board_resource = Smash::CloudPowers::Synapse::Queue::Board.build( name: to_camel(name), client: client, **config ) attr_map(board_resource.call_name => board_resource) do |attribute, resource| instance_attr_accessor attribute resource end board_resource end |
#build_queue(name:, type: :board, **config) ⇒ Object
Build a Smash::CloudPowers::Synapse::Queue and the getter and setter methods. This method uses the Creatable and Resource interface for consistency throughout CloudPowers.
Parameters
-
:nameString- the name of the Queue will be used to create a getter and setter. Please seeCloudPowers::Creatable -
typeString|Symbol- Default isSmash::CloudPowers::Synapse::Queue::Board. As long as the type you request exists, you can use it but only the Classes that useCloudPowers::Creatableand extend fromCloudPowers::Resourceare supported. Please seeCloudPowers::CreatableandCloudPowers::Resource -
config KeywordArguments - any config you want to pass along.
Returns Smash::CloudPowers::Synapse::Queue[::?]
Notes:
-
See
create_queueto create the real object; e.g. anAWS::SQS::Queue. This method just creates a representation of that object in memory. It can be linked up to a real Object or left as is and used later. -
See
CloudPowers::Creatable -
See
CloudPowers::Resource
112 113 114 115 116 117 118 119 |
# File 'lib/cloud_powers/synapse/queue.rb', line 112 def build_queue(name:, type: :board, **config) build_method_name = "build_#{type}" if self.respond_to? build_method_name self.public_send build_method_name, name: name, **config else build_board(name: name, **config) end end |
#create_board(name:, client: sqs, **config) ⇒ Object
This method allows you to create a queue on SQS without explicitly creating a QueueResource object
Parameters
-
name
String- The name of the Queue to be created
Returns Queue::QueueResource
Example
create_queue('exampleQueue')
196 197 198 199 200 201 202 203 204 205 206 207 |
# File 'lib/cloud_powers/synapse/queue.rb', line 196 def create_board(name:, client: sqs, **config) board_resource = Smash::CloudPowers::Synapse::Queue::Board.create!( name: to_camel(name), client: sqs ) attr_map(board_resource.call_name => board_resource) do |attribute, resource| instance_attr_accessor attribute resource end board_resource end |
#create_queue(name:, type: :board, **config) ⇒ Object
Create a Smash::CloudPowers::Synapse::Queue, the getter and setter methods and the real object, either out in the wild or right here, on disk; e.g. create an Aws::SQS::Queue in AWS and the appropriately mapped Smash::CloudPowers::Synapse::Queue[::?] object, locally to use in other methods, etc. This method uses the Creatable and Resource interface for consistency throughout CloudPowers.
Parameters
-
:nameString- the name of the Queue will be used to create a getter and setter. Please seeCloudPowers::Creatable -
typeString|Symbol- Default isSmash::CloudPowers::Synapse::Queue::Board. As long as the type you request exists, you can use it but only the Classes that useCloudPowers::Creatableand extend fromCloudPowers::Resourceare supported. Please seeCloudPowers::CreatableandCloudPowers::Resource -
config KeywordArguments - any config you want to pass along.
Returns Smash::CloudPowers::Synapse::Queue[::?]
Notes:
-
See
build_queueto create the real object; e.g. anAWS::SQS::Queue. This method just creates a representation of that object in memory. It can be linked up to a real Object or left as is and used later. -
See
CloudPowers::Creatable -
See
CloudPowers::Resource
149 150 151 152 153 154 155 156 |
# File 'lib/cloud_powers/synapse/queue.rb', line 149 def create_queue(name:, type: :board, **config) create_method_name = "build_#{type}" if self.respond_to? create_method_name self.public_send create_method_name, name: name, **config else create_queue(name: name, **config) end end |
#delete_queue_message(queue, opts = {}) ⇒ Object
Deletes a queue message without caring about reading/interacting with the message. This is usually used for progress tracking, ie; a Neuron takes a message from the Backlog, moves it to WIP and deletes it from Backlog. Then repeats these steps for the remaining States in the Workflow
Parameters
-
queue
String- queue is the name of theQueueto interact with -
opts
Hash(optional) - a configurationHashfor theSQS::QueuePoller
Notes
-
throws :stop_polling after the message is deleted
Example
('exampleQueue')
# => n
('exampleQueue')
('exampleQueue')
# => n-1
226 227 228 229 230 231 |
# File 'lib/cloud_powers/synapse/queue.rb', line 226 def (queue, opts = {}) poll(queue, opts) do |msg, stats| poller(queue).(msg) throw :stop_polling end end |
#get_queue_message_count(resource_url) ⇒ Object
This method is used to get the approximate count of messages in a given queue
Parameters
-
resource_url
String- the URL for the resource you need to get a count from
Returns Float
Example
('exampleQueue')
# => n
('exampleQueue')
('exampleQueue')
# => n-1
247 248 249 250 251 252 |
# File 'lib/cloud_powers/synapse/queue.rb', line 247 def (resource_url) sqs.get_queue_attributes( queue_url: resource_url, attribute_names: ['ApproximateNumberOfMessages'] ).attributes['ApproximateNumberOfMessages'].to_f end |
#get_queue_poller(name, client = sqs) ⇒ Object
Get an Aws::SQS::QueuePoller for a a queue to deal with messages
Parameters
-
name
String- name of the queue. this also becomes the name of
the Poller object
-
client
Aws::SQS::Client(optional) - good for hitting
different regions or even stubbing for testing
261 262 263 |
# File 'lib/cloud_powers/synapse/queue.rb', line 261 def get_queue_poller(name, client = sqs) queue_poller(url: best_guess_address(name), client: client) end |
#pluck_queue_message(resource) ⇒ Object
Get a message from a Queue
Parameters
-
resource<String|symbol>: The name of the resource
Returns
-
Stringifmsg.bodyis not valid JSON -
Hashifmsg.bodyis valid JSON
Example
# msg.body == 'Hey' # +String+
('exampleQueue')
# => 'Hey' # +String+
# msg.body == "\{"tally":"ho"\}" # +JSON+
('exampleQueue')
# => { 'tally' => 'ho' } # +Hash+
282 283 284 285 286 287 |
# File 'lib/cloud_powers/synapse/queue.rb', line 282 def (resource) poll(resource) do |msg, poller| poller.(msg) return valid_json?(msg.body) ? from_json(msg.body) : msg.body end end |
#poll(queue_name, client: sqs, poller: get_queue_poller(queue_name), **config) ⇒ Object
Polls the given resource with the given options hash and a block that interacts with the message that is retrieved from the queue
Parameters
-
:queue_name
String- the name of the queue that you want to poll -
:sqs
AWS::SQS:QueuePollerpolling configuration option(s) -
blockis the block that is used to interact with the message that was retrieved
Returns the results from the message and the block that interacts with the message(s)
Example
# continuously run jobs from messages in the Queue and leaves the message in the queue
# using the +:skip_delete+ parameter
poll(:backlog, :skip_delete) do |msg|
demo_job = Job.new(msg.body)
demo_job.run
end
307 308 309 310 311 312 313 314 315 |
# File 'lib/cloud_powers/synapse/queue.rb', line 307 def poll(queue_name, client: sqs, poller: get_queue_poller(queue_name), **config) results = nil poller.poll(config) do |msg| results = yield msg, poller if block_given? poller.(msg) throw :stop_polling end results end |
#queue_exists?(name) ⇒ Boolean
Checks SQS for the existence of this queue using the #queue_search() method
Parameters
-
name
String
Returns Boolean
Notes
-
See
#queue_search()
327 328 329 |
# File 'lib/cloud_powers/synapse/queue.rb', line 327 def queue_exists?(name) !queue_search(name).empty? end |
#queue_name(arg) ⇒ Object
This method can be used to parse a queue name from its address. It can be handy if you need the name of a queue but you don’t want the overhead of creating a QueueResource object.
Parameters
-
url
String
Returns String
Example
resource_name('https://sqs.us-west-53.amazonaws.com/001101010010/fooBar')
=> fooBar
344 345 346 347 |
# File 'lib/cloud_powers/synapse/queue.rb', line 344 def queue_name(arg) base_name = to_snake(arg.to_s.split('/').last) %r{_queue$} =~ base_name ? base_name : "#{base_name}_queue" end |
#queue_poller_name(arg) ⇒ Object
This method can be used to parse a queue poller name from its queue name or @url. It can be handy if you need the name of a queue but you don’t want the overhead of creating a QueueResource object.
Parameters
-
url
String
Returns String
Example
resource_name('https://sqs.us-west-53.amazonaws.com/001101010010/fooBar')
=> fooBar
362 363 364 365 |
# File 'lib/cloud_powers/synapse/queue.rb', line 362 def queue_poller_name(arg) base_name = to_snake(arg.to_s.split('/').last) %r{_queue_poller$} =~ base_name ? base_name : "#{base_name}_queue_poller" end |
#queue_search(name) ⇒ Object
Searches for a queue based on the name
Parameters name String
Returns queue_urls String
Example
results = queue_search('exampleQueue') # returns related URLs
results.first =~ /exampleQueue/ # regex match against the URL
378 379 380 381 382 383 384 385 386 387 388 389 390 391 |
# File 'lib/cloud_powers/synapse/queue.rb', line 378 def queue_search(name) urls = sqs.list_queues(queue_name_prefix: name).queue_urls # TODO: allow a collection of blocks to be itterated through. Each one # would be able to further scope down a set of previous results then # pass it to the next. When there are no scoping blocks left, build # the boards and return them. Saves a TON on memory and time urls.map do |url| build_board(name: queue_name(url), client: sqs) do |board| board.instance_attr_accessor :url board.url = url board end end end |
#send_queue_message(address, message, this_sqs = sqs) ⇒ Object
Sends a given message to a given queue
Parameters
-
address
String- address of the Queue you want to interact with -
message
String- message to be sent
Returns Array<String> - Array of URLs
Example
legit_address = 'https://sqs.us-west-2.amazonaws.com/12345678/exampleQueue'
= 'Wowza, this is pretty easy.'
resp = (legit_address, ))
resp.
=> 'some message id'
408 409 410 |
# File 'lib/cloud_powers/synapse/queue.rb', line 408 def (address, , this_sqs = sqs) this_sqs.(queue_url: address, message_body: ) end |