Class: RabbitMQ::Channel

Inherits:
Object
  • Object
show all
Defined in:
lib/rabbitmq/channel.rb

Overview

A Channel holds a connection to a RabbitMQ server and is associated with a specific channel id number for categorizing message flow. It also provides convenient wrapper methods for common uses of the underlying Client.

A Channel is not threadsafe; both the Channel and its associated Client should not be shared between threads. If they are shared without appropriate locking mechanisms, the behavior is undefined and might result in catastrophic process failures like segmentation faults in the underlying C library. A Channel can be safely used in a multithreaded application by only passing control and message data between threads.

To use a Channel effectively, it is necessary to understand the methods available in the underlying AMQP protocol. Please refer to the protocol documentation for more information about specific methods: www.rabbitmq.com/amqp-0-9-1-reference.html

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(client, conn, id, finalizer) ⇒ Channel

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Don’t create a RabbitMQ::Channel directly; call RabbitMQ::Client#channel instead.



28
29
30
31
32
33
34
# File 'lib/rabbitmq/channel.rb', line 28

def initialize(client, conn, id, finalizer)
  @client    = client
  @conn      = conn
  @id        = id
  @finalizer = finalizer
  ObjectSpace.define_finalizer self, @finalizer
end

Instance Attribute Details

#clientObject (readonly)

Returns the value of attribute client.



23
24
25
# File 'lib/rabbitmq/channel.rb', line 23

def client
  @client
end

#idObject (readonly)

Returns the value of attribute id.



24
25
26
# File 'lib/rabbitmq/channel.rb', line 24

def id
  @id
end

Instance Method Details

#basic_ack(delivery_tag, **opts) ⇒ Object



249
250
251
252
253
254
255
# File 'lib/rabbitmq/channel.rb', line 249

def basic_ack(delivery_tag, **opts)
  send_request :basic_ack, {
    delivery_tag: delivery_tag,
    multiple:     opts.fetch(:multiple, false)
  }
  true
end

#basic_cancel(consumer_tag) ⇒ Object



203
204
205
206
# File 'lib/rabbitmq/channel.rb', line 203

def basic_cancel(consumer_tag)
  send_request :basic_cancel, { consumer_tag: consumer_tag }
  fetch_response :basic_cancel_ok
end

#basic_consume(queue, consumer_tag = "", **opts) ⇒ Object



191
192
193
194
195
196
197
198
199
200
201
# File 'lib/rabbitmq/channel.rb', line 191

def basic_consume(queue, consumer_tag="", **opts)
  send_request :basic_consume, {
    queue:        queue,
    consumer_tag: consumer_tag,
    no_local:     opts.fetch(:no_local,  false),
    no_ack:       opts.fetch(:no_ack,    false),
    exclusive:    opts.fetch(:exclusive, false),
    arguments:    opts.fetch(:arguments, {})
  }
  fetch_response :basic_consume_ok
end

#basic_get(queue, **opts) ⇒ Object

Message operations



241
242
243
244
245
246
247
# File 'lib/rabbitmq/channel.rb', line 241

def basic_get(queue, **opts)
  send_request :basic_get, {
    queue:  queue,
    no_ack: opts.fetch(:no_ack, false)
  }
  fetch_response [:basic_get_ok, :basic_get_empty]
end

#basic_nack(delivery_tag, **opts) ⇒ Object



257
258
259
260
261
262
263
264
# File 'lib/rabbitmq/channel.rb', line 257

def basic_nack(delivery_tag, **opts)
  send_request :basic_nack, {
    delivery_tag: delivery_tag,
    multiple:     opts.fetch(:multiple, false),
    requeue:      opts.fetch(:requeue, true)
  }
  true
end

#basic_publish(body, exchange, routing_key, **opts) ⇒ Object



274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
# File 'lib/rabbitmq/channel.rb', line 274

def basic_publish(body, exchange, routing_key, **opts)
  body        = FFI::Bytes.from_s(body.to_s)
  exchange    = FFI::Bytes.from_s(exchange.to_s)
  routing_key = FFI::Bytes.from_s(routing_key.to_s)
  properties  = FFI::BasicProperties.new.apply(
    content_type:       opts.fetch(:content_type,     nil),
    content_encoding:   opts.fetch(:content_encoding, nil),
    headers:            opts.fetch(:headers,           {}),
    delivery_mode:     (opts.fetch(:persistent,     false) ? :persistent : :nonpersistent),
    priority:           opts.fetch(:priority,           0),
    correlation_id:     opts.fetch(:correlation_id,   nil),
    reply_to:           opts.fetch(:reply_to,         nil),
    expiration:         opts.fetch(:expiration,       nil),
    message_id:         opts.fetch(:message_id,       nil),
    timestamp:          opts.fetch(:timestamp,          0),
    type:               opts.fetch(:type,             nil),
    app_id:             opts.fetch(:app_id,           nil),
    cluster_id:         opts.fetch(:cluster_id,       nil)
  )
  
  Util.error_check :"publishing a message",
    FFI.amqp_basic_publish(@conn.ptr, @id,
      exchange,
      routing_key,
      opts.fetch(:mandatory, false),
      opts.fetch(:immediate, false),
      properties,
      body
    )
  
  body.free!
  exchange.free!
  routing_key.free!
  properties.free!
  true
end

#basic_qos(**opts) ⇒ Object

Consumer operations



182
183
184
185
186
187
188
189
# File 'lib/rabbitmq/channel.rb', line 182

def basic_qos(**opts)
  send_request :basic_qos, {
    prefetch_count: opts.fetch(:prefetch_count, 0),
    prefetch_size:  opts.fetch(:prefetch_size,  0),
    global:         opts.fetch(:global,         false)
  }
  fetch_response :basic_qos_ok
end

#basic_reject(delivery_tag, **opts) ⇒ Object



266
267
268
269
270
271
272
# File 'lib/rabbitmq/channel.rb', line 266

def basic_reject(delivery_tag, **opts)
  send_request :basic_reject, {
    delivery_tag: delivery_tag,
    requeue:      opts.fetch(:requeue, true)
  }
  true
end

#break!Object

See Also:

  • RabbitMQ::Channel.{Client{Client#break!}


82
83
84
# File 'lib/rabbitmq/channel.rb', line 82

def break!
  @client.break!
end

#clear_event_handler(*args) ⇒ Object

See Also:

  • RabbitMQ::Channel.{Client{Client#clear_event_handler}


71
72
73
# File 'lib/rabbitmq/channel.rb', line 71

def clear_event_handler(*args)
  @client.clear_event_handler(@id, *args)
end

#confirm_selectObject

Confirm mode operations



229
230
231
232
# File 'lib/rabbitmq/channel.rb', line 229

def confirm_select
  send_request :confirm_select
  fetch_response :confirm_select_ok
end

#exchange_bind(source, destination, **opts) ⇒ Object



110
111
112
113
114
115
116
117
118
# File 'lib/rabbitmq/channel.rb', line 110

def exchange_bind(source, destination, **opts)
  send_request :exchange_bind, {
    source:      source,
    destination: destination,
    routing_key: opts.fetch(:routing_key, ""),
    arguments:   opts.fetch(:arguments,   {})
  }
  fetch_response :exchange_bind_ok
end

#exchange_declare(name, type, **opts) ⇒ Object

Exchange operations



89
90
91
92
93
94
95
96
97
98
99
100
# File 'lib/rabbitmq/channel.rb', line 89

def exchange_declare(name, type, **opts)
  send_request :exchange_declare, {
    exchange:    name,
    type:        type,
    passive:     opts.fetch(:passive,     false),
    durable:     opts.fetch(:durable,     false),
    auto_delete: opts.fetch(:auto_delete, false),
    internal:    opts.fetch(:internal,    false),
    arguments:   opts.fetch(:arguments,   {})
  }
  fetch_response :exchange_declare_ok
end

#exchange_delete(name, **opts) ⇒ Object



102
103
104
105
106
107
108
# File 'lib/rabbitmq/channel.rb', line 102

def exchange_delete(name, **opts)
  send_request :exchange_delete, {
    exchange:  name,
    if_unused: opts.fetch(:if_unused, false)
  }
  fetch_response :exchange_delete_ok
end

#exchange_unbind(source, destination, **opts) ⇒ Object



120
121
122
123
124
125
126
127
128
# File 'lib/rabbitmq/channel.rb', line 120

def exchange_unbind(source, destination, **opts)
  send_request :exchange_unbind, {
    source:      source,
    destination: destination,
    routing_key: opts.fetch(:routing_key, ""),
    arguments:   opts.fetch(:arguments,   {})
  }
  fetch_response :exchange_unbind_ok
end

#fetch_confirmObject



234
235
236
# File 'lib/rabbitmq/channel.rb', line 234

def fetch_confirm
  fetch_response [:basic_ack, :basic_nack, :basic_reject]
end

#fetch_response(*args) ⇒ Object

See Also:

  • RabbitMQ::Channel.{Client{Client#fetch_response}


60
61
62
# File 'lib/rabbitmq/channel.rb', line 60

def fetch_response(*args)
  @client.fetch_response(@id, *args)
end

#on_event(*args, &block) ⇒ Object Also known as: on

See Also:

  • RabbitMQ::Channel.{Client{Client#on_event}


65
66
67
# File 'lib/rabbitmq/channel.rb', line 65

def on_event(*args, &block)
  @client.on_event(@id, *args, &block)
end

#queue_bind(name, exchange, **opts) ⇒ Object



145
146
147
148
149
150
151
152
153
# File 'lib/rabbitmq/channel.rb', line 145

def queue_bind(name, exchange, **opts)
  send_request :queue_bind, {
    queue:       name,
    exchange:    exchange,
    routing_key: opts.fetch(:routing_key, ""),
    arguments:   opts.fetch(:arguments,   {})
  }
  fetch_response :queue_bind_ok
end

#queue_declare(name, **opts) ⇒ Object

Queue operations



133
134
135
136
137
138
139
140
141
142
143
# File 'lib/rabbitmq/channel.rb', line 133

def queue_declare(name, **opts)
  send_request :queue_declare, {
    queue:       name,
    passive:     opts.fetch(:passive,     false),
    durable:     opts.fetch(:durable,     false),
    exclusive:   opts.fetch(:exclusive,   false),
    auto_delete: opts.fetch(:auto_delete, false),
    arguments:   opts.fetch(:arguments,   {})
  }
  fetch_response :queue_declare_ok
end

#queue_delete(name, **opts) ⇒ Object



170
171
172
173
174
175
176
177
# File 'lib/rabbitmq/channel.rb', line 170

def queue_delete(name, **opts)
  send_request :queue_delete, {
    queue:     name,
    if_unused: opts.fetch(:if_unused, false),
    if_empty:  opts.fetch(:if_empty,  false)
  }
  fetch_response :queue_delete_ok
end

#queue_purge(name) ⇒ Object



165
166
167
168
# File 'lib/rabbitmq/channel.rb', line 165

def queue_purge(name)
  send_request :queue_purge, { queue: name }
  fetch_response :queue_purge_ok
end

#queue_unbind(name, exchange, **opts) ⇒ Object



155
156
157
158
159
160
161
162
163
# File 'lib/rabbitmq/channel.rb', line 155

def queue_unbind(name, exchange, **opts)
  send_request :queue_unbind, {
    queue:       name,
    exchange:    exchange,
    routing_key: opts.fetch(:routing_key, ""),
    arguments:   opts.fetch(:arguments,   {})
  }
  fetch_response :queue_unbind_ok
end

#releaseChannel

Release the channel id to be reallocated to another RabbitMQ::Channel instance. This will be called automatically by the object finalizer after the object becomes unreachable by the VM and is garbage collected, but you may want to call it explicitly if you plan to reuse the same channel id in another RabbitMQ::Channel instance explicitly.

Returns:



44
45
46
47
48
49
50
51
52
# File 'lib/rabbitmq/channel.rb', line 44

def release
  if @finalizer
    @finalizer.call
    ObjectSpace.undefine_finalizer self
  end
  @finalizer = nil
  
  self
end

#run_loop!(*args, &block) ⇒ Object

The block will be yielded all non-exception events *for any channel*.

See Also:

  • RabbitMQ::Channel.{Client{Client#run_loop!}


77
78
79
# File 'lib/rabbitmq/channel.rb', line 77

def run_loop!(*args, &block)
  @client.run_loop!(*args, &block)
end

#send_request(*args) ⇒ Object

See Also:

  • RabbitMQ::Channel.{Client{Client#send_request}


55
56
57
# File 'lib/rabbitmq/channel.rb', line 55

def send_request(*args)
  @client.send_request(@id, *args)
end

#tx_commitObject



216
217
218
219
# File 'lib/rabbitmq/channel.rb', line 216

def tx_commit
  send_request :tx_commit
  fetch_response :tx_commit_ok
end

#tx_rollbackObject



221
222
223
224
# File 'lib/rabbitmq/channel.rb', line 221

def tx_rollback
  send_request :tx_rollback
  fetch_response :tx_rollback_ok
end

#tx_selectObject

Transaction operations



211
212
213
214
# File 'lib/rabbitmq/channel.rb', line 211

def tx_select
  send_request :tx_select
  fetch_response :tx_select_ok
end