Class: Consumers::Base

Inherits:
Object
  • Object
show all
Defined in:
lib/smart_que/consumers/base.rb

Direct Known Subclasses

SmartQue::Consumer

Constant Summary collapse

QUEUE_NAME =

The queue name should be defined here.

'smart_que.default'

Instance Attribute Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#queue_nameObject

Returns the value of attribute queue_name.



8
9
10
# File 'lib/smart_que/consumers/base.rb', line 8

def queue_name
  @queue_name
end

Instance Method Details

#channelObject

Create channel with the established connection.



27
28
29
# File 'lib/smart_que/consumers/base.rb', line 27

def channel
  @channel ||= connection.create_channel
end

#configObject



31
32
33
# File 'lib/smart_que/consumers/base.rb', line 31

def config
  ::SmartQue.config
end

#connectionObject

Establish connection to Message Queues.



22
23
24
# File 'lib/smart_que/consumers/base.rb', line 22

def connection
  ::SmartQue.establish_connection
end

#queueObject

This method will return the default queue which present in the message queues. Consumer specific queue should be defined and implemented in the consumer sub classes.



17
18
19
# File 'lib/smart_que/consumers/base.rb', line 17

def queue
  @queue ||= channel.queue(queue_name)
end

#startObject

Method which kick start the consumer process thread



36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
# File 'lib/smart_que/consumers/base.rb', line 36

def start
  channel.prefetch(10)
  queue.subscribe(manual_ack: true, exclusive: false) do |delivery_info, , payload|
    begin
      body = JSON.parse(payload).with_indifferent_access
      status = run(body)
    rescue => e
      status = :error
    end

    if status == :ok
      channel.ack(delivery_info.delivery_tag)
    elsif status == :retry
      channel.reject(delivery_info.delivery_tag, true)
    else # :error, nil etc
      channel.reject(delivery_info.delivery_tag, false)
    end
  end

  wait_for_threads
end

#wait_for_threadsObject



58
59
60
# File 'lib/smart_que/consumers/base.rb', line 58

def wait_for_threads
  sleep
end