Class: MarchHare::Exchange

Inherits:
Object
  • Object
show all
Defined in:
lib/march_hare/exchange.rb

Overview

Represents AMQP 0.9.1 exchanges.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(channel, name, options = {}) ⇒ Exchange

Instantiates a new exchange.

Parameters:

  • channel (Channel)

    Channel to declare exchange on

Raises:

  • (ArgumentError)

See Also:



39
40
41
42
43
44
45
46
47
48
# File 'lib/march_hare/exchange.rb', line 39

def initialize(channel, name, options = {})
  raise ArgumentError, "exchange channel cannot be nil" if channel.nil?
  raise ArgumentError, "exchange name cannot be nil" if name.nil?
  raise ArgumentError, "exchange :type must be specified as an option" if options[:type].nil?

  @channel = channel
  @name    = name
  @type    = options[:type]
  @options = {:type => :fanout, :durable => false, :auto_delete => false, :internal => false, :passive => false}.merge(options)
end

Instance Attribute Details

#channelMarchHare::Channel (readonly)

Returns Channel this exchange object uses.

Returns:



15
16
17
# File 'lib/march_hare/exchange.rb', line 15

def channel
  @channel
end

#nameString (readonly)

Returns Exchange name.

Returns:

  • (String)

    Exchange name



13
14
15
# File 'lib/march_hare/exchange.rb', line 13

def name
  @name
end

#typeSymbol (readonly)

Type of this exchange (one of: :direct, :fanout, :topic, :headers).

Returns:

  • (Symbol)


19
20
21
# File 'lib/march_hare/exchange.rb', line 19

def type
  @type
end

Instance Method Details

#auto_delete?Boolean

Returns true if this exchange was declared as automatically deleted (deleted as soon as last consumer unbinds).

Returns:

  • (Boolean)

    true if this exchange was declared as automatically deleted (deleted as soon as last consumer unbinds).



148
149
150
# File 'lib/march_hare/exchange.rb', line 148

def auto_delete?
  !!@options[:auto_delete]
end

#bind(exchange, options = {}) ⇒ MarchHare::Exchange

Binds an exchange to another (source) exchange using exchange.bind AMQP 0.9.1 extension that RabbitMQ provides.

Parameters:

  • exchange (String)

    Source exchange name

  • options (Hash) (defaults to: {})

    Options

  • opts (Hash)

    a customizable set of options

Returns:

See Also:



111
112
113
114
# File 'lib/march_hare/exchange.rb', line 111

def bind(exchange, options={})
  exchange_name = if exchange.respond_to?(:name) then exchange.name else exchange.to_s end
  @channel.exchange_bind(@name, exchange_name, options.fetch(:routing_key, ''))
end

#declare!Object



174
175
176
177
178
179
180
181
182
183
184
185
# File 'lib/march_hare/exchange.rb', line 174

def declare!
  unless predefined?
    if @options[:passive]
    then @channel.exchange_declare_passive(@name)
    else @channel.exchange_declare(@name, @options[:type].to_s,
        @options[:durable],
        @options[:auto_delete],
        @options[:internal],
        @options[:arguments])
    end
  end
end

#delete(options = {}) ⇒ Object

Deletes the exchange unless it is predefined

Parameters:

  • options (Hash) (defaults to: {})

    Options

  • opts (Hash)

    a customizable set of options

See Also:



92
93
94
95
# File 'lib/march_hare/exchange.rb', line 92

def delete(options={})
  @channel.deregister_exchange(self)
  @channel.exchange_delete(@name, options.fetch(:if_unused, false)) unless predefined?
end

#durable?Boolean

Returns true if this exchange was declared as durable (will survive broker restart).

Returns:

  • (Boolean)

    true if this exchange was declared as durable (will survive broker restart).



142
143
144
# File 'lib/march_hare/exchange.rb', line 142

def durable?
  !!@options[:durable]
end

#internal?Boolean

Returns true if this exchange is internal (used solely for exchange-to-exchange bindings and cannot be published to by clients).

Returns:

  • (Boolean)

    true if this exchange is internal (used solely for exchange-to-exchange bindings and cannot be published to by clients)



154
155
156
# File 'lib/march_hare/exchange.rb', line 154

def internal?
  !!@options[:internal]
end

#predefined?Boolean

Returns true if this exchange is a pre-defined one (amq.direct, amq.fanout, amq.match and so on).

Returns:

  • (Boolean)

    true if this exchange is a pre-defined one (amq.direct, amq.fanout, amq.match and so on)



136
137
138
# File 'lib/march_hare/exchange.rb', line 136

def predefined?
  @name.empty? || @name.start_with?("amq.")
end

#publish(body, opts = {}) ⇒ MarchHare::Exchange

Publishes a message

Parameters:

  • payload (String)

    Message payload. It will never be modified by MarchHare or RabbitMQ in any way.

  • opts (Hash) (defaults to: {})

    Message properties (metadata) and delivery settings

Options Hash (opts):

  • :routing_key (String)

    Routing key

  • :persistent (Boolean)

    Should the message be persisted to disk?

  • :mandatory (Boolean)

    Should the message be returned if it cannot be routed to any queue?

  • :properties (Hash)

    Messages and delivery properties

    • :timestamp (Time) A timestamp associated with this message

    • :expiration (Integer) Expiration time after which the message will be deleted

    • :type (String) Message type, e.g. what type of event or command this message represents. Can be any string

    • :reply_to (String) Queue name other apps should send the response to

    • :content_type (String) Message content type (e.g. application/json)

    • :content_encoding (String) Message content encoding (e.g. gzip)

    • :correlation_id (String) Message correlated to this one, e.g. what request this message is a reply for

    • :priority (Integer) Message priority, 0 to 9. Not used by RabbitMQ, only applications

    • :message_id (String) Any message identifier

    • :user_id (String) Optional user ID. Verified by RabbitMQ against the actual connection username

    • :app_id (String) Optional application ID

Returns:

See Also:



75
76
77
78
79
80
81
82
# File 'lib/march_hare/exchange.rb', line 75

def publish(body, opts = {})
  options = {:routing_key => '', :mandatory => false}.merge(opts)
  @channel.basic_publish(@name,
                         options.delete(:routing_key),
                         options.delete(:mandatory),
                         options.fetch(:properties, options),
                         body.to_java_bytes)
end

#recover_from_network_failureObject



188
189
190
191
192
193
194
195
196
197
198
199
200
# File 'lib/march_hare/exchange.rb', line 188

def recover_from_network_failure
  # puts "Recovering exchange #{@name} from network failure"
  unless predefined?
    begin
      declare!

      @channel.register_exchange(self)
    rescue Exception => e
      # TODO: use a logger
      puts "Caught #{e.inspect} while redeclaring and registering exchange #{@name}!"
    end
  end
end

#unbind(exchange, opts = {}) ⇒ Bunny::Exchange

Unbinds an exchange from another (source) exchange using exchange.unbind AMQP 0.9.1 extension that RabbitMQ provides.

Parameters:

  • source (String)

    Source exchange name

  • opts (Hash) (defaults to: {})

    Options

Options Hash (opts):

  • routing_key (String) — default: nil

    Routing key used for binding

  • arguments (Hash) — default: {}

    Optional arguments

Returns:

  • (Bunny::Exchange)

    Self

See Also:



130
131
132
133
# File 'lib/march_hare/exchange.rb', line 130

def unbind(exchange, opts = {})
  exchange_name = if exchange.respond_to?(:name) then exchange.name else exchange.to_s end
  @channel.exchange_unbind(@name, exchange_name, opts.fetch(:routing_key, ''), opts[:arguments])
end

#wait_for_confirmsObject

Waits until all outstanding publisher confirms on the channel arrive.

This is a convenience method that delegates to Channel#wait_for_confirms



164
165
166
# File 'lib/march_hare/exchange.rb', line 164

def wait_for_confirms
  @channel.wait_for_confirms
end