Module: Smash::CloudPowers::Synapse::Queue

Includes:
AwsResources, Helpers
Included in:
Smash::CloudPowers::SelfAwareness, Smash::CloudPowers::Synapse, Board
Defined in:
lib/cloud_powers/synapse/queue.rb,
lib/cloud_powers/synapse/queue/board.rb,
lib/cloud_powers/synapse/queue/poller.rb

Defined Under Namespace

Classes: Board, NUMap, Poller

Instance Method Summary collapse

Methods included from Helpers

#create_logger, #log_file, #logger

Methods included from PathHelp

#job_exist?, #job_home, #job_path, #job_require_path

Methods included from LogicHelp

#attr_map, #called_from, #instance_attr_accessor, #smart_retry, #update_message_body

Methods included from LangHelp

#deep_modify_keys_with, #find_and_remove, #format_error_message, #from_json, #modify_keys_with, #to_basic_hash, #to_camel, #to_hyph, #to_i_var, #to_pascal, #to_ruby_file_name, #to_snake, #valid_json?, #valid_url?

Methods included from AwsResources

#ec2, #image, #kinesis, #queue_poller, #region, #s3, #sns, #sqs

Methods included from Zenv

#env_vars, #file_tree_search, #i_vars, #project_root, #project_root=, #system_vars, #zfind

Methods included from Auth

creds, region

Instance Method Details

#best_guess_address(board_name = @name) ⇒ Object

Gives a best guess at the URL that points toward this Resource’s Queue. It uses a couple params to build a standard URL for SQS. The only problem with using this last resort is you may need to use a Queue from a different region, account or name but it can be a handy catch-all for the URLs for most cases.

Returns String

Example

best_guess_address('fooBar')
=> "https://sqs.us-west-2.amazonaws.com/12345678/fooBar"

Notes

  • See Smash::CloudPowers::Zenv#zfind() to understand how this method finds your region and account number



65
66
67
# File 'lib/cloud_powers/synapse/queue.rb', line 65

def best_guess_address(board_name = @name)
  "https://sqs.#{zfind(:aws_region)}.amazonaws.com/#{zfind(:account_number)}/#{board_name}"
end

#board_name(arg) ⇒ Object

This method can be used to parse a queue name from its address. It can be handy if you need the name of a queue but you don’t want the overhead of creating a QueueResource object.

Parameters

  • url String

Returns String

Example

board_name('https://sqs.us-west-53.amazonaws.com/001101010010/fooBar')
=> foo_bar_board


82
83
84
85
# File 'lib/cloud_powers/synapse/queue.rb', line 82

def board_name(arg)
  base_name = to_snake(arg.to_s.split('/').last)
  %r{_board$} =~ base_name ? base_name : "#{base_name}_board"
end

#build_board(name:, client: sqs, **config) ⇒ Object

This method builds a Queue::QueueResource object for you to use but doesn’t invoke the #create!() method, so no API call is made to create the Queue on SQS. This can be used if the QueueResource and/or Queue already exists.

Parameters

  • name String - name of the Queue you want to interact with

Returns Queue::QueueResource

Example

queue_object = build_queue('exampleQueue')
queue_object.address
=> https://sqs.us-west-2.amazonaws.com/81234567/exampleQueue


119
120
121
122
123
124
125
126
127
128
129
130
# File 'lib/cloud_powers/synapse/queue.rb', line 119

def build_board(name:, client: sqs, **config)
  board_resource = Smash::CloudPowers::Synapse::Queue::Board.build(
    name: to_camel(name), client: sqs
  )

  attr_map(board_resource.call_name => board_resource) do |attribute, resource|
    instance_attr_accessor attribute
    resource
  end

  board_resource
end

#build_queue(name:, type: :board, **config) ⇒ Object



87
88
89
90
91
92
93
94
# File 'lib/cloud_powers/synapse/queue.rb', line 87

def build_queue(name:, type: :board, **config)
  build_method_name = "build_#{type}"
  if self.respond_to? build_method_name
    self.public_send build_method_name, name: name, **config
  else
    build_board(name: name, **config)
  end
end

#create_board(name:, client: sqs, **config) ⇒ Object

This method allows you to create a queue on SQS without explicitly creating a QueueResource object

Parameters

  • name String - The name of the Queue to be created

Returns Queue::QueueResource

Example

create_queue('exampleQueue')
get_queue_message_count


143
144
145
146
147
148
149
150
151
152
153
154
# File 'lib/cloud_powers/synapse/queue.rb', line 143

def create_board(name:, client: sqs, **config)
  board_resource = Smash::CloudPowers::Synapse::Queue::Board.create!(
    name: to_camel(name), client: sqs
  )

  attr_map(board_resource.call_name => board_resource) do |attribute, resource|
    instance_attr_accessor attribute
    resource
  end

  board_resource
end

#create_queue(name:, type: :board, **config) ⇒ Object



96
97
98
99
100
101
102
103
# File 'lib/cloud_powers/synapse/queue.rb', line 96

def create_queue(name:, type: :board, **config)
  create_method_name = "build_#{type}"
  if self.respond_to? create_method_name
    self.public_send create_method_name, name: name, **config
  else
    create_queue(name: name, **config)
  end
end

#delete_queue_message(queue, opts = {}) ⇒ Object

Deletes a queue message without caring about reading/interacting with the message. This is usually used for progress tracking, ie; a Neuron takes a message from the Backlog, moves it to WIP and deletes it from Backlog. Then repeats these steps for the remaining States in the Workflow

Parameters

  • queue String - queue is the name of the Queue to interact with

  • opts Hash (optional) - a configuration Hash for the SQS::QueuePoller

Notes

  • throws :stop_polling after the message is deleted

Example

get_queue_message_count('exampleQueue')
# => n
delete_queue_message('exampleQueue')
get_queue_message_count('exampleQueue')
# => n-1


173
174
175
176
177
178
# File 'lib/cloud_powers/synapse/queue.rb', line 173

def delete_queue_message(queue, opts = {})
  poll(queue, opts) do |msg, stats|
    poller(queue).delete_message(msg)
    throw :stop_polling
  end
end

#get_queue_message_count(resource_url) ⇒ Object

This method is used to get the approximate count of messages in a given queue

Parameters

  • resource_url String - the URL for the resource you need to get a count from

Returns Float

Example

get_queue_message_count('exampleQueue')
# => n
delete_queue_message('exampleQueue')
get_queue_message_count('exampleQueue')
# => n-1


194
195
196
197
198
199
# File 'lib/cloud_powers/synapse/queue.rb', line 194

def get_queue_message_count(resource_url)
  sqs.get_queue_attributes(
    queue_url: resource_url,
    attribute_names: ['ApproximateNumberOfMessages']
  ).attributes['ApproximateNumberOfMessages'].to_f
end

#get_queue_poller(name, client = sqs) ⇒ Object

Get an Aws::SQS::QueuePoller for a a queue to deal with messages

Parameters

  • name String - name of the queue. this also becomes the name of

the Poller object

  • client Aws::SQS::Client (optional) - good for hitting

different regions or even stubbing for testing



208
209
210
# File 'lib/cloud_powers/synapse/queue.rb', line 208

def get_queue_poller(name, client = sqs)
  queue_poller(url: best_guess_address(name), client: client)
end

#pluck_queue_message(resource) ⇒ Object

Get a message from a Queue

Parameters

  • resource<String|symbol>: The name of the resource

Returns

  • String if msg.body is not valid JSON

  • Hash if msg.body is valid JSON

Example

# msg.body == 'Hey' # +String+
pluck_queue_message('exampleQueue')
# => 'Hey' # +String+

# msg.body == "\{"tally":"ho"\}" # +JSON+
pluck_queue_message('exampleQueue')
# => { 'tally' => 'ho' } # +Hash+


229
230
231
232
233
234
# File 'lib/cloud_powers/synapse/queue.rb', line 229

def pluck_queue_message(resource)
  poll(resource) do |msg, poller|
    poller.delete_message(msg)
    return valid_json?(msg.body) ? from_json(msg.body) : msg.body
  end
end

#poll(queue_name, client: sqs, poller: get_queue_poller(queue_name), **config) ⇒ Object

Polls the given resource with the given options hash and a block that interacts with the message that is retrieved from the queue

Parameters

  • :queue_name String - the name of the queue that you want to poll

  • :sqs AWS::SQS:QueuePoller polling configuration option(s)

  • block is the block that is used to interact with the message that was retrieved

Returns the results from the message and the block that interacts with the message(s)

Example

# continuously run jobs from messages in the Queue and leaves the message in the queue
# using the +:skip_delete+ parameter
poll(:backlog, :skip_delete) do |msg|
  demo_job = Job.new(msg.body)
  demo_job.run
end


254
255
256
257
258
259
260
261
262
# File 'lib/cloud_powers/synapse/queue.rb', line 254

def poll(queue_name, client: sqs, poller: get_queue_poller(queue_name), **config)
  results = nil
  poller.poll(config) do |msg|
    results = yield msg, poller if block_given?
    poller.delete_message(msg)
    throw :stop_polling
  end
  results
end

#queue_exists?(name) ⇒ Boolean

Checks SQS for the existence of this queue using the #queue_search() method

Parameters

  • name String

Returns Boolean

Notes

  • See #queue_search()

Returns:

  • (Boolean)


274
275
276
# File 'lib/cloud_powers/synapse/queue.rb', line 274

def queue_exists?(name)
  !queue_search(name).empty?
end

#queue_name(arg) ⇒ Object

This method can be used to parse a queue name from its address. It can be handy if you need the name of a queue but you don’t want the overhead of creating a QueueResource object.

Parameters

  • url String

Returns String

Example

resource_name('https://sqs.us-west-53.amazonaws.com/001101010010/fooBar')
=> fooBar


291
292
293
294
# File 'lib/cloud_powers/synapse/queue.rb', line 291

def queue_name(arg)
  base_name = to_snake(arg.to_s.split('/').last)
  %r{_queue$} =~ base_name ? base_name : "#{base_name}_queue"
end

#queue_poller_name(arg) ⇒ Object

This method can be used to parse a queue poller name from its queue name or @url. It can be handy if you need the name of a queue but you don’t want the overhead of creating a QueueResource object.

Parameters

  • url String

Returns String

Example

resource_name('https://sqs.us-west-53.amazonaws.com/001101010010/fooBar')
=> fooBar


309
310
311
312
# File 'lib/cloud_powers/synapse/queue.rb', line 309

def queue_poller_name(arg)
  base_name = to_snake(arg.to_s.split('/').last)
  %r{_queue_poller$} =~ base_name ? base_name : "#{base_name}_queue_poller"
end

#queue_search(name) ⇒ Object

Searches for a queue based on the name

Parameters name String

Returns queue_urls String

Example

results = queue_search('exampleQueue') # returns related URLs
results.first =~ /exampleQueue/ # regex match against the URL


325
326
327
328
329
330
331
332
333
334
# File 'lib/cloud_powers/synapse/queue.rb', line 325

def queue_search(name)
  urls = sqs.list_queues(queue_name_prefix: name).queue_urls
  urls.map do |url|
    build_board(name: queue_name(url), client: sqs) do |board|
      board.instance_attr_accessor :url
      board.url = url
      board
    end
  end
end

#send_queue_message(address, message, this_sqs = sqs) ⇒ Object

Sends a given message to a given queue

Parameters

  • address String - address of the Queue you want to interact with

  • message String - message to be sent

Returns Array<String> - Array of URLs

Example

legit_address = 'https://sqs.us-west-2.amazonaws.com/12345678/exampleQueue'
random_message = 'Wowza, this is pretty easy.'
resp = send_queue_message(legit_address, random_message))
resp.message_id
=> 'some message id'


351
352
353
# File 'lib/cloud_powers/synapse/queue.rb', line 351

def send_queue_message(address, message, this_sqs = sqs)
  this_sqs.send_message(queue_url: address, message_body: message)
end