Class: RightAMQP::BrokerClient

Inherits:
Object
  • Object
show all
Includes:
RightSupport::Log::Mixin
Defined in:
lib/right_amqp/ha_client/broker_client.rb

Overview

Client for accessing AMQP broker

Constant Summary collapse

STATUS =

Set of possible broker connection status values

[
  :connecting,   # Initiated AMQP connection but not yet confirmed that connected
  :connected,    # Confirmed AMQP connection
  :stopping,     # Broker is stopping service and, although still connected, is no longer usable
  :disconnected, # Notified by AMQP that connection has been lost and attempting to reconnect
  :closed,       # AMQP connection closed explicitly or because of too many failed connect attempts
  :failed        # Failed to connect due to internal failure or AMQP failure to connect
]

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(identity, address, serializer, exception_stats, non_delivery_stats, options, existing = nil) ⇒ BrokerClient

Create broker client

Parameters

identity(String)

Broker identity

address(Hash)

Broker address

:host(String

IP host name or address

:port(Integer)

TCP port number for individual broker

:index(String)

Unique index for broker within set of brokers for use in forming alias

serializer(Serializer)

Serializer used for unmarshaling received messages to packets

(responds to :load); if nil, has same effect as setting subscribe option :no_unserialize
exception_stats(RightSupport::Stats::Exceptions)

Exception statistics container to be updated

whenever there is an unexpected exception
non_delivery_stats(RightSupport::Stats::Activity)

Non-delivery statistics container to be

updated whenever a message cannot be sent or received
options(Hash)

Configuration options

:user(String)

User name

:pass(String)

Password

:vhost(String)

Virtual host path name

:ssl(Boolean)

Whether SSL is enabled

:insist(Boolean)

Whether to suppress redirection of connection

:reconnect_interval(Integer)

Number of seconds between reconnect attempts

:heartbeat(Integer)

Number of seconds between AMQP connection heartbeats used to keep

  connection alive, e.g., when AMQP broker is behind a firewall
:prefetch(Integer):: Maximum number of messages the AMQP broker is to prefetch for the agent
  before it receives an ack. Value 1 ensures that only last unacknowledged gets redelivered
  if the agent crashes. Value 0 means unlimited prefetch.
:fiber_pool(FiberPool):: Pool of initialized fibers to be used for asynchronous message
  processing (can be overridden with subscribe option)
:exception_on_receive_callback(Proc):: Callback activated on a receive exception with parameters
  message(Object):: Message received
  exception(Exception):: Exception raised
:update_status_callback(Proc):: Callback activated on a connection status change with parameters
  broker(BrokerClient):: Broker client
  connected_before(Boolean):: Whether was connected prior to this status change
:return_message_callback(Proc):: Callback activated when a message is returned with parameters
  to(String):: Queue to which message was published
  reason(String):: Reason for return
    "NO_ROUTE" - queue does not exist
    "NO_CONSUMERS" - queue exists but it has no consumers, or if :immediate was specified,
      all consumers are not immediately ready to consume
    "ACCESS_REFUSED" - queue not usable because broker is in the process of stopping service
  message(String):: Returned serialized message
existing(BrokerClient|nil)

Existing broker client for this address, or nil if none

Raise

ArgumentError

If serializer does not respond to :dump and :load



123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
# File 'lib/right_amqp/ha_client/broker_client.rb', line 123

def initialize(identity, address, serializer, exception_stats, non_delivery_stats, options, existing = nil)
  @options         = options
  @identity        = identity
  @host            = address[:host]
  @port            = address[:port].to_i
  @index           = address[:index].to_i
  set_alias(@index)

  unless serializer.nil? || [:dump, :load].all? { |m| serializer.respond_to?(m) }
    raise ArgumentError, "serializer must be a class/object that responds to :dump and :load"
  end
  @serializer         = serializer
  @queues             = []
  @last_failed        = false
  @exception_stats    = exception_stats
  @non_delivery_stats = non_delivery_stats
  @disconnect_stats   = RightSupport::Stats::Activity.new(measure_rate = false)
  @failure_stats      = RightSupport::Stats::Activity.new(measure_rate = false)
  @retries            = 0

  connect(address, @options[:reconnect_interval])

  if existing
    @disconnect_stats = existing.disconnect_stats
    @failure_stats    = existing.failure_stats
    @last_failed      = existing.last_failed
    @retries          = existing.retries
    update_failure if @status == :failed
  end
end

Instance Attribute Details

#aliasObject (readonly)

(String) Broker alias, used in logs



47
48
49
# File 'lib/right_amqp/ha_client/broker_client.rb', line 47

def alias
  @alias
end

#channelObject (readonly)

(AMQP::Channel) Channel of AMQP connection used by this client



41
42
43
# File 'lib/right_amqp/ha_client/broker_client.rb', line 41

def channel
  @channel
end

#disconnect_statsObject (readonly)

(RightSupport::Stats::Activity) AMQP lost connection statistics



68
69
70
# File 'lib/right_amqp/ha_client/broker_client.rb', line 68

def disconnect_stats
  @disconnect_stats
end

#failure_statsObject (readonly)

(RightSupport::Stats::Activity) AMQP connection failure statistics



71
72
73
# File 'lib/right_amqp/ha_client/broker_client.rb', line 71

def failure_stats
  @failure_stats
end

#hostObject (readonly)

(String) Host name



50
51
52
# File 'lib/right_amqp/ha_client/broker_client.rb', line 50

def host
  @host
end

#identityObject (readonly)

(String) Broker identity



44
45
46
# File 'lib/right_amqp/ha_client/broker_client.rb', line 44

def identity
  @identity
end

#indexObject (readonly)

(Integer) Unique index for broker within given set, used in alias



56
57
58
# File 'lib/right_amqp/ha_client/broker_client.rb', line 56

def index
  @index
end

#last_failedObject (readonly)

(Boolean) Whether last connect attempt failed



65
66
67
# File 'lib/right_amqp/ha_client/broker_client.rb', line 65

def last_failed
  @last_failed
end

#portObject (readonly)

(Integer) Port number



53
54
55
# File 'lib/right_amqp/ha_client/broker_client.rb', line 53

def port
  @port
end

#queuesObject (readonly)

(Array) List of MQ::Queue queues currently subscribed



62
63
64
# File 'lib/right_amqp/ha_client/broker_client.rb', line 62

def queues
  @queues
end

#retriesObject (readonly)

(Integer) Number of attempts to connect after failure



74
75
76
# File 'lib/right_amqp/ha_client/broker_client.rb', line 74

def retries
  @retries
end

#statusObject (readonly)

(Symbol) AMQP connection STATUS value



59
60
61
# File 'lib/right_amqp/ha_client/broker_client.rb', line 59

def status
  @status
end

Instance Method Details

#close(propagate = true, normal = true, log = true, &block) ⇒ Object

Close broker connection

Parameters

propagate(Boolean)

Whether to propagate connection status updates, defaults to true

normal(Boolean)

Whether this is a normal close vs. a failed connection, defaults to true

log(Boolean)

Whether to log that closing, defaults to true

Block

Optional block with no parameters to be called after connection closed

Return

true

Always return true



477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
# File 'lib/right_amqp/ha_client/broker_client.rb', line 477

def close(propagate = true, normal = true, log = true, &block)
  final_status = normal ? :closed : :failed
  if ![:closed, :failed].include?(@status)
    begin
      logger.info("[stop] Closed connection to broker #{@alias}") if log
      update_status(final_status) if propagate
      @connection.close do
        @status = final_status
        yield if block_given?
      end
    rescue StandardError => e
      logger.exception("Failed to close broker #{@alias}", e, :trace)
      @exception_stats.track("close", e)
      @status = final_status
      yield if block_given?
    end
  else
    @status = final_status
    yield if block_given?
  end
  true
end

#connected?Boolean

Determine whether this client is currently connected to the broker

Return

(Boolean)

true if connected, otherwise false

Returns:

  • (Boolean)


177
178
179
# File 'lib/right_amqp/ha_client/broker_client.rb', line 177

def connected?
  @status == :connected
end

#declare(type, name, options = {}) ⇒ Object

Declare queue or exchange object but do not subscribe to it

Parameters

type(Symbol)

Type of object: :queue, :direct, :fanout or :topic

name(String)

Name of object

options(Hash)

Standard AMQP declare options

Return

(Boolean)

true if declare successfully, otherwise false



323
324
325
326
327
328
329
330
331
332
333
334
335
# File 'lib/right_amqp/ha_client/broker_client.rb', line 323

def declare(type, name, options = {})
  return false unless usable?
  begin
    logger.info("[setup] Declaring #{name} #{type.to_s} on broker #{@alias}")
    delete_amqp_resources(:queue, name)
    @channel.__send__(type, name, options)
    true
  rescue StandardError => e
    logger.exception("Failed declaring #{type.to_s} #{name} on broker #{@alias}", e, :trace)
    @exception_stats.track("declare", e)
    false
  end
end

#delete(name, options = {}) ⇒ Object

Delete queue

Parameters

name(String)

Queue name

options(Hash)

Queue declare options

Return

(Boolean)

true if queue was successfully deleted, otherwise false



428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
# File 'lib/right_amqp/ha_client/broker_client.rb', line 428

def delete(name, options = {})
  deleted = false
  if usable?
    begin
      @queues.reject! do |q|
        if q.name == name
          @channel.queue(name, options.merge(:no_declare => true)).delete
          deleted = true
        end
      end
      unless deleted
        # Allowing declare to happen since queue may not exist and do not want NOT_FOUND
        # failure to cause AMQP channel to close
        @channel.queue(name, options.merge(:no_declare => true)).delete
        deleted = true
      end
    rescue StandardError => e
      logger.exception("Failed deleting queue #{name.inspect} on broker #{@alias}", e, :trace)
      @exception_stats.track("delete", e)
    end
  end
  deleted
end

#delete_amqp_resources(type, name) ⇒ Object

Delete resources from local AMQP cache

Parameters

type(Symbol)

Type of AMQP object

name(String)

Name of object

Return

true

Always return true



460
461
462
463
# File 'lib/right_amqp/ha_client/broker_client.rb', line 460

def delete_amqp_resources(type, name)
  @channel.__send__(type == :queue ? :queues : :exchanges).delete(name)
  true
end

#failed?(backoff = false) ⇒ Boolean

Determine whether the broker connection has failed

Return

(Boolean)

true if failed, otherwise false

Returns:

  • (Boolean)


185
186
187
# File 'lib/right_amqp/ha_client/broker_client.rb', line 185

def failed?(backoff = false)
  @status == :failed
end

#publish(exchange, packet, message, options = {}) ⇒ Object

Publish message to AMQP exchange

Parameters

exchange(Hash)

AMQP exchange to subscribe to with keys :type, :name, and :options,

which are the standard AMQP ones plus
  :no_declare(Boolean):: Whether to skip declaring this exchange or queue on the broker
    to cause its creation; for use when caller does not have create permission or
    knows the object already exists and wants to avoid declare overhead
  :declare(Boolean):: Whether to delete this exchange or queue from the AMQP cache
    to force it to be declared on the broker and thus be created if it does not exist
packet(Packet)

Message to serialize and publish (must respond to :to_s(log_filter,

protocol_version) unless :no_serialize specified; if responds to :type, :from, :token,
and/or :one_way, these value are used if this message is returned as non-deliverable)
message(String)

Serialized message to be published

options(Hash)

Publish options – standard AMQP ones plus

:no_serialize(Boolean)

Do not serialize packet because it is already serialized

:log_filter(Array(Symbol))

Filters to be applied in to_s when logging packet to :info

:log_data(String)

Additional data to display at end of log entry

:no_log(Boolean)

Disable publish logging unless debug level

Return

(Boolean)

true if publish successfully, otherwise false



391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
# File 'lib/right_amqp/ha_client/broker_client.rb', line 391

def publish(exchange, packet, message, options = {})
  return false unless connected?
  begin
    exchange_options = exchange[:options] || {}
    unless options[:no_serialize]
      log_data = ""
      unless options[:no_log] && logger.level != :debug
        re = "RE-" if packet.respond_to?(:tries) && !packet.tries.empty?
        log_filter = options[:log_filter] unless logger.level == :debug
        log_data = "#{re}SEND #{@alias} #{packet.to_s(log_filter, :send_version)}"
        if logger.level == :debug
          log_data += ", publish options #{options.inspect}, exchange #{exchange[:name]}, " +
                      "type #{exchange[:type]}, options #{exchange[:options].inspect}"
        end
        log_data += ", #{options[:log_data]}" if options[:log_data]
        logger.info(log_data) unless log_data.empty?
      end
    end
    delete_amqp_resources(exchange[:type], exchange[:name]) if exchange_options[:declare]
    @channel.__send__(exchange[:type], exchange[:name], exchange_options).publish(message, options)
    true
  rescue StandardError => e
    logger.exception("Failed publishing to exchange #{exchange.inspect} on broker #{@alias}", e, :trace)
    @exception_stats.track("publish", e)
    update_non_delivery_stats("publish failure", e)
    false
  end
end

#queue_status(queue_names, &block) ⇒ Object

Check status of specified queues Silently ignore unknown queues If a queue whose status is being checked does not exist in the broker, this broker connection will fail and become unusable

Parameters

queue_names(Array)

Names of queues previously subscribed to

Block

Optional block to be called each time that status for a queue is retrieved with parameters queue name, message count, and consumer count; the counts are nil if there was a failure while trying to retrieve them; the block is not called for queues to which this client is not currently subscribed

Return

(Boolean)

true if connected, otherwise false, in which case block never gets called



353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
# File 'lib/right_amqp/ha_client/broker_client.rb', line 353

def queue_status(queue_names, &block)
  return false unless connected?
  @queues.each do |q|
    if queue_names.include?(q.name)
      begin
        q.status { |messages, consumers| block.call(q.name, messages, consumers) if block }
      rescue StandardError => e
        logger.exception("Failed checking status of queue #{q.name} on broker #{@alias}", e, :trace)
        @exception_stats.track("queue_status", e)
        block.call(q.name, nil, nil) if block
      end
    end
  end
  true
end

#set_alias(index) ⇒ Object

Set alias for broker for use in logs

Parameters

index(Integer)

Unique index for broker within given set

Return

(String)

Broker alias



161
162
163
# File 'lib/right_amqp/ha_client/broker_client.rb', line 161

def set_alias(index)
  @alias = "b#{index}"
end

#statsObject

Get broker client statistics

Return

(Hash)

Broker client stats with keys

“alias”(String)

Broker alias

“identity”(String)

Broker identity

“status”(Status)

Status of connection

“disconnect last”(Hash|nil)

Last disconnect information with key “elapsed”, or nil if none

“disconnects”(Integer|nil)

Number of times lost connection, or nil if none

“failure last”(Hash|nil)

Last connect failure information with key “elapsed”, or nil if none

“failures”(Integer|nil)

Number of failed attempts to connect to broker, or nil if none

“retries”(Integer|nil)

Number of connect retries, or nil if none



533
534
535
536
537
538
539
540
541
542
543
544
# File 'lib/right_amqp/ha_client/broker_client.rb', line 533

def stats
  {
    "alias"           => @alias,
    "identity"        => @identity,
    "status"          => @status.to_s,
    "disconnect last" => @disconnect_stats.last,
    "disconnects"     => RightSupport::Stats.nil_if_zero(@disconnect_stats.total),
    "failure last"    => @failure_stats.last,
    "failures"        => RightSupport::Stats.nil_if_zero(@failure_stats.total),
    "retries"         => RightSupport::Stats.nil_if_zero(@retries)
  }
end

#subscribe(queue, exchange = nil, options = {}, &block) ⇒ Object

Subscribe an AMQP queue to an AMQP exchange Do not wait for confirmation from broker that subscription is complete When a message is received, acknowledge, unserialize, and log it as specified If the message is unserialized and it is not of the right type, it is dropped after logging an error

Parameters

queue(Hash)

AMQP queue being subscribed with keys :name and :options,

which are the standard AMQP ones plus
  :no_declare(Boolean):: Whether to skip declaring this queue on the broker
    to cause its creation; for use when caller does not have permission to create or
    knows the queue already exists and wants to avoid declare overhead
exchange(Hash|nil)

AMQP exchange to subscribe to with keys :type, :name, and :options,

nil means use empty exchange by directly subscribing to queue; the :options are the
standard AMQP ones plus
  :no_declare(Boolean):: Whether to skip declaring this exchange on the broker
    to cause its creation; for use when caller does not have create permission or
    knows the exchange already exists and wants to avoid declare overhead
options(Hash)

Subscribe options:

:ack(Boolean)

Whether caller takes responsibility for explicitly acknowledging each

  message received, defaults to implicit acknowledgement in AMQP as part of message receipt
:no_unserialize(Boolean):: Do not unserialize message, this is an escape for special
  situations like enrollment, also implicitly disables receive filtering and logging;
  this option is implicitly invoked if initialize without a serializer
(packet class)(Array(Symbol)):: Filters to be applied in to_s when logging packet to :info,
  only packet classes specified are accepted, others are not processed but are logged with error
:category(String):: Packet category description to be used in error messages
:log_data(String):: Additional data to display at end of log entry
:no_log(Boolean):: Disable receive logging unless debug level
:exchange2(Hash):: Additional exchange to which same queue is to be bound
:brokers(Array):: Identity of brokers for which to subscribe, defaults to all usable if nil or empty
:fiber_pool(FiberPool):: Pool of initialized fibers to be used for asynchronous message
  processing (non-nil value will override constructor option setting)

Block

Required block with following parameters to be called each time exchange matches a message to the queue

identity(String):: Serialized identity of broker delivering the message
message(Packet|String):: Message received, which is unserialized unless :no_unserialize was specified
header(AMQP::Protocol::Header):: Message header (optional block parameter)

Raise

ArgumentError

If a block is not supplied

Return

(Boolean)

true if subscribe successfully or if already subscribed, otherwise false

Raises:

  • (ArgumentError)


233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
# File 'lib/right_amqp/ha_client/broker_client.rb', line 233

def subscribe(queue, exchange = nil, options = {}, &block)
  raise ArgumentError, "Must call this method with a block" unless block
  return false unless usable?
  return true unless @queues.select { |q| q.name == queue[:name] }.empty?

  to_exchange =  if exchange
    if options[:exchange2]
      " to exchanges #{exchange[:name]} and #{options[:exchange2][:name]}"
    else
      " to exchange #{exchange[:name]}"
    end
  end
  queue_options = queue[:options] || {}
  exchange_options = (exchange && exchange[:options]) || {}

  begin
    logger.info("[setup] Subscribing queue #{queue[:name]}#{to_exchange} on broker #{@alias}")
    q = @channel.queue(queue[:name], queue_options)
    @queues << q
    if exchange
      x = @channel.__send__(exchange[:type], exchange[:name], exchange_options)
      binding = q.bind(x, options[:key] ? {:key => options[:key]} : {})
      if (exchange2 = options[:exchange2])
        q.bind(@channel.__send__(exchange2[:type], exchange2[:name], exchange2[:options] || {}))
      end
      q = binding
    end
    q.subscribe(options[:ack] ? {:ack => true} : {}) do |header, message|
      begin
        if (pool = (options[:fiber_pool] || @options[:fiber_pool]))
          pool.spawn { receive(queue[:name], header, message, options, &block) }
        else
          receive(queue[:name], header, message, options, &block)
        end
      rescue StandardError => e
        header.ack if options[:ack]
        logger.exception("Failed setting up to receive message from queue #{queue.inspect} " +
                         "on broker #{@alias}", e, :trace)
        @exception_stats.track("receive", e)
        update_non_delivery_stats("receive failure", e)
      end
    end
  rescue StandardError => e
    logger.exception("Failed subscribing queue #{queue.inspect}#{to_exchange} on broker #{@alias}", e, :trace)
    @exception_stats.track("subscribe", e)
    false
  end
end

#summaryObject

Get broker client information summarizing its status

Return

(Hash)

Status of broker with keys

:identity(String)

Serialized identity

:alias(String)

Alias used in logs

:status(Symbol)

Status of connection

:disconnects(Integer)

Number of times lost connection

:failures(Integer)

Number of times connect failed

:retries(Integer)

Number of attempts to connect after failure



510
511
512
513
514
515
516
517
518
519
# File 'lib/right_amqp/ha_client/broker_client.rb', line 510

def summary
  {
    :identity    => @identity,
    :alias       => @alias,
    :status      => @status,
    :retries     => @retries,
    :disconnects => @disconnect_stats.total,
    :failures    => @failure_stats.total,
  }
end

#unsubscribe(queue_names, &block) ⇒ Object

Unsubscribe from the specified queues Silently ignore unknown queues

Parameters

queue_names(Array)

Names of queues previously subscribed to

Block

Optional block to be called with no parameters when each unsubscribe completes

Return

true

Always return true



293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
# File 'lib/right_amqp/ha_client/broker_client.rb', line 293

def unsubscribe(queue_names, &block)
  unless failed?
    @queues.reject! do |q|
      if queue_names.include?(q.name)
        begin
          logger.info("[stop] Unsubscribing queue #{q.name} on broker #{@alias}")
          q.unsubscribe { block.call if block }
        rescue StandardError => e
          logger.exception("Failed unsubscribing queue #{q.name} on broker #{@alias}", e, :trace)
          @exception_stats.track("unsubscribe", e)
          block.call if block
        end
        true
      else
        false
      end
    end
  end
  true
end

#update_status(status) ⇒ Object

Callback from AMQP with connection status or from HABrokerClient Makes client callback with :connected or :disconnected status if boundary crossed

Parameters

status(Symbol)

Status of connection (:connected, :disconnected, :stopping, :failed, :closed)

Return

true

Always return true



554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
# File 'lib/right_amqp/ha_client/broker_client.rb', line 554

def update_status(status)
  # Do not let closed connection regress to failed
  return true if status == :failed && @status == :closed

  # Wait until connection is ready (i.e. handshake with broker is completed) before
  # changing our status to connected
  return true if status == :connected
  status = :connected if status == :ready

  before = @status
  @status = status

  if status == :connected
    update_success
  elsif status == :failed
    update_failure
  elsif status == :disconnected && before != :disconnected
    @disconnect_stats.update
  end

  unless status == before || @options[:update_status_callback].nil?
    @options[:update_status_callback].call(self, before == :connected)
  end
  true
end

#usable?Boolean

Determine whether the broker connection is usable, i.e., connecting or confirmed connected

Return

(Boolean)

true if usable, otherwise false

Returns:

  • (Boolean)


169
170
171
# File 'lib/right_amqp/ha_client/broker_client.rb', line 169

def usable?
  [:connected, :connecting].include?(@status)
end