Class: AMQ::Client::Async::Exchange
- Inherits:
-
Object
- Object
- AMQ::Client::Async::Exchange
- Extended by:
- ProtocolMethodHandlers
- Includes:
- Entity, ServerNamedEntity
- Defined in:
- lib/amq/client/async/exchange.rb
Constant Summary collapse
- BUILTIN_TYPES =
[:fanout, :direct, :topic, :headers].freeze
Constants included from Openable
Instance Attribute Summary collapse
-
#arguments ⇒ Hash
readonly
Additional arguments given on queue declaration.
-
#channel ⇒ Object
readonly
Channel this exchange belongs to.
-
#name ⇒ String
readonly
Exchange name.
-
#type ⇒ Symbol
readonly
One of :direct, :fanout, :topic, :headers.
Attributes included from Entity
Attributes included from Openable
Declaration collapse
- #declare(passive = false, durable = false, auto_delete = false, nowait = false, arguments = nil, &block) ⇒ Object
- #redeclare(&block) ⇒ Object
Publishing Messages collapse
- #on_return(&block) ⇒ Object
- #publish(payload, routing_key = AMQ::Protocol::EMPTY_STRING, user_headers = {}, mandatory = false, immediate = false, frame_size = nil) ⇒ Object
Error Handling and Recovery collapse
-
#auto_recover ⇒ Object
Called by associated connection object when AMQP connection has been re-established (for example, after a network failure).
-
#before_recovery(&block) ⇒ Object
Defines a callback that will be executed after TCP connection is recovered after a network failure but before AMQP connection is re-opened.
-
#on_connection_interruption(&block) ⇒ Object
(also: #after_connection_interruption)
Defines a callback that will be executed after TCP connection is interrupted (typically because of a network failure).
-
#on_recovery(&block) ⇒ Object
(also: #after_recovery)
Defines a callback that will be executed when AMQP connection is recovered after a network failure..
Instance Method Summary collapse
-
#custom_type? ⇒ Boolean
True if this exchange is of a custom type (begins with x-).
- #delete(if_unused = false, nowait = false, &block) ⇒ Object
-
#direct? ⇒ Boolean
True if this exchange is of type ‘direct`.
-
#fanout? ⇒ Boolean
True if this exchange is of type ‘fanout`.
- #handle_declare_ok(method) ⇒ Object
- #handle_delete_ok(method) ⇒ Object
-
#headers? ⇒ Boolean
True if this exchange is of type ‘headers`.
-
#initialize(connection, channel, name, type = :fanout) ⇒ Exchange
constructor
A new instance of Exchange.
-
#predefined? ⇒ Boolean
True if this exchange is a pre-defined one (amq.direct, amq.fanout, amq.match and so on).
-
#topic? ⇒ Boolean
True if this exchange is of type ‘topic`.
Methods included from ProtocolMethodHandlers
Methods included from ServerNamedEntity
Methods included from Callbacks
#clear_callbacks, #define_callback, #exec_callback, #exec_callback_once, #exec_callback_once_yielding_self, #exec_callback_yielding_self, #has_callback?, #prepend_callback, #redefine_callback
Methods included from Openable
#closed!, #closed?, #closing!, #closing?, #opened!, #opened?, #opening!, #opening?
Constructor Details
#initialize(connection, channel, name, type = :fanout) ⇒ Exchange
Returns a new instance of Exchange.
40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 |
# File 'lib/amq/client/async/exchange.rb', line 40 def initialize(connection, channel, name, type = :fanout) if !(BUILTIN_TYPES.include?(type.to_sym) || type.to_s =~ /^x-.+/i) raise UnknownExchangeTypeError.new(BUILTIN_TYPES, type) end @connection = connection @channel = channel @name = name @type = type # register pre-declared exchanges if @name == AMQ::Protocol::EMPTY_STRING || @name =~ /^amq\.(direct|fanout|topic|match|headers)/ @channel.register_exchange(self) end super(connection) end |
Instance Attribute Details
#arguments ⇒ Hash (readonly)
Returns Additional arguments given on queue declaration. Typically used by AMQP extensions.
36 37 38 |
# File 'lib/amq/client/async/exchange.rb', line 36 def arguments @arguments end |
#channel ⇒ Object (readonly)
Channel this exchange belongs to.
26 27 28 |
# File 'lib/amq/client/async/exchange.rb', line 26 def channel @channel end |
#name ⇒ String (readonly)
Exchange name. May be server-generated or assigned directly.
30 31 32 |
# File 'lib/amq/client/async/exchange.rb', line 30 def name @name end |
#type ⇒ Symbol (readonly)
Returns One of :direct, :fanout, :topic, :headers.
33 34 35 |
# File 'lib/amq/client/async/exchange.rb', line 33 def type @type end |
Instance Method Details
#auto_recover ⇒ Object
Called by associated connection object when AMQP connection has been re-established (for example, after a network failure).
224 225 226 |
# File 'lib/amq/client/async/exchange.rb', line 224 def auto_recover self.redeclare unless predefined? end |
#before_recovery(&block) ⇒ Object
Defines a callback that will be executed after TCP connection is recovered after a network failure but before AMQP connection is re-opened. Only one callback can be defined (the one defined last replaces previously added ones).
195 196 197 |
# File 'lib/amq/client/async/exchange.rb', line 195 def before_recovery(&block) self.redefine_callback(:before_recovery, &block) end |
#custom_type? ⇒ Boolean
Returns true if this exchange is of a custom type (begins with x-).
84 85 86 |
# File 'lib/amq/client/async/exchange.rb', line 84 def custom_type? @type.to_s =~ /^x-.+/i end |
#declare(passive = false, durable = false, auto_delete = false, nowait = false, arguments = nil, &block) ⇒ Object
97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 |
# File 'lib/amq/client/async/exchange.rb', line 97 def declare(passive = false, durable = false, auto_delete = false, nowait = false, arguments = nil, &block) # for re-declaration @passive = passive @durable = durable @auto_delete = auto_delete @arguments = arguments @connection.send_frame(Protocol::Exchange::Declare.encode(@channel.id, @name, @type.to_s, passive, durable, auto_delete, false, nowait, arguments)) unless nowait self.define_callback(:declare, &block) @channel.exchanges_awaiting_declare_ok.push(self) end self end |
#delete(if_unused = false, nowait = false, &block) ⇒ Object
132 133 134 135 136 137 138 139 140 141 142 143 |
# File 'lib/amq/client/async/exchange.rb', line 132 def delete(if_unused = false, nowait = false, &block) @connection.send_frame(Protocol::Exchange::Delete.encode(@channel.id, @name, if_unused, nowait)) unless nowait self.define_callback(:delete, &block) # TODO: delete itself from exchanges cache @channel.exchanges_awaiting_delete_ok.push(self) end self end |
#direct? ⇒ Boolean
Returns true if this exchange is of type ‘direct`.
66 67 68 |
# File 'lib/amq/client/async/exchange.rb', line 66 def direct? @type == :direct end |
#fanout? ⇒ Boolean
Returns true if this exchange is of type ‘fanout`.
60 61 62 |
# File 'lib/amq/client/async/exchange.rb', line 60 def fanout? @type == :fanout end |
#handle_declare_ok(method) ⇒ Object
237 238 239 240 241 242 |
# File 'lib/amq/client/async/exchange.rb', line 237 def handle_declare_ok(method) @name = method.exchange if self.anonymous? @channel.register_exchange(self) self.exec_callback_once_yielding_self(:declare, method) end |
#handle_delete_ok(method) ⇒ Object
244 245 246 |
# File 'lib/amq/client/async/exchange.rb', line 244 def handle_delete_ok(method) self.exec_callback_once(:delete, method) end |
#headers? ⇒ Boolean
Returns true if this exchange is of type ‘headers`.
78 79 80 |
# File 'lib/amq/client/async/exchange.rb', line 78 def headers? @type == :headers end |
#on_connection_interruption(&block) ⇒ Object Also known as: after_connection_interruption
Defines a callback that will be executed after TCP connection is interrupted (typically because of a network failure). Only one callback can be defined (the one defined last replaces previously added ones).
178 179 180 |
# File 'lib/amq/client/async/exchange.rb', line 178 def on_connection_interruption(&block) self.redefine_callback(:after_connection_interruption, &block) end |
#on_recovery(&block) ⇒ Object Also known as: after_recovery
Defines a callback that will be executed when AMQP connection is recovered after a network failure.. Only one callback can be defined (the one defined last replaces previously added ones).
209 210 211 |
# File 'lib/amq/client/async/exchange.rb', line 209 def on_recovery(&block) self.redefine_callback(:after_recovery, &block) end |
#on_return(&block) ⇒ Object
161 162 163 164 165 |
# File 'lib/amq/client/async/exchange.rb', line 161 def on_return(&block) self.redefine_callback(:return, &block) self end |
#predefined? ⇒ Boolean
Returns true if this exchange is a pre-defined one (amq.direct, amq.fanout, amq.match and so on).
89 90 91 |
# File 'lib/amq/client/async/exchange.rb', line 89 def predefined? @name && ((@name == AMQ::Protocol::EMPTY_STRING) || !!(@name =~ /^amq\.(direct|fanout|topic|headers|match)/i)) end |
#publish(payload, routing_key = AMQ::Protocol::EMPTY_STRING, user_headers = {}, mandatory = false, immediate = false, frame_size = nil) ⇒ Object
150 151 152 153 154 155 156 157 |
# File 'lib/amq/client/async/exchange.rb', line 150 def publish(payload, routing_key = AMQ::Protocol::EMPTY_STRING, user_headers = {}, mandatory = false, immediate = false, frame_size = nil) headers = { :priority => 0, :delivery_mode => 2, :content_type => "application/octet-stream" }.merge(user_headers) @connection.send_frameset(Protocol::Basic::Publish.encode(@channel.id, payload, headers, @name, routing_key, mandatory, immediate, (frame_size || @connection.frame_max)), @channel) # publisher confirms support. MK. @channel.exec_callback(:after_publish) self end |
#redeclare(&block) ⇒ Object
116 117 118 119 120 121 122 123 124 125 126 |
# File 'lib/amq/client/async/exchange.rb', line 116 def redeclare(&block) nowait = block.nil? @connection.send_frame(Protocol::Exchange::Declare.encode(@channel.id, @name, @type.to_s, @passive, @durable, @auto_delete, false, nowait, @arguments)) unless nowait self.define_callback(:declare, &block) @channel.exchanges_awaiting_declare_ok.push(self) end self end |
#topic? ⇒ Boolean
Returns true if this exchange is of type ‘topic`.
72 73 74 |
# File 'lib/amq/client/async/exchange.rb', line 72 def topic? @type == :topic end |