Class: RServiceBus2::MQ

Inherits:
Object
  • Object
show all
Defined in:
lib/rservicebus2/mq.rb

Overview

Wrapper base class for Queue implementations available to the applications, allowing rservicebus to instatiate and configure queue implementations at startup - dependency injection.

Direct Known Subclasses

MQBeanstalk

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(uri) ⇒ MQ

Resources are attached, and are be specified using the URI syntax

Parameters:

  • uri (URI)

    the type and location of queue, eg beanstalk://127.0.0.1/foo

  • timeout (Integer)

    the amount of time to wait for a msg to arrive



33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
# File 'lib/rservicebus2/mq.rb', line 33

def initialize(uri)
  if uri.is_a? URI
    @uri = uri
  else
    puts 'uri must be a valid URI'
    abort
  end

  if uri.path == '' || uri.path == '/'
    @local_queue_name = RServiceBus2.get_value('APPNAME', 'RServiceBus')
  else
    @local_queue_name = uri.path
    @local_queue_name[0] = ''
  end

  if @local_queue_name == ''
    puts "@local_queue_name: #{@local_queue_name}"
    puts 'Queue name must be supplied '
    puts "*** uri, #{uri}, needs to contain a queue name"
    puts '*** the structure is scheme://host[:port]/queuename'
    abort
  end

  @timeout = RServiceBus2.get_value('QUEUE_TIMEOUT', '5').to_i
  connect(uri.host, uri.port)
  subscribe(@local_queue_name)
end

Instance Attribute Details

#local_queue_nameObject (readonly)

Returns the value of attribute local_queue_name.



11
12
13
# File 'lib/rservicebus2/mq.rb', line 11

def local_queue_name
  @local_queue_name
end

Class Method Details

.getObject



13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# File 'lib/rservicebus2/mq.rb', line 13

def self.get
  mq_string = RServiceBus2.get_value('RSBMQ', 'beanstalk://localhost')
  uri = URI.parse(mq_string)

  case uri.scheme
  when 'beanstalk'
    require 'rservicebus2/mq/beanstalk'
    mq = MQBeanstalk.new(uri)
  else
    abort("Scheme, #{uri.scheme}, not recognised when configuring mq,
      #{string}")
  end

  mq
end

Instance Method Details

#ackObject

“Commit” the pop



80
81
82
# File 'lib/rservicebus2/mq.rb', line 80

def ack
  fail 'Method, ack, needs to be implemented'
end

#connect(_host, _port) ⇒ Object

Connect to the broker

Parameters:

  • host (String)

    machine runnig the mq

  • port (String)

    port the mq is running on



64
65
66
# File 'lib/rservicebus2/mq.rb', line 64

def connect(_host, _port)
  fail 'Method, connect, needs to be implemented'
end

#popObject

Get next msg from queue



75
76
77
# File 'lib/rservicebus2/mq.rb', line 75

def pop
  fail 'Method, pop, needs to be implemented'
end

#send(queue_name, msg) ⇒ Object

At least called in the Host rescue block, to ensure all network links are

healthy

Parameters:

  • queue_name (String)

    name of the queue to which the msg should be sent

  • msg (String)

    msg to be sent



88
89
90
91
92
93
94
95
# File 'lib/rservicebus2/mq.rb', line 88

def send(queue_name, msg)
      begin
        @connection.close
      rescue
        puts 'AppResource. An error was raised while closing connection to, ' + @uri.to_s
      end

end

#subscribe(_queuename) ⇒ Object

Connect to the receiving queue

Parameters:

  • queuename (String)

    name of the receiving queue



70
71
72
# File 'lib/rservicebus2/mq.rb', line 70

def subscribe(_queuename)
  fail 'Method, subscribe, needs to be implemented'
end