Class: Tackle::Consumer

Inherits:
Object
  • Object
show all
Defined in:
lib/tackle/consumer.rb,
lib/tackle/consumer/queue.rb,
lib/tackle/consumer/params.rb,
lib/tackle/consumer/message.rb,
lib/tackle/consumer/exchange.rb,
lib/tackle/consumer/dead_queue.rb,
lib/tackle/consumer/main_queue.rb,
lib/tackle/consumer/delay_queue.rb

Defined Under Namespace

Classes: DeadQueue, DelayQueue, Exchange, MainQueue, Message, Params, Queue

Instance Method Summary collapse

Constructor Details

#initialize(params) ⇒ Consumer

Returns a new instance of Consumer.



13
14
15
16
17
18
# File 'lib/tackle/consumer.rb', line 13

def initialize(params)
  @params = params
  @logger = @params.logger

  setup_rabbit_connections
end

Instance Method Details

#process_message(message, &block) ⇒ Object



43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
# File 'lib/tackle/consumer.rb', line 43

def process_message(message, &block)
  message.log_info "Calling message processor"

  response = block.call(message.payload)

  unless @params.manual_ack?
    response = Tackle::ACK
  end

  case response
  when Tackle::ACK
    message.ack
  when Tackle::NACK
    redeliver_message(message, "Received Tackle::NACK")
  else
    raise "Response must be either Tackle::ACK or Tackle::NACK"
  end
rescue Exception => ex
  redeliver_message(message, "Received exception '#{ex}'")

  raise ex
end

#redeliver_message(message, reason) ⇒ Object



66
67
68
69
70
71
72
73
74
75
76
77
# File 'lib/tackle/consumer.rb', line 66

def redeliver_message(message, reason)
  message.log_error "Failed to process message. #{reason}"
  message.log_error "Retry count #{message.retry_count}/#{@params.retry_limit}"

  if message.retry_count < @params.retry_limit
    @delay_queue.publish(message)
  else
    @dead_queue.publish(message)
  end

  message.nack
end

#setup_rabbit_connectionsObject



20
21
22
23
24
25
26
27
28
29
# File 'lib/tackle/consumer.rb', line 20

def setup_rabbit_connections
  @connection = Tackle::Connection.new(@params.amqp_url, @params.exception_handler, @logger)

  @exchange    = Exchange.new(@params.service, @params.routing_key, @connection, @logger)
  @main_queue  = MainQueue.new(@exchange, @connection, @logger)
  @delay_queue = DelayQueue.new(@params.retry_delay, @exchange, @connection, @logger)
  @dead_queue  = DeadQueue.new(@exchange, @connection, @logger)

  @exchange.bind_to_exchange(@params.exchange)
end

#subscribe(&block) ⇒ Object



31
32
33
34
35
36
37
38
39
40
41
# File 'lib/tackle/consumer.rb', line 31

def subscribe(&block)
  @logger.info "Subscribing to the main queue '#{@main_queue.name}'"

  @main_queue.subscribe { |message| process_message(message, &block) }
rescue Interrupt => _
  @connection.close
rescue StandardError => ex
  @logger.error("An exception occured message='#{ex.message}'")

  raise ex
end