Class: RabbitMQ::Channel
- Inherits:
-
Object
- Object
- RabbitMQ::Channel
- Defined in:
- lib/rabbitmq/channel.rb
Overview
A Channel holds a connection to a RabbitMQ server and is associated with a specific channel id number for categorizing message flow. It also provides convenient wrapper methods for common uses of the underlying Client.
A Channel is not threadsafe; both the Channel and its associated Client should not be shared between threads. If they are shared without appropriate locking mechanisms, the behavior is undefined and might result in catastrophic process failures like segmentation faults in the underlying C library. A Channel can be safely used in a multithreaded application by only passing control and message data between threads.
To use a Channel effectively, it is necessary to understand the methods available in the underlying AMQP protocol. Please refer to the protocol documentation for more information about specific methods: www.rabbitmq.com/amqp-0-9-1-reference.html
Instance Attribute Summary collapse
-
#client ⇒ Object
readonly
Returns the value of attribute client.
-
#id ⇒ Object
readonly
Returns the value of attribute id.
Instance Method Summary collapse
- #basic_ack(delivery_tag, **opts) ⇒ Object
- #basic_cancel(consumer_tag) ⇒ Object
- #basic_consume(queue, consumer_tag = "", **opts) ⇒ Object
-
#basic_get(queue, **opts) ⇒ Object
Message operations.
- #basic_nack(delivery_tag, **opts) ⇒ Object
- #basic_publish(body, exchange, routing_key, **opts) ⇒ Object
-
#basic_qos(**opts) ⇒ Object
Consumer operations.
- #basic_reject(delivery_tag, **opts) ⇒ Object
- #break! ⇒ Object
- #clear_event_handler(*args) ⇒ Object
-
#confirm_select ⇒ Object
Confirm mode operations.
- #exchange_bind(source, destination, **opts) ⇒ Object
-
#exchange_declare(name, type, **opts) ⇒ Object
Exchange operations.
- #exchange_delete(name, **opts) ⇒ Object
- #exchange_unbind(source, destination, **opts) ⇒ Object
- #fetch_confirm ⇒ Object
- #fetch_response(*args) ⇒ Object
-
#initialize(client, conn, id, finalizer) ⇒ Channel
constructor
private
Don’t create a Channel directly; call RabbitMQ::Client#channel instead.
- #on_event(*args, &block) ⇒ Object (also: #on)
- #queue_bind(name, exchange, **opts) ⇒ Object
-
#queue_declare(name, **opts) ⇒ Object
Queue operations.
- #queue_delete(name, **opts) ⇒ Object
- #queue_purge(name) ⇒ Object
- #queue_unbind(name, exchange, **opts) ⇒ Object
-
#release ⇒ Channel
Release the channel id to be reallocated to another Channel instance.
-
#run_loop!(*args, &block) ⇒ Object
The block will be yielded all non-exception events *for any channel*.
- #send_request(*args) ⇒ Object
- #tx_commit ⇒ Object
- #tx_rollback ⇒ Object
-
#tx_select ⇒ Object
Transaction operations.
Constructor Details
#initialize(client, conn, id, finalizer) ⇒ Channel
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Don’t create a RabbitMQ::Channel directly; call RabbitMQ::Client#channel instead.
28 29 30 31 32 33 34 |
# File 'lib/rabbitmq/channel.rb', line 28 def initialize(client, conn, id, finalizer) @client = client @conn = conn @id = id @finalizer = finalizer ObjectSpace.define_finalizer self, @finalizer end |
Instance Attribute Details
#client ⇒ Object (readonly)
Returns the value of attribute client.
23 24 25 |
# File 'lib/rabbitmq/channel.rb', line 23 def client @client end |
#id ⇒ Object (readonly)
Returns the value of attribute id.
24 25 26 |
# File 'lib/rabbitmq/channel.rb', line 24 def id @id end |
Instance Method Details
#basic_ack(delivery_tag, **opts) ⇒ Object
249 250 251 252 253 254 255 |
# File 'lib/rabbitmq/channel.rb', line 249 def basic_ack(delivery_tag, **opts) send_request :basic_ack, { delivery_tag: delivery_tag, multiple: opts.fetch(:multiple, false) } true end |
#basic_cancel(consumer_tag) ⇒ Object
203 204 205 206 |
# File 'lib/rabbitmq/channel.rb', line 203 def basic_cancel(consumer_tag) send_request :basic_cancel, { consumer_tag: consumer_tag } fetch_response :basic_cancel_ok end |
#basic_consume(queue, consumer_tag = "", **opts) ⇒ Object
191 192 193 194 195 196 197 198 199 200 201 |
# File 'lib/rabbitmq/channel.rb', line 191 def basic_consume(queue, consumer_tag="", **opts) send_request :basic_consume, { queue: queue, consumer_tag: consumer_tag, no_local: opts.fetch(:no_local, false), no_ack: opts.fetch(:no_ack, false), exclusive: opts.fetch(:exclusive, false), arguments: opts.fetch(:arguments, {}) } fetch_response :basic_consume_ok end |
#basic_get(queue, **opts) ⇒ Object
Message operations
241 242 243 244 245 246 247 |
# File 'lib/rabbitmq/channel.rb', line 241 def basic_get(queue, **opts) send_request :basic_get, { queue: queue, no_ack: opts.fetch(:no_ack, false) } fetch_response [:basic_get_ok, :basic_get_empty] end |
#basic_nack(delivery_tag, **opts) ⇒ Object
257 258 259 260 261 262 263 264 |
# File 'lib/rabbitmq/channel.rb', line 257 def basic_nack(delivery_tag, **opts) send_request :basic_nack, { delivery_tag: delivery_tag, multiple: opts.fetch(:multiple, false), requeue: opts.fetch(:requeue, true) } true end |
#basic_publish(body, exchange, routing_key, **opts) ⇒ Object
274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 |
# File 'lib/rabbitmq/channel.rb', line 274 def basic_publish(body, exchange, routing_key, **opts) body = FFI::Bytes.from_s(body.to_s) exchange = FFI::Bytes.from_s(exchange.to_s) routing_key = FFI::Bytes.from_s(routing_key.to_s) properties = FFI::BasicProperties.new.apply( content_type: opts.fetch(:content_type, nil), content_encoding: opts.fetch(:content_encoding, nil), headers: opts.fetch(:headers, {}), delivery_mode: (opts.fetch(:persistent, false) ? :persistent : :nonpersistent), priority: opts.fetch(:priority, 0), correlation_id: opts.fetch(:correlation_id, nil), reply_to: opts.fetch(:reply_to, nil), expiration: opts.fetch(:expiration, nil), message_id: opts.fetch(:message_id, nil), timestamp: opts.fetch(:timestamp, 0), type: opts.fetch(:type, nil), app_id: opts.fetch(:app_id, nil), cluster_id: opts.fetch(:cluster_id, nil) ) Util.error_check :"publishing a message", FFI.amqp_basic_publish(@conn.ptr, @id, exchange, routing_key, opts.fetch(:mandatory, false), opts.fetch(:immediate, false), properties, body ) body.free! exchange.free! routing_key.free! properties.free! true end |
#basic_qos(**opts) ⇒ Object
Consumer operations
182 183 184 185 186 187 188 189 |
# File 'lib/rabbitmq/channel.rb', line 182 def basic_qos(**opts) send_request :basic_qos, { prefetch_count: opts.fetch(:prefetch_count, 0), prefetch_size: opts.fetch(:prefetch_size, 0), global: opts.fetch(:global, false) } fetch_response :basic_qos_ok end |
#basic_reject(delivery_tag, **opts) ⇒ Object
266 267 268 269 270 271 272 |
# File 'lib/rabbitmq/channel.rb', line 266 def basic_reject(delivery_tag, **opts) send_request :basic_reject, { delivery_tag: delivery_tag, requeue: opts.fetch(:requeue, true) } true end |
#break! ⇒ Object
82 83 84 |
# File 'lib/rabbitmq/channel.rb', line 82 def break! @client.break! end |
#clear_event_handler(*args) ⇒ Object
71 72 73 |
# File 'lib/rabbitmq/channel.rb', line 71 def clear_event_handler(*args) @client.clear_event_handler(@id, *args) end |
#confirm_select ⇒ Object
Confirm mode operations
229 230 231 232 |
# File 'lib/rabbitmq/channel.rb', line 229 def confirm_select send_request :confirm_select fetch_response :confirm_select_ok end |
#exchange_bind(source, destination, **opts) ⇒ Object
110 111 112 113 114 115 116 117 118 |
# File 'lib/rabbitmq/channel.rb', line 110 def exchange_bind(source, destination, **opts) send_request :exchange_bind, { source: source, destination: destination, routing_key: opts.fetch(:routing_key, ""), arguments: opts.fetch(:arguments, {}) } fetch_response :exchange_bind_ok end |
#exchange_declare(name, type, **opts) ⇒ Object
Exchange operations
89 90 91 92 93 94 95 96 97 98 99 100 |
# File 'lib/rabbitmq/channel.rb', line 89 def exchange_declare(name, type, **opts) send_request :exchange_declare, { exchange: name, type: type, passive: opts.fetch(:passive, false), durable: opts.fetch(:durable, false), auto_delete: opts.fetch(:auto_delete, false), internal: opts.fetch(:internal, false), arguments: opts.fetch(:arguments, {}) } fetch_response :exchange_declare_ok end |
#exchange_delete(name, **opts) ⇒ Object
102 103 104 105 106 107 108 |
# File 'lib/rabbitmq/channel.rb', line 102 def exchange_delete(name, **opts) send_request :exchange_delete, { exchange: name, if_unused: opts.fetch(:if_unused, false) } fetch_response :exchange_delete_ok end |
#exchange_unbind(source, destination, **opts) ⇒ Object
120 121 122 123 124 125 126 127 128 |
# File 'lib/rabbitmq/channel.rb', line 120 def exchange_unbind(source, destination, **opts) send_request :exchange_unbind, { source: source, destination: destination, routing_key: opts.fetch(:routing_key, ""), arguments: opts.fetch(:arguments, {}) } fetch_response :exchange_unbind_ok end |
#fetch_confirm ⇒ Object
234 235 236 |
# File 'lib/rabbitmq/channel.rb', line 234 def fetch_confirm fetch_response [:basic_ack, :basic_nack, :basic_reject] end |
#fetch_response(*args) ⇒ Object
60 61 62 |
# File 'lib/rabbitmq/channel.rb', line 60 def fetch_response(*args) @client.fetch_response(@id, *args) end |
#on_event(*args, &block) ⇒ Object Also known as: on
65 66 67 |
# File 'lib/rabbitmq/channel.rb', line 65 def on_event(*args, &block) @client.on_event(@id, *args, &block) end |
#queue_bind(name, exchange, **opts) ⇒ Object
145 146 147 148 149 150 151 152 153 |
# File 'lib/rabbitmq/channel.rb', line 145 def queue_bind(name, exchange, **opts) send_request :queue_bind, { queue: name, exchange: exchange, routing_key: opts.fetch(:routing_key, ""), arguments: opts.fetch(:arguments, {}) } fetch_response :queue_bind_ok end |
#queue_declare(name, **opts) ⇒ Object
Queue operations
133 134 135 136 137 138 139 140 141 142 143 |
# File 'lib/rabbitmq/channel.rb', line 133 def queue_declare(name, **opts) send_request :queue_declare, { queue: name, passive: opts.fetch(:passive, false), durable: opts.fetch(:durable, false), exclusive: opts.fetch(:exclusive, false), auto_delete: opts.fetch(:auto_delete, false), arguments: opts.fetch(:arguments, {}) } fetch_response :queue_declare_ok end |
#queue_delete(name, **opts) ⇒ Object
170 171 172 173 174 175 176 177 |
# File 'lib/rabbitmq/channel.rb', line 170 def queue_delete(name, **opts) send_request :queue_delete, { queue: name, if_unused: opts.fetch(:if_unused, false), if_empty: opts.fetch(:if_empty, false) } fetch_response :queue_delete_ok end |
#queue_purge(name) ⇒ Object
165 166 167 168 |
# File 'lib/rabbitmq/channel.rb', line 165 def queue_purge(name) send_request :queue_purge, { queue: name } fetch_response :queue_purge_ok end |
#queue_unbind(name, exchange, **opts) ⇒ Object
155 156 157 158 159 160 161 162 163 |
# File 'lib/rabbitmq/channel.rb', line 155 def queue_unbind(name, exchange, **opts) send_request :queue_unbind, { queue: name, exchange: exchange, routing_key: opts.fetch(:routing_key, ""), arguments: opts.fetch(:arguments, {}) } fetch_response :queue_unbind_ok end |
#release ⇒ Channel
Release the channel id to be reallocated to another RabbitMQ::Channel instance. This will be called automatically by the object finalizer after the object becomes unreachable by the VM and is garbage collected, but you may want to call it explicitly if you plan to reuse the same channel id in another RabbitMQ::Channel instance explicitly.
44 45 46 47 48 49 50 51 52 |
# File 'lib/rabbitmq/channel.rb', line 44 def release if @finalizer @finalizer.call ObjectSpace.undefine_finalizer self end @finalizer = nil self end |
#run_loop!(*args, &block) ⇒ Object
The block will be yielded all non-exception events *for any channel*.
77 78 79 |
# File 'lib/rabbitmq/channel.rb', line 77 def run_loop!(*args, &block) @client.run_loop!(*args, &block) end |
#send_request(*args) ⇒ Object
55 56 57 |
# File 'lib/rabbitmq/channel.rb', line 55 def send_request(*args) @client.send_request(@id, *args) end |
#tx_commit ⇒ Object
216 217 218 219 |
# File 'lib/rabbitmq/channel.rb', line 216 def tx_commit send_request :tx_commit fetch_response :tx_commit_ok end |
#tx_rollback ⇒ Object
221 222 223 224 |
# File 'lib/rabbitmq/channel.rb', line 221 def tx_rollback send_request :tx_rollback fetch_response :tx_rollback_ok end |
#tx_select ⇒ Object
Transaction operations
211 212 213 214 |
# File 'lib/rabbitmq/channel.rb', line 211 def tx_select send_request :tx_select fetch_response :tx_select_ok end |