Class: BBK::AMQP::Consumer
- Inherits:
-
Object
- Object
- BBK::AMQP::Consumer
- Defined in:
- lib/bbk/amqp/consumer.rb
Constant Summary collapse
- DEFAULT_OPTIONS =
{ consumer_pool_size: 3, consumer_pool_abort_on_exception: true, prefetch_size: 10, consumer_tag: nil, rejection_policy: RejectionPolicies::Requeue.new }.freeze
- PROTOCOLS =
%w[mq amqp amqps].freeze
Instance Attribute Summary collapse
-
#connection ⇒ Object
readonly
Returns the value of attribute connection.
-
#logger ⇒ Object
readonly
Returns the value of attribute logger.
-
#options ⇒ Object
readonly
Returns the value of attribute options.
-
#publisher ⇒ Object
Returns the value of attribute publisher.
-
#queue ⇒ Object
readonly
Returns the value of attribute queue.
-
#queue_name ⇒ Object
readonly
Returns the value of attribute queue_name.
-
#rejection_policy ⇒ Object
readonly
Returns the value of attribute rejection_policy.
Instance Method Summary collapse
-
#ack(incoming, *args, answer: nil, **kwargs) ⇒ Object
Ack incoming message and not send answer.
-
#close ⇒ Object
Close consumer - try close amqp channel.
-
#initialize(connection, queue_name: nil, publisher: nil, **options) ⇒ Consumer
constructor
A new instance of Consumer.
-
#nack(incoming, *args, error: nil, **_kwargs) ⇒ Object
Nack incoming message.
-
#protocols ⇒ Object
Return protocol list which consumer support.
-
#run(msg_stream) ⇒ Object
Running non blocking consumer.
-
#stop ⇒ Object
stop consuming messages.
Constructor Details
#initialize(connection, queue_name: nil, publisher: nil, **options) ⇒ Consumer
Returns a new instance of Consumer.
21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 |
# File 'lib/bbk/amqp/consumer.rb', line 21 def initialize(connection, queue_name: nil, publisher: nil, **) @connection = connection @channel = .delete(:channel) @queue = .delete(:queue) @publisher = publisher if @queue.nil? && queue_name.nil? raise ArgumentError.new('queue_name or queue must be provided!') end @queue_name = @queue&.name || queue_name @options = DEFAULT_OPTIONS.merge() @rejection_policy = @options.delete(:rejection_policy) logger = @options.fetch(:logger, BBK::AMQP.logger) logger = logger.respond_to?(:tagged) ? logger : ActiveSupport::TaggedLogging.new(logger) @logger = BBK::Utils::ProxyLogger.new(logger, tags: [self.class.to_s, queue_name]) end |
Instance Attribute Details
#connection ⇒ Object (readonly)
Returns the value of attribute connection.
8 9 10 |
# File 'lib/bbk/amqp/consumer.rb', line 8 def connection @connection end |
#logger ⇒ Object (readonly)
Returns the value of attribute logger.
8 9 10 |
# File 'lib/bbk/amqp/consumer.rb', line 8 def logger @logger end |
#options ⇒ Object (readonly)
Returns the value of attribute options.
8 9 10 |
# File 'lib/bbk/amqp/consumer.rb', line 8 def @options end |
#publisher ⇒ Object
Returns the value of attribute publisher.
9 10 11 |
# File 'lib/bbk/amqp/consumer.rb', line 9 def publisher @publisher end |
#queue ⇒ Object (readonly)
Returns the value of attribute queue.
8 9 10 |
# File 'lib/bbk/amqp/consumer.rb', line 8 def queue @queue end |
#queue_name ⇒ Object (readonly)
Returns the value of attribute queue_name.
8 9 10 |
# File 'lib/bbk/amqp/consumer.rb', line 8 def queue_name @queue_name end |
#rejection_policy ⇒ Object (readonly)
Returns the value of attribute rejection_policy.
8 9 10 |
# File 'lib/bbk/amqp/consumer.rb', line 8 def rejection_policy @rejection_policy end |
Instance Method Details
#ack(incoming, *args, answer: nil, **kwargs) ⇒ Object
answer should processing amqp publisher
Ack incoming message and not send answer.
80 81 82 83 84 85 86 87 |
# File 'lib/bbk/amqp/consumer.rb', line 80 def ack(incoming, *args, answer: nil, **kwargs) # [] - для работы тестов. В реальности вернется объект VersionedDeliveryTag у # которого to_i (вызывается внутри channel.ack) вернет фактическоe число # logger.debug "Ack message #{incoming.headers[:type]}[#{incoming.headers[:message_id]}] on channel: #{incoming.delivery_info[:channel]&.id}[#{incoming.delivery_info[:channel]&.object_id}] delivery tag: #{incoming.delivery_info[:delivery_tag].to_i}" send_answer(incoming, answer) unless answer.nil? logger.debug "Ack message #{incoming.headers[:type]}[#{incoming.headers[:message_id]}] delivery tag: #{incoming.delivery_info[:delivery_tag].to_i}" incoming.delivery_info[:channel].ack incoming.delivery_info[:delivery_tag] end |
#close ⇒ Object
Close consumer - try close amqp channel
116 117 118 119 120 121 122 123 124 125 |
# File 'lib/bbk/amqp/consumer.rb', line 116 def close @channel.tap do |c| return nil unless c logger.info 'Closing...' @channel = nil c.close logger.info 'Stopped' end end |
#nack(incoming, *args, error: nil, **_kwargs) ⇒ Object
Nack incoming message
100 101 102 |
# File 'lib/bbk/amqp/consumer.rb', line 100 def nack(incoming, *args, error: nil, **_kwargs) rejection_policy.call(incoming, error) end |
#protocols ⇒ Object
Return protocol list which consumer support
42 43 44 |
# File 'lib/bbk/amqp/consumer.rb', line 42 def protocols PROTOCOLS end |
#run(msg_stream) ⇒ Object
Running non blocking consumer
48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 |
# File 'lib/bbk/amqp/consumer.rb', line 48 def run(msg_stream) @channel ||= @connection.create_channel(nil, [:consumer_pool_size], [:consumer_pool_abort_on_exception]).tap do |ch| ch.prefetch([:prefetch_size]) end logger. "Ch##{@channel.id}" @queue ||= @channel.queue(queue_name, passive: true) subscribe_opts = { block: false, manual_ack: true, consumer_tag: [:consumer_tag], exclusive: .fetch(:exclusive, false) }.compact logger.info 'Starting...' @subscription = queue.subscribe(subscribe_opts) do |delivery_info, , payload| = Message.new(self, delivery_info, , payload) # logger.debug "Consumed message #{message.headers[:type]}[#{message.headers[:message_id]}] on channel: #{delivery_info.channel&.id}[#{delivery_info.channel&.object_id}] delivery tag: #{message.delivery_info[:delivery_tag].to_i}" logger.debug "Consumed message #{.headers[:type]}[#{.headers[:message_id]}] delivery tag: #{.delivery_info[:delivery_tag].to_i}" msg_stream << end msg_stream end |
#stop ⇒ Object
stop consuming messages
105 106 107 108 109 110 111 112 113 |
# File 'lib/bbk/amqp/consumer.rb', line 105 def stop @subscription.tap do |s| return nil unless s logger.info 'Stopping...' @subscription = nil s.cancel end end |