Class: LambdaQueuer

Inherits:
Object
  • Object
show all
Defined in:
lib/lambda-queuer.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options = {}) ⇒ LambdaQueuer

Returns a new instance of LambdaQueuer.



7
8
9
10
11
12
13
# File 'lib/lambda-queuer.rb', line 7

def initialize(options = {})
	@host = options[:host] || '127.0.0.1'
	@port = options[:port] || 5672
	@exchange = options[:exchange] || 'default_exchange'
	@request_routing_key = options[:request_routing_key] || 'default_routing_key'
	@response_routing_key = options[:response_routing_key]
end

Instance Attribute Details

#queueObject

Returns the value of attribute queue.



6
7
8
# File 'lib/lambda-queuer.rb', line 6

def queue
  @queue
end

Instance Method Details

#post(&block) ⇒ Object



15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
# File 'lib/lambda-queuer.rb', line 15

def post(&block)
	EventMachine.run do
		begin
			connection = AMQP.connect(:host => @host, :port => @port)
		  	channel = AMQP::Channel.new(connection)
		  	exchange = channel.direct(@exchange, :auto_delete => true)
		  	@queue = channel.queue(@request_routing_key, :auto_delete => true)
		  	@queue.bind(exchange, :routing_key => @request_routing_key)
			v = block.to_source
		  	exchange.publish(v, :routing_key => @request_routing_key)
			if (@response_routing_key)
			  	@answer_queue = channel.queue(@response_routing_key, :auto_delete => true)
			  	@answer_queue.bind(exchange, :routing_key => @response_routing_key)
				@answer_queue.subscribe do |message|
					puts "Received: #{message}"
					connection.close { EventMachine.stop }
				end
			else
				EventMachine.add_timer(2) do
					connection.close { EventMachine.stop }
				end
			end
		rescue => e
			puts e
		end
	end
end