Class: Quiq::Queue
- Inherits:
-
Object
- Object
- Quiq::Queue
- Defined in:
- lib/quiq/queue.rb
Constant Summary collapse
- PREFIX =
'queue'- PROCESSING_SUFFIX =
'processing'- DEAD_LETTER_QUEUE =
'dead'
Instance Attribute Summary collapse
-
#name ⇒ Object
readonly
Returns the value of attribute name.
-
#processing ⇒ Object
readonly
Returns the value of attribute processing.
Class Method Summary collapse
- .delete(queue, job) ⇒ Object
- .formatted_name(name) ⇒ Object
- .processing_name(name) ⇒ Object
- .push(queue, job) ⇒ Object
- .send_to_dlq(job) ⇒ Object
Instance Method Summary collapse
-
#initialize(name) ⇒ Queue
constructor
A new instance of Queue.
- #pop ⇒ Object
-
#purge_processing! ⇒ Object
Insert elements that weren’t fully processed at the tail of the queue to avoid loss.
- #push(job) ⇒ Object
Constructor Details
#initialize(name) ⇒ Queue
Returns a new instance of Queue.
11 12 13 14 |
# File 'lib/quiq/queue.rb', line 11 def initialize(name) @name = self.class.formatted_name(name) @processing = self.class.processing_name(name) end |
Instance Attribute Details
#name ⇒ Object (readonly)
Returns the value of attribute name.
9 10 11 |
# File 'lib/quiq/queue.rb', line 9 def name @name end |
#processing ⇒ Object (readonly)
Returns the value of attribute processing.
9 10 11 |
# File 'lib/quiq/queue.rb', line 9 def processing @processing end |
Class Method Details
.delete(queue, job) ⇒ Object
48 49 50 |
# File 'lib/quiq/queue.rb', line 48 def self.delete(queue, job) Quiq.redis.lrem(queue, 0, job) end |
.formatted_name(name) ⇒ Object
52 53 54 |
# File 'lib/quiq/queue.rb', line 52 def self.formatted_name(name) "#{PREFIX}:#{name}" end |
.processing_name(name) ⇒ Object
56 57 58 |
# File 'lib/quiq/queue.rb', line 56 def self.processing_name(name) "#{PREFIX}:#{name}:#{PROCESSING_SUFFIX}" end |
.push(queue, job) ⇒ Object
43 44 45 46 |
# File 'lib/quiq/queue.rb', line 43 def self.push(queue, job) @queue = new(queue) @queue.push(job) end |
.send_to_dlq(job) ⇒ Object
60 61 62 63 |
# File 'lib/quiq/queue.rb', line 60 def self.send_to_dlq(job) @dlq ||= Queue.new(DEAD_LETTER_QUEUE) @dlq.push(job) end |
Instance Method Details
#pop ⇒ Object
24 25 26 |
# File 'lib/quiq/queue.rb', line 24 def pop Quiq.redis.brpoplpush(@name, @processing, 0) end |
#purge_processing! ⇒ Object
Note:
that they should be enqueued at the head of the queue, but Redis lacks a LPOPRPUSH command
Insert elements that weren’t fully processed at the tail of the queue to avoid loss
30 31 32 33 34 35 36 37 38 39 40 41 |
# File 'lib/quiq/queue.rb', line 30 def purge_processing! Async do Quiq.redis.pipeline do |pipe| loop do job = pipe.sync.call('RPOPLPUSH', @processing, @name) Quiq.logger.warn("Requeuing job #{job} in #{@name}") unless job.nil? break if job.nil? end pipe.close end end.wait end |
#push(job) ⇒ Object
16 17 18 19 20 21 22 |
# File 'lib/quiq/queue.rb', line 16 def push(job) pushed = Quiq.redis.lpush(@name, job) return unless pushed <= 0 Quiq.logger.error("Could not push to the queue: #{@name}") false end |