Class: DispatchRider::QueueServices::AwsSqs

Inherits:
Base
  • Object
show all
Defined in:
lib/dispatch-rider/queue_services/aws_sqs.rb,
lib/dispatch-rider/queue_services/aws_sqs/sqs_received_message.rb,
lib/dispatch-rider/queue_services/aws_sqs/message_body_extractor.rb

Defined Under Namespace

Classes: AbortExecution, MessageBodyExtractor, SqsReceivedMessage, VisibilityTimeoutExceeded

Instance Attribute Summary

Attributes inherited from Base

#queue

Instance Method Summary collapse

Methods inherited from Base

#empty?, #head, #initialize, #push, #raw_head, #received_message_for

Constructor Details

This class inherits a constructor from DispatchRider::QueueServices::Base

Instance Method Details

#assign_storage(attrs) ⇒ Object



12
13
14
15
16
17
18
19
20
21
22
23
24
25
# File 'lib/dispatch-rider/queue_services/aws_sqs.rb', line 12

def assign_storage(attrs)
  begin
    sqs = AWS::SQS.new(logger: nil, region: attrs[:region])
    if attrs[:name]
      sqs.queues.named(attrs[:name])
    elsif attrs[:url]
      sqs.queues[attrs[:url]]
    else
      raise RecordInvalid.new(self, ["Either name or url have to be specified"])
    end
  rescue NameError
    raise AdapterNotFoundError.new(self.class.name, 'aws-sdk')
  end
end

#construct_message_from(item) ⇒ Object



49
50
51
# File 'lib/dispatch-rider/queue_services/aws_sqs.rb', line 49

def construct_message_from(item)
  deserialize(MessageBodyExtractor.new(item).extract)
end

#delete(item) ⇒ Object



53
54
55
# File 'lib/dispatch-rider/queue_services/aws_sqs.rb', line 53

def delete(item)
  item.delete
end

#insert(item) ⇒ Object



45
46
47
# File 'lib/dispatch-rider/queue_services/aws_sqs.rb', line 45

def insert(item)
  queue.send_message(item)
end

#popObject



27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
# File 'lib/dispatch-rider/queue_services/aws_sqs.rb', line 27

def pop
  raw_item = queue.receive_message
  if raw_item.present?
    obj = SqsReceivedMessage.new(construct_message_from(raw_item), raw_item, queue)

    visibility_timeout_shield(obj) do
      raise AbortExecution, "false received from handler" unless yield(obj)
      obj
    end

    with_retries(max_tries: 3) do
      raw_item.delete
    end
  end
rescue AbortExecution
  # ignore, it was already handled, just need to break out if pop
end

#sizeObject



57
58
59
# File 'lib/dispatch-rider/queue_services/aws_sqs.rb', line 57

def size
  queue.approximate_number_of_messages
end