Class: Totoro::BaseQueue
- Inherits:
-
Object
- Object
- Totoro::BaseQueue
- Defined in:
- lib/totoro/base_queue.rb
Class Method Summary collapse
- .channel ⇒ Object
- .config ⇒ Object
- .connection ⇒ Object
-
.enqueue(id, payload) ⇒ Object
enqueue = publish to direct exchange.
- .exchange ⇒ Object
- .get_worker(worker_class) ⇒ Object
- .subscribe(id) ⇒ Object
Class Method Details
.channel ⇒ Object
16 17 18 |
# File 'lib/totoro/base_queue.rb', line 16 def channel @channel ||= connection.create_channel end |
.config ⇒ Object
8 9 10 |
# File 'lib/totoro/base_queue.rb', line 8 def config @config ||= Totoro::Config.new end |
.connection ⇒ Object
12 13 14 |
# File 'lib/totoro/base_queue.rb', line 12 def connection @connection ||= Bunny.new(config.connect).tap(&:start) end |
.enqueue(id, payload) ⇒ Object
enqueue = publish to direct exchange
25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 |
# File 'lib/totoro/base_queue.rb', line 25 def enqueue(id, payload) queue = channel.queue(*config.queue(id)) payload = JSON.dump payload exchange.publish(payload, routing_key: queue.name) Rails.logger.info "send message to #{queue.name}" STDOUT.flush rescue Bunny::TCPConnectionFailedForAllHosts, AMQ::Protocol::EmptyResponseError => error Rails.logger.error error. Rails.logger.info 'Add failed message to resend list' STDOUT.flush Totoro::TotoroFailedMessage.create( class_name: to_s, queue_id: id, payload: payload ) end |
.exchange ⇒ Object
20 21 22 |
# File 'lib/totoro/base_queue.rb', line 20 def exchange @exchange ||= channel.default_exchange end |
.get_worker(worker_class) ⇒ Object
50 51 52 |
# File 'lib/totoro/base_queue.rb', line 50 def get_worker(worker_class) ::Worker.const_get(worker_class.to_s.camelize).new end |
.subscribe(id) ⇒ Object
43 44 45 46 47 48 |
# File 'lib/totoro/base_queue.rb', line 43 def subscribe(id) queue = channel.queue(*config.queue(id)) queue.subscribe do |delivery_info, , payload| yield(delivery_info, , payload) end end |