Class: BBK::AMQP::Consumer

Inherits:
Object
  • Object
show all
Defined in:
lib/bbk/amqp/consumer.rb

Constant Summary collapse

DEFAULT_OPTIONS =
{
  consumer_pool_size:               3,
  consumer_pool_abort_on_exception: true,
  prefetch_size:                    10,
  consumer_tag:                     nil,
  rejection_policy:                 RejectionPolicies::Requeue.new
}.freeze
PROTOCOLS =
%w[mq amqp amqps].freeze

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(connection, queue_name: nil, **options) ⇒ Consumer

Returns a new instance of Consumer.



20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
# File 'lib/bbk/amqp/consumer.rb', line 20

def initialize(connection, queue_name: nil, **options)
  @connection = connection
  @channel = options.delete(:channel)
  @queue = options.delete(:queue)

  if @queue.nil? && queue_name.nil?
    raise ArgumentError.new('queue_name or queue must be provided!')
  end

  @queue_name = @queue&.name || queue_name

  @options = DEFAULT_OPTIONS.merge(options)
  @rejection_policy = @options.delete(:rejection_policy)

  logger = @options.fetch(:logger, BBK::AMQP.logger)
  logger = logger.respond_to?(:tagged) ? logger : ActiveSupport::TaggedLogging.new(logger)
  @logger = BBK::Utils::ProxyLogger.new(logger, tags: [self.class.to_s, queue_name])
end

Instance Attribute Details

#connectionObject (readonly)

Returns the value of attribute connection.



8
9
10
# File 'lib/bbk/amqp/consumer.rb', line 8

def connection
  @connection
end

#loggerObject (readonly)

Returns the value of attribute logger.



8
9
10
# File 'lib/bbk/amqp/consumer.rb', line 8

def logger
  @logger
end

#optionsObject (readonly)

Returns the value of attribute options.



8
9
10
# File 'lib/bbk/amqp/consumer.rb', line 8

def options
  @options
end

#queueObject (readonly)

Returns the value of attribute queue.



8
9
10
# File 'lib/bbk/amqp/consumer.rb', line 8

def queue
  @queue
end

#queue_nameObject (readonly)

Returns the value of attribute queue_name.



8
9
10
# File 'lib/bbk/amqp/consumer.rb', line 8

def queue_name
  @queue_name
end

#rejection_policyObject (readonly)

Returns the value of attribute rejection_policy.



8
9
10
# File 'lib/bbk/amqp/consumer.rb', line 8

def rejection_policy
  @rejection_policy
end

Instance Method Details

#ack(incoming, *args, answer: nil, **kwargs) ⇒ Object

Note:

answer should processing amqp publisher

Ack incoming message and not send answer.

Parameters:

  • incoming (BBK::AMQP::Message)

    consumed message from amqp channel

  • answer (BBK::App::Dispatcher::Result) (defaults to: nil)

    answer message



78
79
80
81
82
83
84
# File 'lib/bbk/amqp/consumer.rb', line 78

def ack(incoming, *args, answer: nil, **kwargs)
  # [] - для работы тестов. В реальности вернется объект VersionedDeliveryTag у
  #  которого to_i (вызывается внутри channel.ack) вернет фактическоe число
  # logger.debug "Ack message #{incoming.headers[:type]}[#{incoming.headers[:message_id]}] on channel: #{incoming.delivery_info[:channel]&.id}[#{incoming.delivery_info[:channel]&.object_id}] delivery tag: #{incoming.delivery_info[:delivery_tag].to_i}"
  logger.debug "Ack message #{incoming.headers[:type]}[#{incoming.headers[:message_id]}]  delivery tag: #{incoming.delivery_info[:delivery_tag].to_i}"
  incoming.delivery_info[:channel].ack incoming.delivery_info[:delivery_tag]
end

#closeObject

Close consumer - try close amqp channel



104
105
106
107
108
109
110
111
112
113
# File 'lib/bbk/amqp/consumer.rb', line 104

def close
  @channel.tap do |c|
    return nil unless c

    logger.info 'Closing...'
    @channel = nil
    c.close
    logger.info 'Stopped'
  end
end

#nack(incoming, *args, error: nil, **_kwargs) ⇒ Object

Nack incoming message

Parameters:



88
89
90
# File 'lib/bbk/amqp/consumer.rb', line 88

def nack(incoming, *args, error: nil, **_kwargs)
  rejection_policy.call(incoming, error)
end

#protocolsObject

Return protocol list which consumer support



40
41
42
# File 'lib/bbk/amqp/consumer.rb', line 40

def protocols
  PROTOCOLS
end

#run(msg_stream) ⇒ Object

Running non blocking consumer

Parameters:

  • msg_stream (Enumerable)
    • object with << method



46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
# File 'lib/bbk/amqp/consumer.rb', line 46

def run(msg_stream)
  @channel ||= @connection.create_channel(nil, options[:consumer_pool_size],
                                          options[:consumer_pool_abort_on_exception]).tap do |ch|
    ch.prefetch(options[:prefetch_size])
  end

  logger.add_tags "Ch##{@channel.id}"

  @queue ||= @channel.queue(queue_name, passive: true)

  subscribe_opts = {
    block:        false,
    manual_ack:   true,
    consumer_tag: options[:consumer_tag],
    exclusive: options.fetch(:exclusive, false)
  }.compact

  logger.info 'Starting...'
  @subscription = queue.subscribe(subscribe_opts) do |delivery_info, , payload|
    message = Message.new(self, delivery_info, , payload)
    # logger.debug "Consumed message #{message.headers[:type]}[#{message.headers[:message_id]}] on channel: #{delivery_info.channel&.id}[#{delivery_info.channel&.object_id}] delivery tag: #{message.delivery_info[:delivery_tag].to_i}"
    logger.debug "Consumed message #{message.headers[:type]}[#{message.headers[:message_id]}] delivery tag: #{message.delivery_info[:delivery_tag].to_i}"

    msg_stream << message
  end
  msg_stream
end

#stopObject

stop consuming messages



93
94
95
96
97
98
99
100
101
# File 'lib/bbk/amqp/consumer.rb', line 93

def stop
  @subscription.tap do |s|
    return nil unless s

    logger.info 'Stopping...'
    @subscription = nil
    s.cancel
  end
end