Class: RabbitMQ::Client
- Inherits:
-
Object
- Object
- RabbitMQ::Client
- Defined in:
- lib/rabbitmq/client.rb,
lib/rabbitmq/client/connection.rb
Overview
A Client holds a connection to a RabbitMQ server and has facilities for sending events to and handling received events from that server.
A Client is not threadsafe; both the Client and any Channels linked to it should not be shared between threads. If they are shared without appropriate locking mechanisms, the behavior is undefined and might result in catastrophic process failures like segmentation faults in the underlying C library. A Client can be safely used in a multithreaded application by only passing control and message data between threads.
To use a Client effectively, it is necessary to understand the methods available in the underlying AMQP protocol. Please refer to the protocol documentation for more information about specific methods: www.rabbitmq.com/amqp-0-9-1-reference.html
Defined Under Namespace
Classes: Connection, DestroyedError
Constant Summary collapse
- DEFAULT_PROTOCOL_TIMEOUT =
seconds
30
Instance Attribute Summary collapse
-
#protocol_timeout ⇒ Object
The timeout to use when waiting for protocol events, in seconds.
Instance Method Summary collapse
-
#break! ⇒ nil
Stop iterating from within an execution of the #run_loop! method.
-
#channel(id = nil) ⇒ Channel
Open a new channel of communication and return a new Channel object with convenience methods for communicating on that channel.
-
#clear_event_handler(channel_id, method) ⇒ Proc?
Unregister the event handler associated with the given channel and method.
-
#close ⇒ Object
Gracefully close the connection with the server.
-
#destroy ⇒ Object
Free the native resources associated with this object.
-
#fetch_response(channel_id, method, timeout: protocol_timeout) ⇒ Hash
Wait for a specific response on the given channel of the given type and return the event data for the response when it is received.
- #host ⇒ Object
-
#initialize(*args) ⇒ Client
constructor
Create a new Client instance with the given properties.
- #max_channels ⇒ Object
- #max_frame_size ⇒ Object
-
#on_event(channel_id, method, callable = nil, &block) {|event| ... } ⇒ Proc, ...
Register a handler for events on the given channel of the given type.
- #password ⇒ Object
- #port ⇒ Object
-
#run_loop!(timeout: protocol_timeout, &block) ⇒ undefined
Fetch and handle events in a loop that blocks the calling thread.
-
#send_request(channel_id, method, properties = {}) ⇒ Object
Send a request on the given channel with the given type and properties.
- #ssl? ⇒ Boolean
-
#start ⇒ Object
Initiate the connection with the server.
- #user ⇒ Object
- #vhost ⇒ Object
Constructor Details
#initialize(*args) ⇒ Client
Create a new RabbitMQ::Client instance with the given properties. There are several ways to convey connection info:
Parsed options from a URL will be applied first, then any options given explicitly will override those parsed. If any options are ambiguous, they will have the default values:
{
user: "guest",
password: "guest",
host: "localhost",
vhost: "/",
port: 5672,
ssl: false,
max_channels: RabbitMQ::FFI::CHANNEL_MAX_ID, # absolute maximum
max_frame_size: 131072,
}
49 50 51 52 53 54 55 56 57 58 |
# File 'lib/rabbitmq/client.rb', line 49 def initialize(*args) @conn = Connection.new(*args) @open_channels = {} @released_channels = {} @event_handlers = Hash.new { |h,k| h[k] = {} } @incoming_events = Hash.new { |h,k| h[k] = {} } @protocol_timeout = DEFAULT_PROTOCOL_TIMEOUT end |
Instance Attribute Details
#protocol_timeout ⇒ Object
The timeout to use when waiting for protocol events, in seconds. By default, this has the value of DEFAULT_PROTOCOL_TIMEOUT. When set, it affects operations like #fetch_response and #run_loop!.
86 87 88 |
# File 'lib/rabbitmq/client.rb', line 86 def protocol_timeout @protocol_timeout end |
Instance Method Details
#break! ⇒ nil
Stop iterating from within an execution of the #run_loop! method. Call this method only from within an event handler. It will take effect only after the handler finishes running.
186 187 188 189 |
# File 'lib/rabbitmq/client.rb', line 186 def break! @breaking = true nil end |
#channel(id = nil) ⇒ Channel
Open a new channel of communication and return a new RabbitMQ::Channel object with convenience methods for communicating on that channel. The channel will be automatically released if the RabbitMQ::Channel instance is garbage collected, or if the RabbitMQ::Client connection is #closed.
202 203 204 205 206 |
# File 'lib/rabbitmq/client.rb', line 202 def channel(id=nil) id = allocate_channel(id) finalizer = Proc.new { release_channel(id) } Channel.new(self, @conn, id, finalizer) end |
#clear_event_handler(channel_id, method) ⇒ Proc?
Unregister the event handler associated with the given channel and method.
157 158 159 |
# File 'lib/rabbitmq/client.rb', line 157 def clear_event_handler(channel_id, method) @event_handlers[Integer(channel_id)].delete(method.to_sym) end |
#close ⇒ Object
Gracefully close the connection with the server. This will be done automatically on garbage collection if not called explicitly.
70 71 72 73 74 |
# File 'lib/rabbitmq/client.rb', line 70 def close @conn.close release_all_channels self end |
#destroy ⇒ Object
Free the native resources associated with this object. This will be done automatically on garbage collection if not called explicitly.
78 79 80 81 |
# File 'lib/rabbitmq/client.rb', line 78 def destroy @conn.destroy self end |
#fetch_response(channel_id, method, timeout: protocol_timeout) ⇒ Hash
Wait for a specific response on the given channel of the given type and return the event data for the response when it is received. Any other events received will be processed or stored internally.
125 126 127 128 129 |
# File 'lib/rabbitmq/client.rb', line 125 def fetch_response(channel_id, method, timeout: protocol_timeout) methods = Array(method).map(&:to_sym) timeout = Float(timeout) if timeout fetch_response_internal(Integer(channel_id), methods, timeout) end |
#host ⇒ Object
91 |
# File 'lib/rabbitmq/client.rb', line 91 def host; @conn..fetch(:host); end |
#max_channels ⇒ Object
95 |
# File 'lib/rabbitmq/client.rb', line 95 def max_channels; @conn..fetch(:max_channels); end |
#max_frame_size ⇒ Object
96 |
# File 'lib/rabbitmq/client.rb', line 96 def max_frame_size; @conn..fetch(:max_frame_size); end |
#on_event(channel_id, method, callable = nil, &block) {|event| ... } ⇒ Proc, ...
Register a handler for events on the given channel of the given type. Only one handler for each event type may be registered at a time. If no callable or block is given, the handler will be cleared.
142 143 144 145 146 147 148 149 |
# File 'lib/rabbitmq/client.rb', line 142 def on_event(channel_id, method, callable=nil, &block) handler = block || callable raise ArgumentError, "expected block or callable as the event handler" \ unless handler.respond_to?(:call) @event_handlers[Integer(channel_id)][method.to_sym] = handler handler end |
#password ⇒ Object
90 |
# File 'lib/rabbitmq/client.rb', line 90 def password; @conn..fetch(:password); end |
#port ⇒ Object
93 |
# File 'lib/rabbitmq/client.rb', line 93 def port; @conn..fetch(:port); end |
#run_loop!(timeout: protocol_timeout, &block) ⇒ undefined
Fetch and handle events in a loop that blocks the calling thread. The loop will continue until the #break! method is called from within an event handler, or until the given timeout duration has elapsed.
173 174 175 176 177 178 |
# File 'lib/rabbitmq/client.rb', line 173 def run_loop!(timeout: protocol_timeout, &block) timeout = Float(timeout) if timeout @breaking = false fetch_events(timeout, &block) nil end |
#send_request(channel_id, method, properties = {}) ⇒ Object
Send a request on the given channel with the given type and properties.
105 106 107 108 109 110 |
# File 'lib/rabbitmq/client.rb', line 105 def send_request(channel_id, method, properties={}) Util.error_check :"sending a request", @conn.send_method(Integer(channel_id), method.to_sym, properties) nil end |
#ssl? ⇒ Boolean
94 |
# File 'lib/rabbitmq/client.rb', line 94 def ssl?; @conn..fetch(:ssl); end |
#start ⇒ Object
Initiate the connection with the server. It is necessary to call this before any other communication, including creating a #channel.
62 63 64 65 66 |
# File 'lib/rabbitmq/client.rb', line 62 def start close # Close if already open @conn.start self end |
#user ⇒ Object
89 |
# File 'lib/rabbitmq/client.rb', line 89 def user; @conn..fetch(:user); end |
#vhost ⇒ Object
92 |
# File 'lib/rabbitmq/client.rb', line 92 def vhost; @conn..fetch(:vhost); end |