Class: RabbitJobs::Publisher::Amqp
- Defined in:
- lib/rabbit_jobs/publisher/amqp.rb
Overview
AMQP publisher implementation.
Class Method Summary collapse
- .cleanup ⇒ Object
- .direct_publish_to(routing_key, payload, ex = {}) ⇒ Object
- .publish_to(routing_key, klass, *params) ⇒ Object
- .purge_queue(*routing_keys) ⇒ Object
- .queue_status(routing_key) ⇒ Object
Class Method Details
.cleanup ⇒ Object
11 12 13 |
# File 'lib/rabbit_jobs/publisher/amqp.rb', line 11 def cleanup amqp_cleanup end |
.direct_publish_to(routing_key, payload, ex = {}) ⇒ Object
22 23 24 25 26 27 28 29 30 31 |
# File 'lib/rabbit_jobs/publisher/amqp.rb', line 22 def direct_publish_to(routing_key, payload, ex = {}) exchange_name, exchange_opts = build_exchange(ex) publisher_channel.basic_publish(payload, exchange_name, routing_key, exchange_opts) fail "Disconnected from #{RJ.config.server}." unless amqp_connection.connected? true rescue RabbitJobs.logger.error $!. raise $! end |
.publish_to(routing_key, klass, *params) ⇒ Object
15 16 17 18 19 20 |
# File 'lib/rabbit_jobs/publisher/amqp.rb', line 15 def publish_to(routing_key, klass, *params) check_amqp_publishing_params(routing_key, klass) payload = Job.serialize(klass, *params) direct_publish_to(routing_key.to_sym, payload) end |
.purge_queue(*routing_keys) ⇒ Object
33 34 35 36 37 38 39 |
# File 'lib/rabbit_jobs/publisher/amqp.rb', line 33 def purge_queue(*routing_keys) fail ArgumentError unless routing_keys.present? routing_keys.map(&:to_sym).each do |routing_key| publisher_channel.queue_purge(routing_key) end end |
.queue_status(routing_key) ⇒ Object
41 42 43 44 |
# File 'lib/rabbit_jobs/publisher/amqp.rb', line 41 def queue_status(routing_key) check_queue_status_params(routing_key) publisher_channel.queue(routing_key, RabbitJobs.config[:queues][routing_key.to_sym]).status end |