Class: Bunny::Exchange09

Inherits:
Object
  • Object
show all
Defined in:
lib/bunny/exchange09.rb

Overview

Exchanges are the routing and distribution hub of AMQP. All messages that Bunny sends to an AMQP broker/server @have_to pass through an exchange in order to be routed to a destination queue. The AMQP specification defines the types of exchange that you can create.

At the time of writing there are four (4) types of exchange defined:

  • :direct
  • :fanout
  • :topic
  • :headers

AMQP-compliant brokers/servers are required to provide default exchanges for the direct and fanout exchange types. All default exchanges are prefixed with 'amq.', for example:

  • amq.direct
  • amq.fanout
  • amq.topic
  • amq.match or amq.headers

If you want more information about exchanges, please consult the documentation for your target broker/server or visit the AMQP website to find the version of the specification that applies to your target broker/server.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(client, name, opts = {}) ⇒ Exchange09

Returns a new instance of Exchange09.



31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
# File 'lib/bunny/exchange09.rb', line 31

def initialize(client, name, opts = {})
  # check connection to server
  raise Bunny::ConnectionError, 'Not connected to server' if client.status == :not_connected

  @client, @name, @opts = client, name, opts

  # set up the exchange type catering for default names
  if name =~ /^amq\.(.+)$/
    predeclared = true
    new_type = $1
    # handle 'amq.match' default
    new_type = 'headers' if new_type == 'match'
    @type = new_type.to_sym
  else
    @type = opts[:type] || :direct
  end

  @key = opts[:key]
  @client.exchanges[@name] ||= self

  # ignore the :nowait option if passed, otherwise program will hang waiting for a
  # response that will not be sent by the server
  opts.delete(:nowait)

  unless predeclared or name == ''
    opts = {
      :exchange => name, :type => type, :nowait => false,
      :deprecated_ticket => 0, :deprecated_auto_delete => false, :deprecated_internal => false
    }.merge(opts)

    client.send_frame(Qrack::Protocol09::Exchange::Declare.new(opts))

    method = client.next_method

    client.check_response(method, Qrack::Protocol09::Exchange::DeclareOk, "Error declaring exchange #{name}: type = #{type}")
  end
end

Instance Attribute Details

#clientObject (readonly)

Returns the value of attribute client.



29
30
31
# File 'lib/bunny/exchange09.rb', line 29

def client
  @client
end

#keyObject (readonly)

Returns the value of attribute key.



29
30
31
# File 'lib/bunny/exchange09.rb', line 29

def key
  @key
end

#nameObject (readonly)

Returns the value of attribute name.



29
30
31
# File 'lib/bunny/exchange09.rb', line 29

def name
  @name
end

#optsObject (readonly)

Returns the value of attribute opts.



29
30
31
# File 'lib/bunny/exchange09.rb', line 29

def opts
  @opts
end

#typeObject (readonly)

Returns the value of attribute type.



29
30
31
# File 'lib/bunny/exchange09.rb', line 29

def type
  @type
end

Instance Method Details

#delete(opts = {}) ⇒ Symbol

Requests that an exchange is deleted from broker/server. Removes reference from exchanges if successful. If an error occurs raises ProtocolError.

Parameters:

  • opts (Hash) (defaults to: {})

    a customizable set of options

Options Hash (opts):

  • :if_unused (Boolean) — default: false

    If set to true, the server will only delete the exchange if it has no queue bindings. If the exchange has queue bindings the server does not delete it but raises a channel exception instead.

  • :nowait (Boolean) — default: false

    Ignored by Bunny, always false.

Returns:

  • (Symbol)

    :delete_ok if successful.



79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
# File 'lib/bunny/exchange09.rb', line 79

def delete(opts = {})
  # ignore the :nowait option if passed, otherwise program will hang waiting for a
  # response that will not be sent by the server
  opts.delete(:nowait)

  opts = { :exchange => name, :nowait => false, :deprecated_ticket => 0 }.merge(opts)

  client.send_frame(Qrack::Protocol09::Exchange::Delete.new(opts))

  method = client.next_method

  client.check_response(method, Qrack::Protocol09::Exchange::DeleteOk, "Error deleting exchange #{name}")

  client.exchanges.delete(name)

  # return confirmation
  :delete_ok
end

#publish(data, opts = {}) ⇒ NilClass

Publishes a message to a specific exchange. The message will be routed to queues as defined by the exchange configuration and distributed to any active consumers when the transaction, if any, is committed.

Parameters:

  • opts (Hash) (defaults to: {})

    a customizable set of options

Options Hash (opts):

  • :key (String)

    Specifies the routing key for the message. The routing key is used for routing messages depending on the exchange configuration.

  • :content_type (String)

    Specifies the content type for the message.

  • :mandatory (Boolean) — default: false

    Tells the server how to react if the message cannot be routed to a queue. If set to true, the server will return an unroutable message with a Return method. If this flag is zero, the server silently drops the message.

  • :immediate (Boolean) — default: false

    Tells the server how to react if the message cannot be routed to a queue consumer immediately. If set to true, the server will return an undeliverable message with a Return method. If set to false, the server will queue the message, but with no guarantee that it will ever be consumed.

  • :persistent (Boolean) — default: false

    Tells the server whether to persist the message. If set to true, the message will be persisted to disk and not lost if the server restarts. If set to false, the message will not be persisted across server restart. Setting to true incurs a performance penalty as there is an extra cost associated with disk access.

Returns:

  • (NilClass)

    nil



127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
# File 'lib/bunny/exchange09.rb', line 127

def publish(data, opts = {})
  opts = opts.dup
  out = []

  # Set up options
  routing_key = opts.delete(:key) || key
  mandatory = opts.delete(:mandatory)
  immediate = opts.delete(:immediate)
  delivery_mode = opts.delete(:persistent) ? 2 : 1
  content_type = opts.delete(:content_type) || 'application/octet-stream'

  out << Qrack::Protocol09::Basic::Publish.new({ :exchange => name,
                                                 :routing_key => routing_key,
                                                 :mandatory => mandatory,
                                                 :immediate => immediate,
                                                 :deprecated_ticket => 0 })
  data = data.to_s
  out << Qrack::Protocol09::Header.new(
                                       Qrack::Protocol09::Basic,
                                       data.bytesize, {
                                         :content_type  => content_type,
                                         :delivery_mode => delivery_mode,
                                         :priority      => 0
                                       }.merge(opts)
                                       )
  out << Qrack::Transport09::Body.new(data)

  client.send_frame(*out)
end