Class: Consumers::Base
- Inherits:
-
Object
- Object
- Consumers::Base
- Defined in:
- lib/smart_que/consumers/base.rb
Direct Known Subclasses
Constant Summary collapse
- QUEUE_NAME =
The queue name should be defined here.
'smart_que.default'
Instance Attribute Summary collapse
-
#queue_name ⇒ Object
Returns the value of attribute queue_name.
Instance Method Summary collapse
-
#channel ⇒ Object
Create channel with the established connection.
- #config ⇒ Object
-
#connection ⇒ Object
Establish connection to Message Queues.
-
#queue ⇒ Object
This method will return the default queue which present in the message queues.
-
#start ⇒ Object
Method which kick start the consumer process thread.
- #wait_for_threads ⇒ Object
Instance Attribute Details
#queue_name ⇒ Object
Returns the value of attribute queue_name.
8 9 10 |
# File 'lib/smart_que/consumers/base.rb', line 8 def queue_name @queue_name end |
Instance Method Details
#channel ⇒ Object
Create channel with the established connection.
27 28 29 |
# File 'lib/smart_que/consumers/base.rb', line 27 def channel @channel ||= connection.create_channel end |
#config ⇒ Object
31 32 33 |
# File 'lib/smart_que/consumers/base.rb', line 31 def config ::SmartQue.config end |
#connection ⇒ Object
Establish connection to Message Queues.
22 23 24 |
# File 'lib/smart_que/consumers/base.rb', line 22 def connection ::SmartQue.establish_connection end |
#queue ⇒ Object
This method will return the default queue which present in the message queues. Consumer specific queue should be defined and implemented in the consumer sub classes.
17 18 19 |
# File 'lib/smart_que/consumers/base.rb', line 17 def queue @queue ||= channel.queue(queue_name) end |
#start ⇒ Object
Method which kick start the consumer process thread
36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 |
# File 'lib/smart_que/consumers/base.rb', line 36 def start channel.prefetch(10) queue.subscribe(manual_ack: true, exclusive: false) do |delivery_info, , payload| begin body = JSON.parse(payload).with_indifferent_access status = run(body) rescue => e status = :error end if status == :ok channel.ack(delivery_info.delivery_tag) elsif status == :retry channel.reject(delivery_info.delivery_tag, true) else # :error, nil etc channel.reject(delivery_info.delivery_tag, false) end end wait_for_threads end |
#wait_for_threads ⇒ Object
58 59 60 |
# File 'lib/smart_que/consumers/base.rb', line 58 def wait_for_threads sleep end |