Class: Msgr::Connection
- Inherits:
-
Object
- Object
- Msgr::Connection
- Includes:
- Logging
- Defined in:
- lib/msgr/connection.rb
Overview
rubocop:disable Metrics/ClassLength
Constant Summary collapse
- EXCHANGE_NAME =
'msgr'
Instance Attribute Summary collapse
-
#config ⇒ Object
readonly
Returns the value of attribute config.
-
#uri ⇒ Object
readonly
Returns the value of attribute uri.
Instance Method Summary collapse
- #ack(delivery_tag) ⇒ Object
- #bind(routes) ⇒ Object
- #bindings ⇒ Object
- #channel ⇒ Object
- #close ⇒ Object
- #connect ⇒ Object
- #connection ⇒ Object
- #delete ⇒ Object
- #exchange ⇒ Object
-
#initialize(uri, config, dispatcher) ⇒ Connection
constructor
A new instance of Connection.
- #nack(delivery_tag) ⇒ Object
- #prefix(name) ⇒ Object
- #publish(message, opts = {}) ⇒ Object
- #purge(**kwargs) ⇒ Object
- #queue(name) ⇒ Object
- #reject(delivery_tag, requeue = true) ⇒ Object
- #release ⇒ Object
- #running? ⇒ Boolean
Methods included from Logging
Constructor Details
#initialize(uri, config, dispatcher) ⇒ Connection
Returns a new instance of Connection.
14 15 16 17 18 |
# File 'lib/msgr/connection.rb', line 14 def initialize(uri, config, dispatcher) @uri = uri @config = config @dispatcher = dispatcher end |
Instance Attribute Details
#config ⇒ Object (readonly)
Returns the value of attribute config.
12 13 14 |
# File 'lib/msgr/connection.rb', line 12 def config @config end |
#uri ⇒ Object (readonly)
Returns the value of attribute uri.
12 13 14 |
# File 'lib/msgr/connection.rb', line 12 def uri @uri end |
Instance Method Details
#ack(delivery_tag) ⇒ Object
112 113 114 115 |
# File 'lib/msgr/connection.rb', line 112 def ack(delivery_tag) channel.ack delivery_tag log(:debug) { "Acked message: #{delivery_tag}" } end |
#bind(routes) ⇒ Object
101 102 103 104 105 106 107 108 109 110 |
# File 'lib/msgr/connection.rb', line 101 def bind(routes) if routes.empty? log(:warn) do "No routes to bound to. Bind will have no effect:\n" \ " #{routes.inspect}" end else bind_all(routes) end end |
#bindings ⇒ Object
69 70 71 |
# File 'lib/msgr/connection.rb', line 69 def bindings @bindings ||= [] end |
#channel ⇒ Object
40 41 42 43 44 45 46 |
# File 'lib/msgr/connection.rb', line 40 def channel @channel ||= begin channel = connection.create_channel channel.prefetch 1 channel end end |
#close ⇒ Object
127 128 129 130 131 |
# File 'lib/msgr/connection.rb', line 127 def close channel.close if @channel && @channel.open? connection.close if @connection log(:debug) { 'Closed.' } end |
#connect ⇒ Object
36 37 38 |
# File 'lib/msgr/connection.rb', line 36 def connect connection end |
#connection ⇒ Object
32 33 34 |
# File 'lib/msgr/connection.rb', line 32 def connection @connection ||= ::Bunny.new(config).tap(&:start) end |
#delete ⇒ Object
55 56 57 58 59 60 |
# File 'lib/msgr/connection.rb', line 55 def delete return if bindings.empty? log(:debug) { "Delete bindings (#{bindings.size})..." } bindings.each(&:delete) end |
#exchange ⇒ Object
81 82 83 84 85 86 87 88 89 90 |
# File 'lib/msgr/connection.rb', line 81 def exchange @exchange ||= begin channel.topic(prefix(EXCHANGE_NAME), durable: true).tap do |ex| log(:debug) do "Created exchange #{ex.name} (type: #{ex.type}, " \ "durable: #{ex.durable?}, auto_delete: #{ex.auto_delete?})" end end end end |
#nack(delivery_tag) ⇒ Object
117 118 119 120 |
# File 'lib/msgr/connection.rb', line 117 def nack(delivery_tag) channel.nack delivery_tag, false, true log(:debug) { "Nacked message: #{delivery_tag}" } end |
#prefix(name) ⇒ Object
73 74 75 76 77 78 79 |
# File 'lib/msgr/connection.rb', line 73 def prefix(name) if config[:prefix].present? "#{config[:prefix]}-#{name}" else name end end |
#publish(message, opts = {}) ⇒ Object
24 25 26 27 28 29 30 |
# File 'lib/msgr/connection.rb', line 24 def publish(, opts = {}) opts[:routing_key] = opts.delete(:to) if opts[:to] exchange.publish .to_s, opts.merge(persistent: true) log(:debug) { "Published message to #{opts[:routing_key]}" } end |
#purge(**kwargs) ⇒ Object
62 63 64 65 66 67 |
# File 'lib/msgr/connection.rb', line 62 def purge(**kwargs) return if bindings.empty? log(:debug) { "Purge bindings (#{bindings.size})..." } bindings.each {|b| b.purge(**kwargs) } end |
#queue(name) ⇒ Object
92 93 94 95 96 97 98 99 |
# File 'lib/msgr/connection.rb', line 92 def queue(name) channel.queue(prefix(name), durable: true).tap do |queue| log(:debug) do "Create queue #{queue.name} (durable: #{queue.durable?}, " \ "auto_delete: #{queue.auto_delete?})" end end end |
#reject(delivery_tag, requeue = true) ⇒ Object
122 123 124 125 |
# File 'lib/msgr/connection.rb', line 122 def reject(delivery_tag, requeue = true) channel.reject delivery_tag, requeue log(:debug) { "Rejected message: #{delivery_tag}" } end |
#release ⇒ Object
48 49 50 51 52 53 |
# File 'lib/msgr/connection.rb', line 48 def release return if bindings.empty? log(:debug) { "Release bindings (#{bindings.size})..." } bindings.each(&:release) end |
#running? ⇒ Boolean
20 21 22 |
# File 'lib/msgr/connection.rb', line 20 def running? bindings.any? end |