Class: RailwayIpc::Rabbitmq::Adapter
- Inherits:
-
Object
- Object
- RailwayIpc::Rabbitmq::Adapter
- Extended by:
- Forwardable
- Defined in:
- lib/railway_ipc/rabbitmq/adapter.rb
Defined Under Namespace
Classes: TimeoutError
Instance Attribute Summary collapse
-
#channel ⇒ Object
readonly
Returns the value of attribute channel.
-
#connection ⇒ Object
readonly
Returns the value of attribute connection.
-
#exchange ⇒ Object
readonly
Returns the value of attribute exchange.
-
#exchange_name ⇒ Object
readonly
Returns the value of attribute exchange_name.
-
#queue ⇒ Object
readonly
Returns the value of attribute queue.
-
#queue_name ⇒ Object
readonly
Returns the value of attribute queue_name.
Instance Method Summary collapse
- #bind_queue_to_exchange ⇒ Object
- #check_for_message(timeout: 10, time_elapsed: 0, &block) ⇒ Object
- #connect ⇒ Object
- #create_exchange(strategy: :fanout, options: { durable: true }) ⇒ Object
- #create_queue(options = { durable: true }) ⇒ Object
- #delete_exchange ⇒ Object
- #delete_queue ⇒ Object
- #disconnect ⇒ Object
-
#initialize(exchange_name:, amqp_url: ENV['RAILWAY_RABBITMQ_CONNECTION_URL'], queue_name: '', options: {}) ⇒ Adapter
constructor
A new instance of Adapter.
- #publish(message, options = {}) ⇒ Object
- #reply(message, from) ⇒ Object
- #subscribe(&block) ⇒ Object
Constructor Details
#initialize(exchange_name:, amqp_url: ENV['RAILWAY_RABBITMQ_CONNECTION_URL'], queue_name: '', options: {}) ⇒ Adapter
Returns a new instance of Adapter.
19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 |
# File 'lib/railway_ipc/rabbitmq/adapter.rb', line 19 def initialize(exchange_name:, amqp_url: ENV['RAILWAY_RABBITMQ_CONNECTION_URL'], queue_name: '', options: {}) @queue_name = queue_name @exchange_name = exchange_name settings = AMQ::Settings.parse_amqp_url(amqp_url) vhost = settings[:vhost] || '/' @connection = Bunny.new({ host: settings[:host], user: settings[:user], pass: settings[:pass], port: settings[:port], vhost: vhost, automatic_recovery: false, logger: RailwayIpc.logger }.merge()) end |
Instance Attribute Details
#channel ⇒ Object (readonly)
Returns the value of attribute channel.
8 9 10 |
# File 'lib/railway_ipc/rabbitmq/adapter.rb', line 8 def channel @channel end |
#connection ⇒ Object (readonly)
Returns the value of attribute connection.
8 9 10 |
# File 'lib/railway_ipc/rabbitmq/adapter.rb', line 8 def connection @connection end |
#exchange ⇒ Object (readonly)
Returns the value of attribute exchange.
8 9 10 |
# File 'lib/railway_ipc/rabbitmq/adapter.rb', line 8 def exchange @exchange end |
#exchange_name ⇒ Object (readonly)
Returns the value of attribute exchange_name.
8 9 10 |
# File 'lib/railway_ipc/rabbitmq/adapter.rb', line 8 def exchange_name @exchange_name end |
#queue ⇒ Object (readonly)
Returns the value of attribute queue.
8 9 10 |
# File 'lib/railway_ipc/rabbitmq/adapter.rb', line 8 def queue @queue end |
#queue_name ⇒ Object (readonly)
Returns the value of attribute queue_name.
8 9 10 |
# File 'lib/railway_ipc/rabbitmq/adapter.rb', line 8 def queue_name @queue_name end |
Instance Method Details
#bind_queue_to_exchange ⇒ Object
90 91 92 93 |
# File 'lib/railway_ipc/rabbitmq/adapter.rb', line 90 def bind_queue_to_exchange queue.bind(exchange) self end |
#check_for_message(timeout: 10, time_elapsed: 0, &block) ⇒ Object
49 50 51 52 53 54 55 56 57 58 59 |
# File 'lib/railway_ipc/rabbitmq/adapter.rb', line 49 def (timeout: 10, time_elapsed: 0, &block) raise TimeoutError.new if time_elapsed >= timeout block ||= ->(result) { result } result = queue.pop return block.call(*result) if result.compact.any? sleep 1 (timeout: timeout, time_elapsed: time_elapsed + 1, &block) end |
#connect ⇒ Object
61 62 63 64 65 |
# File 'lib/railway_ipc/rabbitmq/adapter.rb', line 61 def connect connection.start @channel = connection.channel self end |
#create_exchange(strategy: :fanout, options: { durable: true }) ⇒ Object
73 74 75 76 |
# File 'lib/railway_ipc/rabbitmq/adapter.rb', line 73 def create_exchange(strategy: :fanout, options: { durable: true }) @exchange = Bunny::Exchange.new(connection.channel, strategy, exchange_name, ) self end |
#create_queue(options = { durable: true }) ⇒ Object
85 86 87 88 |
# File 'lib/railway_ipc/rabbitmq/adapter.rb', line 85 def create_queue(={ durable: true }) @queue = @channel.queue(queue_name, ) self end |
#delete_exchange ⇒ Object
78 79 80 81 82 83 |
# File 'lib/railway_ipc/rabbitmq/adapter.rb', line 78 def delete_exchange # rubocop:disable Style/SafeNavigation exchange.delete if exchange # rubocop:enable Style/SafeNavigation self end |
#delete_queue ⇒ Object
95 96 97 98 99 100 |
# File 'lib/railway_ipc/rabbitmq/adapter.rb', line 95 def delete_queue # rubocop:disable Style/SafeNavigation queue.delete if queue # rubocop:enable Style/SafeNavigation self end |
#disconnect ⇒ Object
67 68 69 70 71 |
# File 'lib/railway_ipc/rabbitmq/adapter.rb', line 67 def disconnect channel.close connection.close self end |
#publish(message, options = {}) ⇒ Object
35 36 37 38 39 |
# File 'lib/railway_ipc/rabbitmq/adapter.rb', line 35 def publish(, ={}) # rubocop:disable Style/SafeNavigation exchange.publish(, ) if exchange # rubocop:enable Style/SafeNavigation end |
#reply(message, from) ⇒ Object
41 42 43 |
# File 'lib/railway_ipc/rabbitmq/adapter.rb', line 41 def reply(, from) channel.default_exchange.publish(, routing_key: from) end |
#subscribe(&block) ⇒ Object
45 46 47 |
# File 'lib/railway_ipc/rabbitmq/adapter.rb', line 45 def subscribe(&block) queue.subscribe(&block) end |