Class: DispatchRider::QueueServices::AwsSqs

Inherits:
Base
  • Object
show all
Defined in:
lib/dispatch-rider/queue_services/aws_sqs.rb,
lib/dispatch-rider/queue_services/aws_sqs/sqs_received_message.rb,
lib/dispatch-rider/queue_services/aws_sqs/message_body_extractor.rb

Defined Under Namespace

Classes: AbortExecution, MessageBodyExtractor, SqsReceivedMessage, VisibilityTimeoutExceeded

Instance Attribute Summary

Attributes inherited from Base

#queue

Instance Method Summary collapse

Methods inherited from Base

#empty?, #head, #initialize, #push, #raw_head, #received_message_for

Constructor Details

This class inherits a constructor from DispatchRider::QueueServices::Base

Instance Method Details

#assign_storage(attrs) ⇒ Object



12
13
14
15
16
17
18
19
20
21
22
23
24
25
# File 'lib/dispatch-rider/queue_services/aws_sqs.rb', line 12

def assign_storage(attrs)
  begin
    sqs = AWS::SQS.new(logger: nil, region: attrs[:region])
    if attrs[:name]
      sqs.queues.named(attrs[:name])
    elsif attrs[:url]
      sqs.queues[attrs[:url]]
    else
      raise RecordInvalid.new(self, ["Either name or url have to be specified"])
    end
  rescue NameError
    raise AdapterNotFoundError.new(self.class.name, 'aws-sdk')
  end
end

#construct_message_from(item) ⇒ Object



47
48
49
# File 'lib/dispatch-rider/queue_services/aws_sqs.rb', line 47

def construct_message_from(item)
  deserialize(MessageBodyExtractor.new(item).extract)
end

#delete(item) ⇒ Object



51
52
53
# File 'lib/dispatch-rider/queue_services/aws_sqs.rb', line 51

def delete(item)
  item.delete
end

#insert(item) ⇒ Object



43
44
45
# File 'lib/dispatch-rider/queue_services/aws_sqs.rb', line 43

def insert(item)
  queue.send_message(item)
end

#pop(&block) ⇒ Object



27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
# File 'lib/dispatch-rider/queue_services/aws_sqs.rb', line 27

def pop(&block)
  begin
    queue.receive_message do |raw_item|
      obj = SqsReceivedMessage.new(construct_message_from(raw_item), raw_item, queue)

      visibility_timeout_shield(obj) do
        raise AbortExecution, "false received from handler" unless block.call(obj)
        obj
      end

    end
  rescue AbortExecution
    # ignore, it was already handled, just need to break out if pop
  end
end

#sizeObject



55
56
57
# File 'lib/dispatch-rider/queue_services/aws_sqs.rb', line 55

def size
  queue.approximate_number_of_messages
end