Class: RightAMQP::BrokerClient
- Inherits:
-
Object
- Object
- RightAMQP::BrokerClient
- Includes:
- RightSupport::Log::Mixin
- Defined in:
- lib/right_amqp/ha_client/broker_client.rb
Overview
Client for accessing AMQP broker
Constant Summary collapse
- STATUS =
Set of possible broker connection status values
[ :connecting, # Initiated AMQP connection but not yet confirmed that connected :connected, # Confirmed AMQP connection :stopping, # Broker is stopping service and, although still connected, is no longer usable :disconnected, # Notified by AMQP that connection has been lost and attempting to reconnect :closed, # AMQP connection closed explicitly or because of too many failed connect attempts :failed # Failed to connect due to internal failure or AMQP failure to connect ]
Instance Attribute Summary collapse
-
#alias ⇒ Object
readonly
(String) Broker alias, used in logs.
-
#channel ⇒ Object
readonly
(AMQP::Channel) Channel of AMQP connection used by this client.
-
#disconnect_stats ⇒ Object
readonly
(RightSupport::Stats::Activity) AMQP lost connection statistics.
-
#failure_stats ⇒ Object
readonly
(RightSupport::Stats::Activity) AMQP connection failure statistics.
-
#host ⇒ Object
readonly
(String) Host name.
-
#identity ⇒ Object
readonly
(String) Broker identity.
-
#index ⇒ Object
readonly
(Integer) Unique index for broker within given set, used in alias.
-
#last_failed ⇒ Object
readonly
(Boolean) Whether last connect attempt failed.
-
#port ⇒ Object
readonly
(Integer) Port number.
-
#queues ⇒ Object
readonly
(Array) List of MQ::Queue queues currently subscribed.
-
#retries ⇒ Object
readonly
(Integer) Number of attempts to connect after failure.
-
#status ⇒ Object
readonly
(Symbol) AMQP connection STATUS value.
Instance Method Summary collapse
-
#close(propagate = true, normal = true, log = true, &block) ⇒ Object
Close broker connection.
-
#connected? ⇒ Boolean
Determine whether this client is currently connected to the broker.
-
#declare(type, name, options = {}) ⇒ Object
Declare queue or exchange object but do not subscribe to it.
-
#delete(name, options = {}) ⇒ Object
Delete queue.
-
#delete_amqp_resources(type, name) ⇒ Object
Delete resources from local AMQP cache.
-
#failed?(backoff = false) ⇒ Boolean
Determine whether the broker connection has failed.
-
#initialize(identity, address, serializer, exception_stats, non_delivery_stats, options, existing = nil) ⇒ BrokerClient
constructor
Create broker client.
-
#publish(exchange, packet, message, options = {}) ⇒ Object
Publish message to AMQP exchange.
-
#queue_status(queue_names, &block) ⇒ Object
Check status of specified queues Silently ignore unknown queues If a queue whose status is being checked does not exist in the broker, this broker connection will fail and become unusable.
-
#set_alias(index) ⇒ Object
Set alias for broker for use in logs.
-
#stats ⇒ Object
Get broker client statistics.
-
#subscribe(queue, exchange = nil, options = {}, &block) ⇒ Object
Subscribe an AMQP queue to an AMQP exchange Do not wait for confirmation from broker that subscription is complete When a message is received, acknowledge, unserialize, and log it as specified If the message is unserialized and it is not of the right type, it is dropped after logging an error.
-
#summary ⇒ Object
Get broker client information summarizing its status.
-
#unsubscribe(queue_names, &block) ⇒ Object
Unsubscribe from the specified queues Silently ignore unknown queues.
-
#update_status(status) ⇒ Object
Callback from AMQP with connection status or from HABrokerClient Makes client callback with :connected or :disconnected status if boundary crossed.
-
#usable? ⇒ Boolean
Determine whether the broker connection is usable, i.e., connecting or confirmed connected.
Constructor Details
#initialize(identity, address, serializer, exception_stats, non_delivery_stats, options, existing = nil) ⇒ BrokerClient
Create broker client
Parameters
- identity(String)
-
Broker identity
- address(Hash)
-
Broker address
- :host(String
-
IP host name or address
- :port(Integer)
-
TCP port number for individual broker
- :index(String)
-
Unique index for broker within set of brokers for use in forming alias
- serializer(Serializer)
-
Serializer used for unmarshaling received messages to packets
(responds to :load); if nil, has same effect as setting subscribe option :no_unserialize
- exception_stats(RightSupport::Stats::Exceptions)
-
Exception statistics container to be updated
whenever there is an unexpected exception
- non_delivery_stats(RightSupport::Stats::Activity)
-
Non-delivery statistics container to be
updated whenever a cannot be sent or received
- options(Hash)
-
Configuration options
- :user(String)
-
User name
- :pass(String)
-
Password
- :vhost(String)
-
Virtual host path name
- :ssl(Boolean)
-
Whether SSL is enabled
- :insist(Boolean)
-
Whether to suppress redirection of connection
- :reconnect_interval(Integer)
-
Number of seconds between reconnect attempts
- :heartbeat(Integer)
-
Number of seconds between AMQP connection heartbeats used to keep
connection alive, e.g., when AMQP broker is behind a firewall
:prefetch(Integer):: Maximum number of messages the AMQP broker is to prefetch for the agent
before it receives an ack. Value 1 ensures that only last unacknowledged gets redelivered
if the agent crashes. Value 0 means unlimited prefetch.
:fiber_pool(FiberPool):: Pool of initialized fibers to be used for asynchronous message
processing (can be overridden with subscribe option)
:exception_on_receive_callback(Proc):: Callback activated on a receive exception with parameters
message(Object):: Message received
exception(Exception):: Exception raised
:update_status_callback(Proc):: Callback activated on a connection status change with parameters
broker(BrokerClient):: Broker client
connected_before(Boolean):: Whether was connected prior to this status change
:return_message_callback(Proc):: Callback activated when a message is returned with parameters
to(String):: Queue to which message was published
reason(String):: Reason for return
"NO_ROUTE" - queue does not exist
"NO_CONSUMERS" - queue exists but it has no consumers, or if :immediate was specified,
all consumers are not immediately ready to consume
"ACCESS_REFUSED" - queue not usable because broker is in the process of stopping service
message(String):: Returned serialized message
- existing(BrokerClient|nil)
-
Existing broker client for this address, or nil if none
Raise
- ArgumentError
-
If serializer does not respond to :dump and :load
123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 |
# File 'lib/right_amqp/ha_client/broker_client.rb', line 123 def initialize(identity, address, serializer, exception_stats, non_delivery_stats, , existing = nil) @options = @identity = identity @host = address[:host] @port = address[:port].to_i @index = address[:index].to_i set_alias(@index) unless serializer.nil? || [:dump, :load].all? { |m| serializer.respond_to?(m) } raise ArgumentError, "serializer must be a class/object that responds to :dump and :load" end @serializer = serializer @queues = [] @last_failed = false @exception_stats = exception_stats @non_delivery_stats = non_delivery_stats @disconnect_stats = RightSupport::Stats::Activity.new(measure_rate = false) @failure_stats = RightSupport::Stats::Activity.new(measure_rate = false) @retries = 0 connect(address, @options[:reconnect_interval]) if existing @disconnect_stats = existing.disconnect_stats @failure_stats = existing.failure_stats @last_failed = existing.last_failed @retries = existing.retries update_failure if @status == :failed end end |
Instance Attribute Details
#alias ⇒ Object (readonly)
(String) Broker alias, used in logs
47 48 49 |
# File 'lib/right_amqp/ha_client/broker_client.rb', line 47 def alias @alias end |
#channel ⇒ Object (readonly)
(AMQP::Channel) Channel of AMQP connection used by this client
41 42 43 |
# File 'lib/right_amqp/ha_client/broker_client.rb', line 41 def channel @channel end |
#disconnect_stats ⇒ Object (readonly)
(RightSupport::Stats::Activity) AMQP lost connection statistics
68 69 70 |
# File 'lib/right_amqp/ha_client/broker_client.rb', line 68 def disconnect_stats @disconnect_stats end |
#failure_stats ⇒ Object (readonly)
(RightSupport::Stats::Activity) AMQP connection failure statistics
71 72 73 |
# File 'lib/right_amqp/ha_client/broker_client.rb', line 71 def failure_stats @failure_stats end |
#host ⇒ Object (readonly)
(String) Host name
50 51 52 |
# File 'lib/right_amqp/ha_client/broker_client.rb', line 50 def host @host end |
#identity ⇒ Object (readonly)
(String) Broker identity
44 45 46 |
# File 'lib/right_amqp/ha_client/broker_client.rb', line 44 def identity @identity end |
#index ⇒ Object (readonly)
(Integer) Unique index for broker within given set, used in alias
56 57 58 |
# File 'lib/right_amqp/ha_client/broker_client.rb', line 56 def index @index end |
#last_failed ⇒ Object (readonly)
(Boolean) Whether last connect attempt failed
65 66 67 |
# File 'lib/right_amqp/ha_client/broker_client.rb', line 65 def last_failed @last_failed end |
#port ⇒ Object (readonly)
(Integer) Port number
53 54 55 |
# File 'lib/right_amqp/ha_client/broker_client.rb', line 53 def port @port end |
#queues ⇒ Object (readonly)
(Array) List of MQ::Queue queues currently subscribed
62 63 64 |
# File 'lib/right_amqp/ha_client/broker_client.rb', line 62 def queues @queues end |
#retries ⇒ Object (readonly)
(Integer) Number of attempts to connect after failure
74 75 76 |
# File 'lib/right_amqp/ha_client/broker_client.rb', line 74 def retries @retries end |
#status ⇒ Object (readonly)
(Symbol) AMQP connection STATUS value
59 60 61 |
# File 'lib/right_amqp/ha_client/broker_client.rb', line 59 def status @status end |
Instance Method Details
#close(propagate = true, normal = true, log = true, &block) ⇒ Object
Close broker connection
Parameters
- propagate(Boolean)
-
Whether to propagate connection status updates, defaults to true
- normal(Boolean)
-
Whether this is a normal close vs. a failed connection, defaults to true
- log(Boolean)
-
Whether to log that closing, defaults to true
Block
Optional block with no parameters to be called after connection closed
Return
- true
-
Always return true
477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 |
# File 'lib/right_amqp/ha_client/broker_client.rb', line 477 def close(propagate = true, normal = true, log = true, &block) final_status = normal ? :closed : :failed if ![:closed, :failed].include?(@status) begin logger.info("[stop] Closed connection to broker #{@alias}") if log update_status(final_status) if propagate @connection.close do @status = final_status yield if block_given? end rescue StandardError => e logger.exception("Failed to close broker #{@alias}", e, :trace) @exception_stats.track("close", e) @status = final_status yield if block_given? end else @status = final_status yield if block_given? end true end |
#connected? ⇒ Boolean
Determine whether this client is currently connected to the broker
Return
- (Boolean)
-
true if connected, otherwise false
177 178 179 |
# File 'lib/right_amqp/ha_client/broker_client.rb', line 177 def connected? @status == :connected end |
#declare(type, name, options = {}) ⇒ Object
Declare queue or exchange object but do not subscribe to it
Parameters
- type(Symbol)
-
Type of object: :queue, :direct, :fanout or :topic
- name(String)
-
Name of object
- options(Hash)
-
Standard AMQP declare options
Return
- (Boolean)
-
true if declare successfully, otherwise false
323 324 325 326 327 328 329 330 331 332 333 334 335 |
# File 'lib/right_amqp/ha_client/broker_client.rb', line 323 def declare(type, name, = {}) return false unless usable? begin logger.info("[setup] Declaring #{name} #{type.to_s} on broker #{@alias}") delete_amqp_resources(:queue, name) @channel.__send__(type, name, ) true rescue StandardError => e logger.exception("Failed declaring #{type.to_s} #{name} on broker #{@alias}", e, :trace) @exception_stats.track("declare", e) false end end |
#delete(name, options = {}) ⇒ Object
Delete queue
Parameters
- name(String)
-
Queue name
- options(Hash)
-
Queue declare options
Return
- (Boolean)
-
true if queue was successfully deleted, otherwise false
428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 |
# File 'lib/right_amqp/ha_client/broker_client.rb', line 428 def delete(name, = {}) deleted = false if usable? begin @queues.reject! do |q| if q.name == name @channel.queue(name, .merge(:no_declare => true)).delete deleted = true end end unless deleted # Allowing declare to happen since queue may not exist and do not want NOT_FOUND # failure to cause AMQP channel to close @channel.queue(name, .merge(:no_declare => true)).delete deleted = true end rescue StandardError => e logger.exception("Failed deleting queue #{name.inspect} on broker #{@alias}", e, :trace) @exception_stats.track("delete", e) end end deleted end |
#delete_amqp_resources(type, name) ⇒ Object
Delete resources from local AMQP cache
Parameters
- type(Symbol)
-
Type of AMQP object
- name(String)
-
Name of object
Return
- true
-
Always return true
460 461 462 463 |
# File 'lib/right_amqp/ha_client/broker_client.rb', line 460 def delete_amqp_resources(type, name) @channel.__send__(type == :queue ? :queues : :exchanges).delete(name) true end |
#failed?(backoff = false) ⇒ Boolean
Determine whether the broker connection has failed
Return
- (Boolean)
-
true if failed, otherwise false
185 186 187 |
# File 'lib/right_amqp/ha_client/broker_client.rb', line 185 def failed?(backoff = false) @status == :failed end |
#publish(exchange, packet, message, options = {}) ⇒ Object
Publish message to AMQP exchange
Parameters
- exchange(Hash)
-
AMQP exchange to subscribe to with keys :type, :name, and :options,
which are the standard AMQP ones plus
:no_declare(Boolean):: Whether to skip declaring this exchange or queue on the broker
to cause its creation; for use when caller does not have create permission or
knows the object already exists and wants to avoid declare overhead
:declare(Boolean):: Whether to delete this exchange or queue from the AMQP cache
to force it to be declared on the broker and thus be created if it does not exist
- packet(Packet)
-
Message to serialize and publish (must respond to :to_s(log_filter,
protocol_version) unless :no_serialize specified; if responds to :type, :from, :token,
and/or :one_way, these value are used if this message is returned as non-deliverable)
- message(String)
-
Serialized message to be published
- options(Hash)
-
Publish options – standard AMQP ones plus
- :no_serialize(Boolean)
-
Do not serialize packet because it is already serialized
- :log_filter(Array(Symbol))
-
Filters to be applied in to_s when logging packet to :info
- :log_data(String)
-
Additional data to display at end of log entry
- :no_log(Boolean)
-
Disable publish logging unless debug level
Return
- (Boolean)
-
true if publish successfully, otherwise false
391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 |
# File 'lib/right_amqp/ha_client/broker_client.rb', line 391 def publish(exchange, packet, , = {}) return false unless connected? begin = exchange[:options] || {} unless [:no_serialize] log_data = "" unless [:no_log] && logger.level != :debug re = "RE-" if packet.respond_to?(:tries) && !packet.tries.empty? log_filter = [:log_filter] unless logger.level == :debug log_data = "#{re}SEND #{@alias} #{packet.to_s(log_filter, :send_version)}" if logger.level == :debug log_data += ", publish options #{.inspect}, exchange #{exchange[:name]}, " + "type #{exchange[:type]}, options #{exchange[:options].inspect}" end log_data += ", #{[:log_data]}" if [:log_data] logger.info(log_data) unless log_data.empty? end end delete_amqp_resources(exchange[:type], exchange[:name]) if [:declare] @channel.__send__(exchange[:type], exchange[:name], ).publish(, ) true rescue StandardError => e logger.exception("Failed publishing to exchange #{exchange.inspect} on broker #{@alias}", e, :trace) @exception_stats.track("publish", e) update_non_delivery_stats("publish failure", e) false end end |
#queue_status(queue_names, &block) ⇒ Object
Check status of specified queues Silently ignore unknown queues If a queue whose status is being checked does not exist in the broker, this broker connection will fail and become unusable
Parameters
- queue_names(Array)
-
Names of queues previously subscribed to
Block
Optional block to be called each time that status for a queue is retrieved with parameters queue name, message count, and consumer count; the counts are nil if there was a failure while trying to retrieve them; the block is not called for queues to which this client is not currently subscribed
Return
- (Boolean)
-
true if connected, otherwise false, in which case block never gets called
353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 |
# File 'lib/right_amqp/ha_client/broker_client.rb', line 353 def queue_status(queue_names, &block) return false unless connected? @queues.each do |q| if queue_names.include?(q.name) begin q.status { |, consumers| block.call(q.name, , consumers) if block } rescue StandardError => e logger.exception("Failed checking status of queue #{q.name} on broker #{@alias}", e, :trace) @exception_stats.track("queue_status", e) block.call(q.name, nil, nil) if block end end end true end |
#set_alias(index) ⇒ Object
Set alias for broker for use in logs
Parameters
- index(Integer)
-
Unique index for broker within given set
Return
- (String)
-
Broker alias
161 162 163 |
# File 'lib/right_amqp/ha_client/broker_client.rb', line 161 def set_alias(index) @alias = "b#{index}" end |
#stats ⇒ Object
Get broker client statistics
Return
- (Hash)
-
Broker client stats with keys
- “alias”(String)
-
Broker alias
- “identity”(String)
-
Broker identity
- “status”(Status)
-
Status of connection
- “disconnect last”(Hash|nil)
-
Last disconnect information with key “elapsed”, or nil if none
- “disconnects”(Integer|nil)
-
Number of times lost connection, or nil if none
- “failure last”(Hash|nil)
-
Last connect failure information with key “elapsed”, or nil if none
- “failures”(Integer|nil)
-
Number of failed attempts to connect to broker, or nil if none
- “retries”(Integer|nil)
-
Number of connect retries, or nil if none
533 534 535 536 537 538 539 540 541 542 543 544 |
# File 'lib/right_amqp/ha_client/broker_client.rb', line 533 def stats { "alias" => @alias, "identity" => @identity, "status" => @status.to_s, "disconnect last" => @disconnect_stats.last, "disconnects" => RightSupport::Stats.nil_if_zero(@disconnect_stats.total), "failure last" => @failure_stats.last, "failures" => RightSupport::Stats.nil_if_zero(@failure_stats.total), "retries" => RightSupport::Stats.nil_if_zero(@retries) } end |
#subscribe(queue, exchange = nil, options = {}, &block) ⇒ Object
Subscribe an AMQP queue to an AMQP exchange Do not wait for confirmation from broker that subscription is complete When a message is received, acknowledge, unserialize, and log it as specified If the message is unserialized and it is not of the right type, it is dropped after logging an error
Parameters
- queue(Hash)
-
AMQP queue being subscribed with keys :name and :options,
which are the standard AMQP ones plus
:no_declare(Boolean):: Whether to skip declaring this queue on the broker
to cause its creation; for use when caller does not have permission to create or
knows the queue already exists and wants to avoid declare overhead
- exchange(Hash|nil)
-
AMQP exchange to subscribe to with keys :type, :name, and :options,
nil means use empty exchange by directly subscribing to queue; the :options are the
standard AMQP ones plus
:no_declare(Boolean):: Whether to skip declaring this exchange on the broker
to cause its creation; for use when caller does not have create permission or
knows the exchange already exists and wants to avoid declare overhead
- options(Hash)
-
Subscribe options:
- :ack(Boolean)
-
Whether caller takes responsibility for explicitly acknowledging each
message received, defaults to implicit acknowledgement in AMQP as part of message receipt
:no_unserialize(Boolean):: Do not unserialize message, this is an escape for special
situations like enrollment, also implicitly disables receive filtering and logging;
this option is implicitly invoked if initialize without a serializer
(packet class)(Array(Symbol)):: Filters to be applied in to_s when logging packet to :info,
only packet classes specified are accepted, others are not processed but are logged with error
:category(String):: Packet category description to be used in error messages
:log_data(String):: Additional data to display at end of log entry
:no_log(Boolean):: Disable receive logging unless debug level
:exchange2(Hash):: Additional exchange to which same queue is to be bound
:brokers(Array):: Identity of brokers for which to subscribe, defaults to all usable if nil or empty
:fiber_pool(FiberPool):: Pool of initialized fibers to be used for asynchronous message
processing (non-nil value will override constructor option setting)
Block
Required block with following parameters to be called each time exchange matches a message to the queue
identity(String):: Serialized identity of broker delivering the message
message(Packet|String):: Message received, which is unserialized unless :no_unserialize was specified
header(AMQP::Protocol::Header):: Message header (optional block parameter)
Raise
- ArgumentError
-
If a block is not supplied
Return
- (Boolean)
-
true if subscribe successfully or if already subscribed, otherwise false
233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 |
# File 'lib/right_amqp/ha_client/broker_client.rb', line 233 def subscribe(queue, exchange = nil, = {}, &block) raise ArgumentError, "Must call this method with a block" unless block return false unless usable? return true unless @queues.select { |q| q.name == queue[:name] }.empty? to_exchange = if exchange if [:exchange2] " to exchanges #{exchange[:name]} and #{[:exchange2][:name]}" else " to exchange #{exchange[:name]}" end end = queue[:options] || {} = (exchange && exchange[:options]) || {} begin logger.info("[setup] Subscribing queue #{queue[:name]}#{to_exchange} on broker #{@alias}") q = @channel.queue(queue[:name], ) @queues << q if exchange x = @channel.__send__(exchange[:type], exchange[:name], ) binding = q.bind(x, [:key] ? {:key => [:key]} : {}) if (exchange2 = [:exchange2]) q.bind(@channel.__send__(exchange2[:type], exchange2[:name], exchange2[:options] || {})) end q = binding end q.subscribe([:ack] ? {:ack => true} : {}) do |header, | begin if (pool = ([:fiber_pool] || @options[:fiber_pool])) pool.spawn { receive(queue[:name], header, , , &block) } else receive(queue[:name], header, , , &block) end rescue StandardError => e header.ack if [:ack] logger.exception("Failed setting up to receive message from queue #{queue.inspect} " + "on broker #{@alias}", e, :trace) @exception_stats.track("receive", e) update_non_delivery_stats("receive failure", e) end end rescue StandardError => e logger.exception("Failed subscribing queue #{queue.inspect}#{to_exchange} on broker #{@alias}", e, :trace) @exception_stats.track("subscribe", e) false end end |
#summary ⇒ Object
Get broker client information summarizing its status
Return
- (Hash)
-
Status of broker with keys
- :identity(String)
-
Serialized identity
- :alias(String)
-
Alias used in logs
- :status(Symbol)
-
Status of connection
- :disconnects(Integer)
-
Number of times lost connection
- :failures(Integer)
-
Number of times connect failed
- :retries(Integer)
-
Number of attempts to connect after failure
510 511 512 513 514 515 516 517 518 519 |
# File 'lib/right_amqp/ha_client/broker_client.rb', line 510 def summary { :identity => @identity, :alias => @alias, :status => @status, :retries => @retries, :disconnects => @disconnect_stats.total, :failures => @failure_stats.total, } end |
#unsubscribe(queue_names, &block) ⇒ Object
Unsubscribe from the specified queues Silently ignore unknown queues
Parameters
- queue_names(Array)
-
Names of queues previously subscribed to
Block
Optional block to be called with no parameters when each unsubscribe completes
Return
- true
-
Always return true
293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 |
# File 'lib/right_amqp/ha_client/broker_client.rb', line 293 def unsubscribe(queue_names, &block) unless failed? @queues.reject! do |q| if queue_names.include?(q.name) begin logger.info("[stop] Unsubscribing queue #{q.name} on broker #{@alias}") q.unsubscribe { block.call if block } rescue StandardError => e logger.exception("Failed unsubscribing queue #{q.name} on broker #{@alias}", e, :trace) @exception_stats.track("unsubscribe", e) block.call if block end true else false end end end true end |
#update_status(status) ⇒ Object
Callback from AMQP with connection status or from HABrokerClient Makes client callback with :connected or :disconnected status if boundary crossed
Parameters
- status(Symbol)
-
Status of connection (:connected, :disconnected, :stopping, :failed, :closed)
Return
- true
-
Always return true
554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 |
# File 'lib/right_amqp/ha_client/broker_client.rb', line 554 def update_status(status) # Do not let closed connection regress to failed return true if status == :failed && @status == :closed # Wait until connection is ready (i.e. handshake with broker is completed) before # changing our status to connected return true if status == :connected status = :connected if status == :ready before = @status @status = status if status == :connected update_success elsif status == :failed update_failure elsif status == :disconnected && before != :disconnected @disconnect_stats.update end unless status == before || @options[:update_status_callback].nil? @options[:update_status_callback].call(self, before == :connected) end true end |
#usable? ⇒ Boolean
Determine whether the broker connection is usable, i.e., connecting or confirmed connected
Return
- (Boolean)
-
true if usable, otherwise false
169 170 171 |
# File 'lib/right_amqp/ha_client/broker_client.rb', line 169 def usable? [:connected, :connecting].include?(@status) end |