Class: RServiceBus2::MQ

Inherits:
Object
  • Object
show all
Defined in:
lib/rservicebus2/mq.rb

Overview

Wrapper base class for Queue implementations available to the applications, allowing rservicebus to instatiate and configure queue implementations at startup - dependency injection.

Direct Known Subclasses

MQAWS, MQBeanstalk, MQFile, MQRedis

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(uri) ⇒ MQ

rubocop:disable Metrics/MethodLength



48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
# File 'lib/rservicebus2/mq.rb', line 48

def initialize(uri)
  abort 'Paramter to mq must be a valid URI' unless uri.is_a? URI

  @uri = uri
  @local_queue_name = uri.path
  @local_queue_name[0] = '' if @local_queue_name[0] == '/'
  @local_queue_name = RServiceBus2.get_value('APPNAME', 'RServiceBus') if @local_queue_name == ''

  if @local_queue_name == ''
    puts 'Queue name must be supplied ' \
         "*** uri, #{uri}, needs to contain a queue name" \
         '*** the structure is scheme://host[:port]/queuename'
    abort
  end

  @timeout = RServiceBus2.get_value('QUEUE_TIMEOUT', '5').to_i
  connect(uri.host, uri.port)
  subscribe(@local_queue_name)
end

Instance Attribute Details

#local_queue_nameObject (readonly)

Returns the value of attribute local_queue_name.



13
14
15
# File 'lib/rservicebus2/mq.rb', line 13

def local_queue_name
  @local_queue_name
end

Class Method Details

.getObject

rubocop:disable Metrics/MethodLength



16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
# File 'lib/rservicebus2/mq.rb', line 16

def self.get
  mq_string = RServiceBus2.get_value('RSBMQ', 'beanstalk://localhost')
  uri = URI.parse(mq_string)

  case uri.scheme
  when 'beanstalk'
    require 'rservicebus2/mq/beanstalk'
    mq = MQBeanstalk.new(uri)
  when 'file'
    require 'rservicebus2/mq/file'
    mq = MQFile.new(uri)
  when 'redis'
    require 'rservicebus2/mq/redis'
    mq = MQRedis.new(uri)
  when 'aws'
    require 'rservicebus2/mq/aws'
    mq = MQAWS.new(uri)
  else
    abort("Scheme, #{uri.scheme}, not recognised when configuring mq,
      #{string}")
  end

  mq
end

Instance Method Details

#ackObject

“Commit” the pop



88
89
90
# File 'lib/rservicebus2/mq.rb', line 88

def ack
  raise 'Method, ack, needs to be implemented'
end

#connect(_host, _port) ⇒ Object

Connect to the broker

Parameters:

  • host (String)

    machine runnig the mq

  • port (String)

    port the mq is running on



72
73
74
# File 'lib/rservicebus2/mq.rb', line 72

def connect(_host, _port)
  raise 'Method, connect, needs to be implemented'
end

#popObject

Get next msg from queue



83
84
85
# File 'lib/rservicebus2/mq.rb', line 83

def pop
  raise 'Method, pop, needs to be implemented'
end

#send(_queue_name, _msg) ⇒ Object

At least called in the Host rescue block, to ensure all network links are

healthy

Parameters:

  • queue_name (String)

    name of the queue to which the msg should be sent

  • msg (String)

    msg to be sent



96
97
98
99
100
101
102
# File 'lib/rservicebus2/mq.rb', line 96

def send(_queue_name, _msg)
  @connection.close
rescue StandardError => e
  puts "AppResource. An error was raised while closing connection to, #{@uri}"
  puts "Error: #{e.message}"
  puts "Backtrace: #{e.backtrace}"
end

#subscribe(_queuename) ⇒ Object

Connect to the receiving queue

Parameters:

  • queuename (String)

    name of the receiving queue



78
79
80
# File 'lib/rservicebus2/mq.rb', line 78

def subscribe(_queuename)
  raise 'Method, subscribe, needs to be implemented'
end