Class: RServiceBus::MQ_RabbitMq

Inherits:
MQ
  • Object
show all
Defined in:
lib/rservicebus/MQ/RabbitMq.rb

Overview

Beanstalk client implementation.

Instance Attribute Summary

Attributes inherited from MQ

#localQueueName

Instance Method Summary collapse

Methods inherited from MQ

get, #initialize

Constructor Details

This class inherits a constructor from RServiceBus::MQ

Instance Method Details

#ackObject

“Commit” queue



63
64
65
66
67
68
# File 'lib/rservicebus/MQ/RabbitMq.rb', line 63

def ack
    @ch.ack( @delivery_info.delivery_tag )
    @delivery_info = nil
    @properties = nil
    @payload = nil
end

#connect(host, port) ⇒ Object

Connect to the broker



12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# File 'lib/rservicebus/MQ/RabbitMq.rb', line 12

def connect( host, port )
    port ||= 11300
    string = "#{host}:#{port}"

    begin
        
        @conn = Bunny.new
        @conn.start

        @ch = @conn.create_channel
        @x = @ch.default_exchange

        rescue Exception => e
        puts 'Error connecting to Beanstalk'
        puts "Host string, #{string}"
        if e.message == 'Beanstalk::NotConnected' then
            puts '***Most likely, beanstalk is not running. Start beanstalk, and try running this again.'
            puts "***If you still get this error, check beanstalk is running at, #{string}"
            else
            puts e.message
            puts e.backtrace
        end
        abort()
    end
end

#popObject

Get next msg from queue



45
46
47
48
49
50
51
52
53
54
55
56
# File 'lib/rservicebus/MQ/RabbitMq.rb', line 45

def pop
    begin
        @delivery_info, @properties, @payload = @q.pop(:ack => true)
        
        if @delivery_info.nil? then
            sleep @timeout
            raise NoMsgToProcess.new
        end

    end
    return @payload
end

#returnToQueueObject



58
59
60
# File 'lib/rservicebus/MQ/RabbitMq.rb', line 58

def returnToQueue
    @ch.reject( @delivery_info.delivery_tag, true )
end

#send(queueName, msg) ⇒ Object

At least called in the Host rescue block, to ensure all network links are healthy



71
72
73
# File 'lib/rservicebus/MQ/RabbitMq.rb', line 71

def send( queueName, msg )
    @x.publish(msg, :routing_key => queueName)
end

#subscribe(queuename) ⇒ Object

Connect to the queue



40
41
42
# File 'lib/rservicebus/MQ/RabbitMq.rb', line 40

def subscribe( queuename )
    @q  = @ch.queue( queuename, :durable => true, :auto_delete => false)
end