Module: Mumukit::Nuntius::Consumer

Defined in:
lib/mumukit/nuntius/consumer.rb

Class Method Summary collapse

Class Method Details

.handle_message(channel, delivery_info, properties, body, &block) ⇒ Object



41
42
43
44
45
46
47
# File 'lib/mumukit/nuntius/consumer.rb', line 41

def handle_message(channel, delivery_info, properties, body, &block)
  block.call delivery_info, properties, parse_body(body)
  channel.ack delivery_info.delivery_tag
rescue => e
  Mumukit::Nuntius::Logger.warn "Failed to read body: #{e.message} \n #{e.backtrace}"
  channel.nack delivery_info.delivery_tag, false, true
end

.negligent_start!(queue_name, &block) ⇒ Object



22
23
24
25
26
27
28
29
30
# File 'lib/mumukit/nuntius/consumer.rb', line 22

def negligent_start!(queue_name, &block)
  start queue_name, queue_name do |_delivery_info, _properties, body|
    begin
      block.call(body)
    rescue => e
      Mumukit::Nuntius::Logger.error "#{queue_name} item couldn't be processed #{e}. body was: #{body}"
    end
  end
end

.parse_body(body) ⇒ Object



49
50
51
# File 'lib/mumukit/nuntius/consumer.rb', line 49

def parse_body(body)
  JSON.parse(body).with_indifferent_access
end

.start(queue_name, exchange_name, &block) ⇒ Object



5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# File 'lib/mumukit/nuntius/consumer.rb', line 5

def start(queue_name, exchange_name, &block)
  Mumukit::Nuntius::Logger.info "Attaching to queue #{queue_name}"
  Mumukit::Nuntius::Connection.establish_connection
  channel, exchange = Mumukit::Nuntius::Connection.start_channel(exchange_name)
  queue = channel.queue(queue_name, durable: true)
  queue.bind(exchange)
  channel.prefetch(1)

  begin
    subscribe queue, channel, &block
  rescue Interrupt => _
    Mumukit::Nuntius::Logger.info "Leaving queue #{queue_name}"
  ensure
    channel.close
  end
end

.subscribe(queue, channel, &block) ⇒ Object



32
33
34
35
36
37
38
39
# File 'lib/mumukit/nuntius/consumer.rb', line 32

def subscribe(queue, channel, &block)
  Mumukit::Nuntius::Logger.debug "Subscribed to queue #{queue}"

  queue.subscribe(manual_ack: true, block: true) do |delivery_info, properties, body|
    Mumukit::Nuntius::Logger.debug "Processing message #{body}"
    handle_message channel, delivery_info, properties, body, &block
  end
end