Class: LambdaQueuer

Inherits:
Object
  • Object
show all
Defined in:
lib/lambda-queuer.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options = {}) ⇒ LambdaQueuer

Returns a new instance of LambdaQueuer.



7
8
9
10
11
12
13
# File 'lib/lambda-queuer.rb', line 7

def initialize(options = {})
  @host = options[:host] || '127.0.0.1'
  @port = options[:port] || 5672
  @exchange = options[:exchange] || 'default_exchange'
  @request_routing_key = options[:request_routing_key] || 'default_routing_key'
  @response_routing_key = options[:response_routing_key]
end

Instance Attribute Details

#queueObject

Returns the value of attribute queue.



6
7
8
# File 'lib/lambda-queuer.rb', line 6

def queue
  @queue
end

Instance Method Details

#post(&block) ⇒ Object



15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
# File 'lib/lambda-queuer.rb', line 15

def post(&block)
  EventMachine.run do
    begin
      connection = AMQP.connect(:host => @host, :port => @port)
        channel = AMQP::Channel.new(connection)
        exchange = channel.direct(@exchange, :auto_delete => true)
        @queue = channel.queue(@request_routing_key, :auto_delete => true)
        @queue.bind(exchange, :routing_key => @request_routing_key)
      v = block.to_source
        exchange.publish(v, :routing_key => @request_routing_key)
      if (@response_routing_key)
         @answer_queue = channel.queue(@response_routing_key, :auto_delete => true)
         @answer_queue.bind(exchange, :routing_key => @response_routing_key)
        @answer_queue.subscribe do |message|
          puts "Received: #{message}"
          connection.close { EventMachine.stop }
        end
      else
        EventMachine.add_timer(2) do
          connection.close { EventMachine.stop }
        end
      end
    rescue => e
      puts e
    end
  end
end