Class: Liebre::Actor::Consumer::Core
- Inherits:
-
Object
- Object
- Liebre::Actor::Consumer::Core
- Defined in:
- lib/liebre/actor/consumer/core.rb
Constant Summary collapse
- OPTS =
{:block => false, :manual_ack => true}
Instance Method Summary collapse
- #ack(info, opts) ⇒ Object
- #clean ⇒ Object
- #consume(info, meta, payload) ⇒ Object
- #failed(info, error) ⇒ Object
-
#initialize(consumer, resources, context, callback_class) ⇒ Core
constructor
A new instance of Core.
- #nack(info, opts) ⇒ Object
- #reject(info, opts) ⇒ Object
- #start ⇒ Object
- #stop ⇒ Object
Constructor Details
#initialize(consumer, resources, context, callback_class) ⇒ Core
Returns a new instance of Core.
8 9 10 11 12 13 |
# File 'lib/liebre/actor/consumer/core.rb', line 8 def initialize consumer, resources, context, callback_class @consumer = consumer @resources = resources @context = context @callback_class = callback_class end |
Instance Method Details
#ack(info, opts) ⇒ Object
34 |
# File 'lib/liebre/actor/consumer/core.rb', line 34 def ack(info, opts) queue.ack(info, opts); end |
#clean ⇒ Object
42 43 44 45 46 47 |
# File 'lib/liebre/actor/consumer/core.rb', line 42 def clean queue.delete exchange.delete dead_queue.delete dead_exchange.delete end |
#consume(info, meta, payload) ⇒ Object
26 27 28 29 30 31 32 |
# File 'lib/liebre/actor/consumer/core.rb', line 26 def consume info, , payload callback = callback_class.new(consumer, info) handler.call(payload, , callback) do |error| callback.failed(error) end end |
#failed(info, error) ⇒ Object
38 39 40 |
# File 'lib/liebre/actor/consumer/core.rb', line 38 def failed info, error queue.reject(info, {}) end |
#nack(info, opts) ⇒ Object
35 |
# File 'lib/liebre/actor/consumer/core.rb', line 35 def nack(info, opts) queue.nack(info, opts); end |
#reject(info, opts) ⇒ Object
36 |
# File 'lib/liebre/actor/consumer/core.rb', line 36 def reject(info, opts) queue.reject(info, opts); end |
#start ⇒ Object
15 16 17 18 19 20 |
# File 'lib/liebre/actor/consumer/core.rb', line 15 def start queue.subscribe(OPTS) do |info, , payload| consumer.consume(info, , payload) end dead_queue end |
#stop ⇒ Object
22 23 24 |
# File 'lib/liebre/actor/consumer/core.rb', line 22 def stop chan.close end |