Module: Asynk::Consumer
- Defined in:
- lib/asynk/consumer.rb
Defined Under Namespace
Modules: ClassMethods
Class Method Summary collapse
Instance Method Summary collapse
- #ack! ⇒ Object
- #initialize(channel, delivery_info, &block) ⇒ Object
- #invoke_processing(message) ⇒ Object
- #logger ⇒ Object
- #reject! ⇒ Object
- #requeue! ⇒ Object
- #respond(result) ⇒ Object
Class Method Details
.included(base) ⇒ Object
4 5 6 7 8 |
# File 'lib/asynk/consumer.rb', line 4 def self.included(base) base.extend(ClassMethods) base.include ActiveSupport::Rescuable Asynk.register_consumer(base) end |
Instance Method Details
#ack! ⇒ Object
16 17 18 |
# File 'lib/asynk/consumer.rb', line 16 def ack! @channel.ack(@delivery_info.delivery_tag) end |
#initialize(channel, delivery_info, &block) ⇒ Object
10 11 12 13 14 |
# File 'lib/asynk/consumer.rb', line 10 def initialize(channel, delivery_info, &block) @channel = channel @delivery_info = delivery_info @callback_block = block end |
#invoke_processing(message) ⇒ Object
36 37 38 39 40 41 42 43 44 45 |
# File 'lib/asynk/consumer.rb', line 36 def invoke_processing() method_for_exec = (self.class.route_ending_as_action? && .routing_key) ? self.class.action_name_from_routing_key(.routing_key) : :process begin public_send(method_for_exec, ) rescue Exception => ex raise(ex) unless rescue_with_handler(ex) end end |
#reject! ⇒ Object
20 21 22 |
# File 'lib/asynk/consumer.rb', line 20 def reject! @channel.reject(@delivery_info.delivery_tag) end |
#requeue! ⇒ Object
24 25 26 |
# File 'lib/asynk/consumer.rb', line 24 def requeue! @channel.reject(@delivery_info.delivery_tag, true) end |
#respond(result) ⇒ Object
32 33 34 |
# File 'lib/asynk/consumer.rb', line 32 def respond(result) @callback_block.call(result) end |