Class: DispatchRider::QueueServices::AwsSqs

Inherits:
Base
  • Object
show all
Defined in:
lib/dispatch-rider/queue_services/aws_sqs.rb,
lib/dispatch-rider/queue_services/aws_sqs/sqs_received_message.rb,
lib/dispatch-rider/queue_services/aws_sqs/message_body_extractor.rb

Defined Under Namespace

Classes: AbortExecution, MessageBodyExtractor, SqsReceivedMessage, VisibilityTimeoutExceeded

Instance Attribute Summary collapse

Attributes inherited from Base

#queue

Instance Method Summary collapse

Methods inherited from Base

#empty?, #head, #initialize, #push, #raw_head, #received_message_for

Constructor Details

This class inherits a constructor from DispatchRider::QueueServices::Base

Instance Attribute Details

#visibility_timeoutObject (readonly)

Returns the value of attribute visibility_timeout.



60
61
62
# File 'lib/dispatch-rider/queue_services/aws_sqs.rb', line 60

def visibility_timeout
  @visibility_timeout
end

Instance Method Details

#assign_storage(attrs) ⇒ Object



12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# File 'lib/dispatch-rider/queue_services/aws_sqs.rb', line 12

def assign_storage(attrs)
  sqs = Aws::SQS::Client.new(logger: nil)
  if attrs[:name].present?
    url = sqs.list_queues({queue_name_prefix: attrs[:name]}).queue_urls.first
    set_visibility_timeout(sqs,url)
    Aws::SQS::Queue.new(url: url, client: sqs)
  elsif attrs[:url].present?
    set_visibility_timeout(sqs,attrs[:url])
    Aws::SQS::Queue.new(url: attrs[:url], client: sqs)
  else
    raise RecordInvalid.new(self, ["Either name or url have to be specified"])
  end
rescue NameError
  raise AdapterNotFoundError.new(self.class.name, 'aws-sdk')
end

#construct_message_from(item) ⇒ Object



48
49
50
# File 'lib/dispatch-rider/queue_services/aws_sqs.rb', line 48

def construct_message_from(item)
  deserialize(MessageBodyExtractor.new(item).extract)
end

#delete(item) ⇒ Object



52
53
54
# File 'lib/dispatch-rider/queue_services/aws_sqs.rb', line 52

def delete(item)
  item.delete
end

#insert(item) ⇒ Object



44
45
46
# File 'lib/dispatch-rider/queue_services/aws_sqs.rb', line 44

def insert(item)
  queue.send_message(item)
end

#popObject



28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
# File 'lib/dispatch-rider/queue_services/aws_sqs.rb', line 28

def pop
  raw_item = queue.receive_messages({max_number_of_messages: 1}).first
  if raw_item.present?
    obj = SqsReceivedMessage.new(construct_message_from(raw_item), raw_item, queue, visibility_timeout)

    visibility_timeout_shield(obj) do
      raise AbortExecution, "false received from handler" unless yield(obj)
      obj
    end

    Retriable.retriable(tries: 3) { raw_item.delete }
  end
rescue AbortExecution
  # ignore, it was already handled, just need to break out if pop
end

#sizeObject



56
57
58
# File 'lib/dispatch-rider/queue_services/aws_sqs.rb', line 56

def size
  queue.approximate_number_of_messages
end