Module: Asynk::Consumer

Defined in:
lib/asynk/consumer.rb

Defined Under Namespace

Modules: ClassMethods

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.included(base) ⇒ Object



4
5
6
7
8
# File 'lib/asynk/consumer.rb', line 4

def self.included(base)
  base.extend(ClassMethods)
  base.include ActiveSupport::Rescuable
  Asynk.register_consumer(base)
end

Instance Method Details

#ack!Object



16
17
18
# File 'lib/asynk/consumer.rb', line 16

def ack!
  @channel.ack(@delivery_info.delivery_tag)
end

#initialize(channel, delivery_info, &block) ⇒ Object



10
11
12
13
14
# File 'lib/asynk/consumer.rb', line 10

def initialize(channel, delivery_info, &block)
  @channel = channel
  @delivery_info = delivery_info
  @callback_block = block
end

#invoke_processing(message) ⇒ Object



36
37
38
39
40
41
42
43
44
45
# File 'lib/asynk/consumer.rb', line 36

def invoke_processing(message)
  method_for_exec = (self.class.route_ending_as_action? && message.routing_key) ?
                      self.class.action_name_from_routing_key(message.routing_key) : :process

  begin
    public_send(method_for_exec, message)
  rescue Exception => ex
    raise(ex) unless rescue_with_handler(ex)
  end
end

#loggerObject



28
29
30
# File 'lib/asynk/consumer.rb', line 28

def logger
  Asynk.logger
end

#reject!Object



20
21
22
# File 'lib/asynk/consumer.rb', line 20

def reject!
  @channel.reject(@delivery_info.delivery_tag)
end

#requeue!Object



24
25
26
# File 'lib/asynk/consumer.rb', line 24

def requeue!
  @channel.reject(@delivery_info.delivery_tag, true)
end

#respond(result) ⇒ Object



32
33
34
# File 'lib/asynk/consumer.rb', line 32

def respond(result)
  @callback_block.call(result)
end