Class: RabbitMQ::Client

Inherits:
Object
  • Object
show all
Defined in:
lib/rabbitmq/client.rb,
lib/rabbitmq/client/connection.rb

Overview

A Client holds a connection to a RabbitMQ server and has facilities for sending events to and handling received events from that server.

A Client is not threadsafe; both the Client and any Channels linked to it should not be shared between threads. If they are shared without appropriate locking mechanisms, the behavior is undefined and might result in catastrophic process failures like segmentation faults in the underlying C library. A Client can be safely used in a multithreaded application by only passing control and message data between threads.

To use a Client effectively, it is necessary to understand the methods available in the underlying AMQP protocol. Please refer to the protocol documentation for more information about specific methods: www.rabbitmq.com/amqp-0-9-1-reference.html

Defined Under Namespace

Classes: Connection, DestroyedError

Constant Summary collapse

DEFAULT_PROTOCOL_TIMEOUT =

seconds

30

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(*args) ⇒ Client

Create a new RabbitMQ::Client instance with the given properties. There are several ways to convey connection info:

Parsed options from a URL will be applied first, then any options given explicitly will override those parsed. If any options are ambiguous, they will have the default values:

{
  user:           "guest",
  password:       "guest",
  host:           "localhost",
  vhost:          "/",
  port:           5672,
  ssl:            false,
  max_channels:   RabbitMQ::FFI::CHANNEL_MAX_ID, # absolute maximum
  max_frame_size: 131072,
}

Examples:

with a URL string

RabbitMQ::Client.new("amqp://user:password@host:1234/vhost")

with explicit options

RabbitMQ::Client.new(user: "user", password: "password", port: 1234)

with both URL string and explicit options

RabbitMQ::Client.new("amqp://host:1234", user: "user", password: "password")


49
50
51
52
53
54
55
56
57
58
# File 'lib/rabbitmq/client.rb', line 49

def initialize(*args)
  @conn = Connection.new(*args)
  
  @open_channels     = {}
  @released_channels = {}
  @event_handlers    = Hash.new { |h,k| h[k] = {} }
  @incoming_events   = Hash.new { |h,k| h[k] = {} }
  
  @protocol_timeout  = DEFAULT_PROTOCOL_TIMEOUT
end

Instance Attribute Details

#protocol_timeoutObject

The timeout to use when waiting for protocol events, in seconds. By default, this has the value of DEFAULT_PROTOCOL_TIMEOUT. When set, it affects operations like #fetch_response and #run_loop!.



86
87
88
# File 'lib/rabbitmq/client.rb', line 86

def protocol_timeout
  @protocol_timeout
end

Instance Method Details

#break!nil

Stop iterating from within an execution of the #run_loop! method. Call this method only from within an event handler. It will take effect only after the handler finishes running.

Returns:

  • (nil)


186
187
188
189
# File 'lib/rabbitmq/client.rb', line 186

def break!
  @breaking = true
  nil
end

#channel(id = nil) ⇒ Channel

Open a new channel of communication and return a new RabbitMQ::Channel object with convenience methods for communicating on that channel. The channel will be automatically released if the RabbitMQ::Channel instance is garbage collected, or if the RabbitMQ::Client connection is #closed.

Parameters:

  • id (Integer, nil) (defaults to: nil)

    The channel id number to use. If nil or not given, a unique channel number will be chosen automatically.

Returns:

  • (Channel)

    The new channel handle.

Raises:

  • (ArgumentError)

    If the given channel id number is not unique or if the given channel id number is greater than #max_channels.



202
203
204
205
206
# File 'lib/rabbitmq/client.rb', line 202

def channel(id=nil)
  id = allocate_channel(id)
  finalizer = Proc.new { release_channel(id) }
  Channel.new(self, @conn, id, finalizer)
end

#clear_event_handler(channel_id, method) ⇒ Proc?

Unregister the event handler associated with the given channel and method.

Parameters:

  • channel_id (Integer)

    The channel number to watch for.

  • method (Symbol)

    The type of protocol method to watch for.

Returns:

  • (Proc, nil)

    This removed handler, if any.



157
158
159
# File 'lib/rabbitmq/client.rb', line 157

def clear_event_handler(channel_id, method)
  @event_handlers[Integer(channel_id)].delete(method.to_sym)
end

#closeObject

Gracefully close the connection with the server. This will be done automatically on garbage collection if not called explicitly.



70
71
72
73
74
# File 'lib/rabbitmq/client.rb', line 70

def close
  @conn.close
  release_all_channels
  self
end

#destroyObject

Free the native resources associated with this object. This will be done automatically on garbage collection if not called explicitly.



78
79
80
81
# File 'lib/rabbitmq/client.rb', line 78

def destroy
  @conn.destroy
  self
end

#fetch_response(channel_id, method, timeout: protocol_timeout) ⇒ Hash

Wait for a specific response on the given channel of the given type and return the event data for the response when it is received. Any other events received will be processed or stored internally.

Parameters:

  • channel_id (Integer)

    The channel number to watch for.

  • method (Symbol, Array<Symbol>)

    The protocol method(s) to watch for.

  • timeout (Float) (defaults to: protocol_timeout)

    The maximum time to wait for a response in seconds; uses the value of #protocol_timeout by default.

Returns:

  • (Hash)

    the response data received.

Raises:



125
126
127
128
129
# File 'lib/rabbitmq/client.rb', line 125

def fetch_response(channel_id, method, timeout: protocol_timeout)
  methods = Array(method).map(&:to_sym)
  timeout = Float(timeout) if timeout
  fetch_response_internal(Integer(channel_id), methods, timeout)
end

#hostObject



91
# File 'lib/rabbitmq/client.rb', line 91

def host;           @conn.options.fetch(:host);           end

#max_channelsObject



95
# File 'lib/rabbitmq/client.rb', line 95

def max_channels;   @conn.options.fetch(:max_channels);   end

#max_frame_sizeObject



96
# File 'lib/rabbitmq/client.rb', line 96

def max_frame_size; @conn.options.fetch(:max_frame_size); end

#on_event(channel_id, method, callable = nil, &block) {|event| ... } ⇒ Proc, ...

Register a handler for events on the given channel of the given type. Only one handler for each event type may be registered at a time. If no callable or block is given, the handler will be cleared.

Parameters:

  • channel_id (Integer)

    The channel number to watch for.

  • method (Symbol)

    The type of protocol method to watch for.

  • callable (#call, nil) (defaults to: nil)

    The callable handler if no block is given.

  • block (Proc, nil)

    The handler block to register.

Yield Parameters:

  • event (Hash)

    The event passed to the handler.

Returns:

  • (Proc, #call, nil)

    The given block or callable.

Raises:

  • (ArgumentError)


142
143
144
145
146
147
148
149
# File 'lib/rabbitmq/client.rb', line 142

def on_event(channel_id, method, callable=nil, &block)
  handler = block || callable
  raise ArgumentError, "expected block or callable as the event handler" \
    unless handler.respond_to?(:call)
  
  @event_handlers[Integer(channel_id)][method.to_sym] = handler
  handler
end

#passwordObject



90
# File 'lib/rabbitmq/client.rb', line 90

def password;       @conn.options.fetch(:password);       end

#portObject



93
# File 'lib/rabbitmq/client.rb', line 93

def port;           @conn.options.fetch(:port);           end

#run_loop!(timeout: protocol_timeout, &block) ⇒ undefined

Fetch and handle events in a loop that blocks the calling thread. The loop will continue until the #break! method is called from within an event handler, or until the given timeout duration has elapsed.

Parameters:

  • timeout (Float) (defaults to: protocol_timeout)

    the maximum time to run the loop, in seconds; if none is given, the value is #protocol_timeout or until #break!

  • block (Proc, nil)

    if given, the block will be yielded each non-exception event received on any channel. Other handlers or response fetchings that match the event will still be processed, as the block does not consume the event or replace the handlers.

Returns:

  • (undefined)

    assume no value - reserved for future use.



173
174
175
176
177
178
# File 'lib/rabbitmq/client.rb', line 173

def run_loop!(timeout: protocol_timeout, &block)
  timeout = Float(timeout) if timeout
  @breaking = false
  fetch_events(timeout, &block)
  nil
end

#send_request(channel_id, method, properties = {}) ⇒ Object

Send a request on the given channel with the given type and properties.

Parameters:

  • channel_id (Integer)

    The channel number to send on.

  • method (Symbol)

    The type of protocol method to send.

  • properties (Hash) (defaults to: {})

    The properties to apply to the method.

Raises:



105
106
107
108
109
110
# File 'lib/rabbitmq/client.rb', line 105

def send_request(channel_id, method, properties={})
  Util.error_check :"sending a request",
    @conn.send_method(Integer(channel_id), method.to_sym, properties)
  
  nil
end

#ssl?Boolean

Returns:

  • (Boolean)


94
# File 'lib/rabbitmq/client.rb', line 94

def ssl?;           @conn.options.fetch(:ssl);            end

#startObject

Initiate the connection with the server. It is necessary to call this before any other communication, including creating a #channel.



62
63
64
65
66
# File 'lib/rabbitmq/client.rb', line 62

def start
  close # Close if already open
  @conn.start
  self
end

#userObject



89
# File 'lib/rabbitmq/client.rb', line 89

def user;           @conn.options.fetch(:user);           end

#vhostObject



92
# File 'lib/rabbitmq/client.rb', line 92

def vhost;          @conn.options.fetch(:vhost);          end