Class: RabbitMQ::Channel

Inherits:
Object
  • Object
show all
Defined in:
lib/rabbitmq/channel.rb

Overview

A Channel holds a connection to a RabbitMQ server and is associated with a specific channel id number for categorizing message flow. It also provides convenient wrapper methods for common uses of the underlying Client.

A Channel is not threadsafe; both the Channel and its associated Client should not be shared between threads. If they are shared without appropriate locking mechanisms, the behavior is undefined and might result in catastrophic process failures like segmentation faults in the underlying C library. A Channel can be safely used in a multithreaded application by only passing control and message data between threads.

To use a Channel effectively, it is necessary to understand the methods available in the underlying AMQP protocol. Please refer to the protocol documentation for more information about specific methods: www.rabbitmq.com/amqp-0-9-1-reference.html

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(client, conn, id, finalizer) ⇒ Channel

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Don’t create a RabbitMQ::Channel directly; call RabbitMQ::Client#channel instead.



28
29
30
31
32
33
34
# File 'lib/rabbitmq/channel.rb', line 28

def initialize(client, conn, id, finalizer)
  @client    = client
  @conn      = conn
  @id        = id
  @finalizer = finalizer
  ObjectSpace.define_finalizer self, @finalizer
end

Instance Attribute Details

#clientObject (readonly)

Returns the value of attribute client.



23
24
25
# File 'lib/rabbitmq/channel.rb', line 23

def client
  @client
end

#idObject (readonly)

Returns the value of attribute id.



24
25
26
# File 'lib/rabbitmq/channel.rb', line 24

def id
  @id
end

Instance Method Details

#basic_ack(delivery_tag, **opts) ⇒ Object



248
249
250
251
252
253
254
# File 'lib/rabbitmq/channel.rb', line 248

def basic_ack(delivery_tag, **opts)
  send_request :basic_ack, {
    delivery_tag: delivery_tag,
    multiple:     opts.fetch(:multiple, false)
  }
  true
end

#basic_cancel(consumer_tag) ⇒ Object



202
203
204
205
# File 'lib/rabbitmq/channel.rb', line 202

def basic_cancel(consumer_tag)
  send_request :basic_cancel, { consumer_tag: consumer_tag }
  fetch_response :basic_cancel_ok
end

#basic_consume(queue, consumer_tag = "", **opts) ⇒ Object



190
191
192
193
194
195
196
197
198
199
200
# File 'lib/rabbitmq/channel.rb', line 190

def basic_consume(queue, consumer_tag="", **opts)
  send_request :basic_consume, {
    queue:        queue,
    consumer_tag: consumer_tag,
    no_local:     opts.fetch(:no_local,  false),
    no_ack:       opts.fetch(:no_ack,    false),
    exclusive:    opts.fetch(:exclusive, false),
    arguments:    opts.fetch(:arguments, {})
  }
  fetch_response :basic_consume_ok
end

#basic_get(queue, **opts) ⇒ Object

Message operations



240
241
242
243
244
245
246
# File 'lib/rabbitmq/channel.rb', line 240

def basic_get(queue, **opts)
  send_request :basic_get, {
    queue:  queue,
    no_ack: opts.fetch(:no_ack, false)
  }
  fetch_response [:basic_get_ok, :basic_get_empty]
end

#basic_nack(delivery_tag, **opts) ⇒ Object



256
257
258
259
260
261
262
263
# File 'lib/rabbitmq/channel.rb', line 256

def basic_nack(delivery_tag, **opts)
  send_request :basic_nack, {
    delivery_tag: delivery_tag,
    multiple:     opts.fetch(:multiple, false),
    requeue:      opts.fetch(:requeue, true)
  }
  true
end

#basic_publish(body, exchange, routing_key, **opts) ⇒ Object



273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
# File 'lib/rabbitmq/channel.rb', line 273

def basic_publish(body, exchange, routing_key, **opts)
  body        = FFI::Bytes.from_s(body.to_s)
  exchange    = FFI::Bytes.from_s(exchange.to_s)
  routing_key = FFI::Bytes.from_s(routing_key.to_s)
  properties  = FFI::BasicProperties.new.apply(
    content_type:       opts.fetch(:content_type,     nil),
    content_encoding:   opts.fetch(:content_encoding, nil),
    headers:            opts.fetch(:headers,           {}),
    delivery_mode:     (opts.fetch(:persistent,     false) ? :persistent : :nonpersistent),
    priority:           opts.fetch(:priority,           0),
    correlation_id:     opts.fetch(:correlation_id,   nil),
    reply_to:           opts.fetch(:reply_to,         nil),
    expiration:         opts.fetch(:expiration,       nil),
    message_id:         opts.fetch(:message_id,       nil),
    timestamp:          opts.fetch(:timestamp,          0),
    type:               opts.fetch(:type,             nil),
    app_id:             opts.fetch(:app_id,           nil),
    cluster_id:         opts.fetch(:cluster_id,       nil)
  )
  
  Util.error_check :"publishing a message",
    FFI.amqp_basic_publish(@conn.ptr, @id,
      exchange,
      routing_key,
      opts.fetch(:mandatory, false),
      opts.fetch(:immediate, false),
      properties,
      body
    )
  
  body.free!
  exchange.free!
  routing_key.free!
  properties.free!
  true
end

#basic_qos(**opts) ⇒ Object

Consumer operations



181
182
183
184
185
186
187
188
# File 'lib/rabbitmq/channel.rb', line 181

def basic_qos(**opts)
  send_request :basic_qos, {
    prefetch_count: opts.fetch(:prefetch_count, 0),
    prefetch_size:  opts.fetch(:prefetch_size,  0),
    global:         opts.fetch(:global,         false)
  }
  fetch_response :basic_qos_ok
end

#basic_reject(delivery_tag, **opts) ⇒ Object



265
266
267
268
269
270
271
# File 'lib/rabbitmq/channel.rb', line 265

def basic_reject(delivery_tag, **opts)
  send_request :basic_reject, {
    delivery_tag: delivery_tag,
    requeue:      opts.fetch(:requeue, true)
  }
  true
end

#break!Object

See Also:

  • RabbitMQ::Channel.{Client{Client#break!}


82
83
84
# File 'lib/rabbitmq/channel.rb', line 82

def break!
  @client.break!
end

#clear_event_handler(*args) ⇒ Object

See Also:

  • RabbitMQ::Channel.{Client{Client#clear_event_handler}


71
72
73
# File 'lib/rabbitmq/channel.rb', line 71

def clear_event_handler(*args)
  @client.clear_event_handler(@id, *args)
end

#confirm_selectObject

Confirm mode operations



228
229
230
231
# File 'lib/rabbitmq/channel.rb', line 228

def confirm_select
  send_request :confirm_select
  fetch_response :confirm_select_ok
end

#exchange_bind(source, destination, **opts) ⇒ Object



109
110
111
112
113
114
115
116
117
# File 'lib/rabbitmq/channel.rb', line 109

def exchange_bind(source, destination, **opts)
  send_request :exchange_bind, {
    source:      source,
    destination: destination,
    routing_key: opts.fetch(:routing_key, ""),
    arguments:   opts.fetch(:arguments,   {})
  }
  fetch_response :exchange_bind_ok
end

#exchange_declare(name, type, **opts) ⇒ Object

Exchange operations



89
90
91
92
93
94
95
96
97
98
99
# File 'lib/rabbitmq/channel.rb', line 89

def exchange_declare(name, type, **opts)
  send_request :exchange_declare, {
    exchange:    name,
    type:        type,
    passive:     opts.fetch(:passive,     false),
    durable:     opts.fetch(:durable,     false),
    auto_delete: opts.fetch(:auto_delete, false),
    internal:    opts.fetch(:internal,    false),
  }
  fetch_response :exchange_declare_ok
end

#exchange_delete(name, **opts) ⇒ Object



101
102
103
104
105
106
107
# File 'lib/rabbitmq/channel.rb', line 101

def exchange_delete(name, **opts)
  send_request :exchange_delete, {
    exchange:  name,
    if_unused: opts.fetch(:if_unused, false)
  }
  fetch_response :exchange_delete_ok
end

#exchange_unbind(source, destination, **opts) ⇒ Object



119
120
121
122
123
124
125
126
127
# File 'lib/rabbitmq/channel.rb', line 119

def exchange_unbind(source, destination, **opts)
  send_request :exchange_unbind, {
    source:      source,
    destination: destination,
    routing_key: opts.fetch(:routing_key, ""),
    arguments:   opts.fetch(:arguments,   {})
  }
  fetch_response :exchange_unbind_ok
end

#fetch_confirmObject



233
234
235
# File 'lib/rabbitmq/channel.rb', line 233

def fetch_confirm
  fetch_response [:basic_ack, :basic_nack, :basic_reject]
end

#fetch_response(*args) ⇒ Object

See Also:

  • RabbitMQ::Channel.{Client{Client#fetch_response}


60
61
62
# File 'lib/rabbitmq/channel.rb', line 60

def fetch_response(*args)
  @client.fetch_response(@id, *args)
end

#on_event(*args, &block) ⇒ Object Also known as: on

See Also:

  • RabbitMQ::Channel.{Client{Client#on_event}


65
66
67
# File 'lib/rabbitmq/channel.rb', line 65

def on_event(*args, &block)
  @client.on_event(@id, *args, &block)
end

#queue_bind(name, exchange, **opts) ⇒ Object



144
145
146
147
148
149
150
151
152
# File 'lib/rabbitmq/channel.rb', line 144

def queue_bind(name, exchange, **opts)
  send_request :queue_bind, {
    queue:       name,
    exchange:    exchange,
    routing_key: opts.fetch(:routing_key, ""),
    arguments:   opts.fetch(:arguments,   {})
  }
  fetch_response :queue_bind_ok
end

#queue_declare(name, **opts) ⇒ Object

Queue operations



132
133
134
135
136
137
138
139
140
141
142
# File 'lib/rabbitmq/channel.rb', line 132

def queue_declare(name, **opts)
  send_request :queue_declare, {
    queue:       name,
    passive:     opts.fetch(:passive,     false),
    durable:     opts.fetch(:durable,     false),
    exclusive:   opts.fetch(:exclusive,   false),
    auto_delete: opts.fetch(:auto_delete, false),
    arguments:   opts.fetch(:arguments,   {})
  }
  fetch_response :queue_declare_ok
end

#queue_delete(name, **opts) ⇒ Object



169
170
171
172
173
174
175
176
# File 'lib/rabbitmq/channel.rb', line 169

def queue_delete(name, **opts)
  send_request :queue_delete, {
    queue:     name,
    if_unused: opts.fetch(:if_unused, false),
    if_empty:  opts.fetch(:if_empty,  false)
  }
  fetch_response :queue_delete_ok
end

#queue_purge(name) ⇒ Object



164
165
166
167
# File 'lib/rabbitmq/channel.rb', line 164

def queue_purge(name)
  send_request :queue_purge, { queue: name }
  fetch_response :queue_purge_ok
end

#queue_unbind(name, exchange, **opts) ⇒ Object



154
155
156
157
158
159
160
161
162
# File 'lib/rabbitmq/channel.rb', line 154

def queue_unbind(name, exchange, **opts)
  send_request :queue_unbind, {
    queue:       name,
    exchange:    exchange,
    routing_key: opts.fetch(:routing_key, ""),
    arguments:   opts.fetch(:arguments,   {})
  }
  fetch_response :queue_unbind_ok
end

#releaseChannel

Release the channel id to be reallocated to another RabbitMQ::Channel instance. This will be called automatically by the object finalizer after the object becomes unreachable by the VM and is garbage collected, but you may want to call it explicitly if you plan to reuse the same channel id in another RabbitMQ::Channel instance explicitly.

Returns:



44
45
46
47
48
49
50
51
52
# File 'lib/rabbitmq/channel.rb', line 44

def release
  if @finalizer
    @finalizer.call
    ObjectSpace.undefine_finalizer self
  end
  @finalizer = nil
  
  self
end

#run_loop!(*args, &block) ⇒ Object

The block will be yielded all non-exception events *for any channel*.

See Also:

  • RabbitMQ::Channel.{Client{Client#run_loop!}


77
78
79
# File 'lib/rabbitmq/channel.rb', line 77

def run_loop!(*args, &block)
  @client.run_loop!(*args, &block)
end

#send_request(*args) ⇒ Object

See Also:

  • RabbitMQ::Channel.{Client{Client#send_request}


55
56
57
# File 'lib/rabbitmq/channel.rb', line 55

def send_request(*args)
  @client.send_request(@id, *args)
end

#tx_commitObject



215
216
217
218
# File 'lib/rabbitmq/channel.rb', line 215

def tx_commit
  send_request :tx_commit
  fetch_response :tx_commit_ok
end

#tx_rollbackObject



220
221
222
223
# File 'lib/rabbitmq/channel.rb', line 220

def tx_rollback
  send_request :tx_rollback
  fetch_response :tx_rollback_ok
end

#tx_selectObject

Transaction operations



210
211
212
213
# File 'lib/rabbitmq/channel.rb', line 210

def tx_select
  send_request :tx_select
  fetch_response :tx_select_ok
end