Class: Totoro::BaseQueue

Inherits:
Object
  • Object
show all
Defined in:
lib/totoro/base_queue.rb

Class Method Summary collapse

Class Method Details

.channelObject



16
17
18
# File 'lib/totoro/base_queue.rb', line 16

def channel
  @channel ||= connection.create_channel
end

.configObject



8
9
10
# File 'lib/totoro/base_queue.rb', line 8

def config
  @config ||= Totoro::Config.new
end

.connectionObject



12
13
14
# File 'lib/totoro/base_queue.rb', line 12

def connection
  @connection ||= Bunny.new(config.connect).tap(&:start)
end

.enqueue(id, payload) ⇒ Object

enqueue = publish to direct exchange



25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
# File 'lib/totoro/base_queue.rb', line 25

def enqueue(id, payload)
  queue = channel.queue(*config.queue(id))
  payload = JSON.dump payload
  exchange.publish(payload, routing_key: queue.name)
  Rails.logger.info "send message to #{queue.name}"
  STDOUT.flush
rescue Bunny::TCPConnectionFailedForAllHosts,
       AMQ::Protocol::EmptyResponseError => error
  Rails.logger.error error.message
  Rails.logger.info 'Add failed message to resend list'
  STDOUT.flush
  Totoro::TotoroFailedMessage.create(
    class_name: to_s,
    queue_id: id,
    payload: payload
  )
end

.exchangeObject



20
21
22
# File 'lib/totoro/base_queue.rb', line 20

def exchange
  @exchange ||= channel.default_exchange
end

.get_worker(worker_class) ⇒ Object



50
51
52
# File 'lib/totoro/base_queue.rb', line 50

def get_worker(worker_class)
  ::Worker.const_get(worker_class.to_s.camelize).new
end

.subscribe(id) ⇒ Object



43
44
45
46
47
48
# File 'lib/totoro/base_queue.rb', line 43

def subscribe(id)
  queue = channel.queue(*config.queue(id))
  queue.subscribe do |delivery_info, , payload|
    yield(delivery_info, , payload)
  end
end