Class: RightAMQP::HABrokerClient
- Inherits:
-
Object
- Object
- RightAMQP::HABrokerClient
- Includes:
- RightSupport::Log::Mixin
- Defined in:
- lib/right_amqp/ha_client/ha_broker_client.rb
Overview
Client for multiple AMQP brokers used together to achieve a high availability messaging routing service
Defined Under Namespace
Classes: Context, CountedDeferrable, NoBrokerHosts, NoConnectedBrokers, NoUserData, Published
Constant Summary collapse
- RECONNECT_INTERVAL =
Default number of seconds between reconnect attempts
60
Instance Attribute Summary collapse
-
#brokers ⇒ Object
(Array(Broker)) Priority ordered list of AMQP broker clients (exposed only for unit test purposes).
Class Method Summary collapse
-
.addresses(host, port) ⇒ Object
Parse host and port information to form list of broker address information.
-
.identities(host, port = nil) ⇒ Object
Parse host and port information to form list of broker identities.
-
.identity(host, port = nil) ⇒ Object
Construct a broker serialized identity from its host and port of the form rs-broker-host-port, with any ‘-’s in host replaced by ‘~’.
-
.parse_user_data(user_data) ⇒ Object
Parse agent user data to extract broker host and port configuration An agent is permitted to only support using one broker.
Instance Method Summary collapse
-
#alias_(identity) ⇒ Object
Convert broker serialized identity to its alias.
-
#aliases(identities) ⇒ Object
Convert broker identities to aliases.
-
#all ⇒ Object
Get serialized identity of all brokers.
-
#close(&blk) ⇒ Object
Close all broker client connections.
-
#close_one(identity, propagate = true, &blk) ⇒ Object
Close an individual broker client connection.
-
#connect(host, port, index, priority = nil, force = false, &blk) ⇒ Object
Make new connection to broker at specified address unless already connected or currently connecting.
-
#connected ⇒ Object
Get serialized identity of connected brokers.
-
#connected?(identity) ⇒ Boolean
Check whether connected to broker.
-
#connection_status(options = {}, &callback) ⇒ Object
Register callback to be activated when there is a change in connection status Can be called more than once without affecting previous callbacks.
-
#declare(type, name, options = {}) ⇒ Object
Declare queue or exchange object but do not subscribe to it.
-
#declare_unusable(identities) ⇒ Object
Declare a broker client as unusable.
-
#delete(name, options = {}) ⇒ Object
Delete queue in all usable brokers or all selected brokers that are usable.
-
#delete_amqp_resources(name, options = {}) ⇒ Object
Delete queue resources from AMQP in all usable brokers.
-
#failed ⇒ Object
Get serialized identity of failed broker clients, i.e., ones that were never successfully connected, not ones that are just disconnected.
-
#get(id) ⇒ Object
Get broker serialized identity if client exists.
-
#heartbeat=(heartbeat) ⇒ Object
Change connection heartbeat frequency to be used for any new connections.
-
#hosts ⇒ Object
Form string of hosts and associated indices.
-
#identity_parts(id) ⇒ Object
Break broker serialized identity down into individual parts if exists.
-
#initialize(serializer, options = {}) ⇒ HABrokerClient
constructor
Create connections to all configured AMQP brokers The constructed broker client list is in priority order.
-
#non_delivery(&blk) ⇒ Object
Provide callback to be activated when a message cannot be delivered.
-
#ports ⇒ Object
Form string of ports and associated indices.
-
#publish(exchange, packet, options = {}) ⇒ Object
Publish message to AMQP exchange of first connected broker.
-
#remove(host, port, &blk) ⇒ Object
Remove a broker client from the configuration Invoke connection status callbacks only if connection is not already disabled There is no check whether this is the last usable broker client.
-
#reset_stats ⇒ Object
Reset broker client statistics Do not reset disconnect and failure stats because they might then be inconsistent with underlying connection status.
-
#stats(reset = false) ⇒ Object
Get broker client statistics.
-
#status ⇒ Object
Get status summary.
-
#subscribe(queue, exchange = nil, options = {}, &blk) ⇒ Object
Subscribe an AMQP queue to an AMQP exchange on all broker clients that are connected or still connecting Allow connecting here because subscribing may happen before all have confirmed connected Do not wait for confirmation from broker client that subscription is complete When a message is received, acknowledge, unserialize, and log it as specified If the message is unserialized and it is not of the right type, it is dropped after logging a warning.
-
#unsubscribe(queue_names, timeout = nil, &blk) ⇒ Object
Unsubscribe from the specified queues on usable broker clients Silently ignore unknown queues.
-
#unusable ⇒ Object
Get serialized identity of unusable brokers.
-
#usable ⇒ Object
Get serialized identity of brokers that are usable, i.e., connecting or confirmed connected.
Constructor Details
#initialize(serializer, options = {}) ⇒ HABrokerClient
Create connections to all configured AMQP brokers The constructed broker client list is in priority order
Parameters
- serializer(Serializer)
-
Serializer used for marshaling packets being published or
unmarshaling received to packets (responds to :dump and :load); if nil, has
same effect as setting subscribe option :no_serialize and publish option :no_unserialize
- options(Hash)
-
Configuration options
- :user(String)
-
User name
- :pass(String)
-
Password
- :vhost(String)
-
Virtual host path name
- :ssl(Boolean)
-
Whether SSL is enabled
- :insist(Boolean)
-
Whether to suppress redirection of connection
- :reconnect_interval(Integer)
-
Number of seconds between reconnect attempts, defaults to RECONNECT_INTERVAL
- :heartbeat(Integer)
-
Number of seconds between AMQP connection heartbeats used to keep
connection alive (e.g., when AMQP broker is behind a firewall), nil or 0 means disable
:host{String):: Comma-separated list of AMQP broker host names; if only one, it is reapplied
to successive ports; if none, defaults to localhost; each host may be followed by ':'
and a short string to be used as a broker index; the index defaults to the list index,
e.g., "host_a:0, host_c:2"
:port(String|Integer):: Comma-separated list of AMQP broker port numbers corresponding to :host list;
if only one, it is incremented and applied to successive hosts; if none, defaults to AMQP::PORT
:prefetch(Integer):: Maximum number of the AMQP broker is to prefetch for the agent
before it receives an ack. Value 1 ensures that only last unacknowledged gets redelivered
if the agent crashes. Value 0 means unlimited prefetch.
:order(Symbol):: Broker selection order when publishing a message: :random or :priority,
defaults to :priority, value can be overridden on publish call
:exception_callback(Proc):: Callback activated on exception events with parameters
exception(Exception):: Exception
(Packet):: Message being processed
client(HABrokerClient):: Reference to this client
:exception_on_receive_callback(Proc):: Callback activated on a receive exception with parameters
(String):: Message content that caused an exception
exception(Exception):: Exception that was raised
Raise
- ArgumentError
-
If :host and :port are not matched lists or if serializer does not respond
to :dump and :load
136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 |
# File 'lib/right_amqp/ha_client/ha_broker_client.rb', line 136 def initialize(serializer, = {}) = .dup [:update_status_callback] = lambda { |b, c| update_status(b, c) } [:return_message_callback] = lambda { |i, t, r, m| handle_return(i, t, r, m) } [:reconnect_interval] ||= RECONNECT_INTERVAL @connection_status = {} unless serializer.nil? || [:dump, :load].all? { |m| serializer.respond_to?(m) } raise ArgumentError, "serializer must be a class/object that responds to :dump and :load" end @serializer = serializer @published = Published.new reset_stats @select = [:order] || :priority @brokers = connect_all @closed = false @brokers_hash = {} @brokers.each { |b| @brokers_hash[b.identity] = b } end |
Instance Attribute Details
#brokers ⇒ Object
(Array(Broker)) Priority ordered list of AMQP broker clients (exposed only for unit test purposes)
96 97 98 |
# File 'lib/right_amqp/ha_client/ha_broker_client.rb', line 96 def brokers @brokers end |
Class Method Details
.addresses(host, port) ⇒ Object
Parse host and port information to form list of broker address information
Parameters
- host{String)
-
Comma-separated list of broker host names; if only one, it is reapplied
to successive ports; if none, defaults to localhost; each host may be followed by ':'
and a short string to be used as a broker index; the index defaults to the list index,
e.g., "host_a:0, host_c:2"
- port(String|Integer)
-
Comma-separated list of broker port numbers corresponding to :host list;
if only one, it is incremented and applied to successive hosts; if none, defaults to AMQP::PORT
Returns
- (Array(Hash))
-
List of broker addresses with keys :host, :port, :index
Raise
- ArgumentError
-
If host and port are not matched lists
208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 |
# File 'lib/right_amqp/ha_client/ha_broker_client.rb', line 208 def self.addresses(host, port) hosts = if host && !host.empty? then host.split(/,\s*/) else [ "localhost" ] end ports = if port && port.size > 0 then port.to_s.split(/,\s*/) else [ ::AMQP::PORT ] end if hosts.size != ports.size && hosts.size != 1 && ports.size != 1 raise ArgumentError.new("Unmatched AMQP host/port lists -- hosts: #{host.inspect} ports: #{port.inspect}") end i = -1 if hosts.size > 1 hosts.map do |host| i += 1 h = host.split(/:\s*/) port = if ports[i] then ports[i].to_i else ports[0].to_i end port = port.to_s.split(/:\s*/)[0] {:host => h[0], :port => port.to_i, :index => (h[1] || i.to_s).to_i} end else ports.map do |port| i += 1 p = port.to_s.split(/:\s*/) host = if hosts[i] then hosts[i] else hosts[0] end host = host.split(/:\s*/)[0] {:host => host, :port => p[0].to_i, :index => (p[1] || i.to_s).to_i} end end end |
.identities(host, port = nil) ⇒ Object
Parse host and port information to form list of broker identities
Parameters
- host{String)
-
Comma-separated list of broker host names; if only one, it is reapplied
to successive ports; if none, defaults to localhost; each host may be followed by ':'
and a short string to be used as a broker index; the index defaults to the list index,
e.g., "host_a:0, host_c:2"
- port(String|Integer)
-
Comma-separated list of broker port numbers corresponding to :host list;
if only one, it is incremented and applied to successive hosts; if none, defaults to AMQP::PORT
Returns
- (Array)
-
Identity of each broker
Raise
- ArgumentError
-
If host and port are not matched lists
249 250 251 |
# File 'lib/right_amqp/ha_client/ha_broker_client.rb', line 249 def self.identities(host, port = nil) addresses(host, port).map { |a| identity(a[:host], a[:port]) } end |
.identity(host, port = nil) ⇒ Object
Construct a broker serialized identity from its host and port of the form rs-broker-host-port, with any ‘-’s in host replaced by ‘~’
Parameters
- host{String)
-
IP host name or address for individual broker
- port(Integer)
-
TCP port number for individual broker, defaults to ::AMQP::PORT
Returns
- (String)
-
Broker serialized identity
262 263 264 265 |
# File 'lib/right_amqp/ha_client/ha_broker_client.rb', line 262 def self.identity(host, port = nil) port ||= ::AMQP::PORT "rs-broker-#{host.gsub('-', '~')}-#{port.to_i}" end |
.parse_user_data(user_data) ⇒ Object
Parse agent user data to extract broker host and port configuration An agent is permitted to only support using one broker
Parameters
- user_data(String)
-
Agent user data in <name>=<value>&<name>=<value>&… form
with required name RS_rn_url and optional names RS_rn_host and RS_rn_port
Return
- (Array)
-
Broker hosts and ports as comma-separated list in priority order in the form
<hostname>:<index>,<hostname>:<index>,...
<port>:<index>,<port>:<index>,... or nil if none specified
Raise
- NoUserData
-
If the user data is missing
- NoBrokerHosts
-
If no brokers could be extracted from the user data
170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 |
# File 'lib/right_amqp/ha_client/ha_broker_client.rb', line 170 def self.parse_user_data(user_data) raise NoUserData.new("User data is missing") if user_data.nil? || user_data.empty? hosts = "" ports = nil user_data.split("&").each do |data| name, value = data.split("=") if name == "RS_rn_url" h = value.split("@").last.split("/").first # Translate host name used by very old agents using only one broker h = "broker1-1.rightscale.com" if h == "broker.rightscale.com" hosts = h + hosts end if name == "RS_rn_host" hosts << value end if name == "RS_rn_port" ports = value end end raise NoBrokerHosts.new("No brokers found in user data") if hosts.empty? [hosts, ports] end |
Instance Method Details
#alias_(identity) ⇒ Object
Convert broker serialized identity to its alias
Parameters
- identity(String)
-
Broker serialized identity
Return
- (String|nil)
-
Broker alias, or nil if not a known broker
299 300 301 |
# File 'lib/right_amqp/ha_client/ha_broker_client.rb', line 299 def alias_(identity) @brokers_hash[identity].alias rescue nil end |
#aliases(identities) ⇒ Object
Convert broker identities to aliases
Parameters
- identities(Array)
-
Broker identities
Return
- (Array)
-
Broker aliases
288 289 290 |
# File 'lib/right_amqp/ha_client/ha_broker_client.rb', line 288 def aliases(identities) identities.map { |i| alias_(i) } end |
#all ⇒ Object
Get serialized identity of all brokers
Return
- (Array)
-
Serialized identity of all brokers
370 371 372 |
# File 'lib/right_amqp/ha_client/ha_broker_client.rb', line 370 def all @brokers.map { |b| b.identity } end |
#close(&blk) ⇒ Object
Close all broker client connections
Block
Optional block with no parameters to be called after all connections are closed
Return
- true
-
Always return true
710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 |
# File 'lib/right_amqp/ha_client/ha_broker_client.rb', line 710 def close(&blk) if @closed blk.call if blk else @closed = true @connection_status = {} handler = CountedDeferrable.new(@brokers.size) handler.callback { blk.call if blk } @brokers.each do |b| begin b.close(propagate = false) { handler.completed_one } rescue Exception => e handler.completed_one logger.exception("Failed to close broker #{b.alias}", e, :trace) @exception_stats.track("close", e) end end end true end |
#close_one(identity, propagate = true, &blk) ⇒ Object
Close an individual broker client connection
Parameters
- identity(String)
-
Broker serialized identity
- propagate(Boolean)
-
Whether to propagate connection status updates
Block
Optional block with no parameters to be called after connection closed
Return
- true
-
Always return true
Raise
- Exception
-
If broker unknown
745 746 747 748 749 750 |
# File 'lib/right_amqp/ha_client/ha_broker_client.rb', line 745 def close_one(identity, propagate = true, &blk) broker = @brokers_hash[identity] raise Exception, "Cannot close unknown broker #{identity}" unless broker broker.close(propagate, &blk) true end |
#connect(host, port, index, priority = nil, force = false, &blk) ⇒ Object
Make new connection to broker at specified address unless already connected or currently connecting
Parameters
- host{String)
-
IP host name or address for individual broker
- port(Integer)
-
TCP port number for individual broker
- index(Integer)
-
Unique index for broker within set for use in forming alias
- priority(Integer|nil)
-
Priority position of this broker in set for use by this agent
with nil or a value that would leave a gap in the list meaning add to end of list
- force(Boolean)
-
Reconnect even if already connected
Block
Optional block with following parameters to be called after initiating the connection unless already connected to this broker:
identity(String):: Broker serialized identity
Return
- (Boolean)
-
true if connected, false if no connect attempt made
Raise
- Exception
-
If host and port do not match an existing broker but index does
416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 |
# File 'lib/right_amqp/ha_client/ha_broker_client.rb', line 416 def connect(host, port, index, priority = nil, force = false, &blk) identity = self.class.identity(host, port) existing = @brokers_hash[identity] if existing && existing.usable? && !force logger.info("Ignored request to reconnect #{identity} because already #{existing.status.to_s}") false else old_identity = identity @brokers.each do |b| if index == b.index # Changing host and/or port of existing broker client old_identity = b.identity break end end unless existing address = {:host => host, :port => port, :index => index} broker = BrokerClient.new(identity, address, @serializer, @exception_stats, @non_delivery_stats, , existing) p = priority(old_identity) if priority && priority < p @brokers.insert(priority, broker) elsif priority && priority > p logger.info("Reduced priority setting for broker #{identity} from #{priority} to #{p} to avoid gap in list") @brokers.insert(p, broker) else @brokers[p].close if @brokers[p] @brokers[p] = broker end @brokers_hash[identity] = broker yield broker.identity if block_given? true end end |
#connected ⇒ Object
Get serialized identity of connected brokers
Return
- (Array)
-
Serialized identity of connected brokers
346 347 348 |
# File 'lib/right_amqp/ha_client/ha_broker_client.rb', line 346 def connected @brokers.inject([]) { |c, b| if b.connected? then c << b.identity else c end } end |
#connected?(identity) ⇒ Boolean
Check whether connected to broker
Parameters
- identity{String)
-
Broker serialized identity
Return
- (Boolean)
-
true if connected to broker, otherwise false, or nil if broker unknown
338 339 340 |
# File 'lib/right_amqp/ha_client/ha_broker_client.rb', line 338 def connected?(identity) @brokers_hash[identity].connected? rescue nil end |
#connection_status(options = {}, &callback) ⇒ Object
Register callback to be activated when there is a change in connection status Can be called more than once without affecting previous callbacks
Parameters
- options(Hash)
-
Connection status monitoring options
- :one_off(Integer)
-
Seconds to wait for status change; only send update once;
if timeout, report :timeout as the status
:boundary(Symbol):: :any if only report change on any (0/1) boundary,
:all if only report change on all (n-1/n) boundary, defaults to :any
:brokers(Array):: Only report a status change for these identified brokers
Block
Required block activated when connected count crosses a status boundary with following parameters
status(Symbol):: Status of connection: :connected, :disconnected, or :failed, with
:failed indicating that all selected brokers or all brokers have failed
Return
- id(String)
-
Identifier associated with connection status request
770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 |
# File 'lib/right_amqp/ha_client/ha_broker_client.rb', line 770 def connection_status( = {}, &callback) id = generate_id @connection_status[id] = {:boundary => [:boundary], :brokers => [:brokers], :callback => callback} if timeout = [:one_off] @connection_status[id][:timer] = EM::Timer.new(timeout) do if @connection_status[id] if @connection_status[id][:callback].arity == 2 @connection_status[id][:callback].call(:timeout, nil) else @connection_status[id][:callback].call(:timeout) end @connection_status.delete(id) end end end id end |
#declare(type, name, options = {}) ⇒ Object
Declare queue or exchange object but do not subscribe to it
Parameters
- type(Symbol)
-
Type of object: :queue, :direct, :fanout or :topic
- name(String)
-
Name of object
- options(Hash)
-
Standard AMQP declare options plus
- :brokers(Array)
-
Identity of brokers for which to declare, defaults to all usable if nil or empty
Return
- identities(Array)
-
Identity of brokers where successfully declared
536 537 538 539 540 541 542 543 |
# File 'lib/right_amqp/ha_client/ha_broker_client.rb', line 536 def declare(type, name, = {}) identities = [] brokers = .delete(:brokers) each_usable(brokers) { |b| identities << b.identity if b.declare(type, name, ) } logger.info("Could not declare #{type.to_s} #{name.inspect} on brokers #{each_usable(brokers).inspect} " + "when selected #{brokers.inspect} from usable #{usable.inspect}") if identities.empty? identities end |
#declare_unusable(identities) ⇒ Object
Declare a broker client as unusable
Parameters
- identities(Array)
-
Identity of brokers
Return
- true
-
Always return true
Raises
- Exception
-
If identified broker is unknown
695 696 697 698 699 700 701 |
# File 'lib/right_amqp/ha_client/ha_broker_client.rb', line 695 def declare_unusable(identities) identities.each do |id| broker = @brokers_hash[id] raise Exception, "Cannot mark unknown broker #{id} unusable" unless broker broker.close(propagate = true, normal = false, log = false) end end |
#delete(name, options = {}) ⇒ Object
Delete queue in all usable brokers or all selected brokers that are usable
Parameters
- name(String)
-
Queue name
- options(Hash)
-
Queue declare options plus
- :brokers(Array)
-
Identity of brokers in which queue is to be deleted
Return
- identities(Array)
-
Identity of brokers where queue was deleted
631 632 633 634 635 636 637 |
# File 'lib/right_amqp/ha_client/ha_broker_client.rb', line 631 def delete(name, = {}) identities = [] u = usable brokers = .delete(:brokers) ((brokers || u) & u).each { |i| identities << i if (b = @brokers_hash[i]) && b.delete(name, ) } identities end |
#delete_amqp_resources(name, options = {}) ⇒ Object
Delete queue resources from AMQP in all usable brokers
Parameters
- name(String)
-
Queue name
- options(Hash)
-
Queue declare options plus
- :brokers(Array)
-
Identity of brokers in which queue is to be deleted
Return
- identities(Array)
-
Identity of brokers where queue was deleted
648 649 650 651 652 653 |
# File 'lib/right_amqp/ha_client/ha_broker_client.rb', line 648 def delete_amqp_resources(name, = {}) identities = [] u = usable (([:brokers] || u) & u).each { |i| identities << i if (b = @brokers_hash[i]) && b.delete_amqp_resources(:queue, name) } identities end |
#failed ⇒ Object
Get serialized identity of failed broker clients, i.e., ones that were never successfully connected, not ones that are just disconnected
Return
- (Array)
-
Serialized identity of failed broker clients
379 380 381 |
# File 'lib/right_amqp/ha_client/ha_broker_client.rb', line 379 def failed @brokers.inject([]) { |c, b| b.failed? ? c << b.identity : c } end |
#get(id) ⇒ Object
Get broker serialized identity if client exists
Parameters
- id(Integer|String)
-
Broker alias or serialized identity
Return
- (String|nil)
-
Broker serialized identity if client found, otherwise nil
326 327 328 329 |
# File 'lib/right_amqp/ha_client/ha_broker_client.rb', line 326 def get(id) @brokers.each { |b| return b.identity if b.identity == id || b.alias == id } nil end |
#heartbeat=(heartbeat) ⇒ Object
Change connection heartbeat frequency to be used for any new connections
Parameters
- heartbeat(Integer)
-
Number of seconds between AMQP connection heartbeats used to keep
connection alive (e.g., when AMQP broker is behind a firewall), nil or 0 means disable
Return
- (Integer|nil)
-
New heartbeat setting
391 392 393 |
# File 'lib/right_amqp/ha_client/ha_broker_client.rb', line 391 def heartbeat=(heartbeat) [:heartbeat] = heartbeat end |
#hosts ⇒ Object
Form string of hosts and associated indices
Return
- (String)
-
Comma separated list of host:index
307 308 309 |
# File 'lib/right_amqp/ha_client/ha_broker_client.rb', line 307 def hosts @brokers.map { |b| "#{b.host}:#{b.index}" }.join(",") end |
#identity_parts(id) ⇒ Object
Break broker serialized identity down into individual parts if exists
Parameters
- id(Integer|String)
-
Broker alias or serialized identity
Return
- (Array)
-
Host, port, index, and priority, or all nil if broker not found
274 275 276 277 278 279 |
# File 'lib/right_amqp/ha_client/ha_broker_client.rb', line 274 def identity_parts(id) @brokers.each do |b| return [b.host, b.port, b.index, priority(b.identity)] if b.identity == id || b.alias == id end [nil, nil, nil, nil] end |
#non_delivery(&blk) ⇒ Object
Provide callback to be activated when a message cannot be delivered
Block
Required block with parameters
reason(String):: Non-delivery reason
"NO_ROUTE" - queue does not exist
"NO_CONSUMERS" - queue exists but it has no consumers, or if :immediate was specified,
all consumers are not immediately ready to consume
"ACCESS_REFUSED" - queue not usable because broker is in the process of stopping service
type(String|nil):: Request type, or nil if not applicable
token(String|nil):: Generated identifier, or nil if not applicable
from(String|nil):: Identity of original sender of , or nil if not applicable
to(String):: Queue to which was published
Return
- true
-
Always return true
617 618 619 620 |
# File 'lib/right_amqp/ha_client/ha_broker_client.rb', line 617 def non_delivery(&blk) @non_delivery = blk true end |
#ports ⇒ Object
Form string of ports and associated indices
Return
- (String)
-
Comma separated list of port:index
315 316 317 |
# File 'lib/right_amqp/ha_client/ha_broker_client.rb', line 315 def ports @brokers.map { |b| "#{b.port}:#{b.index}" }.join(",") end |
#publish(exchange, packet, options = {}) ⇒ Object
Publish message to AMQP exchange of first connected broker
Parameters
- exchange(Hash)
-
AMQP exchange to subscribe to with keys :type, :name, and :options,
which are the standard AMQP ones plus
:no_declare(Boolean):: Whether to skip declaring this exchange or queue on the broker
to cause its creation; for use when client does not have create or
knows the object already exists and wants to avoid declare overhead
:declare(Boolean):: Whether to delete this exchange or queue from the AMQP cache
to force it to be declared on the broker and thus be created if it does not exist
- packet(Packet)
-
Message to serialize and publish
- options(Hash)
-
Publish options – standard AMQP ones plus
- :mandatory(Boolean)
-
Return message if the exchange does not have any associated queues
or if all the associated queues do not have any consumers
:immediate(Boolean):: Return for the same reasons as :mandatory plus if all
of the queues associated with the exchange are not immediately ready to consume the
:fanout(Boolean):: true means publish to all connected brokers
:brokers(Array):: Identity of brokers selected for use, defaults to all home brokers
if nil or empty
:order(Symbol):: Broker selection order: :random or :priority,
defaults to @select if :brokers is nil, otherwise defaults to :priority
:no_serialize(Boolean):: Do not serialize packet because it is already serialized,
this is an escape for special situations like enrollment, also implicitly disables
publish logging; this option is implicitly invoked if initialize without a serializer
:log_filter(Array(Symbol)):: Filters to be applied in to_s when logging packet to :info
:log_data(String):: Additional data to display at end of log entry
:no_log(Boolean):: Disable publish logging unless debug level
Return
- identities(Array)
-
Identity of brokers where packet was successfully published
Raise
- NoConnectedBrokers
-
If cannot find a connected broker
578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 |
# File 'lib/right_amqp/ha_client/ha_broker_client.rb', line 578 def publish(exchange, packet, = {}) identities = [] no_serialize = [:no_serialize] || @serializer.nil? = if no_serialize then packet else @serializer.dump(packet) end brokers = use() brokers.each do |b| if b.publish(exchange, packet, , .merge(:no_serialize => no_serialize)) identities << b.identity if [:mandatory] && !no_serialize context = Context.new(packet, , brokers.map { |b| b.identity }) @published.store(, context) end break unless [:fanout] end end if identities.empty? selected = "selected " if [:brokers] list = aliases(brokers.map { |b| b.identity }).join(", ") raise NoConnectedBrokers, "None of #{selected}brokers [#{list}] are usable for publishing" end identities end |
#remove(host, port, &blk) ⇒ Object
Remove a broker client from the configuration Invoke connection status callbacks only if connection is not already disabled There is no check whether this is the last usable broker client
Parameters
- host{String)
-
IP host name or address for individual broker
- port(Integer)
-
TCP port number for individual broker
Block
Optional block with following parameters to be called after removing the connection unless broker is not configured
identity(String):: Broker serialized identity
Return
- identity(String|nil)
-
Serialized identity of broker removed, or nil if unknown
670 671 672 673 674 675 676 677 678 679 680 681 682 683 |
# File 'lib/right_amqp/ha_client/ha_broker_client.rb', line 670 def remove(host, port, &blk) identity = self.class.identity(host, port) if broker = @brokers_hash[identity] logger.info("Removing #{identity}, alias #{broker.alias} from broker list") broker.close(propagate = true, normal = true, log = false) @brokers_hash.delete(identity) @brokers.reject! { |b| b.identity == identity } yield identity if block_given? else logger.info("Ignored request to remove #{identity} from broker list because unknown") identity = nil end identity end |
#reset_stats ⇒ Object
Reset broker client statistics Do not reset disconnect and failure stats because they might then be inconsistent with underlying connection status
Return
- true
-
Always return true
836 837 838 839 840 841 |
# File 'lib/right_amqp/ha_client/ha_broker_client.rb', line 836 def reset_stats @return_stats = RightSupport::Stats::Activity.new @non_delivery_stats = RightSupport::Stats::Activity.new @exception_stats = RightSupport::Stats::Exceptions.new(self, [:exception_callback]) true end |
#stats(reset = false) ⇒ Object
Get broker client statistics
Parameters:
- reset(Boolean)
-
Whether to reset the statistics after getting the current ones
Return
- stats(Hash)
-
Broker client stats with keys
- “brokers”(Array)
-
Stats for each broker client in priority order
- “exceptions”(Hash|nil)
-
Exceptions raised per category, or nil if none
- “total”(Integer)
-
Total exceptions for this category
- “recent”(Array)
-
Most recent as a hash of “count”, “type”, “message”, “when”, and “where”
- “heartbeat”(Integer|nil)
-
Number of seconds between AMQP heartbeats, or nil if heartbeat disabled
- “non_deliveries”(Hash|nil)
-
Non-delivery activity stats with keys “total”, “percent”, “last”, and “rate”
with percentage breakdown per non-delivery reason, or nil if none
"returns"(Hash|nil):: Message return activity stats with keys "total", "percent", "last", and "rate"
with percentage breakdown per return reason, or nil if none
818 819 820 821 822 823 824 825 826 827 828 |
# File 'lib/right_amqp/ha_client/ha_broker_client.rb', line 818 def stats(reset = false) stats = { "brokers" => @brokers.map { |b| b.stats }, "exceptions" => @exception_stats.stats, "heartbeat" => [:heartbeat], "non-deliveries" => @non_delivery_stats.all, "returns" => @return_stats.all } reset_stats if reset stats end |
#status ⇒ Object
Get status summary
Return
- (Array(Hash))
-
Status of each configured broker with keys
- :identity(String)
-
Broker serialized identity
- :alias(String)
-
Broker alias used in logs
- :status(Symbol)
-
Status of connection
- :disconnects(Integer)
-
Number of times lost connection
- :failures(Integer)
-
Number of times connect failed
- :retries(Integer)
-
Number of attempts to connect after failure
798 799 800 |
# File 'lib/right_amqp/ha_client/ha_broker_client.rb', line 798 def status @brokers.map { |b| b.summary } end |
#subscribe(queue, exchange = nil, options = {}, &blk) ⇒ Object
Subscribe an AMQP queue to an AMQP exchange on all broker clients that are connected or still connecting Allow connecting here because subscribing may happen before all have confirmed connected Do not wait for confirmation from broker client that subscription is complete When a message is received, acknowledge, unserialize, and log it as specified If the message is unserialized and it is not of the right type, it is dropped after logging a warning
Parameters
- queue(Hash)
-
AMQP queue being subscribed with keys :name and :options,
which are the standard AMQP ones plus
:no_declare(Boolean):: Whether to skip declaring this queue on the broker
to cause its creation; for use when client does not have to create or
knows the queue already exists and wants to avoid declare overhead
- exchange(Hash|nil)
-
AMQP exchange to subscribe to with keys :type, :name, and :options,
nil means use empty exchange by directly subscribing to queue; the :options are the
standard AMQP ones plus
:no_declare(Boolean):: Whether to skip declaring this exchange on the broker
to cause its creation; for use when client does not have create or
knows the exchange already exists and wants to avoid declare overhead
- options(Hash)
-
Subscribe options:
- :ack(Boolean)
-
Whether client takes responsibility for explicitly acknowledging each
received, defaults to implicit acknowledgement in AMQP as part of receipt
:no_unserialize(Boolean):: Do not unserialize , this is an escape for special
situations like enrollment, also implicitly disables receive filtering and logging;
this option is implicitly invoked if initialize without a serializer
(packet class)(Array(Symbol)):: Filters to be applied in to_s when logging packet to :info,
only packet classes specified are accepted, others are not processed but are logged with error
:category(String):: Packet category description to be used in error
:log_data(String):: Additional data to display at end of log entry
:no_log(Boolean):: Disable receive logging unless debug level
:exchange2(Hash):: Additional exchange to which same queue is to be bound
:brokers(Array):: Identity of brokers for which to subscribe, defaults to all usable if nil or empty
Block
Block with following parameters to be called each time exchange matches a message to the queue:
identity(String):: Serialized identity of broker delivering the
(Packet|String):: Message received, which is unserialized unless :no_unserialize was specified
header(AMQP::Protocol::Header):: Message header (optional block parameter)
Return
- identities(Array)
-
Identity of brokers where successfully subscribed
490 491 492 493 494 495 496 497 498 |
# File 'lib/right_amqp/ha_client/ha_broker_client.rb', line 490 def subscribe(queue, exchange = nil, = {}, &blk) identities = [] brokers = .delete(:brokers) each_usable(brokers) { |b| identities << b.identity if b.subscribe(queue, exchange, , &blk) } logger.info("Could not subscribe to queue #{queue.inspect} on exchange #{exchange.inspect} " + "on brokers #{each_usable(brokers).inspect} when selected #{brokers.inspect} " + "from usable #{usable.inspect}") if identities.empty? identities end |
#unsubscribe(queue_names, timeout = nil, &blk) ⇒ Object
Unsubscribe from the specified queues on usable broker clients Silently ignore unknown queues
Parameters
- queue_names(Array)
-
Names of queues previously subscribed to
- timeout(Integer)
-
Number of seconds to wait for all confirmations, defaults to no timeout
Block
Optional block with no parameters to be called after all queues are unsubscribed
Return
- true
-
Always return true
512 513 514 515 516 517 518 519 520 521 522 523 524 |
# File 'lib/right_amqp/ha_client/ha_broker_client.rb', line 512 def unsubscribe(queue_names, timeout = nil, &blk) count = each_usable.inject(0) do |c, b| c + b.queues.inject(0) { |c, q| c + (queue_names.include?(q.name) ? 1 : 0) } end if count == 0 blk.call if blk else handler = CountedDeferrable.new(count, timeout) handler.callback { blk.call if blk } each_usable { |b| b.unsubscribe(queue_names) { handler.completed_one } } end true end |
#unusable ⇒ Object
Get serialized identity of unusable brokers
Return
- (Array)
-
Serialized identity of unusable brokers
362 363 364 |
# File 'lib/right_amqp/ha_client/ha_broker_client.rb', line 362 def unusable all - usable end |
#usable ⇒ Object
Get serialized identity of brokers that are usable, i.e., connecting or confirmed connected
Return
- (Array)
-
Serialized identity of usable brokers
354 355 356 |
# File 'lib/right_amqp/ha_client/ha_broker_client.rb', line 354 def usable each_usable.map { |b| b.identity } end |