Class: Quiq::Queue

Inherits:
Object
  • Object
show all
Defined in:
lib/quiq/queue.rb

Constant Summary collapse

PREFIX =
'queue'
PROCESSING_SUFFIX =
'processing'
DEAD_LETTER_QUEUE =
'dead'

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(name) ⇒ Queue

Returns a new instance of Queue.



11
12
13
14
# File 'lib/quiq/queue.rb', line 11

def initialize(name)
  @name = self.class.formatted_name(name)
  @processing = self.class.processing_name(name)
end

Instance Attribute Details

#nameObject (readonly)

Returns the value of attribute name.



9
10
11
# File 'lib/quiq/queue.rb', line 9

def name
  @name
end

#processingObject (readonly)

Returns the value of attribute processing.



9
10
11
# File 'lib/quiq/queue.rb', line 9

def processing
  @processing
end

Class Method Details

.delete(queue, job) ⇒ Object



48
49
50
# File 'lib/quiq/queue.rb', line 48

def self.delete(queue, job)
  Quiq.redis.lrem(queue, 0, job)
end

.formatted_name(name) ⇒ Object



52
53
54
# File 'lib/quiq/queue.rb', line 52

def self.formatted_name(name)
  "#{PREFIX}:#{name}"
end

.processing_name(name) ⇒ Object



56
57
58
# File 'lib/quiq/queue.rb', line 56

def self.processing_name(name)
  "#{PREFIX}:#{name}:#{PROCESSING_SUFFIX}"
end

.push(queue, job) ⇒ Object



43
44
45
46
# File 'lib/quiq/queue.rb', line 43

def self.push(queue, job)
  @queue = new(queue)
  @queue.push(job)
end

.send_to_dlq(job) ⇒ Object



60
61
62
63
# File 'lib/quiq/queue.rb', line 60

def self.send_to_dlq(job)
  @dlq ||= Queue.new(DEAD_LETTER_QUEUE)
  @dlq.push(job)
end

Instance Method Details

#popObject



24
25
26
# File 'lib/quiq/queue.rb', line 24

def pop
  Quiq.redis.brpoplpush(@name, @processing, 0)
end

#purge_processing!Object

Note:

that they should be enqueued at the head of the queue, but Redis lacks a LPOPRPUSH command

Insert elements that weren’t fully processed at the tail of the queue to avoid loss



30
31
32
33
34
35
36
37
38
39
40
41
# File 'lib/quiq/queue.rb', line 30

def purge_processing!
  Async do
    Quiq.redis.pipeline do |pipe|
      loop do
        job = pipe.sync.call('RPOPLPUSH', @processing, @name)
        Quiq.logger.warn("Requeuing job #{job} in #{@name}") unless job.nil?
        break if job.nil?
      end
      pipe.close
    end
  end.wait
end

#push(job) ⇒ Object



16
17
18
19
20
21
22
# File 'lib/quiq/queue.rb', line 16

def push(job)
  pushed = Quiq.redis.lpush(@name, job)
  return unless pushed <= 0

  Quiq.logger.error("Could not push to the queue: #{@name}")
  false
end