Class: AmqpNotifier
- Inherits:
-
Object
- Object
- AmqpNotifier
- Defined in:
- lib/amqp_notifier.rb
Constant Summary collapse
- DEFAULT_EXCHANGE =
'labs_tests_1'- DEFAULT_OPTIONS =
{:durable => true, :type => :topic}
- DEFAULT_PREFIX =
''- DEFAULT_QUEUE =
'default'- RETRIES =
10
Instance Method Summary collapse
-
#initialize(exchange = DEFAULT_EXCHANGE, queue = DEFAULT_QUEUE, key_prefix = DEFAULT_PREFIX, options = DEFAULT_OPTIONS) ⇒ AmqpNotifier
constructor
A new instance of AmqpNotifier.
- #publish(key = '', message = '') ⇒ Object
- #subscribe(key = '', &block) ⇒ Object
Constructor Details
#initialize(exchange = DEFAULT_EXCHANGE, queue = DEFAULT_QUEUE, key_prefix = DEFAULT_PREFIX, options = DEFAULT_OPTIONS) ⇒ AmqpNotifier
Returns a new instance of AmqpNotifier.
8 9 10 11 12 13 |
# File 'lib/amqp_notifier.rb', line 8 def initialize(exchange = DEFAULT_EXCHANGE, queue = DEFAULT_QUEUE, key_prefix = DEFAULT_PREFIX, = DEFAULT_OPTIONS) @key_prefix = key_prefix @exchange = exchange = @queue = queue end |
Instance Method Details
#publish(key = '', message = '') ⇒ Object
15 16 17 18 19 |
# File 'lib/amqp_notifier.rb', line 15 def publish(key = '', = '') $stderr.puts "#{key}:" $stderr.puts "#{message}" Qusion.channel.topic(@exchange, ).publish(, {:key => @key_prefix + key, :timestamp => (Time.now.to_f*1000).truncate}) end |
#subscribe(key = '', &block) ⇒ Object
21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 |
# File 'lib/amqp_notifier.rb', line 21 def subscribe(key = '', &block) $stdout.puts "Subscriction to Exchange: #{@exchange} Queue: #{@queue} Key: #{key}" Qusion.channel.prefetch(1).queue(@queue, :durable => true).bind(Qusion.channel.topic(@exchange, ), :key => key).subscribe(:ack => true) do |info, | for i in (0..RETRIES) begin yield info, info.ack break rescue => e $stderr.puts "Error executing block amqp" $stderr.puts "#{e.inspect}:" e.backtrace.each {|line| $stderr.puts line} if i == RETRIES AmqpNotifier.new("#{@exchange}_errors").publish(key, ) info.reject(:requeue => false) else sleep(1) next end end end end end |