Module: RabbitWQ::Work
- Defined in:
- lib/rabbit_wq/work.rb
Class Method Summary collapse
- .enqueue(worker, options = {}) ⇒ Object
- .enqueue_error_payload(payload, options = {}) ⇒ Object
- .enqueue_payload(payload, options = {}) ⇒ Object
- .with_channel ⇒ Object
- .with_work_exchange ⇒ Object
Class Method Details
.enqueue(worker, options = {}) ⇒ Object
6 7 8 9 |
# File 'lib/rabbit_wq/work.rb', line 6 def self.enqueue( worker, ={} ) payload = worker.to_yaml enqueue_payload( payload, ) end |
.enqueue_error_payload(payload, options = {}) ⇒ Object
41 42 43 44 45 46 47 48 |
# File 'lib/rabbit_wq/work.rb', line 41 def self.enqueue_error_payload( payload, ={} ) with_channel do |channel| error_q = channel.queue( RabbitWQ.configuration.error_queue, durable: true ) error_q.publish( payload, durable: true, content_type: 'application/yaml', headers: ) end end |
.enqueue_payload(payload, options = {}) ⇒ Object
11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 |
# File 'lib/rabbit_wq/work.rb', line 11 def self.enqueue_payload( payload, ={} ) delay = .delete( :delay ) delay = nil if delay && delay < 5000 if delay with_channel do |channel| delay_x = channel.direct( "#{RabbitWQ.configuration.delayed_exchange_prefix}-#{delay}ms", durable: true ) work_x = channel.direct( RabbitWQ.configuration.work_exchange, durable: true ) channel.queue( "#{RabbitWQ.configuration.delayed_queue_prefix}-#{delay}ms", durable: true, arguments: { "x-dead-letter-exchange" => work_x.name, "x-message-ttl" => delay } ). bind( delay_x ) delay_x.publish( payload, durable: true, content_type: 'application/yaml', headers: ) end return end with_work_exchange do |work_x, work_q| work_x.publish( payload, durable: true, content_type: 'application/yaml', headers: ) end end |
.with_channel ⇒ Object
63 64 65 66 67 68 69 70 71 72 73 74 |
# File 'lib/rabbit_wq/work.rb', line 63 def self.with_channel Bunny.new.tap do |b| b.start begin b.create_channel.tap do |c| yield c end ensure b.stop end end end |
.with_work_exchange ⇒ Object
50 51 52 53 54 55 56 57 58 59 60 61 |
# File 'lib/rabbit_wq/work.rb', line 50 def self.with_work_exchange with_channel do |channel| begin exchange = channel.direct( RabbitWQ.configuration.work_exchange, durable: true ) channel.queue( RabbitWQ.configuration.work_queue, durable: true ).tap do |q| q.bind( exchange ) yield exchange, q end ensure end end end |