Class: RightAMQP::HABrokerClient
- Inherits:
-
Object
- Object
- RightAMQP::HABrokerClient
- Includes:
- RightSupport::Log::Mixin
- Defined in:
- lib/right_amqp/ha_client/ha_broker_client.rb
Overview
Client for multiple AMQP brokers used together to achieve a high availability messaging routing service
Defined Under Namespace
Classes: Context, CountedDeferrable, NoBrokerHosts, NoConnectedBrokers, NoUserData, Published
Constant Summary collapse
- RECONNECT_INTERVAL =
Default number of seconds between reconnect attempts
60
Instance Attribute Summary collapse
-
#brokers ⇒ Object
(Array(Broker)) Priority ordered list of AMQP broker clients (exposed only for unit test purposes).
Class Method Summary collapse
-
.addresses(host, port) ⇒ Object
Parse host and port information to form list of broker address information.
-
.identities(host, port = nil) ⇒ Object
Parse host and port information to form list of broker identities.
-
.identity(host, port = nil) ⇒ Object
Construct a broker serialized identity from its host and port of the form rs-broker-host-port, with any ‘-’s in host replaced by ‘~’.
-
.parse_user_data(user_data) ⇒ Object
Parse agent user data to extract broker host and port configuration An agent is permitted to only support using one broker.
Instance Method Summary collapse
-
#alias_(identity) ⇒ Object
Convert broker serialized identity to its alias.
-
#aliases(identities) ⇒ Object
Convert broker identities to aliases.
-
#all ⇒ Object
Get serialized identity of all brokers.
-
#close(&blk) ⇒ Object
Close all broker client connections.
-
#close_one(identity, propagate = true, &blk) ⇒ Object
Close an individual broker client connection.
-
#connect(host, port, index, priority = nil, force = false, &blk) ⇒ Object
Make new connection to broker at specified address unless already connected or currently connecting.
-
#connected ⇒ Object
Get serialized identity of connected brokers.
-
#connected?(identity) ⇒ Boolean
Check whether connected to broker.
-
#connection_status(options = {}, &callback) ⇒ Object
Register callback to be activated when there is a change in connection status Can be called more than once without affecting previous callbacks.
-
#declare(type, name, options = {}) ⇒ Object
Declare queue or exchange object but do not subscribe to it.
-
#declare_unusable(identities) ⇒ Object
Declare a broker client as unusable.
-
#delete(name, options = {}) ⇒ Object
Delete queue in all usable brokers or all selected brokers that are usable.
-
#delete_amqp_resources(name, options = {}) ⇒ Object
Delete queue resources from AMQP in all usable brokers.
-
#failed ⇒ Object
Get serialized identity of failed broker clients, i.e., ones that were never successfully connected, not ones that are just disconnected.
-
#get(id) ⇒ Object
Get broker serialized identity if client exists.
-
#heartbeat=(heartbeat) ⇒ Object
Change connection heartbeat frequency to be used for any new connections.
-
#hosts ⇒ Object
Form string of hosts and associated indices.
-
#identity_parts(id) ⇒ Object
Break broker serialized identity down into individual parts if exists.
-
#initialize(serializer, options = {}) ⇒ HABrokerClient
constructor
Create connections to all configured AMQP brokers The constructed broker client list is in priority order.
-
#non_delivery(&blk) ⇒ Object
Provide callback to be activated when a message cannot be delivered.
-
#ports ⇒ Object
Form string of ports and associated indices.
-
#publish(exchange, packet, options = {}) ⇒ Object
Publish message to AMQP exchange of first connected broker.
-
#queue_status(queue_names, timeout = nil, &blk) ⇒ Object
Check status of specified queues for connected brokers Silently ignore unknown queues If a queue whose status is being checked does not exist, the associated broker connection will fail and be unusable.
-
#remove(host, port, &blk) ⇒ Object
Remove a broker client from the configuration Invoke connection status callbacks only if connection is not already disabled There is no check whether this is the last usable broker client.
-
#reset_stats ⇒ Object
Reset broker client statistics Do not reset disconnect and failure stats because they might then be inconsistent with underlying connection status.
-
#stats(reset = false) ⇒ Object
Get broker client statistics.
-
#status ⇒ Object
Get status summary.
-
#subscribe(queue, exchange = nil, options = {}, &blk) ⇒ Object
Subscribe an AMQP queue to an AMQP exchange on all broker clients that are connected or still connecting Allow connecting here because subscribing may happen before all have confirmed connected Do not wait for confirmation from broker client that subscription is complete When a message is received, acknowledge, unserialize, and log it as specified If the message is unserialized and it is not of the right type, it is dropped after logging a warning.
-
#unsubscribe(queue_names, timeout = nil, &blk) ⇒ Object
Unsubscribe from the specified queues on usable broker clients Silently ignore unknown queues.
-
#unusable ⇒ Object
Get serialized identity of unusable brokers.
-
#usable ⇒ Object
Get serialized identity of brokers that are usable, i.e., connecting or confirmed connected.
Constructor Details
#initialize(serializer, options = {}) ⇒ HABrokerClient
Create connections to all configured AMQP brokers The constructed broker client list is in priority order
Parameters
- serializer(Serializer)
-
Serializer used for marshaling packets being published or
unmarshaling received messages to packets (responds to :dump and :load); if nil, has
same effect as setting subscribe option :no_serialize and publish option :no_unserialize
- options(Hash)
-
Configuration options
- :user(String)
-
User name
- :pass(String)
-
Password
- :vhost(String)
-
Virtual host path name
- :ssl(Boolean)
-
Whether SSL is enabled
- :insist(Boolean)
-
Whether to suppress redirection of connection
- :reconnect_interval(Integer)
-
Number of seconds between reconnect attempts, defaults to RECONNECT_INTERVAL
- :heartbeat(Integer)
-
Number of seconds between AMQP connection heartbeats used to keep
connection alive (e.g., when AMQP broker is behind a firewall), nil or 0 means disable
:host{String):: Comma-separated list of AMQP broker host names; if only one, it is reapplied
to successive ports; if none, defaults to localhost; each host may be followed by ':'
and a short string to be used as a broker index; the index defaults to the list index,
e.g., "host_a:0, host_c:2"
:port(String|Integer):: Comma-separated list of AMQP broker port numbers corresponding to :host list;
if only one, it is incremented and applied to successive hosts; if none, defaults to AMQP::PORT
:prefetch(Integer):: Maximum number of messages the AMQP broker is to prefetch for the agent
before it receives an ack. Value 1 ensures that only last unacknowledged gets redelivered
if the agent crashes. Value 0 means unlimited prefetch.
:order(Symbol):: Broker selection order when publishing a message: :random or :priority,
defaults to :priority, value can be overridden on publish call
:exception_stats(RightSupport::Stats::Exceptions):: Exception statistics container to be used
instead of internally created one
:exception_callback(Proc):: Callback activated on exception events with parameters
exception(Exception):: Exception
message(Packet):: Message being processed
client(HABrokerClient):: Reference to this client
:exception_on_receive_callback(Proc):: Callback activated on a receive exception with parameters
message(String):: Message content that caused an exception
exception(Exception):: Exception that was raised
Raise
- ArgumentError
-
If :host and :port are not matched lists or if serializer does not respond
to :dump and :load
138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 |
# File 'lib/right_amqp/ha_client/ha_broker_client.rb', line 138 def initialize(serializer, = {}) @options = .dup @options[:update_status_callback] = lambda { |b, c| update_status(b, c) } @options[:return_message_callback] = lambda { |i, t, r, m| handle_return(i, t, r, m) } @options[:reconnect_interval] ||= RECONNECT_INTERVAL @connection_status = {} unless serializer.nil? || [:dump, :load].all? { |m| serializer.respond_to?(m) } raise ArgumentError, "serializer must be a class/object that responds to :dump and :load" end @serializer = serializer @published = Published.new reset_stats @select = @options[:order] || :priority @brokers = connect_all @closed = false @brokers_hash = {} @brokers.each { |b| @brokers_hash[b.identity] = b } end |
Instance Attribute Details
#brokers ⇒ Object
(Array(Broker)) Priority ordered list of AMQP broker clients (exposed only for unit test purposes)
96 97 98 |
# File 'lib/right_amqp/ha_client/ha_broker_client.rb', line 96 def brokers @brokers end |
Class Method Details
.addresses(host, port) ⇒ Object
Parse host and port information to form list of broker address information
Parameters
- host{String)
-
Comma-separated list of broker host names; if only one, it is reapplied
to successive ports; if none, defaults to localhost; each host may be followed by ':'
and a short string to be used as a broker index; the index defaults to the list index,
e.g., "host_a:0, host_c:2"
- port(String|Integer)
-
Comma-separated list of broker port numbers corresponding to :host list;
if only one, it is incremented and applied to successive hosts; if none, defaults to AMQP::PORT
Returns
- (Array(Hash))
-
List of broker addresses with keys :host, :port, :index
Raise
- ArgumentError
-
If host and port are not matched lists
215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 |
# File 'lib/right_amqp/ha_client/ha_broker_client.rb', line 215 def self.addresses(host, port) hosts = if host && !host.empty? then host.split(/,\s*/) else [ "localhost" ] end ports = if port && port.size > 0 then port.to_s.split(/,\s*/) else [ ::AMQP::PORT ] end if hosts.size != ports.size && hosts.size != 1 && ports.size != 1 raise ArgumentError.new("Unmatched AMQP host/port lists -- hosts: #{host.inspect} ports: #{port.inspect}") end i = -1 if hosts.size > 1 hosts.map do |host| i += 1 h = host.split(/:\s*/) port = if ports[i] then ports[i].to_i else ports[0].to_i end port = port.to_s.split(/:\s*/)[0] {:host => h[0], :port => port.to_i, :index => (h[1] || i.to_s).to_i} end else ports.map do |port| i += 1 p = port.to_s.split(/:\s*/) host = if hosts[i] then hosts[i] else hosts[0] end host = host.split(/:\s*/)[0] {:host => host, :port => p[0].to_i, :index => (p[1] || i.to_s).to_i} end end end |
.identities(host, port = nil) ⇒ Object
Parse host and port information to form list of broker identities
Parameters
- host{String)
-
Comma-separated list of broker host names; if only one, it is reapplied
to successive ports; if none, defaults to localhost; each host may be followed by ':'
and a short string to be used as a broker index; the index defaults to the list index,
e.g., "host_a:0, host_c:2"
- port(String|Integer)
-
Comma-separated list of broker port numbers corresponding to :host list;
if only one, it is incremented and applied to successive hosts; if none, defaults to AMQP::PORT
Returns
- (Array)
-
Identity of each broker
Raise
- ArgumentError
-
If host and port are not matched lists
256 257 258 |
# File 'lib/right_amqp/ha_client/ha_broker_client.rb', line 256 def self.identities(host, port = nil) addresses(host, port).map { |a| identity(a[:host], a[:port]) } end |
.identity(host, port = nil) ⇒ Object
Construct a broker serialized identity from its host and port of the form rs-broker-host-port, with any ‘-’s in host replaced by ‘~’
Parameters
- host{String)
-
IP host name or address for individual broker
- port(Integer)
-
TCP port number for individual broker, defaults to ::AMQP::PORT
Returns
- (String)
-
Broker serialized identity
269 270 271 272 |
# File 'lib/right_amqp/ha_client/ha_broker_client.rb', line 269 def self.identity(host, port = nil) port ||= ::AMQP::PORT "rs-broker-#{host.gsub('-', '~')}-#{port.to_i}" end |
.parse_user_data(user_data) ⇒ Object
Parse agent user data to extract broker host and port configuration An agent is permitted to only support using one broker
Parameters
- user_data(String)
-
Agent user data in <name>=<value>&<name>=<value>&… form
with required name RS_rn_url and optional names RS_rn_host and RS_rn_port
Return
- (Array)
-
Broker hosts and ports as comma-separated list in priority order in the form
<hostname>:<index>,<hostname>:<index>,...
<port>:<index>,<port>:<index>,... or nil if none specified
Raise
- NoUserData
-
If the user data is missing
- NoBrokerHosts
-
If no brokers could be extracted from the user data
172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 |
# File 'lib/right_amqp/ha_client/ha_broker_client.rb', line 172 def self.parse_user_data(user_data) raise NoUserData.new("User data is missing") if user_data.nil? || user_data.empty? hosts = "" ports = nil parsed = {} user_data.split("&").each do |data| name, value = data.split("=") # Guard against repeats unless parsed[name] if name == "RS_rn_url" h = value.split("@").last.split("/").first # Translate host name used by very old agents using only one broker h = "broker1-1.rightscale.com" if h == "broker.rightscale.com" hosts = h + hosts end if name == "RS_rn_host" hosts << value end if name == "RS_rn_port" ports = value end parsed[name] = true end end raise NoBrokerHosts.new("No brokers found in user data") if hosts.empty? [hosts, ports] end |
Instance Method Details
#alias_(identity) ⇒ Object
Convert broker serialized identity to its alias
Parameters
- identity(String)
-
Broker serialized identity
Return
- (String|nil)
-
Broker alias, or nil if not a known broker
306 307 308 |
# File 'lib/right_amqp/ha_client/ha_broker_client.rb', line 306 def alias_(identity) @brokers_hash[identity].alias rescue nil end |
#aliases(identities) ⇒ Object
Convert broker identities to aliases
Parameters
- identities(Array)
-
Broker identities
Return
- (Array)
-
Broker aliases
295 296 297 |
# File 'lib/right_amqp/ha_client/ha_broker_client.rb', line 295 def aliases(identities) identities.map { |i| alias_(i) } end |
#all ⇒ Object
Get serialized identity of all brokers
Return
- (Array)
-
Serialized identity of all brokers
377 378 379 |
# File 'lib/right_amqp/ha_client/ha_broker_client.rb', line 377 def all @brokers.map { |b| b.identity } end |
#close(&blk) ⇒ Object
Close all broker client connections
Block
Optional block with no parameters to be called after all connections are closed
Return
- true
-
Always return true
756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 |
# File 'lib/right_amqp/ha_client/ha_broker_client.rb', line 756 def close(&blk) if @closed blk.call if blk else @closed = true @connection_status = {} handler = CountedDeferrable.new(@brokers.size) handler.callback { blk.call if blk } @brokers.each do |b| begin b.close(propagate = false) { handler.completed_one } rescue StandardError => e handler.completed_one logger.exception("Failed to close broker #{b.alias}", e, :trace) @exception_stats.track("close", e) end end end true end |
#close_one(identity, propagate = true, &blk) ⇒ Object
Close an individual broker client connection
Parameters
- identity(String)
-
Broker serialized identity
- propagate(Boolean)
-
Whether to propagate connection status updates
Block
Optional block with no parameters to be called after connection closed
Return
- true
-
Always return true
Raise
- Exception
-
If broker unknown
791 792 793 794 795 796 |
# File 'lib/right_amqp/ha_client/ha_broker_client.rb', line 791 def close_one(identity, propagate = true, &blk) broker = @brokers_hash[identity] raise Exception, "Cannot close unknown broker #{identity}" unless broker broker.close(propagate, &blk) true end |
#connect(host, port, index, priority = nil, force = false, &blk) ⇒ Object
Make new connection to broker at specified address unless already connected or currently connecting
Parameters
- host{String)
-
IP host name or address for individual broker
- port(Integer)
-
TCP port number for individual broker
- index(Integer)
-
Unique index for broker within set for use in forming alias
- priority(Integer|nil)
-
Priority position of this broker in set for use by this agent
with nil or a value that would leave a gap in the list meaning add to end of list
- force(Boolean)
-
Reconnect even if already connected
Block
Optional block with following parameters to be called after initiating the connection unless already connected to this broker:
identity(String):: Broker serialized identity
Return
- (Boolean)
-
true if connected, false if no connect attempt made
Raise
- Exception
-
If host and port do not match an existing broker but index does
423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 |
# File 'lib/right_amqp/ha_client/ha_broker_client.rb', line 423 def connect(host, port, index, priority = nil, force = false, &blk) identity = self.class.identity(host, port) existing = @brokers_hash[identity] if existing && existing.usable? && !force logger.info("Ignored request to reconnect #{identity} because already #{existing.status.to_s}") false else old_identity = identity @brokers.each do |b| if index == b.index # Changing host and/or port of existing broker client old_identity = b.identity break end end unless existing address = {:host => host, :port => port, :index => index} broker = BrokerClient.new(identity, address, @serializer, @exception_stats, @non_delivery_stats, @options, existing) p = priority(old_identity) if priority && priority < p @brokers.insert(priority, broker) elsif priority && priority > p logger.info("Reduced priority setting for broker #{identity} from #{priority} to #{p} to avoid gap in list") @brokers.insert(p, broker) else @brokers[p].close if @brokers[p] @brokers[p] = broker end @brokers_hash[identity] = broker yield broker.identity if block_given? true end end |
#connected ⇒ Object
Get serialized identity of connected brokers
Return
- (Array)
-
Serialized identity of connected brokers
353 354 355 |
# File 'lib/right_amqp/ha_client/ha_broker_client.rb', line 353 def connected @brokers.inject([]) { |c, b| if b.connected? then c << b.identity else c end } end |
#connected?(identity) ⇒ Boolean
Check whether connected to broker
Parameters
- identity{String)
-
Broker serialized identity
Return
- (Boolean)
-
true if connected to broker, otherwise false, or nil if broker unknown
345 346 347 |
# File 'lib/right_amqp/ha_client/ha_broker_client.rb', line 345 def connected?(identity) @brokers_hash[identity].connected? rescue nil end |
#connection_status(options = {}, &callback) ⇒ Object
Register callback to be activated when there is a change in connection status Can be called more than once without affecting previous callbacks
Parameters
- options(Hash)
-
Connection status monitoring options
- :one_off(Integer)
-
Seconds to wait for status change; only send update once;
if timeout, report :timeout as the status
:boundary(Symbol):: :any if only report change on any (0/1) boundary,
:all if only report change on all (n-1/n) boundary, defaults to :any
:brokers(Array):: Only report a status change for these identified brokers
Block
Required block activated when connected count crosses a status boundary with following parameters
status(Symbol):: Status of connection: :connected, :disconnected, or :failed, with
:failed indicating that all selected brokers or all brokers have failed
Return
- id(String)
-
Identifier associated with connection status request
816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 |
# File 'lib/right_amqp/ha_client/ha_broker_client.rb', line 816 def connection_status( = {}, &callback) id = generate_id @connection_status[id] = {:boundary => [:boundary], :brokers => [:brokers], :callback => callback} if timeout = [:one_off] @connection_status[id][:timer] = EM::Timer.new(timeout) do if @connection_status[id] if @connection_status[id][:callback].arity == 2 @connection_status[id][:callback].call(:timeout, nil) else @connection_status[id][:callback].call(:timeout) end @connection_status.delete(id) end end end id end |
#declare(type, name, options = {}) ⇒ Object
Declare queue or exchange object but do not subscribe to it
Parameters
- type(Symbol)
-
Type of object: :queue, :direct, :fanout or :topic
- name(String)
-
Name of object
- options(Hash)
-
Standard AMQP declare options plus
- :brokers(Array)
-
Identity of brokers for which to declare, defaults to all usable if nil or empty
Return
- identities(Array)
-
Identity of brokers where successfully declared
543 544 545 546 547 548 549 550 |
# File 'lib/right_amqp/ha_client/ha_broker_client.rb', line 543 def declare(type, name, = {}) identities = [] brokers = .delete(:brokers) each(:usable, brokers) { |b| identities << b.identity if b.declare(type, name, ) } logger.info("Could not declare #{type.to_s} #{name.inspect} on brokers #{each(:usable, brokers).inspect} " + "when selected #{brokers.inspect} from usable #{usable.inspect}") if identities.empty? identities end |
#declare_unusable(identities) ⇒ Object
Declare a broker client as unusable
Parameters
- identities(Array)
-
Identity of brokers
Return
- true
-
Always return true
Raises
- Exception
-
If identified broker is unknown
741 742 743 744 745 746 747 |
# File 'lib/right_amqp/ha_client/ha_broker_client.rb', line 741 def declare_unusable(identities) identities.each do |id| broker = @brokers_hash[id] raise Exception, "Cannot mark unknown broker #{id} unusable" unless broker broker.close(propagate = true, normal = false, log = false) end end |
#delete(name, options = {}) ⇒ Object
Delete queue in all usable brokers or all selected brokers that are usable
Parameters
- name(String)
-
Queue name
- options(Hash)
-
Queue declare options plus
- :brokers(Array)
-
Identity of brokers in which queue is to be deleted
Return
- identities(Array)
-
Identity of brokers where queue was deleted
677 678 679 680 681 682 683 |
# File 'lib/right_amqp/ha_client/ha_broker_client.rb', line 677 def delete(name, = {}) identities = [] u = usable brokers = .delete(:brokers) ((brokers || u) & u).each { |i| identities << i if (b = @brokers_hash[i]) && b.delete(name, ) } identities end |
#delete_amqp_resources(name, options = {}) ⇒ Object
Delete queue resources from AMQP in all usable brokers
Parameters
- name(String)
-
Queue name
- options(Hash)
-
Queue declare options plus
- :brokers(Array)
-
Identity of brokers in which queue is to be deleted
Return
- identities(Array)
-
Identity of brokers where queue was deleted
694 695 696 697 698 699 |
# File 'lib/right_amqp/ha_client/ha_broker_client.rb', line 694 def delete_amqp_resources(name, = {}) identities = [] u = usable (([:brokers] || u) & u).each { |i| identities << i if (b = @brokers_hash[i]) && b.delete_amqp_resources(:queue, name) } identities end |
#failed ⇒ Object
Get serialized identity of failed broker clients, i.e., ones that were never successfully connected, not ones that are just disconnected
Return
- (Array)
-
Serialized identity of failed broker clients
386 387 388 |
# File 'lib/right_amqp/ha_client/ha_broker_client.rb', line 386 def failed @brokers.inject([]) { |c, b| b.failed? ? c << b.identity : c } end |
#get(id) ⇒ Object
Get broker serialized identity if client exists
Parameters
- id(Integer|String)
-
Broker alias or serialized identity
Return
- (String|nil)
-
Broker serialized identity if client found, otherwise nil
333 334 335 336 |
# File 'lib/right_amqp/ha_client/ha_broker_client.rb', line 333 def get(id) @brokers.each { |b| return b.identity if b.identity == id || b.alias == id } nil end |
#heartbeat=(heartbeat) ⇒ Object
Change connection heartbeat frequency to be used for any new connections
Parameters
- heartbeat(Integer)
-
Number of seconds between AMQP connection heartbeats used to keep
connection alive (e.g., when AMQP broker is behind a firewall), nil or 0 means disable
Return
- (Integer|nil)
-
New heartbeat setting
398 399 400 |
# File 'lib/right_amqp/ha_client/ha_broker_client.rb', line 398 def heartbeat=(heartbeat) @options[:heartbeat] = heartbeat end |
#hosts ⇒ Object
Form string of hosts and associated indices
Return
- (String)
-
Comma separated list of host:index
314 315 316 |
# File 'lib/right_amqp/ha_client/ha_broker_client.rb', line 314 def hosts @brokers.map { |b| "#{b.host}:#{b.index}" }.join(",") end |
#identity_parts(id) ⇒ Object
Break broker serialized identity down into individual parts if exists
Parameters
- id(Integer|String)
-
Broker alias or serialized identity
Return
- (Array)
-
Host, port, index, and priority, or all nil if broker not found
281 282 283 284 285 286 |
# File 'lib/right_amqp/ha_client/ha_broker_client.rb', line 281 def identity_parts(id) @brokers.each do |b| return [b.host, b.port, b.index, priority(b.identity)] if b.identity == id || b.alias == id end [nil, nil, nil, nil] end |
#non_delivery(&blk) ⇒ Object
Provide callback to be activated when a message cannot be delivered
Block
Required block with parameters
reason(String):: Non-delivery reason
"NO_ROUTE" - queue does not exist
"NO_CONSUMERS" - queue exists but it has no consumers, or if :immediate was specified,
all consumers are not immediately ready to consume
"ACCESS_REFUSED" - queue not usable because broker is in the process of stopping service
type(String|nil):: Request type, or nil if not applicable
token(String|nil):: Generated message identifier, or nil if not applicable
from(String|nil):: Identity of original sender of message, or nil if not applicable
to(String):: Queue to which message was published
Return
- true
-
Always return true
663 664 665 666 |
# File 'lib/right_amqp/ha_client/ha_broker_client.rb', line 663 def non_delivery(&blk) @non_delivery = blk true end |
#ports ⇒ Object
Form string of ports and associated indices
Return
- (String)
-
Comma separated list of port:index
322 323 324 |
# File 'lib/right_amqp/ha_client/ha_broker_client.rb', line 322 def ports @brokers.map { |b| "#{b.port}:#{b.index}" }.join(",") end |
#publish(exchange, packet, options = {}) ⇒ Object
Publish message to AMQP exchange of first connected broker
Parameters
- exchange(Hash)
-
AMQP exchange to subscribe to with keys :type, :name, and :options,
which are the standard AMQP ones plus
:no_declare(Boolean):: Whether to skip declaring this exchange or queue on the broker
to cause its creation; for use when client does not have create permission or
knows the object already exists and wants to avoid declare overhead
:declare(Boolean):: Whether to delete this exchange or queue from the AMQP cache
to force it to be declared on the broker and thus be created if it does not exist
- packet(Packet)
-
Message to serialize and publish
- options(Hash)
-
Publish options – standard AMQP ones plus
- :mandatory(Boolean)
-
Return message if the exchange does not have any associated queues
or if all the associated queues do not have any consumers
:immediate(Boolean):: Return message for the same reasons as :mandatory plus if all
of the queues associated with the exchange are not immediately ready to consume the message
:fanout(Boolean):: true means publish to all connected brokers
:brokers(Array):: Identity of brokers selected for use, defaults to all brokers
if nil or empty
:order(Symbol):: Broker selection order: :random or :priority,
defaults to @select if :brokers is nil, otherwise defaults to :priority
:no_serialize(Boolean):: Do not serialize packet because it is already serialized,
this is an escape for special situations like enrollment, also implicitly disables
publish logging; this option is implicitly invoked if initialize without a serializer
:log_filter(Array(Symbol)):: Filters to be applied in to_s when logging packet to :info
:log_data(String):: Additional data to display at end of log entry
:no_log(Boolean):: Disable publish logging unless debug level
Return
- identities(Array)
-
Identity of brokers where packet was successfully published
Raise
- NoConnectedBrokers
-
If cannot find a connected broker
624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 |
# File 'lib/right_amqp/ha_client/ha_broker_client.rb', line 624 def publish(exchange, packet, = {}) identities = [] no_serialize = [:no_serialize] || @serializer.nil? = if no_serialize then packet else @serializer.dump(packet) end brokers = use() brokers.each do |b| if b.publish(exchange, packet, , .merge(:no_serialize => no_serialize)) identities << b.identity if [:mandatory] && !no_serialize context = Context.new(packet, , brokers.map { |b| b.identity }) @published.store(, context) end break unless [:fanout] end end if identities.empty? selected = "selected " if [:brokers] list = aliases(brokers.map { |b| b.identity }).join(", ") raise NoConnectedBrokers, "None of #{selected}brokers [#{list}] are usable for publishing" end identities end |
#queue_status(queue_names, timeout = nil, &blk) ⇒ Object
Check status of specified queues for connected brokers Silently ignore unknown queues If a queue whose status is being checked does not exist, the associated broker connection will fail and be unusable
Parameters
- queue_names(Array)
-
Names of queues previously subscribed to that are to be checked
- timeout(Integer)
-
Number of seconds to wait for all status checks, defaults to no timeout
Block
Optional block to be called after all queue statuses are obtained with hash parameter containing statuses with queue name as key and value that is a hash with broker identity as key and hash of :messages and :consumers as value; the :messages and :consumers values are nil if there is a failure retrieving them for the given queue
Return
- true
-
Always return true
569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 |
# File 'lib/right_amqp/ha_client/ha_broker_client.rb', line 569 def queue_status(queue_names, timeout = nil, &blk) count = 0 status = {} each(:connected) { |b| b.queues.each { |q| count += 1 if queue_names.include?(q.name) } } if count == 0 blk.call(status) if blk else handler = CountedDeferrable.new(count, timeout) handler.callback { blk.call(status) if blk } each(:connected) do |b| if b.queue_status(queue_names) do |name, , consumers| (status[name] ||= {})[b.identity] = {:messages => , :consumers => consumers} handler.completed_one end else b.queues.each { |q| handler.completed_one if queue_names.include?(q.name) } end end end true end |
#remove(host, port, &blk) ⇒ Object
Remove a broker client from the configuration Invoke connection status callbacks only if connection is not already disabled There is no check whether this is the last usable broker client
Parameters
- host{String)
-
IP host name or address for individual broker
- port(Integer)
-
TCP port number for individual broker
Block
Optional block with following parameters to be called after removing the connection unless broker is not configured
identity(String):: Broker serialized identity
Return
- identity(String|nil)
-
Serialized identity of broker removed, or nil if unknown
716 717 718 719 720 721 722 723 724 725 726 727 728 729 |
# File 'lib/right_amqp/ha_client/ha_broker_client.rb', line 716 def remove(host, port, &blk) identity = self.class.identity(host, port) if broker = @brokers_hash[identity] logger.info("Removing #{identity}, alias #{broker.alias} from broker list") broker.close(propagate = true, normal = true, log = false) @brokers_hash.delete(identity) @brokers.reject! { |b| b.identity == identity } yield identity if block_given? else logger.info("Ignored request to remove #{identity} from broker list because unknown") identity = nil end identity end |
#reset_stats ⇒ Object
Reset broker client statistics Do not reset disconnect and failure stats because they might then be inconsistent with underlying connection status
Return
- true
-
Always return true
882 883 884 885 886 887 |
# File 'lib/right_amqp/ha_client/ha_broker_client.rb', line 882 def reset_stats @return_stats = RightSupport::Stats::Activity.new @non_delivery_stats = RightSupport::Stats::Activity.new @exception_stats = @options[:exception_stats] || RightSupport::Stats::Exceptions.new(self, @options[:exception_callback]) true end |
#stats(reset = false) ⇒ Object
Get broker client statistics
Parameters:
- reset(Boolean)
-
Whether to reset the statistics after getting the current ones
Return
- stats(Hash)
-
Broker client stats with keys
- “brokers”(Array)
-
Stats for each broker client in priority order
- “exceptions”(Hash|nil)
-
Exceptions raised per category, or nil if none
- “total”(Integer)
-
Total exceptions for this category
- “recent”(Array)
-
Most recent as a hash of “count”, “type”, “message”, “when”, and “where”
- “heartbeat”(Integer|nil)
-
Number of seconds between AMQP heartbeats, or nil if heartbeat disabled
- “non_deliveries”(Hash|nil)
-
Non-delivery activity stats with keys “total”, “percent”, “last”, and “rate”
with percentage breakdown per non-delivery reason, or nil if none
"returns"(Hash|nil):: Message return activity stats with keys "total", "percent", "last", and "rate"
with percentage breakdown per return reason, or nil if none
864 865 866 867 868 869 870 871 872 873 874 |
# File 'lib/right_amqp/ha_client/ha_broker_client.rb', line 864 def stats(reset = false) stats = { "brokers" => @brokers.map { |b| b.stats }, "heartbeat" => @options[:heartbeat], "non-deliveries" => @non_delivery_stats.all, "returns" => @return_stats.all } stats["exceptions"] = @exception_stats.stats unless @options[:exception_stats] reset_stats if reset stats end |
#status ⇒ Object
Get status summary
Return
- (Array(Hash))
-
Status of each configured broker with keys
- :identity(String)
-
Broker serialized identity
- :alias(String)
-
Broker alias used in logs
- :status(Symbol)
-
Status of connection
- :disconnects(Integer)
-
Number of times lost connection
- :failures(Integer)
-
Number of times connect failed
- :retries(Integer)
-
Number of attempts to connect after failure
844 845 846 |
# File 'lib/right_amqp/ha_client/ha_broker_client.rb', line 844 def status @brokers.map { |b| b.summary } end |
#subscribe(queue, exchange = nil, options = {}, &blk) ⇒ Object
Subscribe an AMQP queue to an AMQP exchange on all broker clients that are connected or still connecting Allow connecting here because subscribing may happen before all have confirmed connected Do not wait for confirmation from broker client that subscription is complete When a message is received, acknowledge, unserialize, and log it as specified If the message is unserialized and it is not of the right type, it is dropped after logging a warning
Parameters
- queue(Hash)
-
AMQP queue being subscribed with keys :name and :options,
which are the standard AMQP ones plus
:no_declare(Boolean):: Whether to skip declaring this queue on the broker
to cause its creation; for use when client does not have permission to create or
knows the queue already exists and wants to avoid declare overhead
- exchange(Hash|nil)
-
AMQP exchange to subscribe to with keys :type, :name, and :options,
nil means use empty exchange by directly subscribing to queue; the :options are the
standard AMQP ones plus
:no_declare(Boolean):: Whether to skip declaring this exchange on the broker
to cause its creation; for use when client does not have create permission or
knows the exchange already exists and wants to avoid declare overhead
- options(Hash)
-
Subscribe options:
- :ack(Boolean)
-
Whether client takes responsibility for explicitly acknowledging each
message received, defaults to implicit acknowledgement in AMQP as part of message receipt
:no_unserialize(Boolean):: Do not unserialize message, this is an escape for special
situations like enrollment, also implicitly disables receive filtering and logging;
this option is implicitly invoked if initialize without a serializer
(packet class)(Array(Symbol)):: Filters to be applied in to_s when logging packet to :info,
only packet classes specified are accepted, others are not processed but are logged with error
:category(String):: Packet category description to be used in error messages
:log_data(String):: Additional data to display at end of log entry
:no_log(Boolean):: Disable receive logging unless debug level
:exchange2(Hash):: Additional exchange to which same queue is to be bound
:brokers(Array):: Identity of brokers for which to subscribe, defaults to all usable if nil or empty
Block
Block with following parameters to be called each time exchange matches a message to the queue:
identity(String):: Serialized identity of broker delivering the message
message(Packet|String):: Message received, which is unserialized unless :no_unserialize was specified
header(AMQP::Protocol::Header):: Message header (optional block parameter)
Return
- identities(Array)
-
Identity of brokers where successfully subscribed
497 498 499 500 501 502 503 504 505 |
# File 'lib/right_amqp/ha_client/ha_broker_client.rb', line 497 def subscribe(queue, exchange = nil, = {}, &blk) identities = [] brokers = .delete(:brokers) each(:usable, brokers) { |b| identities << b.identity if b.subscribe(queue, exchange, , &blk) } logger.info("Could not subscribe to queue #{queue.inspect} on exchange #{exchange.inspect} " + "on brokers #{each(:usable, brokers).inspect} when selected #{brokers.inspect} " + "from usable #{usable.inspect}") if identities.empty? identities end |
#unsubscribe(queue_names, timeout = nil, &blk) ⇒ Object
Unsubscribe from the specified queues on usable broker clients Silently ignore unknown queues
Parameters
- queue_names(Array)
-
Names of queues previously subscribed to
- timeout(Integer)
-
Number of seconds to wait for all confirmations, defaults to no timeout
Block
Optional block with no parameters to be called after all queues are unsubscribed
Return
- true
-
Always return true
519 520 521 522 523 524 525 526 527 528 529 530 531 |
# File 'lib/right_amqp/ha_client/ha_broker_client.rb', line 519 def unsubscribe(queue_names, timeout = nil, &blk) count = each(:usable).inject(0) do |c, b| c + b.queues.inject(0) { |c, q| c + (queue_names.include?(q.name) ? 1 : 0) } end if count == 0 blk.call if blk else handler = CountedDeferrable.new(count, timeout) handler.callback { blk.call if blk } each(:usable) { |b| b.unsubscribe(queue_names) { handler.completed_one } } end true end |
#unusable ⇒ Object
Get serialized identity of unusable brokers
Return
- (Array)
-
Serialized identity of unusable brokers
369 370 371 |
# File 'lib/right_amqp/ha_client/ha_broker_client.rb', line 369 def unusable all - usable end |
#usable ⇒ Object
Get serialized identity of brokers that are usable, i.e., connecting or confirmed connected
Return
- (Array)
-
Serialized identity of usable brokers
361 362 363 |
# File 'lib/right_amqp/ha_client/ha_broker_client.rb', line 361 def usable each(:usable).map { |b| b.identity } end |