Class: AmqpNotifier

Inherits:
Object
  • Object
show all
Defined in:
lib/amqp_notifier.rb

Constant Summary collapse

DEFAULT_EXCHANGE =
'labs_tests_1'
DEFAULT_OPTIONS =
{:durable => true, :type => :topic}
DEFAULT_PREFIX =
''
DEFAULT_QUEUE =
'default'
RETRIES =
10

Instance Method Summary collapse

Constructor Details

#initialize(exchange = DEFAULT_EXCHANGE, queue = DEFAULT_QUEUE, key_prefix = DEFAULT_PREFIX, options = DEFAULT_OPTIONS) ⇒ AmqpNotifier

Returns a new instance of AmqpNotifier.



8
9
10
11
12
13
# File 'lib/amqp_notifier.rb', line 8

def initialize(exchange = DEFAULT_EXCHANGE, queue = DEFAULT_QUEUE, key_prefix = DEFAULT_PREFIX, options = DEFAULT_OPTIONS)
  @key_prefix = key_prefix
  @exchange = exchange
  @options = options
  @queue = queue
end

Instance Method Details

#publish(key = '', message = '') ⇒ Object



15
16
17
18
19
# File 'lib/amqp_notifier.rb', line 15

def publish(key = '', message = '')
  $stderr.puts "#{key}:"
  $stderr.puts "#{message}"
  Qusion.channel.topic(@exchange, @options).publish(message, {:key => @key_prefix + key, :timestamp => (Time.now.to_f*1000).truncate})
end

#subscribe(key = '', &block) ⇒ Object



21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
# File 'lib/amqp_notifier.rb', line 21

def subscribe(key = '', &block)
  $stdout.puts "Subscriction to Exchange: #{@exchange} Queue: #{@queue} Key: #{key}"
  Qusion.channel.prefetch(1).queue(@queue, :durable => true).bind(Qusion.channel.topic(@exchange, @options), :key => key).subscribe(:ack => true) do |info, message|
    for i in (0..RETRIES)
      begin
        yield info, message
        info.ack
        break
      rescue => e
        $stderr.puts "Error executing block amqp"
        $stderr.puts "#{e.inspect}:"
        e.backtrace.each {|line| $stderr.puts line}
        if i == RETRIES
          AmqpNotifier.new("#{@exchange}_errors").publish(key, message)
          info.reject(:requeue => false)
        else
          sleep(1)          
          next
        end
      end
    end      
  end
end