Module: RabbitWQ::Work

Defined in:
lib/rabbit_wq/work.rb

Class Method Summary collapse

Class Method Details

.enqueue(worker, options = {}) ⇒ Object



6
7
8
9
# File 'lib/rabbit_wq/work.rb', line 6

def self.enqueue( worker, options={} )
  payload = worker.to_yaml
  enqueue_payload( payload, options )
end

.enqueue_error_payload(payload, options = {}) ⇒ Object



41
42
43
44
45
46
47
48
# File 'lib/rabbit_wq/work.rb', line 41

def self.enqueue_error_payload( payload, options={} )
  with_channel do |channel|
    error_q = channel.queue( RabbitWQ.configuration.error_queue, durable: true )
    error_q.publish( payload, durable: true,
                              content_type: 'application/yaml',
                              headers: options )
  end
end

.enqueue_payload(payload, options = {}) ⇒ Object



11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
# File 'lib/rabbit_wq/work.rb', line 11

def self.enqueue_payload( payload, options={} )
  delay = options.delete( :delay )
  delay = nil if delay && delay < 5000

  if delay
    with_channel do |channel|
      delay_x = channel.direct( "#{RabbitWQ.configuration.delayed_exchange_prefix}-#{delay}ms", durable: true )
      work_x  = channel.direct( RabbitWQ.configuration.work_exchange, durable: true )

      channel.queue( "#{RabbitWQ.configuration.delayed_queue_prefix}-#{delay}ms",
                     durable: true,
                     arguments: { "x-dead-letter-exchange" => work_x.name,
                                  "x-message-ttl" => delay } ).
              bind( delay_x )

      delay_x.publish( payload, durable: true,
                                content_type: 'application/yaml',
                                headers: options )
    end

    return
  end

  with_work_exchange do |work_x, work_q|
    work_x.publish( payload, durable: true,
                             content_type: 'application/yaml',
                             headers: options )
  end
end

.with_channelObject



63
64
65
66
67
68
69
70
71
72
73
74
# File 'lib/rabbit_wq/work.rb', line 63

def self.with_channel
  Bunny.new.tap do |b|
    b.start
    begin
      b.create_channel.tap do |c|
        yield c
      end
    ensure
      b.stop
    end
  end
end

.with_work_exchangeObject



50
51
52
53
54
55
56
57
58
59
60
61
# File 'lib/rabbit_wq/work.rb', line 50

def self.with_work_exchange
  with_channel do |channel|
    begin
      exchange = channel.direct( RabbitWQ.configuration.work_exchange, durable: true )
      channel.queue( RabbitWQ.configuration.work_queue, durable: true ).tap do |q|
        q.bind( exchange )
        yield exchange, q
      end
    ensure
    end
  end
end