Class: PikaQ::Consumers::Base
- Inherits:
-
Object
- Object
- PikaQ::Consumers::Base
- Defined in:
- lib/pika_q/consumers/base.rb
Class Attribute Summary collapse
-
.consumer ⇒ Object
readonly
Returns the value of attribute consumer.
-
.consumer_tag ⇒ Object
readonly
Returns the value of attribute consumer_tag.
-
.exchange ⇒ Object
readonly
Returns the value of attribute exchange.
-
.queue ⇒ Object
readonly
Returns the value of attribute queue.
-
.routing_key ⇒ Object
readonly
Returns the value of attribute routing_key.
Class Method Summary collapse
Class Attribute Details
.consumer ⇒ Object (readonly)
Returns the value of attribute consumer.
7 8 9 |
# File 'lib/pika_q/consumers/base.rb', line 7 def consumer @consumer end |
.consumer_tag ⇒ Object (readonly)
Returns the value of attribute consumer_tag.
7 8 9 |
# File 'lib/pika_q/consumers/base.rb', line 7 def consumer_tag @consumer_tag end |
.exchange ⇒ Object (readonly)
Returns the value of attribute exchange.
7 8 9 |
# File 'lib/pika_q/consumers/base.rb', line 7 def exchange @exchange end |
.queue ⇒ Object (readonly)
Returns the value of attribute queue.
7 8 9 |
# File 'lib/pika_q/consumers/base.rb', line 7 def queue @queue end |
.routing_key ⇒ Object (readonly)
Returns the value of attribute routing_key.
7 8 9 |
# File 'lib/pika_q/consumers/base.rb', line 7 def routing_key @routing_key end |
Class Method Details
.config(options = {}) ⇒ Object
10 11 12 13 14 15 16 17 |
# File 'lib/pika_q/consumers/base.rb', line 10 def self.config( = {}) ||= {} if .nil? = default_config.merge() @consumer_tag = .fetch(:consumer_tag) @exchange = .fetch(:exchange) @routing_key = .fetch(:routing_key) @queue = .fetch(:queue) end |
.start(channel, &block) ⇒ Object
19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 |
# File 'lib/pika_q/consumers/base.rb', line 19 def self.start(channel, &block) @exchange = exchange.establish(channel) if exchange.respond_to? :establish @queue = queue.create(channel) if queue.respond_to? :create unless exchange.predeclared? queue.bind(exchange, { routing_key: routing_key }) end @consumer = queue.subscribe(manual_ack: true, block: true, consumer_tag: consumer_tag, &Proc.new) consumer rescue Interrupt consumer.cancel consumer.channel.close end |