Class: BunnyMock::Queue

Inherits:
Object
  • Object
show all
Defined in:
lib/bunny_mock/queue.rb

Instance Attribute Summary collapse

Bunny API collapse

Instance Method Summary collapse

Constructor Details

#initialize(channel, name = '', opts = {}) ⇒ Queue

Create a new [BunnyMock::Queue] instance

Parameters:

  • channel (BunnyMock::Channel)

    Channel this queue will use

  • name (String) (defaults to: '')

    Name of queue

  • opts (Hash) (defaults to: {})

    Creation options

See Also:



26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
# File 'lib/bunny_mock/queue.rb', line 26

def initialize(channel, name = '', opts = {})
  # Store creation information
  @channel = channel
  @name    = name
  @opts    = opts

  # Store messages
  @messages = []

  # Store consumers subsribed to that queue
  @consumers = []

  # marks if this queue is deleted
  @deleted = false
end

Instance Attribute Details

#channelBunnyMock::Channel (readonly)

Returns Channel used by queue.

Returns:



9
10
11
# File 'lib/bunny_mock/queue.rb', line 9

def channel
  @channel
end

#nameString (readonly)

Returns Queue name.

Returns:

  • (String)

    Queue name



12
13
14
# File 'lib/bunny_mock/queue.rb', line 12

def name
  @name
end

#optsHash (readonly)

Returns Creation options.

Returns:

  • (Hash)

    Creation options



15
16
17
# File 'lib/bunny_mock/queue.rb', line 15

def opts
  @opts
end

Instance Method Details

#allArray

Get all messages in queue

Returns:

  • (Array)

    All messages



225
226
227
# File 'lib/bunny_mock/queue.rb', line 225

def all
  @messages
end

#bind(exchange, opts = {}) ⇒ Object

Bind this queue to an exchange

Parameters:

  • exchange (BunnyMock::Exchange, String)

    Exchange to bind to

  • opts (Hash) (defaults to: {})

    Binding properties

Options Hash (opts):

  • :routing_key (String)

    Custom routing key



118
119
120
121
122
123
124
125
126
127
128
129
130
131
# File 'lib/bunny_mock/queue.rb', line 118

def bind(exchange, opts = {})
  check_queue_deleted!

  if exchange.respond_to?(:add_route)

    # we can do the binding ourselves
    exchange.add_route opts.fetch(:routing_key, @name), self
  else

    # we need the channel to lookup the exchange
    @channel.queue_bind self, opts.fetch(:routing_key, @name), exchange
  end
  self
end

#bound_to?(exchange, opts = {}) ⇒ Boolean

Check if this queue is bound to the exchange

Parameters:

  • exchange (BunnyMock::Exchange, String)

    Exchange to test

  • opts (Hash) (defaults to: {})

    Binding properties

Options Hash (opts):

  • :routing_key (String)

    Routing key from binding

Returns:

  • (Boolean)

    true if this queue is bound to the given exchange, false otherwise



170
171
172
173
174
175
176
177
178
179
180
# File 'lib/bunny_mock/queue.rb', line 170

def bound_to?(exchange, opts = {})
  check_queue_deleted!

  if exchange.respond_to?(:routes_to?)
    # we can do the check ourselves
    exchange.routes_to? self, opts
  else
    # we need the channel to lookup the exchange
    @channel.xchg_routes_to? self, opts.fetch(:routing_key, @name), exchange
  end
end

#deleteObject

Deletes this queue



234
235
236
237
# File 'lib/bunny_mock/queue.rb', line 234

def delete
  @channel.deregister_queue self
  @deleted = true
end

#message_countInteger

Count of messages in queue

Returns:

  • (Integer)

    Number of messages in queue



188
189
190
# File 'lib/bunny_mock/queue.rb', line 188

def message_count
  @messages.count
end

#pop(opts = { manual_ack: false }, &block) ⇒ Hash Also known as: get

Get oldest message in queue

Returns:

  • (Hash)

    Message data



198
199
200
201
202
203
204
205
# File 'lib/bunny_mock/queue.rb', line 198

def pop(opts = { manual_ack: false }, &block)
  if BunnyMock.use_bunny_queue_pop_api
    bunny_pop(opts, &block)
  else
    warn '[DEPRECATED] This behavior is deprecated - please set `BunnyMock::use_bunny_queue_pop_api` to true to use Bunny Queue#pop behavior'
    @messages.shift
  end
end

#publish(payload, opts = {}) ⇒ BunnyMock::Queue

Publish a message

Parameters:

  • payload (Object)

    Message payload

  • opts (Hash) (defaults to: {})

    Message properties

Options Hash (opts):

  • :routing_key (String)

    Routing key

  • :persistent (Boolean)

    Should the message be persisted to disk?

  • :mandatory (Boolean)

    Should the message be returned if it cannot be routed to any queue?

  • :timestamp (Integer)

    A timestamp associated with this message

  • :expiration (Integer)

    Expiration time after which the message will be deleted

  • :type (String)

    Message type, e.g. what type of event or command this message represents. Can be any string

  • :reply_to (String)

    Queue name other apps should send the response to

  • :content_type (String)

    Message content type (e.g. application/json)

  • :content_encoding (String)

    Message content encoding (e.g. gzip)

  • :correlation_id (String)

    Message correlated to this one, e.g. what request this message is a reply for

  • :priority (Integer)

    Message priority, 0 to 9. Not used by RabbitMQ, only applications

  • :message_id (String)

    Any message identifier

  • :user_id (String)

    Optional user ID. Verified by RabbitMQ against the actual connection username

  • :app_id (String)

    Optional application ID

Returns:

See Also:

  • BunnyMock::Queue.{BunnyMock{BunnyMock::Exchange{BunnyMock::Exchange#publish}


69
70
71
72
73
74
75
76
# File 'lib/bunny_mock/queue.rb', line 69

def publish(payload, opts = {})
  check_queue_deleted!

  # add to messages
  @messages << { message: payload, options: opts }
  yield_consumers
  self
end

#purgeObject

Clear all messages in queue



213
214
215
216
217
# File 'lib/bunny_mock/queue.rb', line 213

def purge
  @messages = []

  self
end

#subscribe(*args, &block) ⇒ Object

Adds a consumer to the queue (subscribes for message deliveries).

Params are so they can be used when the message is processed. Takes a block which is called when a message is delivered to the queue



86
87
88
89
90
91
# File 'lib/bunny_mock/queue.rb', line 86

def subscribe(*args, &block)
  @consumers << [block, args]
  yield_consumers

  self
end

#subscribe_with(consumer, *args) ⇒ Object

Adds a specific consumer object to the queue (subscribes for message deliveries).

Secondary params are so they can be used when the message is processed.

Parameters:

  • consumer (#call)

    A subclass of Bunny::Consumer or any callable object



101
102
103
104
105
106
# File 'lib/bunny_mock/queue.rb', line 101

def subscribe_with(consumer, *args)
  @consumers << [consumer, args]
  yield_consumers

  self
end

#unbind(exchange, opts = {}) ⇒ Object

Unbind this queue from an exchange

Parameters:

  • exchange (BunnyMock::Exchange, String)

    Exchange to unbind from

  • opts (Hash) (defaults to: {})

    Binding properties

Options Hash (opts):

  • :routing_key (String)

    Custom routing key



143
144
145
146
147
148
149
150
151
152
153
154
155
# File 'lib/bunny_mock/queue.rb', line 143

def unbind(exchange, opts = {})
  check_queue_deleted!

  if exchange.respond_to?(:remove_route)

    # we can do the unbinding ourselves
    exchange.remove_route opts.fetch(:routing_key, @name), self
  else

    # we need the channel to lookup the exchange
    @channel.queue_unbind self, opts.fetch(:routing_key, @name), exchange
  end
end