Class: BBK::AMQP::Consumer
- Inherits:
-
Object
- Object
- BBK::AMQP::Consumer
- Defined in:
- lib/bbk/amqp/consumer.rb
Constant Summary collapse
- DEFAULT_OPTIONS =
{ consumer_pool_size: 3, consumer_pool_abort_on_exception: true, prefetch_size: 10, consumer_tag: nil, rejection_policy: RejectionPolicies::Requeue.new }.freeze
- PROTOCOLS =
%w[mq amqp amqps].freeze
Instance Attribute Summary collapse
-
#connection ⇒ Object
readonly
Returns the value of attribute connection.
-
#logger ⇒ Object
readonly
Returns the value of attribute logger.
-
#options ⇒ Object
readonly
Returns the value of attribute options.
-
#queue ⇒ Object
readonly
Returns the value of attribute queue.
-
#queue_name ⇒ Object
readonly
Returns the value of attribute queue_name.
-
#rejection_policy ⇒ Object
readonly
Returns the value of attribute rejection_policy.
Instance Method Summary collapse
-
#ack(incoming, *args, answer: nil, **kwargs) ⇒ Object
Ack incoming message and not send answer.
-
#close ⇒ Object
Close consumer - try close amqp channel.
-
#initialize(connection, queue_name: nil, **options) ⇒ Consumer
constructor
A new instance of Consumer.
-
#nack(incoming, *args, error: nil, **_kwargs) ⇒ Object
Nack incoming message.
-
#protocols ⇒ Object
Return protocol list which consumer support.
-
#run(msg_stream) ⇒ Object
Running non blocking consumer.
-
#stop ⇒ Object
stop consuming messages.
Constructor Details
#initialize(connection, queue_name: nil, **options) ⇒ Consumer
Returns a new instance of Consumer.
20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 |
# File 'lib/bbk/amqp/consumer.rb', line 20 def initialize(connection, queue_name: nil, **) @connection = connection @channel = .delete(:channel) @queue = .delete(:queue) if @queue.nil? && queue_name.nil? raise ArgumentError.new('queue_name or queue must be provided!') end @queue_name = @queue&.name || queue_name = DEFAULT_OPTIONS.merge() @rejection_policy = .delete(:rejection_policy) logger = .fetch(:logger, BBK::AMQP.logger) logger = logger.respond_to?(:tagged) ? logger : ActiveSupport::TaggedLogging.new(logger) @logger = BBK::Utils::ProxyLogger.new(logger, tags: [self.class.to_s, queue_name]) end |
Instance Attribute Details
#connection ⇒ Object (readonly)
Returns the value of attribute connection.
8 9 10 |
# File 'lib/bbk/amqp/consumer.rb', line 8 def connection @connection end |
#logger ⇒ Object (readonly)
Returns the value of attribute logger.
8 9 10 |
# File 'lib/bbk/amqp/consumer.rb', line 8 def logger @logger end |
#options ⇒ Object (readonly)
Returns the value of attribute options.
8 9 10 |
# File 'lib/bbk/amqp/consumer.rb', line 8 def end |
#queue ⇒ Object (readonly)
Returns the value of attribute queue.
8 9 10 |
# File 'lib/bbk/amqp/consumer.rb', line 8 def queue @queue end |
#queue_name ⇒ Object (readonly)
Returns the value of attribute queue_name.
8 9 10 |
# File 'lib/bbk/amqp/consumer.rb', line 8 def queue_name @queue_name end |
#rejection_policy ⇒ Object (readonly)
Returns the value of attribute rejection_policy.
8 9 10 |
# File 'lib/bbk/amqp/consumer.rb', line 8 def rejection_policy @rejection_policy end |
Instance Method Details
#ack(incoming, *args, answer: nil, **kwargs) ⇒ Object
answer should processing amqp publisher
Ack incoming message and not send answer.
78 79 80 81 82 83 84 |
# File 'lib/bbk/amqp/consumer.rb', line 78 def ack(incoming, *args, answer: nil, **kwargs) # [] - для работы тестов. В реальности вернется объект VersionedDeliveryTag у # которого to_i (вызывается внутри channel.ack) вернет фактическоe число # logger.debug "Ack message #{incoming.headers[:type]}[#{incoming.headers[:message_id]}] on channel: #{incoming.delivery_info[:channel]&.id}[#{incoming.delivery_info[:channel]&.object_id}] delivery tag: #{incoming.delivery_info[:delivery_tag].to_i}" logger.debug "Ack message #{incoming.headers[:type]}[#{incoming.headers[:message_id]}] delivery tag: #{incoming.delivery_info[:delivery_tag].to_i}" incoming.delivery_info[:channel].ack incoming.delivery_info[:delivery_tag] end |
#close ⇒ Object
Close consumer - try close amqp channel
104 105 106 107 108 109 110 111 112 113 |
# File 'lib/bbk/amqp/consumer.rb', line 104 def close @channel.tap do |c| return nil unless c logger.info 'Closing...' @channel = nil c.close logger.info 'Stopped' end end |
#nack(incoming, *args, error: nil, **_kwargs) ⇒ Object
Nack incoming message
88 89 90 |
# File 'lib/bbk/amqp/consumer.rb', line 88 def nack(incoming, *args, error: nil, **_kwargs) rejection_policy.call(incoming, error) end |
#protocols ⇒ Object
Return protocol list which consumer support
40 41 42 |
# File 'lib/bbk/amqp/consumer.rb', line 40 def protocols PROTOCOLS end |
#run(msg_stream) ⇒ Object
Running non blocking consumer
46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 |
# File 'lib/bbk/amqp/consumer.rb', line 46 def run(msg_stream) @channel ||= @connection.create_channel(nil, [:consumer_pool_size], [:consumer_pool_abort_on_exception]).tap do |ch| ch.prefetch([:prefetch_size]) end logger. "Ch##{@channel.id}" @queue ||= @channel.queue(queue_name, passive: true) subscribe_opts = { block: false, manual_ack: true, consumer_tag: [:consumer_tag], exclusive: .fetch(:exclusive, false) }.compact logger.info 'Starting...' @subscription = queue.subscribe(subscribe_opts) do |delivery_info, , payload| = Message.new(self, delivery_info, , payload) # logger.debug "Consumed message #{message.headers[:type]}[#{message.headers[:message_id]}] on channel: #{delivery_info.channel&.id}[#{delivery_info.channel&.object_id}] delivery tag: #{message.delivery_info[:delivery_tag].to_i}" logger.debug "Consumed message #{message.headers[:type]}[#{message.headers[:message_id]}] delivery tag: #{message.delivery_info[:delivery_tag].to_i}" msg_stream << end msg_stream end |
#stop ⇒ Object
stop consuming messages
93 94 95 96 97 98 99 100 101 |
# File 'lib/bbk/amqp/consumer.rb', line 93 def stop @subscription.tap do |s| return nil unless s logger.info 'Stopping...' @subscription = nil s.cancel end end |