Class: BunnyPriorityQueue::Queue

Inherits:
Object
  • Object
show all
Defined in:
lib/bunny_priority_queue/queue.rb

Instance Method Summary collapse

Constructor Details

#initialize(prefix, priorities) ⇒ Queue

Returns a new instance of Queue.

Parameters:

  • prefix (String)

    prefix of queue name

  • priorities (Array)

    each prirority queue names



9
10
11
12
# File 'lib/bunny_priority_queue/queue.rb', line 9

def initialize(prefix, priorities)
  @prefix = prefix
  @priorities = priorities
end

Instance Method Details

#bindObject

bind queue to exchange



55
56
57
58
59
60
# File 'lib/bunny_priority_queue/queue.rb', line 55

def bind
  @queues.each do |q|
    q.bind(@exchange, :routing_key => q.name)
  end
  self
end

#create(exchange, level_ttl = 2000, queue_opts = {}) ⇒ Queue

create priority queues

Parameters:

  • exchange (Bunny::Exchange)

    exchange

  • level_ttl (Integer) (defaults to: 2000)

    PriorityQueue Message TTL (msec)

  • queue_opts (Hash) (defaults to: {})

    Bunny::Queue options

Returns:



25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
# File 'lib/bunny_priority_queue/queue.rb', line 25

def create(exchange, level_ttl = 2000, queue_opts = {})
  @exchange = exchange

  queue_opts[:arguments] ||= {}
  args = queue_opts[:arguments]

  original_arguments = backup_arguments(args, ["x-dead-letter-exchange", "x-message-ttl", "x-dead-letter-routing-key"])

  @queues = @priorities.map.with_index do |priority, i|
    args["x-priority-level-index"] = i

    if i == 0
      original_arguments.each do |k, v|
        args[k] = v unless v.nil?
      end
    else
      # when a message is expired, its is deivered to upper priority queue fia DLX.
      args["x-dead-letter-routing-key"] = self.name(@priorities[i-1])
      args["x-message-ttl"] = level_ttl
      args["x-dead-letter-exchange"] = @exchange.name
    end

    @exchange.channel.queue("#{self.name(priority)}", queue_opts)
  end

  self
end

#name(priority) ⇒ String

Returns a priority queue name.

Returns:

  • (String)

    a priority queue name



15
16
17
# File 'lib/bunny_priority_queue/queue.rb', line 15

def name(priority)
  "#{@prefix}.#{priority}"
end

#subscribe(opts, &block) ⇒ Object

subscribe queue



64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
# File 'lib/bunny_priority_queue/queue.rb', line 64

def subscribe(opts, &block)
  manual_ack = opts[:ack] || opts[:manual_ack]
  consumers = @queues.map do |q|
    q.subscribe(:ack => true) do |delivery_info, properties, body|
      d, p, b = check_higher_queue(q.arguments["x-priority-level-index"])

      unless d.nil?
        channel.reject(delivery_info.delivery_tag, true)
        delivery_info = d
        properties = p
        body = b
      end

      channel.ack(delivery_info.delivery_tag) unless manual_ack
      yield delivery_info, properties, body
    end
  end

  if opts[:block]
    channel.work_pool.join
  end

  consumers
end