Class: RServiceBus2::MQ
- Inherits:
-
Object
- Object
- RServiceBus2::MQ
- Defined in:
- lib/rservicebus2/mq.rb
Overview
Wrapper base class for Queue implementations available to the applications, allowing rservicebus to instatiate and configure queue implementations at startup - dependency injection.
Direct Known Subclasses
Instance Attribute Summary collapse
-
#local_queue_name ⇒ Object
readonly
Returns the value of attribute local_queue_name.
Class Method Summary collapse
-
.get ⇒ Object
rubocop:disable Metrics/MethodLength.
Instance Method Summary collapse
-
#ack ⇒ Object
“Commit” the pop.
-
#connect(_host, _port) ⇒ Object
Connect to the broker.
-
#initialize(uri) ⇒ MQ
constructor
rubocop:disable Metrics/MethodLength.
-
#pop ⇒ Object
Get next msg from queue.
-
#send(_queue_name, _msg) ⇒ Object
At least called in the Host rescue block, to ensure all network links are healthy.
-
#subscribe(_queuename) ⇒ Object
Connect to the receiving queue.
Constructor Details
#initialize(uri) ⇒ MQ
rubocop:disable Metrics/MethodLength
48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 |
# File 'lib/rservicebus2/mq.rb', line 48 def initialize(uri) abort 'Paramter to mq must be a valid URI' unless uri.is_a? URI @uri = uri @local_queue_name = uri.path @local_queue_name[0] = '' if @local_queue_name[0] == '/' @local_queue_name = RServiceBus2.get_value('APPNAME', 'RServiceBus') if @local_queue_name == '' if @local_queue_name == '' puts 'Queue name must be supplied ' \ "*** uri, #{uri}, needs to contain a queue name" \ '*** the structure is scheme://host[:port]/queuename' abort end @timeout = RServiceBus2.get_value('QUEUE_TIMEOUT', '5').to_i connect(uri.host, uri.port) subscribe(@local_queue_name) end |
Instance Attribute Details
#local_queue_name ⇒ Object (readonly)
Returns the value of attribute local_queue_name.
13 14 15 |
# File 'lib/rservicebus2/mq.rb', line 13 def local_queue_name @local_queue_name end |
Class Method Details
.get ⇒ Object
rubocop:disable Metrics/MethodLength
16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 |
# File 'lib/rservicebus2/mq.rb', line 16 def self.get mq_string = RServiceBus2.get_value('RSBMQ', 'beanstalk://localhost') uri = URI.parse(mq_string) case uri.scheme when 'beanstalk' require 'rservicebus2/mq/beanstalk' mq = MQBeanstalk.new(uri) when 'file' require 'rservicebus2/mq/file' mq = MQFile.new(uri) when 'redis' require 'rservicebus2/mq/redis' mq = MQRedis.new(uri) when 'aws' require 'rservicebus2/mq/aws' mq = MQAWS.new(uri) else abort("Scheme, #{uri.scheme}, not recognised when configuring mq, #{string}") end mq end |
Instance Method Details
#ack ⇒ Object
“Commit” the pop
88 89 90 |
# File 'lib/rservicebus2/mq.rb', line 88 def ack raise 'Method, ack, needs to be implemented' end |
#connect(_host, _port) ⇒ Object
Connect to the broker
72 73 74 |
# File 'lib/rservicebus2/mq.rb', line 72 def connect(_host, _port) raise 'Method, connect, needs to be implemented' end |
#pop ⇒ Object
Get next msg from queue
83 84 85 |
# File 'lib/rservicebus2/mq.rb', line 83 def pop raise 'Method, pop, needs to be implemented' end |
#send(_queue_name, _msg) ⇒ Object
At least called in the Host rescue block, to ensure all network links are
healthy
96 97 98 99 100 101 102 |
# File 'lib/rservicebus2/mq.rb', line 96 def send(_queue_name, _msg) @connection.close rescue StandardError => e puts "AppResource. An error was raised while closing connection to, #{@uri}" puts "Error: #{e.}" puts "Backtrace: #{e.backtrace}" end |
#subscribe(_queuename) ⇒ Object
Connect to the receiving queue
78 79 80 |
# File 'lib/rservicebus2/mq.rb', line 78 def subscribe(_queuename) raise 'Method, subscribe, needs to be implemented' end |