Module: Smash::CloudPowers::Synapse::Queue

Includes:
AwsResources, Helpers
Included in:
Smash::CloudPowers::Synapse, Board
Defined in:
lib/cloud_powers/synapse/queue.rb,
lib/cloud_powers/synapse/queue/board.rb,
lib/cloud_powers/synapse/queue/poller.rb

Defined Under Namespace

Classes: Board, NUMap, Poller

Instance Method Summary collapse

Methods included from Helpers

#create_logger, #log_file, #logger

Methods included from PathHelp

#common_delimiter, #expand_path, #file_exists?, #file_search, #filename?, #job_exist?, #job_path, #job_require_path, #path_search, #paths_gcd, #paths_lcd, #to_path, #to_pathname, #to_realpath, #touch, #zlib_path

Methods included from LogicHelp

#attr_map, #called_from, #i_var_hash, #instance_attr_accessor, #smart_retry, #update_message_body, #wait_until

Methods included from LangHelp

#deep_modify_keys_with, #extract!, #find_and_remove, #format_error_message, #from_json, #modify_keys_with, #to_basic_hash, #to_camel, #to_hyph, #to_i_var, #to_pascal, #to_ruby_file_name, #to_snake, #valid_json?, #valid_url?

Methods included from AwsResources

#ec2, #image, #kinesis, #queue_poller, #region, #s3, #sns, #sqs

Methods included from Zenv

#env_vars, #i_vars, #lsof_cwd, #pid, #proc_cwd, #process_search, #project_root, #project_root=, #ps_cwd, #system_vars, #zfind, #zselect

Methods included from Auth

creds, region

Instance Method Details

#best_guess_address(board_name = @name) ⇒ Object

Gives a best guess at the URL that points toward this Resource’s Queue. It uses a couple params to build a standard URL for SQS. The only problem with using this last resort is you may need to use a Queue from a different region, account or name but it can be a handy catch-all for the URLs for most cases.

Returns String

Example

best_guess_address('fooBar')
=> "https://sqs.us-west-2.amazonaws.com/12345678/fooBar"

Notes

  • See Smash::CloudPowers::Zenv#zfind() to understand how this method finds your region and account number



65
66
67
# File 'lib/cloud_powers/synapse/queue.rb', line 65

def best_guess_address(board_name = @name)
  "https://sqs.#{zfind(:aws_region)}.amazonaws.com/#{zfind(:account_number)}/#{board_name}"
end

#board_name(arg) ⇒ Object

This method can be used to parse a queue name from its address. It can be handy if you need the name of a queue but you don’t want the overhead of creating a QueueResource object.

Parameters

  • url String

Returns String

Example

board_name('https://sqs.us-west-53.amazonaws.com/001101010010/fooBar')
=> foo_bar_board


82
83
84
85
# File 'lib/cloud_powers/synapse/queue.rb', line 82

def board_name(arg)
  base_name = to_snake(arg.to_s.split('/').last)
  %r{_board$} =~ base_name ? base_name : "#{base_name}_board"
end

#build_board(name:, client: sqs, **config) ⇒ Object

This method builds a Queue::QueueResource object for you to use but doesn’t invoke the #create!() method, so no API call is made to create the Queue on SQS. This can be used if the QueueResource and/or Queue already exists.

Parameters

  • name String - name of the Queue you want to interact with

Returns Queue::QueueResource

Example

queue_object = build_queue('exampleQueue')
queue_object.address
=> https://sqs.us-west-2.amazonaws.com/81234567/exampleQueue


180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
# File 'lib/cloud_powers/synapse/queue.rb', line 180

def build_board(name:, client: sqs, **config)
  board_resource = Smash::CloudPowers::Synapse::Queue::Board.build(
    name: to_camel(name),
    client: client,
    project_root: config[:project_root] || project_root,
    **config
  )

  attr_map(board_resource.call_name => board_resource) do |attribute, resource|
    instance_attr_accessor attribute
    resource
  end

  board_resource
end

#build_queue(name:, type: :board, **config) ⇒ Object

Build a Smash::CloudPowers::Synapse::Queue and the getter and setter methods. This method uses the Creatable and Resource interface for consistency throughout CloudPowers.

Parameters

  • :name String - the name of the Queue will be used to create a getter and setter. Please see CloudPowers::Creatable

  • type String|Symbol - Default is Smash::CloudPowers::Synapse::Queue::Board. As long as the type you request exists, you can use it but only the Classes that use CloudPowers::Creatable and extend from CloudPowers::Resource are supported. Please see CloudPowers::Creatable and CloudPowers::Resource

  • config KeywordArguments - any config you want to pass along.

Returns Smash::CloudPowers::Synapse::Queue[::?]

Notes:

  • See create_queue to create the real object; e.g. an AWS::SQS::Queue. This method just creates a representation of that object in memory. It can be linked up to a real Object or left as is and used later.

  • See CloudPowers::Creatable

  • See CloudPowers::Resource



112
113
114
115
116
117
118
119
120
121
122
123
# File 'lib/cloud_powers/synapse/queue.rb', line 112

def build_queue(name:, type: :board, **config)
  build_method_name = "build_#{type}"
  if self.respond_to? build_method_name
    self.public_send(build_method_name,
      name: name,
      project_root: config[:project_root] || project_root,
      **config
    )
  else
    build_board(name: name, **config)
  end
end

#create_board(name:, client: sqs, **config) ⇒ Object

This method allows you to create a queue on SQS without explicitly creating a QueueResource object

Parameters

  • name String - The name of the Queue to be created

Returns Queue::QueueResource

Example

create_queue('exampleQueue')
get_queue_message_count


207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
# File 'lib/cloud_powers/synapse/queue.rb', line 207

def create_board(name:, client: sqs, **config)
  board_resource = Smash::CloudPowers::Synapse::Queue::Board.create!(
    name: to_camel(name),
    client: sqs,
    project_root: config[:project_root] || project_root,
    **config
  )

  attr_map(board_resource.call_name => board_resource) do |attribute, resource|
    instance_attr_accessor attribute
    resource
  end

  board_resource
end

#create_queue(name:, type: :board, **config) ⇒ Object

Create a Smash::CloudPowers::Synapse::Queue, the getter and setter methods and the real object, either out in the wild or right here, on disk; e.g. create an Aws::SQS::Queue in AWS and the appropriately mapped Smash::CloudPowers::Synapse::Queue[::?] object, locally to use in other methods, etc. This method uses the Creatable and Resource interface for consistency throughout CloudPowers.

Parameters

  • :name String - the name of the Queue will be used to create a getter and setter. Please see CloudPowers::Creatable

  • type String|Symbol - Default is Smash::CloudPowers::Synapse::Queue::Board. As long as the type you request exists, you can use it but only the Classes that use CloudPowers::Creatable and extend from CloudPowers::Resource are supported. Please see CloudPowers::Creatable and CloudPowers::Resource

  • config KeywordArguments - any config you want to pass along.

Returns Smash::CloudPowers::Synapse::Queue[::?]

Notes:

  • See build_queue to create the real object; e.g. an AWS::SQS::Queue. This method just creates a representation of that object in memory. It can be linked up to a real Object or left as is and used later.

  • See CloudPowers::Creatable

  • See CloudPowers::Resource



153
154
155
156
157
158
159
160
161
162
163
164
# File 'lib/cloud_powers/synapse/queue.rb', line 153

def create_queue(name:, type: :board, **config)
  create_method_name = "build_#{type}"
  if self.respond_to? create_method_name
    self.public_send(create_method_name,
      name: name,
      project_root: config[:project_root] || project_root,
      **config
    )
  else
    create_queue(name: name, **config)
  end
end

#delete_queue_message(queue, opts = {}) ⇒ Object

Deletes a queue message without caring about reading/interacting with the message. This is usually used for progress tracking, ie; a Neuron takes a message from the Backlog, moves it to WIP and deletes it from Backlog. Then repeats these steps for the remaining States in the Workflow

Parameters

  • queue String - queue is the name of the Queue to interact with

  • opts Hash (optional) - a configuration Hash for the SQS::QueuePoller

Notes

  • throws :stop_polling after the message is deleted

Example

get_queue_message_count('exampleQueue')
# => n
delete_queue_message('exampleQueue')
get_queue_message_count('exampleQueue')
# => n-1


240
241
242
243
244
245
# File 'lib/cloud_powers/synapse/queue.rb', line 240

def delete_queue_message(queue, opts = {})
  poll(queue, opts) do |msg, stats|
    poller(queue).delete_message(msg)
    throw :stop_polling
  end
end

#get_queue_message_count(resource_url) ⇒ Object

This method is used to get the approximate count of messages in a given queue

Parameters

  • resource_url String - the URL for the resource you need to get a count from

Returns Float

Example

get_queue_message_count('exampleQueue')
# => n
delete_queue_message('exampleQueue')
get_queue_message_count('exampleQueue')
# => n-1


261
262
263
264
265
266
# File 'lib/cloud_powers/synapse/queue.rb', line 261

def get_queue_message_count(resource_url)
  sqs.get_queue_attributes(
    queue_url: resource_url,
    attribute_names: ['ApproximateNumberOfMessages']
  ).attributes['ApproximateNumberOfMessages'].to_f
end

#get_queue_poller(name, client = sqs) ⇒ Object

Get an Aws::SQS::QueuePoller for a a queue to deal with messages

Parameters

  • name String - name of the queue. this also becomes the name of

the Poller object

  • client Aws::SQS::Client (optional) - good for hitting

different regions or even stubbing for testing



275
276
277
# File 'lib/cloud_powers/synapse/queue.rb', line 275

def get_queue_poller(name, client = sqs)
  queue_poller(url: best_guess_address(name), client: client)
end

#pluck_queue_message(resource) ⇒ Object

Get a message from a Queue

Parameters

  • resource<String|symbol>: The name of the resource

Returns

  • String if msg.body is not valid JSON

  • Hash if msg.body is valid JSON

Example

# msg.body == 'Hey' # +String+
pluck_queue_message('exampleQueue')
# => 'Hey' # +String+

# msg.body == "\{"tally":"ho"\}" # +JSON+
pluck_queue_message('exampleQueue')
# => { 'tally' => 'ho' } # +Hash+


296
297
298
299
300
301
# File 'lib/cloud_powers/synapse/queue.rb', line 296

def pluck_queue_message(resource)
  poll(resource) do |msg, poller|
    poller.delete_message(msg)
    return valid_json?(msg.body) ? from_json(msg.body) : msg.body
  end
end

#poll(queue_name, client: sqs, poller: get_queue_poller(queue_name), **config) ⇒ Object

Polls the given resource with the given options hash and a block that interacts with the message that is retrieved from the queue

Parameters

  • :queue_name String - the name of the queue that you want to poll

  • :sqs AWS::SQS:QueuePoller polling configuration option(s)

  • block is the block that is used to interact with the message that was retrieved

Returns the results from the message and the block that interacts with the message(s)

Example

# continuously run jobs from messages in the Queue and leaves the message in the queue
# using the +:skip_delete+ parameter
poll(:backlog, :skip_delete) do |msg|
  demo_job = Job.new(msg.body)
  demo_job.run
end


321
322
323
324
325
326
327
328
329
# File 'lib/cloud_powers/synapse/queue.rb', line 321

def poll(queue_name, client: sqs, poller: get_queue_poller(queue_name), **config)
  results = nil
  poller.poll(config) do |msg|
    results = yield msg, poller if block_given?
    poller.delete_message(msg)
    throw :stop_polling
  end
  results
end

#queue_exists?(name) ⇒ Boolean

Checks SQS for the existence of this queue using the #queue_search() method

Parameters

  • name String

Returns Boolean

Notes

  • See #queue_search()

Returns:

  • (Boolean)


341
342
343
# File 'lib/cloud_powers/synapse/queue.rb', line 341

def queue_exists?(name)
  !queue_search(name).empty?
end

#queue_name(arg) ⇒ Object

This method can be used to parse a queue name from its address. It can be handy if you need the name of a queue but you don’t want the overhead of creating a QueueResource object.

Parameters

  • url String

Returns String

Example

resource_name('https://sqs.us-west-53.amazonaws.com/001101010010/fooBar')
=> fooBar


358
359
360
361
# File 'lib/cloud_powers/synapse/queue.rb', line 358

def queue_name(arg)
  base_name = to_snake(arg.to_s.split('/').last)
  %r{_queue$} =~ base_name ? base_name : "#{base_name}_queue"
end

#queue_poller_name(arg) ⇒ Object

This method can be used to parse a queue poller name from its queue name or @url. It can be handy if you need the name of a queue but you don’t want the overhead of creating a QueueResource object.

Parameters

  • url String

Returns String

Example

resource_name('https://sqs.us-west-53.amazonaws.com/001101010010/fooBar')
=> fooBar


376
377
378
379
# File 'lib/cloud_powers/synapse/queue.rb', line 376

def queue_poller_name(arg)
  base_name = to_snake(arg.to_s.split('/').last)
  %r{_queue_poller$} =~ base_name ? base_name : "#{base_name}_queue_poller"
end

#queue_search(name) ⇒ Object

Searches for a queue based on the name

Parameters name String

Returns queue_urls String

Example

results = queue_search('exampleQueue') # returns related URLs
results.first =~ /exampleQueue/ # regex match against the URL


392
393
394
395
396
397
398
399
400
401
402
403
404
405
# File 'lib/cloud_powers/synapse/queue.rb', line 392

def queue_search(name)
  urls = sqs.list_queues(queue_name_prefix: name).queue_urls
  # TODO: allow a collection of blocks to be itterated through. Each one
  # would be able to further scope down a set of previous results then
  # pass it to the next.  When there are no scoping blocks left, build
  # the boards and return them.  Saves a TON on memory and time
  urls.map do |url|
    build_board(name: queue_name(url), client: sqs) do |board|
      board.instance_attr_accessor :url
      board.url = url
      board
    end
  end
end

#send_queue_message(address, message, this_sqs = sqs) ⇒ Object

Sends a given message to a given queue

Parameters

  • address String - address of the Queue you want to interact with

  • message String - message to be sent

Returns Array<String> - Array of URLs

Example

legit_address = 'https://sqs.us-west-2.amazonaws.com/12345678/exampleQueue'
random_message = 'Wowza, this is pretty easy.'
resp = send_queue_message(legit_address, random_message))
resp.message_id
=> 'some message id'


422
423
424
# File 'lib/cloud_powers/synapse/queue.rb', line 422

def send_queue_message(address, message, this_sqs = sqs)
  this_sqs.send_message(queue_url: address, message_body: message)
end