Class: Msgr::Connection
- Inherits:
-
Object
- Object
- Msgr::Connection
- Includes:
- Celluloid, Logging
- Defined in:
- lib/msgr/connection.rb
Instance Attribute Summary collapse
-
#conn ⇒ Object
readonly
Returns the value of attribute conn.
-
#pool ⇒ Object
readonly
Returns the value of attribute pool.
-
#routes ⇒ Object
readonly
Returns the value of attribute routes.
Instance Method Summary collapse
- #ack(delivery_tag) ⇒ Object
-
#bindings ⇒ Object
Used to store al bindings.
- #close ⇒ Object
- #exchange ⇒ Object
-
#initialize(conn, routes, pool) ⇒ Connection
constructor
A new instance of Connection.
- #publish(payload, opts = {}) ⇒ Object
- #queue(name) ⇒ Object
- #rebind(routes = nil) ⇒ Object
-
#release ⇒ Object
Release all bindings but do not close channel.
Methods included from Logging
Constructor Details
#initialize(conn, routes, pool) ⇒ Connection
Returns a new instance of Connection.
10 11 12 13 14 15 16 17 18 19 |
# File 'lib/msgr/connection.rb', line 10 def initialize(conn, routes, pool) @conn = conn @pool = pool @routes = routes @channel = conn.create_channel @channel.prefetch(10) rebind end |
Instance Attribute Details
#conn ⇒ Object (readonly)
Returns the value of attribute conn.
7 8 9 |
# File 'lib/msgr/connection.rb', line 7 def conn @conn end |
#pool ⇒ Object (readonly)
Returns the value of attribute pool.
7 8 9 |
# File 'lib/msgr/connection.rb', line 7 def pool @pool end |
#routes ⇒ Object (readonly)
Returns the value of attribute routes.
7 8 9 |
# File 'lib/msgr/connection.rb', line 7 def routes @routes end |
Instance Method Details
#ack(delivery_tag) ⇒ Object
69 70 71 |
# File 'lib/msgr/connection.rb', line 69 def ack(delivery_tag) @channel.ack delivery_tag end |
#bindings ⇒ Object
Used to store al bindings. Allows use to release bindings when receiver should not longer receive messages but channel need to be open to allow further acknowledgments.
38 39 40 |
# File 'lib/msgr/connection.rb', line 38 def bindings @bindings ||= [] end |
#close ⇒ Object
73 74 75 76 |
# File 'lib/msgr/connection.rb', line 73 def close @channel.close if @channel.open? log(:debug) { 'Connection closed.' } end |
#exchange ⇒ Object
46 47 48 |
# File 'lib/msgr/connection.rb', line 46 def exchange @exchange ||= @channel.topic 'msgr', durable: true end |
#publish(payload, opts = {}) ⇒ Object
63 64 65 66 67 |
# File 'lib/msgr/connection.rb', line 63 def publish(payload, opts = {}) log(:debug) { "Publish message to #{opts[:routing_key]}" } exchange.publish payload, opts.merge(persistent: true) end |
#queue(name) ⇒ Object
42 43 44 |
# File 'lib/msgr/connection.rb', line 42 def queue(name) @channel.queue name, durable: true end |
#rebind(routes = nil) ⇒ Object
21 22 23 24 25 26 27 28 29 30 31 |
# File 'lib/msgr/connection.rb', line 21 def rebind(routes = nil) routes = self.routes unless routes # First release old bindings release # Create new bindings routes.each { |route| bindings << Binding.new(Actor.current, route, pool) } log(:debug) { 'New routes bound.' } end |
#release ⇒ Object
Release all bindings but do not close channel. Will not longer receive any message but channel can be used to acknowledge currently processing messages.
54 55 56 57 58 59 60 61 |
# File 'lib/msgr/connection.rb', line 54 def release return unless bindings.any? log(:debug) { 'Release all bindings.' } bindings.each { |binding| binding.release } bindings.clear end |