Class: MicroQ::Fetcher::Sqs

Inherits:
Object
  • Object
show all
Includes:
Celluloid
Defined in:
lib/micro_q/fetchers/sqs.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(name, manager) ⇒ Sqs

Returns a new instance of Sqs.



7
8
9
10
# File 'lib/micro_q/fetchers/sqs.rb', line 7

def initialize(name, manager)
  @name = name.to_s
  @manager = manager
end

Instance Attribute Details

#nameObject (readonly)

Returns the value of attribute name.



5
6
7
# File 'lib/micro_q/fetchers/sqs.rb', line 5

def name
  @name
end

Instance Method Details

#add_message(message, time = nil) ⇒ Object

Add the message to the sqs queue Respect the maximum amount of time that a message can be delayed (900 seconds).



31
32
33
34
35
36
37
# File 'lib/micro_q/fetchers/sqs.rb', line 31

def add_message(message, time=nil)
  message['run_at'] = [time.to_f, (Time.now + 900).to_i].min if time

  defer do
    client.messages_create(message)
  end
end

#remove_message(message) ⇒ Object



39
40
41
42
43
# File 'lib/micro_q/fetchers/sqs.rb', line 39

def remove_message(message)
  defer do
    client.messages_delete(message)
  end
end

#startObject

Long poll the SQS messages API and if there are messages then fetch more right away. Send messages to the manager when they return from the API



16
17
18
19
20
21
22
23
24
# File 'lib/micro_q/fetchers/sqs.rb', line 16

def start
  defer do
    client.messages.tap do |messages|
      @manager.receive_messages!(messages) if messages.any?
    end
  end

  after(SHORT_DELAY) { start }
end