Class: Msgr::Connection
- Inherits:
-
Object
- Object
- Msgr::Connection
- Includes:
- Celluloid, Logging
- Defined in:
- lib/msgr/connection.rb
Instance Attribute Summary collapse
-
#conn ⇒ Object
readonly
Returns the value of attribute conn.
-
#dispatcher ⇒ Object
readonly
Returns the value of attribute dispatcher.
-
#opts ⇒ Object
readonly
Returns the value of attribute opts.
-
#routes ⇒ Object
readonly
Returns the value of attribute routes.
Instance Method Summary collapse
- #ack(delivery_tag) ⇒ Object
- #bind ⇒ Object
-
#bindings ⇒ Object
Used to store al bindings.
- #close ⇒ Object
- #delete ⇒ Object
- #exchange ⇒ Object
-
#initialize(conn, routes, dispatcher, opts = {}) ⇒ Connection
constructor
A new instance of Connection.
- #prefix(name = '') ⇒ Object
- #publish(payload, opts = {}) ⇒ Object
- #queue(name) ⇒ Object
- #rebind ⇒ Object
- #reject(delivery_tag, requeue = true) ⇒ Object
-
#release(wait = false) ⇒ Object
Release all bindings but do not close channel.
Methods included from Logging
Constructor Details
#initialize(conn, routes, dispatcher, opts = {}) ⇒ Connection
Returns a new instance of Connection.
10 11 12 13 14 15 16 17 18 19 20 |
# File 'lib/msgr/connection.rb', line 10 def initialize(conn, routes, dispatcher, opts = {}) @conn = conn @dispatcher = dispatcher @routes = routes @opts = opts @channel = conn.create_channel @channel.prefetch(10) bind end |
Instance Attribute Details
#conn ⇒ Object (readonly)
Returns the value of attribute conn.
7 8 9 |
# File 'lib/msgr/connection.rb', line 7 def conn @conn end |
#dispatcher ⇒ Object (readonly)
Returns the value of attribute dispatcher.
7 8 9 |
# File 'lib/msgr/connection.rb', line 7 def dispatcher @dispatcher end |
#opts ⇒ Object (readonly)
Returns the value of attribute opts.
7 8 9 |
# File 'lib/msgr/connection.rb', line 7 def opts @opts end |
#routes ⇒ Object (readonly)
Returns the value of attribute routes.
7 8 9 |
# File 'lib/msgr/connection.rb', line 7 def routes @routes end |
Instance Method Details
#ack(delivery_tag) ⇒ Object
110 111 112 113 |
# File 'lib/msgr/connection.rb', line 110 def ack(delivery_tag) log(:debug) { "Ack message: #{delivery_tag}" } @channel.ack delivery_tag end |
#bind ⇒ Object
27 28 29 30 31 32 |
# File 'lib/msgr/connection.rb', line 27 def bind # Create new bindings routes.each { |route| bindings << Binding.new(Actor.current, route, dispatcher) } log(:debug) { 'New routes bound.' } end |
#bindings ⇒ Object
Used to store al bindings. Allows use to release bindings when receiver should not longer receive messages but channel need to be open to allow further acknowledgments.
43 44 45 |
# File 'lib/msgr/connection.rb', line 43 def bindings @bindings ||= [] end |
#close ⇒ Object
120 121 122 123 |
# File 'lib/msgr/connection.rb', line 120 def close @channel.close if @channel.open? log(:debug) { 'Connection closed.' } end |
#delete ⇒ Object
85 86 87 88 89 90 91 92 93 94 |
# File 'lib/msgr/connection.rb', line 85 def delete return unless bindings.any? log(:debug) { 'Delete all bindings and exchange.' } bindings.each { |binding| binding.delete } bindings.clear @exchange.delete if @exchange end |
#exchange ⇒ Object
53 54 55 56 57 58 59 60 61 |
# File 'lib/msgr/connection.rb', line 53 def exchange unless @exchange @exchange = @channel.topic prefix('msgr'), durable: true log(:debug) { "Created exchange #{@exchange.name} (type: #{@exchange.type}, durable: #{@exchange.durable?}, auto_delete: #{@exchange.auto_delete?})" } end @exchange end |
#prefix(name = '') ⇒ Object
34 35 36 |
# File 'lib/msgr/connection.rb', line 34 def prefix(name = '') opts[:prefix] ? "#{opts[:prefix]}-#{name}" : name end |
#publish(payload, opts = {}) ⇒ Object
96 97 98 99 100 101 102 103 104 105 106 107 108 |
# File 'lib/msgr/connection.rb', line 96 def publish(payload, opts = {}) opts[:routing_key] ||= opts[:to] raise ArgumentError, 'Missing routing key.' unless opts[:routing_key] log(:debug) { "Publish message to #{opts[:routing_key]}" } begin payload = JSON.generate(payload) exchange.publish payload, opts.merge(persistent: true, content_type: 'application/json') rescue => error exchange.publish payload.to_s, opts.merge(persistent: true, content_type: 'application/text') end end |
#queue(name) ⇒ Object
47 48 49 50 51 |
# File 'lib/msgr/connection.rb', line 47 def queue(name) @channel.queue(prefix(name), durable: true).tap do |queue| log(:debug) { "Create queue #{queue.name} (durable: #{queue.durable?}, auto_delete: #{queue.auto_delete?})" } end end |
#rebind ⇒ Object
22 23 24 25 |
# File 'lib/msgr/connection.rb', line 22 def rebind release bind end |
#reject(delivery_tag, requeue = true) ⇒ Object
115 116 117 118 |
# File 'lib/msgr/connection.rb', line 115 def reject(delivery_tag, requeue = true) log(:debug) { "Reject message: #{delivery_tag}" } @channel.reject delivery_tag, requeue end |
#release(wait = false) ⇒ Object
Release all bindings but do not close channel. Will not longer receive any message but channel can be used to acknowledge currently processing messages.
67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 |
# File 'lib/msgr/connection.rb', line 67 def release(wait = false) return unless bindings.any? log(:debug) { "Release all bindings#{wait ? ' after queues are empty': ''}..." } if wait binds = bindings.dup while binds.any? binds.reject! { |b| b.release_if_empty } sleep 1 end else bindings.each &:release end log(:debug) { 'All bindings released.' } end |