Class: AMQ::Client::Async::Exchange

Inherits:
Object
  • Object
show all
Extended by:
ProtocolMethodHandlers
Includes:
Entity, ServerNamedEntity
Defined in:
lib/amq/client/async/exchange.rb

Constant Summary collapse

BUILTIN_TYPES =
[:fanout, :direct, :topic, :headers].freeze

Constants included from Openable

Openable::VALUES

Instance Attribute Summary collapse

Attributes included from Entity

#callbacks

Attributes included from Openable

#status

Declaration collapse

Publishing Messages collapse

Error Handling and Recovery collapse

Instance Method Summary collapse

Methods included from ProtocolMethodHandlers

handle, handlers

Methods included from ServerNamedEntity

#server_named?

Methods included from Callbacks

#clear_callbacks, #define_callback, #exec_callback, #exec_callback_once, #exec_callback_once_yielding_self, #exec_callback_yielding_self, #has_callback?, #prepend_callback, #redefine_callback

Methods included from Openable

#closed!, #closed?, #closing!, #closing?, #opened!, #opened?, #opening!, #opening?

Constructor Details

#initialize(connection, channel, name, type = :fanout) ⇒ Exchange

Returns a new instance of Exchange.



40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
# File 'lib/amq/client/async/exchange.rb', line 40

def initialize(connection, channel, name, type = :fanout)
  if !(BUILTIN_TYPES.include?(type.to_sym) || type.to_s =~ /^x-.+/i)
    raise UnknownExchangeTypeError.new(BUILTIN_TYPES, type)
  end

  @connection = connection
  @channel    = channel
  @name       = name
  @type       = type

  # register pre-declared exchanges
  if @name == AMQ::Protocol::EMPTY_STRING || @name =~ /^amq\.(direct|fanout|topic|match|headers)/
    @channel.register_exchange(self)
  end

  super(connection)
end

Instance Attribute Details

#argumentsHash (readonly)

Returns Additional arguments given on queue declaration. Typically used by AMQP extensions.

Returns:

  • (Hash)

    Additional arguments given on queue declaration. Typically used by AMQP extensions.



36
37
38
# File 'lib/amq/client/async/exchange.rb', line 36

def arguments
  @arguments
end

#channelObject (readonly)

Channel this exchange belongs to.



26
27
28
# File 'lib/amq/client/async/exchange.rb', line 26

def channel
  @channel
end

#nameString (readonly)

Exchange name. May be server-generated or assigned directly.

Returns:

  • (String)


30
31
32
# File 'lib/amq/client/async/exchange.rb', line 30

def name
  @name
end

#typeSymbol (readonly)

Returns One of :direct, :fanout, :topic, :headers.

Returns:

  • (Symbol)

    One of :direct, :fanout, :topic, :headers



33
34
35
# File 'lib/amq/client/async/exchange.rb', line 33

def type
  @type
end

Instance Method Details

#auto_recoverObject

Called by associated connection object when AMQP connection has been re-established (for example, after a network failure).



224
225
226
# File 'lib/amq/client/async/exchange.rb', line 224

def auto_recover
  self.redeclare unless predefined?
end

#before_recovery(&block) ⇒ Object

Defines a callback that will be executed after TCP connection is recovered after a network failure but before AMQP connection is re-opened. Only one callback can be defined (the one defined last replaces previously added ones).



195
196
197
# File 'lib/amq/client/async/exchange.rb', line 195

def before_recovery(&block)
  self.redefine_callback(:before_recovery, &block)
end

#custom_type?Boolean

Returns true if this exchange is of a custom type (begins with x-).

Returns:

  • (Boolean)

    true if this exchange is of a custom type (begins with x-)



84
85
86
# File 'lib/amq/client/async/exchange.rb', line 84

def custom_type?
  @type.to_s =~ /^x-.+/i
end

#declare(passive = false, durable = false, auto_delete = false, nowait = false, arguments = nil, &block) ⇒ Object



97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
# File 'lib/amq/client/async/exchange.rb', line 97

def declare(passive = false, durable = false, auto_delete = false, nowait = false, arguments = nil, &block)
  # for re-declaration
  @passive     = passive
  @durable     = durable
  @auto_delete = auto_delete
  @arguments   = arguments

  @connection.send_frame(Protocol::Exchange::Declare.encode(@channel.id, @name, @type.to_s, passive, durable, auto_delete, false, nowait, arguments))

  unless nowait
    self.define_callback(:declare, &block)
    @channel.exchanges_awaiting_declare_ok.push(self)
  end

  self
end

#delete(if_unused = false, nowait = false, &block) ⇒ Object



132
133
134
135
136
137
138
139
140
141
142
143
# File 'lib/amq/client/async/exchange.rb', line 132

def delete(if_unused = false, nowait = false, &block)
  @connection.send_frame(Protocol::Exchange::Delete.encode(@channel.id, @name, if_unused, nowait))

  unless nowait
    self.define_callback(:delete, &block)

    # TODO: delete itself from exchanges cache
    @channel.exchanges_awaiting_delete_ok.push(self)
  end

  self
end

#direct?Boolean

Returns true if this exchange is of type ‘direct`.

Returns:

  • (Boolean)

    true if this exchange is of type ‘direct`



66
67
68
# File 'lib/amq/client/async/exchange.rb', line 66

def direct?
  @type == :direct
end

#fanout?Boolean

Returns true if this exchange is of type ‘fanout`.

Returns:

  • (Boolean)

    true if this exchange is of type ‘fanout`



60
61
62
# File 'lib/amq/client/async/exchange.rb', line 60

def fanout?
  @type == :fanout
end

#handle_declare_ok(method) ⇒ Object



237
238
239
240
241
242
# File 'lib/amq/client/async/exchange.rb', line 237

def handle_declare_ok(method)
  @name = method.exchange if self.anonymous?
  @channel.register_exchange(self)

  self.exec_callback_once_yielding_self(:declare, method)
end

#handle_delete_ok(method) ⇒ Object



244
245
246
# File 'lib/amq/client/async/exchange.rb', line 244

def handle_delete_ok(method)
  self.exec_callback_once(:delete, method)
end

#headers?Boolean

Returns true if this exchange is of type ‘headers`.

Returns:

  • (Boolean)

    true if this exchange is of type ‘headers`



78
79
80
# File 'lib/amq/client/async/exchange.rb', line 78

def headers?
  @type == :headers
end

#on_connection_interruption(&block) ⇒ Object Also known as: after_connection_interruption

Defines a callback that will be executed after TCP connection is interrupted (typically because of a network failure). Only one callback can be defined (the one defined last replaces previously added ones).



178
179
180
# File 'lib/amq/client/async/exchange.rb', line 178

def on_connection_interruption(&block)
  self.redefine_callback(:after_connection_interruption, &block)
end

#on_recovery(&block) ⇒ Object Also known as: after_recovery

Defines a callback that will be executed when AMQP connection is recovered after a network failure.. Only one callback can be defined (the one defined last replaces previously added ones).



209
210
211
# File 'lib/amq/client/async/exchange.rb', line 209

def on_recovery(&block)
  self.redefine_callback(:after_recovery, &block)
end

#on_return(&block) ⇒ Object



161
162
163
164
165
# File 'lib/amq/client/async/exchange.rb', line 161

def on_return(&block)
  self.redefine_callback(:return, &block)

  self
end

#predefined?Boolean

Returns true if this exchange is a pre-defined one (amq.direct, amq.fanout, amq.match and so on).

Returns:

  • (Boolean)

    true if this exchange is a pre-defined one (amq.direct, amq.fanout, amq.match and so on)



89
90
91
# File 'lib/amq/client/async/exchange.rb', line 89

def predefined?
  @name && ((@name == AMQ::Protocol::EMPTY_STRING) || !!(@name =~ /^amq\.(direct|fanout|topic|headers|match)/i))
end

#publish(payload, routing_key = AMQ::Protocol::EMPTY_STRING, user_headers = {}, mandatory = false, immediate = false, frame_size = nil) ⇒ Object



150
151
152
153
154
155
156
157
# File 'lib/amq/client/async/exchange.rb', line 150

def publish(payload, routing_key = AMQ::Protocol::EMPTY_STRING, user_headers = {}, mandatory = false, immediate = false, frame_size = nil)
  headers = { :priority => 0, :delivery_mode => 2, :content_type => "application/octet-stream" }.merge(user_headers)
  @connection.send_frameset(Protocol::Basic::Publish.encode(@channel.id, payload, headers, @name, routing_key, mandatory, immediate, (frame_size || @connection.frame_max)), @channel)

  # publisher confirms support. MK.
  @channel.exec_callback(:after_publish)
  self
end

#redeclare(&block) ⇒ Object



116
117
118
119
120
121
122
123
124
125
126
# File 'lib/amq/client/async/exchange.rb', line 116

def redeclare(&block)
  nowait = block.nil?
  @connection.send_frame(Protocol::Exchange::Declare.encode(@channel.id, @name, @type.to_s, @passive, @durable, @auto_delete, false, nowait, @arguments))

  unless nowait
    self.define_callback(:declare, &block)
    @channel.exchanges_awaiting_declare_ok.push(self)
  end

  self
end

#topic?Boolean

Returns true if this exchange is of type ‘topic`.

Returns:

  • (Boolean)

    true if this exchange is of type ‘topic`



72
73
74
# File 'lib/amq/client/async/exchange.rb', line 72

def topic?
  @type == :topic
end