Class: RServiceBus::MQ

Inherits:
Object
  • Object
show all
Defined in:
lib/rservicebus/MQ.rb

Overview

Wrapper base class for Queue implementations available to the applications, allowing rservicebus to instatiate and configure queue implementations at startup

  • dependency injection.

Direct Known Subclasses

MQ_Beanstalk, MQ_RabbitMq, MQ_Redis

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(uri) ⇒ MQ

Resources are attached resources, and can be specified using the URI syntax.

Parameters:

  • uri (URI)

    the type and location of queue, eg beanstalk://127.0.0.1/foo

  • timeout (Integer)

    the amount of time to wait for a msg to arrive



47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
# File 'lib/rservicebus/MQ.rb', line 47

def initialize( uri )
    
    if uri.is_a? URI then
        @uri = uri
        else
        puts 'uri must be a valid URI'
        abort()
    end

    if uri.path == '' || uri.path == '/' then
        @localQueueName = RServiceBus.getValue( 'APPNAME', 'RServiceBus')
        else
        @localQueueName = uri.path
        @localQueueName[0] = ''
    end

    if @localQueueName == '' then
        puts "@localQueueName: #{@localQueueName}"
        puts 'Queue name must be supplied '
        puts "*** uri, #{uri}, needs to contain a queue name"
        puts '*** the structure is scheme://host[:port]/queuename'
        abort()
    end
    
    @timeout = RServiceBus.getValue( 'QUEUE_TIMEOUT', '5').to_i
    self.connect(uri.host, uri.port)
    self.subscribe( @localQueueName )
end

Instance Attribute Details

#localQueueNameObject (readonly)

Returns the value of attribute localQueueName.



14
15
16
# File 'lib/rservicebus/MQ.rb', line 14

def localQueueName
  @localQueueName
end

Class Method Details

.getObject



19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
# File 'lib/rservicebus/MQ.rb', line 19

def MQ.get
    mqString = RServiceBus.getValue( 'RSBMQ', 'beanstalk://localhost');
    uri = URI.parse( mqString )
    
    case uri.scheme
        when 'beanstalk'
        require 'rservicebus/MQ/Beanstalk'
        mq = MQ_Beanstalk.new( uri )
        
        when 'redis'
        require 'rservicebus/MQ/Redis'
        mq = MQ_Redis.new( uri )
        
        when 'rabbitmq'
        require 'rservicebus/MQ/RabbitMq'
        mq = MQ_RabbitMq.new( uri )

        else
        abort("Scheme, #{uri.scheme}, not recognised when configuring mq, #{string}");
    end
    
    return mq
end

Instance Method Details

#ackObject

“Commit” the pop



97
98
99
# File 'lib/rservicebus/MQ.rb', line 97

def ack
    raise 'Method, ack, needs to be implemented'
end

#connect(host, port) ⇒ Object

Connect to the broker

Parameters:

  • host (String)

    machine runnig the mq

  • port (String)

    port the mq is running on



80
81
82
# File 'lib/rservicebus/MQ.rb', line 80

def connect( host, port )
    raise 'Method, connect, needs to be implemented'
end

#popObject

Get next msg from queue



92
93
94
# File 'lib/rservicebus/MQ.rb', line 92

def pop
    raise 'Method, pop, needs to be implemented'
end

#send(queueName, msg) ⇒ Object

At least called in the Host rescue block, to ensure all network links are healthy

Parameters:

  • queueName (String)

    name of the queue to which the m sg should be sent

  • msg (String)

    msg to be sent



105
106
107
108
109
110
111
112
# File 'lib/rservicebus/MQ.rb', line 105

def send( queueName, msg )
    begin
        @connection.close
        rescue
        puts 'AppResource. An error was raised while closing connection to, ' + @uri.to_s
    end
    
end

#subscribe(queuename) ⇒ Object

Connect to the receiving queue

Parameters:

  • queuename (String)

    name of the receiving queue



87
88
89
# File 'lib/rservicebus/MQ.rb', line 87

def subscribe( queuename )
    raise 'Method, subscribe, needs to be implemented'
end