Class: Qyu::Queue::SQS::Adapter

Inherits:
Object
  • Object
show all
Defined in:
lib/qyu/queue/sqs/adapter.rb

Constant Summary collapse

TYPE =
:sqs

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(config) ⇒ Adapter

Returns a new instance of Adapter.



11
12
13
# File 'lib/qyu/queue/sqs/adapter.rb', line 11

def initialize(config)
  init_client(config)
end

Class Method Details

.valid_config?(config) ⇒ Boolean

Returns:

  • (Boolean)


15
16
17
# File 'lib/qyu/queue/sqs/adapter.rb', line 15

def self.valid_config?(config)
 ConfigurationValidator.new(config).valid?
end

Instance Method Details

#acknowledge_message(queue_name, message_id) ⇒ Object



57
58
59
60
61
62
# File 'lib/qyu/queue/sqs/adapter.rb', line 57

def acknowledge_message(queue_name, message_id)
  @sqs.delete_message({
    queue_url: get_or_create_queue_url(queue_name),
    receipt_handle: message_id
  })
end

#enqueue_task(queue_name, task_id) ⇒ Object



19
20
21
22
23
24
25
26
27
28
29
# File 'lib/qyu/queue/sqs/adapter.rb', line 19

def enqueue_task(queue_name, task_id)
  response = @sqs.send_message({
    queue_url: get_or_create_queue_url(queue_name),
    message_body: { task_id: task_id }.to_json.to_s
  })

  Qyu.logger.debug "SQS response: #{response}"
  Qyu.logger.info "Task enqueued with ID #{task_id} in queue #{queue_name}"

  response
end

#enqueue_task_to_failed_queue(queue_name, task_id) ⇒ Object



31
32
33
34
# File 'lib/qyu/queue/sqs/adapter.rb', line 31

def enqueue_task_to_failed_queue(queue_name, task_id)
  failed_queue_name = queue_name + '-failed'
  enqueue_task(failed_queue_name, task_id)
end

#fetch_next_message(queue_name) ⇒ Object



36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
# File 'lib/qyu/queue/sqs/adapter.rb', line 36

def fetch_next_message(queue_name)
  Qyu.logger.debug "Listening on `#{queue_name}`"

  while (response = @sqs.receive_message({
    queue_url: get_or_create_queue_url(queue_name),
    max_number_of_messages: 1
  })).messages.count == 0

  sleep 1
  end

  message = response.messages[0]

  Qyu.logger.debug "Fetched message #{message}"

  {
    'id' => message.receipt_handle,
    'task_id' => JSON.parse(message.body)['task_id']
  }
end