Class: RabbitMQ::Client

Inherits:
Object
  • Object
show all
Defined in:
lib/rabbitmq/client.rb,
lib/rabbitmq/client/connection.rb

Overview

A Client holds a connection to a RabbitMQ server and has facilities for sending events to and handling received events from that server.

A Client is not threadsafe; both the Client and any Channels linked to it should not be shared between threads. If they are shared without appropriate locking mechanisms, the behavior is undefined and might result in catastrophic process failures like segmentation faults in the underlying C library. A Client can be safely used in a multithreaded application by only passing control and message data between threads.

To use a Client effectively, it is necessary to understand the methods available in the underlying AMQP protocol. Please refer to the protocol documentation for more information about specific methods: www.rabbitmq.com/amqp-0-9-1-reference.html

Defined Under Namespace

Classes: Connection, DestroyedError

Constant Summary collapse

DEFAULT_PROTOCOL_TIMEOUT =

seconds

30

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(*args) ⇒ Client

Create a new RabbitMQ::Client instance with the given properties. There are several ways to convey connection info:

Parsed options from a URL will be applied first, then any options given explicitly will override those parsed. If any options are ambiguous, they will have the default values:

{
  user:           "guest",
  password:       "guest",
  host:           "localhost",
  vhost:          "/",
  port:           5672,
  ssl:            false,
  max_channels:   RabbitMQ::FFI::CHANNEL_MAX_ID, # absolute maximum
  max_frame_size: 131072,
}

Examples:

with a URL string

RabbitMQ::Client.new("amqp://user:password@host:1234/vhost")

with explicit options

RabbitMQ::Client.new(user: "user", password: "password", port: 1234)

with both URL string and explicit options

RabbitMQ::Client.new("amqp://host:1234", user: "user", password: "password")


49
50
51
52
53
54
55
56
57
58
# File 'lib/rabbitmq/client.rb', line 49

def initialize(*args)
  @conn = Connection.new(*args)
  
  @open_channels     = {}
  @released_channels = {}
  @event_handlers    = Hash.new { |h,k| h[k] = {} }
  @incoming_events   = Hash.new { |h,k| h[k] = {} }
  
  @protocol_timeout  = DEFAULT_PROTOCOL_TIMEOUT
end

Instance Attribute Details

#protocol_timeoutObject

The timeout to use when waiting for protocol events, in seconds. By default, this has the value of DEFAULT_PROTOCOL_TIMEOUT. When set, it affects operations like #fetch_response and #run_loop!.



86
87
88
# File 'lib/rabbitmq/client.rb', line 86

def protocol_timeout
  @protocol_timeout
end

Instance Method Details

#break!nil

Stop iterating from within an execution of the #run_loop! method. Call this method only from within an event handler. It will take effect only after the handler finishes running.



186
187
188
189
# File 'lib/rabbitmq/client.rb', line 186

def break!
  @breaking = true
  nil
end

#channel(id = nil) ⇒ Channel

Open a new channel of communication and return a new RabbitMQ::Channel object with convenience methods for communicating on that channel. The channel will be automatically released if the RabbitMQ::Channel instance is garbage collected, or if the RabbitMQ::Client connection is #closed.

Raises:

  • (ArgumentError)

    If the given channel id number is not unique or if the given channel id number is greater than #max_channels.



202
203
204
205
206
# File 'lib/rabbitmq/client.rb', line 202

def channel(id=nil)
  id = allocate_channel(id)
  finalizer = Proc.new { release_channel(id) }
  Channel.new(self, @conn, id, finalizer)
end

#clear_event_handler(channel_id, method) ⇒ Proc?

Unregister the event handler associated with the given channel and method.



157
158
159
# File 'lib/rabbitmq/client.rb', line 157

def clear_event_handler(channel_id, method)
  @event_handlers[Integer(channel_id)].delete(method.to_sym)
end

#closeObject

Gracefully close the connection with the server. This will be done automatically on garbage collection if not called explicitly.



70
71
72
73
74
# File 'lib/rabbitmq/client.rb', line 70

def close
  @conn.close
  release_all_channels
  self
end

#destroyObject

Free the native resources associated with this object. This will be done automatically on garbage collection if not called explicitly.



78
79
80
81
# File 'lib/rabbitmq/client.rb', line 78

def destroy
  @conn.destroy
  self
end

#fetch_response(channel_id, method, timeout: protocol_timeout) ⇒ Hash

Wait for a specific response on the given channel of the given type and return the event data for the response when it is received. Any other events received will be processed or stored internally.

Raises:



125
126
127
128
129
# File 'lib/rabbitmq/client.rb', line 125

def fetch_response(channel_id, method, timeout: protocol_timeout)
  methods = Array(method).map(&:to_sym)
  timeout = Float(timeout) if timeout
  fetch_response_internal(Integer(channel_id), methods, timeout)
end

#hostObject



91
# File 'lib/rabbitmq/client.rb', line 91

def host;           @conn.options.fetch(:host);           end

#max_channelsObject



95
# File 'lib/rabbitmq/client.rb', line 95

def max_channels;   @conn.options.fetch(:max_channels);   end

#max_frame_sizeObject



96
# File 'lib/rabbitmq/client.rb', line 96

def max_frame_size; @conn.options.fetch(:max_frame_size); end

#on_event(channel_id, method, callable = nil, &block) {|event| ... } ⇒ Proc, ...

Register a handler for events on the given channel of the given type. Only one handler for each event type may be registered at a time. If no callable or block is given, the handler will be cleared.

Yield Parameters:

  • event (Hash)

    The event passed to the handler.

Raises:

  • (ArgumentError)


142
143
144
145
146
147
148
149
# File 'lib/rabbitmq/client.rb', line 142

def on_event(channel_id, method, callable=nil, &block)
  handler = block || callable
  raise ArgumentError, "expected block or callable as the event handler" \
    unless handler.respond_to?(:call)
  
  @event_handlers[Integer(channel_id)][method.to_sym] = handler
  handler
end

#passwordObject



90
# File 'lib/rabbitmq/client.rb', line 90

def password;       @conn.options.fetch(:password);       end

#portObject



93
# File 'lib/rabbitmq/client.rb', line 93

def port;           @conn.options.fetch(:port);           end

#run_loop!(timeout: protocol_timeout, &block) ⇒ undefined

Fetch and handle events in a loop that blocks the calling thread. The loop will continue until the #break! method is called from within an event handler, or until the given timeout duration has elapsed.



173
174
175
176
177
178
# File 'lib/rabbitmq/client.rb', line 173

def run_loop!(timeout: protocol_timeout, &block)
  timeout = Float(timeout) if timeout
  @breaking = false
  fetch_events(timeout, &block)
  nil
end

#send_request(channel_id, method, properties = {}) ⇒ Object

Send a request on the given channel with the given type and properties.

Raises:



105
106
107
108
109
110
# File 'lib/rabbitmq/client.rb', line 105

def send_request(channel_id, method, properties={})
  Util.error_check :"sending a request",
    @conn.send_method(Integer(channel_id), method.to_sym, properties)
  
  nil
end

#ssl?Boolean



94
# File 'lib/rabbitmq/client.rb', line 94

def ssl?;           @conn.options.fetch(:ssl);            end

#startObject

Initiate the connection with the server. It is necessary to call this before any other communication, including creating a #channel.



62
63
64
65
66
# File 'lib/rabbitmq/client.rb', line 62

def start
  close # Close if already open
  @conn.start
  self
end

#userObject



89
# File 'lib/rabbitmq/client.rb', line 89

def user;           @conn.options.fetch(:user);           end

#vhostObject



92
# File 'lib/rabbitmq/client.rb', line 92

def vhost;          @conn.options.fetch(:vhost);          end