Module: Smash::CloudPowers::Synapse::Queue

Includes:
AwsResources, Helpers
Included in:
Smash::CloudPowers::SelfAwareness, Smash::CloudPowers::Synapse, Board
Defined in:
lib/cloud_powers/synapse/queue.rb,
lib/cloud_powers/synapse/queue/board.rb,
lib/cloud_powers/synapse/queue/poller.rb

Defined Under Namespace

Classes: Board, NUMap, Poller

Instance Method Summary collapse

Methods included from Helpers

#create_logger, #log_file, #logger

Methods included from PathHelp

#common_delimiter, #expand_path, #file_exists?, #file_search, #filename?, #job_exist?, #job_path, #job_require_path, #path_search, #paths_gcd, #paths_lcd, #to_path, #to_pathname, #to_realpath, #touch, #zlib_path

Methods included from LogicHelp

#attr_map, #called_from, #i_var_hash, #instance_attr_accessor, #smart_retry, #update_message_body

Methods included from LangHelp

#deep_modify_keys_with, #extract!, #find_and_remove, #format_error_message, #from_json, #modify_keys_with, #to_basic_hash, #to_camel, #to_hyph, #to_i_var, #to_pascal, #to_ruby_file_name, #to_snake, #valid_json?, #valid_url?

Methods included from AwsResources

#ec2, #image, #kinesis, #queue_poller, #region, #s3, #sns, #sqs

Methods included from Zenv

#env_vars, #i_vars, #lsof_cwd, #pid, #proc_cwd, #process_search, #project_root, #project_root=, #ps_cwd, #system_vars, #zfind, #zselect

Methods included from Auth

creds, region

Instance Method Details

#best_guess_address(board_name = @name) ⇒ Object

Gives a best guess at the URL that points toward this Resource’s Queue. It uses a couple params to build a standard URL for SQS. The only problem with using this last resort is you may need to use a Queue from a different region, account or name but it can be a handy catch-all for the URLs for most cases.

Returns String

Example

best_guess_address('fooBar')
=> "https://sqs.us-west-2.amazonaws.com/12345678/fooBar"

Notes

  • See Smash::CloudPowers::Zenv#zfind() to understand how this method finds your region and account number



65
66
67
# File 'lib/cloud_powers/synapse/queue.rb', line 65

def best_guess_address(board_name = @name)
  "https://sqs.#{zfind(:aws_region)}.amazonaws.com/#{zfind(:account_number)}/#{board_name}"
end

#board_name(arg) ⇒ Object

This method can be used to parse a queue name from its address. It can be handy if you need the name of a queue but you don’t want the overhead of creating a QueueResource object.

Parameters

  • url String

Returns String

Example

board_name('https://sqs.us-west-53.amazonaws.com/001101010010/fooBar')
=> foo_bar_board


82
83
84
85
# File 'lib/cloud_powers/synapse/queue.rb', line 82

def board_name(arg)
  base_name = to_snake(arg.to_s.split('/').last)
  %r{_board$} =~ base_name ? base_name : "#{base_name}_board"
end

#build_board(name:, client: sqs, **config) ⇒ Object

This method builds a Queue::QueueResource object for you to use but doesn’t invoke the #create!() method, so no API call is made to create the Queue on SQS. This can be used if the QueueResource and/or Queue already exists.

Parameters

  • name String - name of the Queue you want to interact with

Returns Queue::QueueResource

Example

queue_object = build_queue('exampleQueue')
queue_object.address
=> https://sqs.us-west-2.amazonaws.com/81234567/exampleQueue


172
173
174
175
176
177
178
179
180
181
182
183
# File 'lib/cloud_powers/synapse/queue.rb', line 172

def build_board(name:, client: sqs, **config)
  board_resource = Smash::CloudPowers::Synapse::Queue::Board.build(
    name: to_camel(name), client: client, **config
  )

  attr_map(board_resource.call_name => board_resource) do |attribute, resource|
    instance_attr_accessor attribute
    resource
  end

  board_resource
end

#build_queue(name:, type: :board, **config) ⇒ Object

Build a Smash::CloudPowers::Synapse::Queue and the getter and setter methods. This method uses the Creatable and Resource interface for consistency throughout CloudPowers.

Parameters

  • :name String - the name of the Queue will be used to create a getter and setter. Please see CloudPowers::Creatable

  • type String|Symbol - Default is Smash::CloudPowers::Synapse::Queue::Board. As long as the type you request exists, you can use it but only the Classes that use CloudPowers::Creatable and extend from CloudPowers::Resource are supported. Please see CloudPowers::Creatable and CloudPowers::Resource

  • config KeywordArguments - any config you want to pass along.

Returns Smash::CloudPowers::Synapse::Queue[::?]

Notes:

  • See create_queue to create the real object; e.g. an AWS::SQS::Queue. This method just creates a representation of that object in memory. It can be linked up to a real Object or left as is and used later.

  • See CloudPowers::Creatable

  • See CloudPowers::Resource



112
113
114
115
116
117
118
119
# File 'lib/cloud_powers/synapse/queue.rb', line 112

def build_queue(name:, type: :board, **config)
  build_method_name = "build_#{type}"
  if self.respond_to? build_method_name
    self.public_send build_method_name, name: name, **config
  else
    build_board(name: name, **config)
  end
end

#create_board(name:, client: sqs, **config) ⇒ Object

This method allows you to create a queue on SQS without explicitly creating a QueueResource object

Parameters

  • name String - The name of the Queue to be created

Returns Queue::QueueResource

Example

create_queue('exampleQueue')
get_queue_message_count


196
197
198
199
200
201
202
203
204
205
206
207
# File 'lib/cloud_powers/synapse/queue.rb', line 196

def create_board(name:, client: sqs, **config)
  board_resource = Smash::CloudPowers::Synapse::Queue::Board.create!(
    name: to_camel(name), client: sqs
  )

  attr_map(board_resource.call_name => board_resource) do |attribute, resource|
    instance_attr_accessor attribute
    resource
  end

  board_resource
end

#create_queue(name:, type: :board, **config) ⇒ Object

Create a Smash::CloudPowers::Synapse::Queue, the getter and setter methods and the real object, either out in the wild or right here, on disk; e.g. create an Aws::SQS::Queue in AWS and the appropriately mapped Smash::CloudPowers::Synapse::Queue[::?] object, locally to use in other methods, etc. This method uses the Creatable and Resource interface for consistency throughout CloudPowers.

Parameters

  • :name String - the name of the Queue will be used to create a getter and setter. Please see CloudPowers::Creatable

  • type String|Symbol - Default is Smash::CloudPowers::Synapse::Queue::Board. As long as the type you request exists, you can use it but only the Classes that use CloudPowers::Creatable and extend from CloudPowers::Resource are supported. Please see CloudPowers::Creatable and CloudPowers::Resource

  • config KeywordArguments - any config you want to pass along.

Returns Smash::CloudPowers::Synapse::Queue[::?]

Notes:

  • See build_queue to create the real object; e.g. an AWS::SQS::Queue. This method just creates a representation of that object in memory. It can be linked up to a real Object or left as is and used later.

  • See CloudPowers::Creatable

  • See CloudPowers::Resource



149
150
151
152
153
154
155
156
# File 'lib/cloud_powers/synapse/queue.rb', line 149

def create_queue(name:, type: :board, **config)
  create_method_name = "build_#{type}"
  if self.respond_to? create_method_name
    self.public_send create_method_name, name: name, **config
  else
    create_queue(name: name, **config)
  end
end

#delete_queue_message(queue, opts = {}) ⇒ Object

Deletes a queue message without caring about reading/interacting with the message. This is usually used for progress tracking, ie; a Neuron takes a message from the Backlog, moves it to WIP and deletes it from Backlog. Then repeats these steps for the remaining States in the Workflow

Parameters

  • queue String - queue is the name of the Queue to interact with

  • opts Hash (optional) - a configuration Hash for the SQS::QueuePoller

Notes

  • throws :stop_polling after the message is deleted

Example

get_queue_message_count('exampleQueue')
# => n
delete_queue_message('exampleQueue')
get_queue_message_count('exampleQueue')
# => n-1


226
227
228
229
230
231
# File 'lib/cloud_powers/synapse/queue.rb', line 226

def delete_queue_message(queue, opts = {})
  poll(queue, opts) do |msg, stats|
    poller(queue).delete_message(msg)
    throw :stop_polling
  end
end

#get_queue_message_count(resource_url) ⇒ Object

This method is used to get the approximate count of messages in a given queue

Parameters

  • resource_url String - the URL for the resource you need to get a count from

Returns Float

Example

get_queue_message_count('exampleQueue')
# => n
delete_queue_message('exampleQueue')
get_queue_message_count('exampleQueue')
# => n-1


247
248
249
250
251
252
# File 'lib/cloud_powers/synapse/queue.rb', line 247

def get_queue_message_count(resource_url)
  sqs.get_queue_attributes(
    queue_url: resource_url,
    attribute_names: ['ApproximateNumberOfMessages']
  ).attributes['ApproximateNumberOfMessages'].to_f
end

#get_queue_poller(name, client = sqs) ⇒ Object

Get an Aws::SQS::QueuePoller for a a queue to deal with messages

Parameters

  • name String - name of the queue. this also becomes the name of

the Poller object

  • client Aws::SQS::Client (optional) - good for hitting

different regions or even stubbing for testing



261
262
263
# File 'lib/cloud_powers/synapse/queue.rb', line 261

def get_queue_poller(name, client = sqs)
  queue_poller(url: best_guess_address(name), client: client)
end

#pluck_queue_message(resource) ⇒ Object

Get a message from a Queue

Parameters

  • resource<String|symbol>: The name of the resource

Returns

  • String if msg.body is not valid JSON

  • Hash if msg.body is valid JSON

Example

# msg.body == 'Hey' # +String+
pluck_queue_message('exampleQueue')
# => 'Hey' # +String+

# msg.body == "\{"tally":"ho"\}" # +JSON+
pluck_queue_message('exampleQueue')
# => { 'tally' => 'ho' } # +Hash+


282
283
284
285
286
287
# File 'lib/cloud_powers/synapse/queue.rb', line 282

def pluck_queue_message(resource)
  poll(resource) do |msg, poller|
    poller.delete_message(msg)
    return valid_json?(msg.body) ? from_json(msg.body) : msg.body
  end
end

#poll(queue_name, client: sqs, poller: get_queue_poller(queue_name), **config) ⇒ Object

Polls the given resource with the given options hash and a block that interacts with the message that is retrieved from the queue

Parameters

  • :queue_name String - the name of the queue that you want to poll

  • :sqs AWS::SQS:QueuePoller polling configuration option(s)

  • block is the block that is used to interact with the message that was retrieved

Returns the results from the message and the block that interacts with the message(s)

Example

# continuously run jobs from messages in the Queue and leaves the message in the queue
# using the +:skip_delete+ parameter
poll(:backlog, :skip_delete) do |msg|
  demo_job = Job.new(msg.body)
  demo_job.run
end


307
308
309
310
311
312
313
314
315
# File 'lib/cloud_powers/synapse/queue.rb', line 307

def poll(queue_name, client: sqs, poller: get_queue_poller(queue_name), **config)
  results = nil
  poller.poll(config) do |msg|
    results = yield msg, poller if block_given?
    poller.delete_message(msg)
    throw :stop_polling
  end
  results
end

#queue_exists?(name) ⇒ Boolean

Checks SQS for the existence of this queue using the #queue_search() method

Parameters

  • name String

Returns Boolean

Notes

  • See #queue_search()

Returns:

  • (Boolean)


327
328
329
# File 'lib/cloud_powers/synapse/queue.rb', line 327

def queue_exists?(name)
  !queue_search(name).empty?
end

#queue_name(arg) ⇒ Object

This method can be used to parse a queue name from its address. It can be handy if you need the name of a queue but you don’t want the overhead of creating a QueueResource object.

Parameters

  • url String

Returns String

Example

resource_name('https://sqs.us-west-53.amazonaws.com/001101010010/fooBar')
=> fooBar


344
345
346
347
# File 'lib/cloud_powers/synapse/queue.rb', line 344

def queue_name(arg)
  base_name = to_snake(arg.to_s.split('/').last)
  %r{_queue$} =~ base_name ? base_name : "#{base_name}_queue"
end

#queue_poller_name(arg) ⇒ Object

This method can be used to parse a queue poller name from its queue name or @url. It can be handy if you need the name of a queue but you don’t want the overhead of creating a QueueResource object.

Parameters

  • url String

Returns String

Example

resource_name('https://sqs.us-west-53.amazonaws.com/001101010010/fooBar')
=> fooBar


362
363
364
365
# File 'lib/cloud_powers/synapse/queue.rb', line 362

def queue_poller_name(arg)
  base_name = to_snake(arg.to_s.split('/').last)
  %r{_queue_poller$} =~ base_name ? base_name : "#{base_name}_queue_poller"
end

#queue_search(name) ⇒ Object

Searches for a queue based on the name

Parameters name String

Returns queue_urls String

Example

results = queue_search('exampleQueue') # returns related URLs
results.first =~ /exampleQueue/ # regex match against the URL


378
379
380
381
382
383
384
385
386
387
388
389
390
391
# File 'lib/cloud_powers/synapse/queue.rb', line 378

def queue_search(name)
  urls = sqs.list_queues(queue_name_prefix: name).queue_urls
  # TODO: allow a collection of blocks to be itterated through. Each one
  # would be able to further scope down a set of previous results then
  # pass it to the next.  When there are no scoping blocks left, build
  # the boards and return them.  Saves a TON on memory and time
  urls.map do |url|
    build_board(name: queue_name(url), client: sqs) do |board|
      board.instance_attr_accessor :url
      board.url = url
      board
    end
  end
end

#send_queue_message(address, message, this_sqs = sqs) ⇒ Object

Sends a given message to a given queue

Parameters

  • address String - address of the Queue you want to interact with

  • message String - message to be sent

Returns Array<String> - Array of URLs

Example

legit_address = 'https://sqs.us-west-2.amazonaws.com/12345678/exampleQueue'
random_message = 'Wowza, this is pretty easy.'
resp = send_queue_message(legit_address, random_message))
resp.message_id
=> 'some message id'


408
409
410
# File 'lib/cloud_powers/synapse/queue.rb', line 408

def send_queue_message(address, message, this_sqs = sqs)
  this_sqs.send_message(queue_url: address, message_body: message)
end