Class: RightAMQP::HABrokerClient

Inherits:
Object
  • Object
show all
Includes:
RightSupport::Log::Mixin
Defined in:
lib/right_amqp/ha_client/ha_broker_client.rb

Overview

Client for multiple AMQP brokers used together to achieve a high availability messaging routing service

Defined Under Namespace

Classes: Context, CountedDeferrable, NoBrokerHosts, NoConnectedBrokers, NoUserData, Published

Constant Summary collapse

RECONNECT_INTERVAL =

Default number of seconds between reconnect attempts

60

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(serializer, options = {}) ⇒ HABrokerClient

Create connections to all configured AMQP brokers The constructed broker client list is in priority order

Parameters

serializer(Serializer)

Serializer used for marshaling packets being published or

unmarshaling received messages to packets (responds to :dump and :load); if nil, has
same effect as setting subscribe option :no_serialize and publish option :no_unserialize
options(Hash)

Configuration options

:user(String)

User name

:pass(String)

Password

:vhost(String)

Virtual host path name

:ssl(Boolean)

Whether SSL is enabled

:insist(Boolean)

Whether to suppress redirection of connection

:reconnect_interval(Integer)

Number of seconds between reconnect attempts, defaults to RECONNECT_INTERVAL

:heartbeat(Integer)

Number of seconds between AMQP connection heartbeats used to keep

  connection alive (e.g., when AMQP broker is behind a firewall), nil or 0 means disable
:host{String):: Comma-separated list of AMQP broker host names; if only one, it is reapplied
  to successive ports; if none, defaults to localhost; each host may be followed by ':'
  and a short string to be used as a broker index; the index defaults to the list index,
  e.g., "host_a:0, host_c:2"
:port(String|Integer):: Comma-separated list of AMQP broker port numbers corresponding to :host list;
  if only one, it is incremented and applied to successive hosts; if none, defaults to AMQP::PORT
:prefetch(Integer):: Maximum number of messages the AMQP broker is to prefetch for the agent
  before it receives an ack. Value 1 ensures that only last unacknowledged gets redelivered
  if the agent crashes. Value 0 means unlimited prefetch.
:order(Symbol):: Broker selection order when publishing a message: :random or :priority,
  defaults to :priority, value can be overridden on publish call
:exception_stats(RightSupport::Stats::Exceptions):: Exception statistics container to be used
  instead of internally created one
:exception_callback(Proc):: Callback activated on exception events with parameters
  exception(Exception):: Exception
  message(Packet):: Message being processed
  client(HABrokerClient):: Reference to this client
:exception_on_receive_callback(Proc):: Callback activated on a receive exception with parameters
  message(String):: Message content that caused an exception
  exception(Exception):: Exception that was raised

Raise

ArgumentError

If :host and :port are not matched lists or if serializer does not respond

to :dump and :load


138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
# File 'lib/right_amqp/ha_client/ha_broker_client.rb', line 138

def initialize(serializer, options = {})
  @options = options.dup
  @options[:update_status_callback] = lambda { |b, c| update_status(b, c) }
  @options[:return_message_callback] = lambda { |i, t, r, m| handle_return(i, t, r, m) }
  @options[:reconnect_interval] ||= RECONNECT_INTERVAL
  @connection_status = {}
  unless serializer.nil? || [:dump, :load].all? { |m| serializer.respond_to?(m) }
    raise ArgumentError, "serializer must be a class/object that responds to :dump and :load"
  end
  @serializer = serializer
  @published = Published.new
  reset_stats
  @select = @options[:order] || :priority
  @brokers = connect_all
  @closed = false
  @brokers_hash = {}
  @brokers.each { |b| @brokers_hash[b.identity] = b }
end

Instance Attribute Details

#brokersObject

(Array(Broker)) Priority ordered list of AMQP broker clients (exposed only for unit test purposes)



96
97
98
# File 'lib/right_amqp/ha_client/ha_broker_client.rb', line 96

def brokers
  @brokers
end

Class Method Details

.addresses(host, port) ⇒ Object

Parse host and port information to form list of broker address information

Parameters

host{String)

Comma-separated list of broker host names; if only one, it is reapplied

to successive ports; if none, defaults to localhost; each host may be followed by ':'
and a short string to be used as a broker index; the index defaults to the list index,
e.g., "host_a:0, host_c:2"
port(String|Integer)

Comma-separated list of broker port numbers corresponding to :host list;

if only one, it is incremented and applied to successive hosts; if none, defaults to AMQP::PORT

Returns

(Array(Hash))

List of broker addresses with keys :host, :port, :index

Raise

ArgumentError

If host and port are not matched lists



215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
# File 'lib/right_amqp/ha_client/ha_broker_client.rb', line 215

def self.addresses(host, port)
  hosts = if host && !host.empty? then host.split(/,\s*/) else [ "localhost" ] end
  ports = if port && port.size > 0 then port.to_s.split(/,\s*/) else [ ::AMQP::PORT ] end
  if hosts.size != ports.size && hosts.size != 1 && ports.size != 1
    raise ArgumentError.new("Unmatched AMQP host/port lists -- hosts: #{host.inspect} ports: #{port.inspect}")
  end
  i = -1
  if hosts.size > 1
    hosts.map do |host|
      i += 1
      h = host.split(/:\s*/)
      port = if ports[i] then ports[i].to_i else ports[0].to_i end
      port = port.to_s.split(/:\s*/)[0]
      {:host => h[0], :port => port.to_i, :index => (h[1] || i.to_s).to_i}
    end
  else
    ports.map do |port|
      i += 1
      p = port.to_s.split(/:\s*/)
      host = if hosts[i] then hosts[i] else hosts[0] end
      host = host.split(/:\s*/)[0]
      {:host => host, :port => p[0].to_i, :index => (p[1] || i.to_s).to_i}
    end
  end
end

.identities(host, port = nil) ⇒ Object

Parse host and port information to form list of broker identities

Parameters

host{String)

Comma-separated list of broker host names; if only one, it is reapplied

to successive ports; if none, defaults to localhost; each host may be followed by ':'
and a short string to be used as a broker index; the index defaults to the list index,
e.g., "host_a:0, host_c:2"
port(String|Integer)

Comma-separated list of broker port numbers corresponding to :host list;

if only one, it is incremented and applied to successive hosts; if none, defaults to AMQP::PORT

Returns

(Array)

Identity of each broker

Raise

ArgumentError

If host and port are not matched lists



256
257
258
# File 'lib/right_amqp/ha_client/ha_broker_client.rb', line 256

def self.identities(host, port = nil)
  addresses(host, port).map { |a| identity(a[:host], a[:port]) }
end

.identity(host, port = nil) ⇒ Object

Construct a broker serialized identity from its host and port of the form rs-broker-host-port, with any ‘-’s in host replaced by ‘~’

Parameters

host{String)

IP host name or address for individual broker

port(Integer)

TCP port number for individual broker, defaults to ::AMQP::PORT

Returns

(String)

Broker serialized identity



269
270
271
272
# File 'lib/right_amqp/ha_client/ha_broker_client.rb', line 269

def self.identity(host, port = nil)
  port ||= ::AMQP::PORT
  "rs-broker-#{host.gsub('-', '~')}-#{port.to_i}"
end

.parse_user_data(user_data) ⇒ Object

Parse agent user data to extract broker host and port configuration An agent is permitted to only support using one broker

Parameters

user_data(String)

Agent user data in <name>=<value>&<name>=<value>&… form

with required name RS_rn_url and optional names RS_rn_host and RS_rn_port

Return

(Array)

Broker hosts and ports as comma-separated list in priority order in the form

<hostname>:<index>,<hostname>:<index>,...
<port>:<index>,<port>:<index>,... or nil if none specified

Raise

NoUserData

If the user data is missing

NoBrokerHosts

If no brokers could be extracted from the user data

Raises:



172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
# File 'lib/right_amqp/ha_client/ha_broker_client.rb', line 172

def self.parse_user_data(user_data)
  raise NoUserData.new("User data is missing") if user_data.nil? || user_data.empty?
  hosts = ""
  ports = nil
  parsed = {}
  user_data.split("&").each do |data|
    name, value = data.split("=")
    # Guard against repeats
    unless parsed[name]
      if name == "RS_rn_url"
        h = value.split("@").last.split("/").first
        # Translate host name used by very old agents using only one broker
        h = "broker1-1.rightscale.com" if h == "broker.rightscale.com"
        hosts = h + hosts
      end
      if name == "RS_rn_host"
        hosts << value
      end
      if name == "RS_rn_port"
        ports = value
      end
      parsed[name] = true
    end
  end
  raise NoBrokerHosts.new("No brokers found in user data") if hosts.empty?
  [hosts, ports]
end

Instance Method Details

#alias_(identity) ⇒ Object

Convert broker serialized identity to its alias

Parameters

identity(String)

Broker serialized identity

Return

(String|nil)

Broker alias, or nil if not a known broker



306
307
308
# File 'lib/right_amqp/ha_client/ha_broker_client.rb', line 306

def alias_(identity)
  @brokers_hash[identity].alias rescue nil
end

#aliases(identities) ⇒ Object

Convert broker identities to aliases

Parameters

identities(Array)

Broker identities

Return

(Array)

Broker aliases



295
296
297
# File 'lib/right_amqp/ha_client/ha_broker_client.rb', line 295

def aliases(identities)
  identities.map { |i| alias_(i) }
end

#allObject

Get serialized identity of all brokers

Return

(Array)

Serialized identity of all brokers



377
378
379
# File 'lib/right_amqp/ha_client/ha_broker_client.rb', line 377

def all
  @brokers.map { |b| b.identity }
end

#close(&blk) ⇒ Object

Close all broker client connections

Block

Optional block with no parameters to be called after all connections are closed

Return

true

Always return true



756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
# File 'lib/right_amqp/ha_client/ha_broker_client.rb', line 756

def close(&blk)
  if @closed
    blk.call if blk
  else
    @closed = true
    @connection_status = {}
    handler = CountedDeferrable.new(@brokers.size)
    handler.callback { blk.call if blk }
    @brokers.each do |b|
      begin
        b.close(propagate = false) { handler.completed_one }
      rescue StandardError => e
        handler.completed_one
        logger.exception("Failed to close broker #{b.alias}", e, :trace)
        @exception_stats.track("close", e)
      end
    end
  end
  true
end

#close_one(identity, propagate = true, &blk) ⇒ Object

Close an individual broker client connection

Parameters

identity(String)

Broker serialized identity

propagate(Boolean)

Whether to propagate connection status updates

Block

Optional block with no parameters to be called after connection closed

Return

true

Always return true

Raise

Exception

If broker unknown

Raises:

  • (Exception)


791
792
793
794
795
796
# File 'lib/right_amqp/ha_client/ha_broker_client.rb', line 791

def close_one(identity, propagate = true, &blk)
  broker = @brokers_hash[identity]
  raise Exception, "Cannot close unknown broker #{identity}" unless broker
  broker.close(propagate, &blk)
  true
end

#connect(host, port, index, priority = nil, force = false, &blk) ⇒ Object

Make new connection to broker at specified address unless already connected or currently connecting

Parameters

host{String)

IP host name or address for individual broker

port(Integer)

TCP port number for individual broker

index(Integer)

Unique index for broker within set for use in forming alias

priority(Integer|nil)

Priority position of this broker in set for use by this agent

with nil or a value that would leave a gap in the list meaning add to end of list
force(Boolean)

Reconnect even if already connected

Block

Optional block with following parameters to be called after initiating the connection unless already connected to this broker:

identity(String):: Broker serialized identity

Return

(Boolean)

true if connected, false if no connect attempt made

Raise

Exception

If host and port do not match an existing broker but index does



423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
# File 'lib/right_amqp/ha_client/ha_broker_client.rb', line 423

def connect(host, port, index, priority = nil, force = false, &blk)
  identity = self.class.identity(host, port)
  existing = @brokers_hash[identity]
  if existing && existing.usable? && !force
    logger.info("Ignored request to reconnect #{identity} because already #{existing.status.to_s}")
    false
  else
    old_identity = identity
    @brokers.each do |b|
      if index == b.index
        # Changing host and/or port of existing broker client
        old_identity = b.identity
        break
      end
    end unless existing

    address = {:host => host, :port => port, :index => index}
    broker = BrokerClient.new(identity, address, @serializer, @exception_stats, @non_delivery_stats, @options, existing)
    p = priority(old_identity)
    if priority && priority < p
      @brokers.insert(priority, broker)
    elsif priority && priority > p
      logger.info("Reduced priority setting for broker #{identity} from #{priority} to #{p} to avoid gap in list")
      @brokers.insert(p, broker)
    else
      @brokers[p].close if @brokers[p]
      @brokers[p] = broker
    end
    @brokers_hash[identity] = broker
    yield broker.identity if block_given?
    true
  end
end

#connectedObject

Get serialized identity of connected brokers

Return

(Array)

Serialized identity of connected brokers



353
354
355
# File 'lib/right_amqp/ha_client/ha_broker_client.rb', line 353

def connected
  @brokers.inject([]) { |c, b| if b.connected? then c << b.identity else c end }
end

#connected?(identity) ⇒ Boolean

Check whether connected to broker

Parameters

identity{String)

Broker serialized identity

Return

(Boolean)

true if connected to broker, otherwise false, or nil if broker unknown

Returns:

  • (Boolean)


345
346
347
# File 'lib/right_amqp/ha_client/ha_broker_client.rb', line 345

def connected?(identity)
  @brokers_hash[identity].connected? rescue nil
end

#connection_status(options = {}, &callback) ⇒ Object

Register callback to be activated when there is a change in connection status Can be called more than once without affecting previous callbacks

Parameters

options(Hash)

Connection status monitoring options

:one_off(Integer)

Seconds to wait for status change; only send update once;

  if timeout, report :timeout as the status
:boundary(Symbol):: :any if only report change on any (0/1) boundary,
  :all if only report change on all (n-1/n) boundary, defaults to :any
:brokers(Array):: Only report a status change for these identified brokers

Block

Required block activated when connected count crosses a status boundary with following parameters

status(Symbol):: Status of connection: :connected, :disconnected, or :failed, with
  :failed indicating that all selected brokers or all brokers have failed

Return

id(String)

Identifier associated with connection status request



816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
# File 'lib/right_amqp/ha_client/ha_broker_client.rb', line 816

def connection_status(options = {}, &callback)
  id = generate_id
  @connection_status[id] = {:boundary => options[:boundary], :brokers => options[:brokers], :callback => callback}
  if timeout = options[:one_off]
    @connection_status[id][:timer] = EM::Timer.new(timeout) do
      if @connection_status[id]
        if @connection_status[id][:callback].arity == 2
          @connection_status[id][:callback].call(:timeout, nil)
        else
          @connection_status[id][:callback].call(:timeout)
        end
        @connection_status.delete(id)
      end
    end
  end
  id
end

#declare(type, name, options = {}) ⇒ Object

Declare queue or exchange object but do not subscribe to it

Parameters

type(Symbol)

Type of object: :queue, :direct, :fanout or :topic

name(String)

Name of object

options(Hash)

Standard AMQP declare options plus

:brokers(Array)

Identity of brokers for which to declare, defaults to all usable if nil or empty

Return

identities(Array)

Identity of brokers where successfully declared



543
544
545
546
547
548
549
550
# File 'lib/right_amqp/ha_client/ha_broker_client.rb', line 543

def declare(type, name, options = {})
  identities = []
  brokers = options.delete(:brokers)
  each(:usable, brokers) { |b| identities << b.identity if b.declare(type, name, options) }
  logger.info("Could not declare #{type.to_s} #{name.inspect} on brokers #{each(:usable, brokers).inspect} " +
              "when selected #{brokers.inspect} from usable #{usable.inspect}") if identities.empty?
  identities
end

#declare_unusable(identities) ⇒ Object

Declare a broker client as unusable

Parameters

identities(Array)

Identity of brokers

Return

true

Always return true

Raises

Exception

If identified broker is unknown



741
742
743
744
745
746
747
# File 'lib/right_amqp/ha_client/ha_broker_client.rb', line 741

def declare_unusable(identities)
  identities.each do |id|
    broker = @brokers_hash[id]
    raise Exception, "Cannot mark unknown broker #{id} unusable" unless broker
    broker.close(propagate = true, normal = false, log = false)
  end
end

#delete(name, options = {}) ⇒ Object

Delete queue in all usable brokers or all selected brokers that are usable

Parameters

name(String)

Queue name

options(Hash)

Queue declare options plus

:brokers(Array)

Identity of brokers in which queue is to be deleted

Return

identities(Array)

Identity of brokers where queue was deleted



677
678
679
680
681
682
683
# File 'lib/right_amqp/ha_client/ha_broker_client.rb', line 677

def delete(name, options = {})
  identities = []
  u = usable
  brokers = options.delete(:brokers)
  ((brokers || u) & u).each { |i| identities << i if (b = @brokers_hash[i]) && b.delete(name, options) }
  identities
end

#delete_amqp_resources(name, options = {}) ⇒ Object

Delete queue resources from AMQP in all usable brokers

Parameters

name(String)

Queue name

options(Hash)

Queue declare options plus

:brokers(Array)

Identity of brokers in which queue is to be deleted

Return

identities(Array)

Identity of brokers where queue was deleted



694
695
696
697
698
699
# File 'lib/right_amqp/ha_client/ha_broker_client.rb', line 694

def delete_amqp_resources(name, options = {})
  identities = []
  u = usable
  ((options[:brokers] || u) & u).each { |i| identities << i if (b = @brokers_hash[i]) && b.delete_amqp_resources(:queue, name) }
  identities
end

#failedObject

Get serialized identity of failed broker clients, i.e., ones that were never successfully connected, not ones that are just disconnected

Return

(Array)

Serialized identity of failed broker clients



386
387
388
# File 'lib/right_amqp/ha_client/ha_broker_client.rb', line 386

def failed
  @brokers.inject([]) { |c, b| b.failed? ? c << b.identity : c }
end

#get(id) ⇒ Object

Get broker serialized identity if client exists

Parameters

id(Integer|String)

Broker alias or serialized identity

Return

(String|nil)

Broker serialized identity if client found, otherwise nil



333
334
335
336
# File 'lib/right_amqp/ha_client/ha_broker_client.rb', line 333

def get(id)
  @brokers.each { |b| return b.identity if b.identity == id || b.alias == id }
  nil
end

#heartbeat=(heartbeat) ⇒ Object

Change connection heartbeat frequency to be used for any new connections

Parameters

heartbeat(Integer)

Number of seconds between AMQP connection heartbeats used to keep

connection alive (e.g., when AMQP broker is behind a firewall), nil or 0 means disable

Return

(Integer|nil)

New heartbeat setting



398
399
400
# File 'lib/right_amqp/ha_client/ha_broker_client.rb', line 398

def heartbeat=(heartbeat)
  @options[:heartbeat] = heartbeat
end

#hostsObject

Form string of hosts and associated indices

Return

(String)

Comma separated list of host:index



314
315
316
# File 'lib/right_amqp/ha_client/ha_broker_client.rb', line 314

def hosts
  @brokers.map { |b| "#{b.host}:#{b.index}" }.join(",")
end

#identity_parts(id) ⇒ Object

Break broker serialized identity down into individual parts if exists

Parameters

id(Integer|String)

Broker alias or serialized identity

Return

(Array)

Host, port, index, and priority, or all nil if broker not found



281
282
283
284
285
286
# File 'lib/right_amqp/ha_client/ha_broker_client.rb', line 281

def identity_parts(id)
  @brokers.each do |b|
    return [b.host, b.port, b.index, priority(b.identity)] if b.identity == id || b.alias == id
  end
  [nil, nil, nil, nil]
end

#non_delivery(&blk) ⇒ Object

Provide callback to be activated when a message cannot be delivered

Block

Required block with parameters

reason(String):: Non-delivery reason
  "NO_ROUTE" - queue does not exist
  "NO_CONSUMERS" - queue exists but it has no consumers, or if :immediate was specified,
    all consumers are not immediately ready to consume
  "ACCESS_REFUSED" - queue not usable because broker is in the process of stopping service
type(String|nil):: Request type, or nil if not applicable
token(String|nil):: Generated message identifier, or nil if not applicable
from(String|nil):: Identity of original sender of message, or nil if not applicable
to(String):: Queue to which message was published

Return

true

Always return true



663
664
665
666
# File 'lib/right_amqp/ha_client/ha_broker_client.rb', line 663

def non_delivery(&blk)
  @non_delivery = blk
  true
end

#portsObject

Form string of ports and associated indices

Return

(String)

Comma separated list of port:index



322
323
324
# File 'lib/right_amqp/ha_client/ha_broker_client.rb', line 322

def ports
  @brokers.map { |b| "#{b.port}:#{b.index}" }.join(",")
end

#publish(exchange, packet, options = {}) ⇒ Object

Publish message to AMQP exchange of first connected broker

Parameters

exchange(Hash)

AMQP exchange to subscribe to with keys :type, :name, and :options,

which are the standard AMQP ones plus
  :no_declare(Boolean):: Whether to skip declaring this exchange or queue on the broker
    to cause its creation; for use when client does not have create permission or
    knows the object already exists and wants to avoid declare overhead
  :declare(Boolean):: Whether to delete this exchange or queue from the AMQP cache
    to force it to be declared on the broker and thus be created if it does not exist
packet(Packet)

Message to serialize and publish

options(Hash)

Publish options – standard AMQP ones plus

:mandatory(Boolean)

Return message if the exchange does not have any associated queues

  or if all the associated queues do not have any consumers
:immediate(Boolean):: Return message for the same reasons as :mandatory plus if all
  of the queues associated with the exchange are not immediately ready to consume the message
:fanout(Boolean):: true means publish to all connected brokers
:brokers(Array):: Identity of brokers selected for use, defaults to all brokers
  if nil or empty
:order(Symbol):: Broker selection order: :random or :priority,
  defaults to @select if :brokers is nil, otherwise defaults to :priority
:no_serialize(Boolean):: Do not serialize packet because it is already serialized,
  this is an escape for special situations like enrollment, also implicitly disables
  publish logging; this option is implicitly invoked if initialize without a serializer
:log_filter(Array(Symbol)):: Filters to be applied in to_s when logging packet to :info
:log_data(String):: Additional data to display at end of log entry
:no_log(Boolean):: Disable publish logging unless debug level

Return

identities(Array)

Identity of brokers where packet was successfully published

Raise

NoConnectedBrokers

If cannot find a connected broker



624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
# File 'lib/right_amqp/ha_client/ha_broker_client.rb', line 624

def publish(exchange, packet, options = {})
  identities = []
  no_serialize = options[:no_serialize] || @serializer.nil?
  message = if no_serialize then packet else @serializer.dump(packet) end
  brokers = use(options)
  brokers.each do |b|
    if b.publish(exchange, packet, message, options.merge(:no_serialize => no_serialize))
      identities << b.identity
      if options[:mandatory] && !no_serialize
        context = Context.new(packet, options, brokers.map { |b| b.identity })
        @published.store(message, context)
      end
      break unless options[:fanout]
    end
  end
  if identities.empty?
    selected = "selected " if options[:brokers]
    list = aliases(brokers.map { |b| b.identity }).join(", ")
    raise NoConnectedBrokers, "None of #{selected}brokers [#{list}] are usable for publishing"
  end
  identities
end

#queue_status(queue_names, timeout = nil, &blk) ⇒ Object

Check status of specified queues for connected brokers Silently ignore unknown queues If a queue whose status is being checked does not exist, the associated broker connection will fail and be unusable

Parameters

queue_names(Array)

Names of queues previously subscribed to that are to be checked

timeout(Integer)

Number of seconds to wait for all status checks, defaults to no timeout

Block

Optional block to be called after all queue statuses are obtained with hash parameter containing statuses with queue name as key and value that is a hash with broker identity as key and hash of :messages and :consumers as value; the :messages and :consumers values are nil if there is a failure retrieving them for the given queue

Return

true

Always return true



569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
# File 'lib/right_amqp/ha_client/ha_broker_client.rb', line 569

def queue_status(queue_names, timeout = nil, &blk)
  count = 0
  status = {}
  each(:connected) { |b| b.queues.each { |q| count += 1 if queue_names.include?(q.name) } }
  if count == 0
    blk.call(status) if blk
  else
    handler = CountedDeferrable.new(count, timeout)
    handler.callback { blk.call(status) if blk }
    each(:connected) do |b|
      if b.queue_status(queue_names) do |name, messages, consumers|
           (status[name] ||= {})[b.identity] = {:messages => messages, :consumers => consumers}
           handler.completed_one
         end
       else
         b.queues.each { |q| handler.completed_one if queue_names.include?(q.name) }
       end
    end
  end
  true
end

#remove(host, port, &blk) ⇒ Object

Remove a broker client from the configuration Invoke connection status callbacks only if connection is not already disabled There is no check whether this is the last usable broker client

Parameters

host{String)

IP host name or address for individual broker

port(Integer)

TCP port number for individual broker

Block

Optional block with following parameters to be called after removing the connection unless broker is not configured

identity(String):: Broker serialized identity

Return

identity(String|nil)

Serialized identity of broker removed, or nil if unknown



716
717
718
719
720
721
722
723
724
725
726
727
728
729
# File 'lib/right_amqp/ha_client/ha_broker_client.rb', line 716

def remove(host, port, &blk)
  identity = self.class.identity(host, port)
  if broker = @brokers_hash[identity]
    logger.info("Removing #{identity}, alias #{broker.alias} from broker list")
    broker.close(propagate = true, normal = true, log = false)
    @brokers_hash.delete(identity)
    @brokers.reject! { |b| b.identity == identity }
    yield identity if block_given?
  else
    logger.info("Ignored request to remove #{identity} from broker list because unknown")
    identity = nil
  end
  identity
end

#reset_statsObject

Reset broker client statistics Do not reset disconnect and failure stats because they might then be inconsistent with underlying connection status

Return

true

Always return true



882
883
884
885
886
887
# File 'lib/right_amqp/ha_client/ha_broker_client.rb', line 882

def reset_stats
  @return_stats = RightSupport::Stats::Activity.new
  @non_delivery_stats = RightSupport::Stats::Activity.new
  @exception_stats = @options[:exception_stats] || RightSupport::Stats::Exceptions.new(self, @options[:exception_callback])
  true
end

#stats(reset = false) ⇒ Object

Get broker client statistics

Parameters:

reset(Boolean)

Whether to reset the statistics after getting the current ones

Return

stats(Hash)

Broker client stats with keys

“brokers”(Array)

Stats for each broker client in priority order

“exceptions”(Hash|nil)

Exceptions raised per category, or nil if none

“total”(Integer)

Total exceptions for this category

“recent”(Array)

Most recent as a hash of “count”, “type”, “message”, “when”, and “where”

“heartbeat”(Integer|nil)

Number of seconds between AMQP heartbeats, or nil if heartbeat disabled

“non_deliveries”(Hash|nil)

Non-delivery activity stats with keys “total”, “percent”, “last”, and “rate”

  with percentage breakdown per non-delivery reason, or nil if none
"returns"(Hash|nil):: Message return activity stats with keys "total", "percent", "last", and "rate"
  with percentage breakdown per return reason, or nil if none


864
865
866
867
868
869
870
871
872
873
874
# File 'lib/right_amqp/ha_client/ha_broker_client.rb', line 864

def stats(reset = false)
  stats = {
    "brokers"        => @brokers.map { |b| b.stats },
    "heartbeat"      => @options[:heartbeat],
    "non-deliveries" => @non_delivery_stats.all,
    "returns"        => @return_stats.all
  }
  stats["exceptions"] = @exception_stats.stats unless @options[:exception_stats]
  reset_stats if reset
  stats
end

#statusObject

Get status summary

Return

(Array(Hash))

Status of each configured broker with keys

:identity(String)

Broker serialized identity

:alias(String)

Broker alias used in logs

:status(Symbol)

Status of connection

:disconnects(Integer)

Number of times lost connection

:failures(Integer)

Number of times connect failed

:retries(Integer)

Number of attempts to connect after failure



844
845
846
# File 'lib/right_amqp/ha_client/ha_broker_client.rb', line 844

def status
  @brokers.map { |b| b.summary }
end

#subscribe(queue, exchange = nil, options = {}, &blk) ⇒ Object

Subscribe an AMQP queue to an AMQP exchange on all broker clients that are connected or still connecting Allow connecting here because subscribing may happen before all have confirmed connected Do not wait for confirmation from broker client that subscription is complete When a message is received, acknowledge, unserialize, and log it as specified If the message is unserialized and it is not of the right type, it is dropped after logging a warning

Parameters

queue(Hash)

AMQP queue being subscribed with keys :name and :options,

which are the standard AMQP ones plus
  :no_declare(Boolean):: Whether to skip declaring this queue on the broker
    to cause its creation; for use when client does not have permission to create or
    knows the queue already exists and wants to avoid declare overhead
exchange(Hash|nil)

AMQP exchange to subscribe to with keys :type, :name, and :options,

nil means use empty exchange by directly subscribing to queue; the :options are the
standard AMQP ones plus
  :no_declare(Boolean):: Whether to skip declaring this exchange on the broker
    to cause its creation; for use when client does not have create permission or
    knows the exchange already exists and wants to avoid declare overhead
options(Hash)

Subscribe options:

:ack(Boolean)

Whether client takes responsibility for explicitly acknowledging each

  message received, defaults to implicit acknowledgement in AMQP as part of message receipt
:no_unserialize(Boolean):: Do not unserialize message, this is an escape for special
  situations like enrollment, also implicitly disables receive filtering and logging;
  this option is implicitly invoked if initialize without a serializer
(packet class)(Array(Symbol)):: Filters to be applied in to_s when logging packet to :info,
  only packet classes specified are accepted, others are not processed but are logged with error
:category(String):: Packet category description to be used in error messages
:log_data(String):: Additional data to display at end of log entry
:no_log(Boolean):: Disable receive logging unless debug level
:exchange2(Hash):: Additional exchange to which same queue is to be bound
:brokers(Array):: Identity of brokers for which to subscribe, defaults to all usable if nil or empty

Block

Block with following parameters to be called each time exchange matches a message to the queue:

identity(String):: Serialized identity of broker delivering the message
message(Packet|String):: Message received, which is unserialized unless :no_unserialize was specified
header(AMQP::Protocol::Header):: Message header (optional block parameter)

Return

identities(Array)

Identity of brokers where successfully subscribed



497
498
499
500
501
502
503
504
505
# File 'lib/right_amqp/ha_client/ha_broker_client.rb', line 497

def subscribe(queue, exchange = nil, options = {}, &blk)
  identities = []
  brokers = options.delete(:brokers)
  each(:usable, brokers) { |b| identities << b.identity if b.subscribe(queue, exchange, options, &blk) }
  logger.info("Could not subscribe to queue #{queue.inspect} on exchange #{exchange.inspect} " +
              "on brokers #{each(:usable, brokers).inspect} when selected #{brokers.inspect} " +
              "from usable #{usable.inspect}") if identities.empty?
  identities
end

#unsubscribe(queue_names, timeout = nil, &blk) ⇒ Object

Unsubscribe from the specified queues on usable broker clients Silently ignore unknown queues

Parameters

queue_names(Array)

Names of queues previously subscribed to

timeout(Integer)

Number of seconds to wait for all confirmations, defaults to no timeout

Block

Optional block with no parameters to be called after all queues are unsubscribed

Return

true

Always return true



519
520
521
522
523
524
525
526
527
528
529
530
531
# File 'lib/right_amqp/ha_client/ha_broker_client.rb', line 519

def unsubscribe(queue_names, timeout = nil, &blk)
  count = each(:usable).inject(0) do |c, b|
    c + b.queues.inject(0) { |c, q| c + (queue_names.include?(q.name) ? 1 : 0) }
  end
  if count == 0
    blk.call if blk
  else
    handler = CountedDeferrable.new(count, timeout)
    handler.callback { blk.call if blk }
    each(:usable) { |b| b.unsubscribe(queue_names) { handler.completed_one } }
  end
  true
end

#unusableObject

Get serialized identity of unusable brokers

Return

(Array)

Serialized identity of unusable brokers



369
370
371
# File 'lib/right_amqp/ha_client/ha_broker_client.rb', line 369

def unusable
  all - usable
end

#usableObject

Get serialized identity of brokers that are usable, i.e., connecting or confirmed connected

Return

(Array)

Serialized identity of usable brokers



361
362
363
# File 'lib/right_amqp/ha_client/ha_broker_client.rb', line 361

def usable
  each(:usable).map { |b| b.identity }
end