Class: Qyu::Queue::SQS::Adapter
- Inherits:
-
Object
- Object
- Qyu::Queue::SQS::Adapter
- Defined in:
- lib/qyu/queue/sqs/adapter.rb
Constant Summary collapse
- TYPE =
:sqs
Class Method Summary collapse
Instance Method Summary collapse
- #acknowledge_message(queue_name, message_id) ⇒ Object
- #enqueue_task(queue_name, task_id) ⇒ Object
- #enqueue_task_to_failed_queue(queue_name, task_id) ⇒ Object
- #fetch_next_message(queue_name) ⇒ Object
-
#initialize(config) ⇒ Adapter
constructor
A new instance of Adapter.
Constructor Details
#initialize(config) ⇒ Adapter
Returns a new instance of Adapter.
11 12 13 |
# File 'lib/qyu/queue/sqs/adapter.rb', line 11 def initialize(config) init_client(config) end |
Class Method Details
.valid_config?(config) ⇒ Boolean
15 16 17 |
# File 'lib/qyu/queue/sqs/adapter.rb', line 15 def self.valid_config?(config) ConfigurationValidator.new(config).valid? end |
Instance Method Details
#acknowledge_message(queue_name, message_id) ⇒ Object
57 58 59 60 61 62 |
# File 'lib/qyu/queue/sqs/adapter.rb', line 57 def (queue_name, ) @sqs.({ queue_url: get_or_create_queue_url(queue_name), receipt_handle: }) end |
#enqueue_task(queue_name, task_id) ⇒ Object
19 20 21 22 23 24 25 26 27 28 29 |
# File 'lib/qyu/queue/sqs/adapter.rb', line 19 def enqueue_task(queue_name, task_id) response = @sqs.({ queue_url: get_or_create_queue_url(queue_name), message_body: { task_id: task_id }.to_json.to_s }) Qyu.logger.debug "SQS response: #{response}" Qyu.logger.info "Task enqueued with ID #{task_id} in queue #{queue_name}" response end |
#enqueue_task_to_failed_queue(queue_name, task_id) ⇒ Object
31 32 33 34 |
# File 'lib/qyu/queue/sqs/adapter.rb', line 31 def enqueue_task_to_failed_queue(queue_name, task_id) failed_queue_name = queue_name + '-failed' enqueue_task(failed_queue_name, task_id) end |
#fetch_next_message(queue_name) ⇒ Object
36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 |
# File 'lib/qyu/queue/sqs/adapter.rb', line 36 def (queue_name) Qyu.logger.debug "Listening on `#{queue_name}`" while (response = @sqs.({ queue_url: get_or_create_queue_url(queue_name), max_number_of_messages: 1 }))..count == 0 sleep 1 end = response.[0] Qyu.logger.debug "Fetched message #{}" { 'id' => .receipt_handle, 'task_id' => JSON.parse(.body)['task_id'] } end |