Module: Smash::CloudPowers::Synapse::Queue
- Includes:
- AwsResources, Helpers
- Included in:
- Smash::CloudPowers::SelfAwareness, Smash::CloudPowers::Synapse, Board
- Defined in:
- lib/cloud_powers/synapse/queue.rb,
lib/cloud_powers/synapse/queue/board.rb,
lib/cloud_powers/synapse/queue/poller.rb
Defined Under Namespace
Instance Method Summary collapse
-
#best_guess_address(board_name = @name) ⇒ Object
Gives a best guess at the URL that points toward this Resource’s Queue.
-
#board_name(arg) ⇒ Object
This method can be used to parse a queue name from its address.
-
#build_board(name:, client: sqs, **config) ⇒ Object
This method builds a Queue::QueueResource object for you to use but doesn’t invoke the
#create!()method, so no API call is made to create the Queue on SQS. - #build_queue(name:, type: :board, **config) ⇒ Object
-
#create_board(name:, client: sqs, **config) ⇒ Object
This method allows you to create a queue on SQS without explicitly creating a QueueResource object.
- #create_queue(name:, type: :board, **config) ⇒ Object
-
#delete_queue_message(queue, opts = {}) ⇒ Object
Deletes a queue message without caring about reading/interacting with the message.
-
#get_queue_message_count(resource_url) ⇒ Object
This method is used to get the approximate count of messages in a given queue.
-
#get_queue_poller(name, client = sqs) ⇒ Object
Get an Aws::SQS::QueuePoller for a a queue to deal with messages.
-
#pluck_queue_message(resource) ⇒ Object
Get a message from a Queue.
-
#poll(queue_name, client: sqs, poller: get_queue_poller(queue_name), **config) ⇒ Object
Polls the given resource with the given options hash and a block that interacts with the message that is retrieved from the queue.
-
#queue_exists?(name) ⇒ Boolean
Checks SQS for the existence of this queue using the
#queue_search()method. -
#queue_name(arg) ⇒ Object
This method can be used to parse a queue name from its address.
-
#queue_poller_name(arg) ⇒ Object
This method can be used to parse a queue poller name from its queue name or
@url. -
#queue_search(name) ⇒ Object
Searches for a queue based on the name.
-
#send_queue_message(address, message, this_sqs = sqs) ⇒ Object
Sends a given message to a given queue.
Methods included from Helpers
#create_logger, #log_file, #logger
Methods included from PathHelp
#job_exist?, #job_home, #job_path, #job_require_path
Methods included from LogicHelp
#attr_map, #called_from, #instance_attr_accessor, #smart_retry, #update_message_body
Methods included from LangHelp
#deep_modify_keys_with, #find_and_remove, #format_error_message, #from_json, #modify_keys_with, #to_basic_hash, #to_camel, #to_hyph, #to_i_var, #to_pascal, #to_ruby_file_name, #to_snake, #valid_json?, #valid_url?
Methods included from AwsResources
#ec2, #image, #kinesis, #queue_poller, #region, #s3, #sns, #sqs
Methods included from Zenv
#env_vars, #file_tree_search, #i_vars, #project_root, #project_root=, #system_vars, #zfind
Methods included from Auth
Instance Method Details
#best_guess_address(board_name = @name) ⇒ Object
Gives a best guess at the URL that points toward this Resource’s Queue. It uses a couple params to build a standard URL for SQS. The only problem with using this last resort is you may need to use a Queue from a different region, account or name but it can be a handy catch-all for the URLs for most cases.
Returns String
Example
best_guess_address('fooBar')
=> "https://sqs.us-west-2.amazonaws.com/12345678/fooBar"
Notes
-
See
Smash::CloudPowers::Zenv#zfind()to understand how this method finds your region and account number
65 66 67 |
# File 'lib/cloud_powers/synapse/queue.rb', line 65 def best_guess_address(board_name = @name) "https://sqs.#{zfind(:aws_region)}.amazonaws.com/#{zfind(:account_number)}/#{board_name}" end |
#board_name(arg) ⇒ Object
This method can be used to parse a queue name from its address. It can be handy if you need the name of a queue but you don’t want the overhead of creating a QueueResource object.
Parameters
-
url
String
Returns String
Example
board_name('https://sqs.us-west-53.amazonaws.com/001101010010/fooBar')
=> foo_bar_board
82 83 84 85 |
# File 'lib/cloud_powers/synapse/queue.rb', line 82 def board_name(arg) base_name = to_snake(arg.to_s.split('/').last) %r{_board$} =~ base_name ? base_name : "#{base_name}_board" end |
#build_board(name:, client: sqs, **config) ⇒ Object
This method builds a Queue::QueueResource object for you to use but doesn’t invoke the #create!() method, so no API call is made to create the Queue on SQS. This can be used if the QueueResource and/or Queue already exists.
Parameters
-
name
String- name of the Queue you want to interact with
Returns Queue::QueueResource
Example
queue_object = build_queue('exampleQueue')
queue_object.address
=> https://sqs.us-west-2.amazonaws.com/81234567/exampleQueue
119 120 121 122 123 124 125 126 127 128 129 130 |
# File 'lib/cloud_powers/synapse/queue.rb', line 119 def build_board(name:, client: sqs, **config) board_resource = Smash::CloudPowers::Synapse::Queue::Board.build( name: to_camel(name), client: sqs ) attr_map(board_resource.call_name => board_resource) do |attribute, resource| instance_attr_accessor attribute resource end board_resource end |
#build_queue(name:, type: :board, **config) ⇒ Object
87 88 89 90 91 92 93 94 |
# File 'lib/cloud_powers/synapse/queue.rb', line 87 def build_queue(name:, type: :board, **config) build_method_name = "build_#{type}" if self.respond_to? build_method_name self.public_send build_method_name, name: name, **config else build_board(name: name, **config) end end |
#create_board(name:, client: sqs, **config) ⇒ Object
This method allows you to create a queue on SQS without explicitly creating a QueueResource object
Parameters
-
name
String- The name of the Queue to be created
Returns Queue::QueueResource
Example
create_queue('exampleQueue')
143 144 145 146 147 148 149 150 151 152 153 154 |
# File 'lib/cloud_powers/synapse/queue.rb', line 143 def create_board(name:, client: sqs, **config) board_resource = Smash::CloudPowers::Synapse::Queue::Board.create!( name: to_camel(name), client: sqs ) attr_map(board_resource.call_name => board_resource) do |attribute, resource| instance_attr_accessor attribute resource end board_resource end |
#create_queue(name:, type: :board, **config) ⇒ Object
96 97 98 99 100 101 102 103 |
# File 'lib/cloud_powers/synapse/queue.rb', line 96 def create_queue(name:, type: :board, **config) create_method_name = "build_#{type}" if self.respond_to? create_method_name self.public_send create_method_name, name: name, **config else create_queue(name: name, **config) end end |
#delete_queue_message(queue, opts = {}) ⇒ Object
Deletes a queue message without caring about reading/interacting with the message. This is usually used for progress tracking, ie; a Neuron takes a message from the Backlog, moves it to WIP and deletes it from Backlog. Then repeats these steps for the remaining States in the Workflow
Parameters
-
queue
String- queue is the name of theQueueto interact with -
opts
Hash(optional) - a configurationHashfor theSQS::QueuePoller
Notes
-
throws :stop_polling after the message is deleted
Example
('exampleQueue')
# => n
('exampleQueue')
('exampleQueue')
# => n-1
173 174 175 176 177 178 |
# File 'lib/cloud_powers/synapse/queue.rb', line 173 def (queue, opts = {}) poll(queue, opts) do |msg, stats| poller(queue).(msg) throw :stop_polling end end |
#get_queue_message_count(resource_url) ⇒ Object
This method is used to get the approximate count of messages in a given queue
Parameters
-
resource_url
String- the URL for the resource you need to get a count from
Returns Float
Example
('exampleQueue')
# => n
('exampleQueue')
('exampleQueue')
# => n-1
194 195 196 197 198 199 |
# File 'lib/cloud_powers/synapse/queue.rb', line 194 def (resource_url) sqs.get_queue_attributes( queue_url: resource_url, attribute_names: ['ApproximateNumberOfMessages'] ).attributes['ApproximateNumberOfMessages'].to_f end |
#get_queue_poller(name, client = sqs) ⇒ Object
Get an Aws::SQS::QueuePoller for a a queue to deal with messages
Parameters
-
name
String- name of the queue. this also becomes the name of
the Poller object
-
client
Aws::SQS::Client(optional) - good for hitting
different regions or even stubbing for testing
208 209 210 |
# File 'lib/cloud_powers/synapse/queue.rb', line 208 def get_queue_poller(name, client = sqs) queue_poller(url: best_guess_address(name), client: client) end |
#pluck_queue_message(resource) ⇒ Object
Get a message from a Queue
Parameters
-
resource<String|symbol>: The name of the resource
Returns
-
Stringifmsg.bodyis not valid JSON -
Hashifmsg.bodyis valid JSON
Example
# msg.body == 'Hey' # +String+
('exampleQueue')
# => 'Hey' # +String+
# msg.body == "\{"tally":"ho"\}" # +JSON+
('exampleQueue')
# => { 'tally' => 'ho' } # +Hash+
229 230 231 232 233 234 |
# File 'lib/cloud_powers/synapse/queue.rb', line 229 def (resource) poll(resource) do |msg, poller| poller.(msg) return valid_json?(msg.body) ? from_json(msg.body) : msg.body end end |
#poll(queue_name, client: sqs, poller: get_queue_poller(queue_name), **config) ⇒ Object
Polls the given resource with the given options hash and a block that interacts with the message that is retrieved from the queue
Parameters
-
:queue_name
String- the name of the queue that you want to poll -
:sqs
AWS::SQS:QueuePollerpolling configuration option(s) -
blockis the block that is used to interact with the message that was retrieved
Returns the results from the message and the block that interacts with the message(s)
Example
# continuously run jobs from messages in the Queue and leaves the message in the queue
# using the +:skip_delete+ parameter
poll(:backlog, :skip_delete) do |msg|
demo_job = Job.new(msg.body)
demo_job.run
end
254 255 256 257 258 259 260 261 262 |
# File 'lib/cloud_powers/synapse/queue.rb', line 254 def poll(queue_name, client: sqs, poller: get_queue_poller(queue_name), **config) results = nil poller.poll(config) do |msg| results = yield msg, poller if block_given? poller.(msg) throw :stop_polling end results end |
#queue_exists?(name) ⇒ Boolean
Checks SQS for the existence of this queue using the #queue_search() method
Parameters
-
name
String
Returns Boolean
Notes
-
See
#queue_search()
274 275 276 |
# File 'lib/cloud_powers/synapse/queue.rb', line 274 def queue_exists?(name) !queue_search(name).empty? end |
#queue_name(arg) ⇒ Object
This method can be used to parse a queue name from its address. It can be handy if you need the name of a queue but you don’t want the overhead of creating a QueueResource object.
Parameters
-
url
String
Returns String
Example
resource_name('https://sqs.us-west-53.amazonaws.com/001101010010/fooBar')
=> fooBar
291 292 293 294 |
# File 'lib/cloud_powers/synapse/queue.rb', line 291 def queue_name(arg) base_name = to_snake(arg.to_s.split('/').last) %r{_queue$} =~ base_name ? base_name : "#{base_name}_queue" end |
#queue_poller_name(arg) ⇒ Object
This method can be used to parse a queue poller name from its queue name or @url. It can be handy if you need the name of a queue but you don’t want the overhead of creating a QueueResource object.
Parameters
-
url
String
Returns String
Example
resource_name('https://sqs.us-west-53.amazonaws.com/001101010010/fooBar')
=> fooBar
309 310 311 312 |
# File 'lib/cloud_powers/synapse/queue.rb', line 309 def queue_poller_name(arg) base_name = to_snake(arg.to_s.split('/').last) %r{_queue_poller$} =~ base_name ? base_name : "#{base_name}_queue_poller" end |
#queue_search(name) ⇒ Object
Searches for a queue based on the name
Parameters name String
Returns queue_urls String
Example
results = queue_search('exampleQueue') # returns related URLs
results.first =~ /exampleQueue/ # regex match against the URL
325 326 327 328 329 330 331 332 333 334 |
# File 'lib/cloud_powers/synapse/queue.rb', line 325 def queue_search(name) urls = sqs.list_queues(queue_name_prefix: name).queue_urls urls.map do |url| build_board(name: queue_name(url), client: sqs) do |board| board.instance_attr_accessor :url board.url = url board end end end |
#send_queue_message(address, message, this_sqs = sqs) ⇒ Object
Sends a given message to a given queue
Parameters
-
address
String- address of the Queue you want to interact with -
message
String- message to be sent
Returns Array<String> - Array of URLs
Example
legit_address = 'https://sqs.us-west-2.amazonaws.com/12345678/exampleQueue'
random_message = 'Wowza, this is pretty easy.'
resp = send_queue_message(legit_address, random_message))
resp.message_id
=> 'some message id'
351 352 353 |
# File 'lib/cloud_powers/synapse/queue.rb', line 351 def (address, , this_sqs = sqs) this_sqs.(queue_url: address, message_body: ) end |