Class: Reliable::Queue
- Inherits:
-
Object
- Object
- Reliable::Queue
- Defined in:
- lib/reliable/queue.rb
Constant Summary collapse
- FatalError =
Class.new(StandardError)
Instance Attribute Summary collapse
-
#base_key ⇒ Object
Returns the value of attribute base_key.
-
#uuid ⇒ Object
Returns the value of attribute uuid.
Instance Method Summary collapse
- #create_worker(&work) ⇒ Object
- #current_time ⇒ Object
- #each(&block) ⇒ Object
- #failed ⇒ Object
-
#initialize(name) ⇒ Queue
constructor
A new instance of Queue.
- #logger ⇒ Object
- #notify(e, other = {}) ⇒ Object
- #peach(opts = {}, &block) ⇒ Object
- #pending ⇒ Object
- #push(value) ⇒ Object (also: #<<)
- #take(number, &block) ⇒ Object
- #to_enum(&work) ⇒ Object
- #total_items ⇒ Object
- #total_processing ⇒ Object
Constructor Details
Instance Attribute Details
#base_key ⇒ Object
Returns the value of attribute base_key.
13 14 15 |
# File 'lib/reliable/queue.rb', line 13 def base_key @base_key end |
#uuid ⇒ Object
Returns the value of attribute uuid.
13 14 15 |
# File 'lib/reliable/queue.rb', line 13 def uuid @uuid end |
Instance Method Details
#create_worker(&work) ⇒ Object
42 43 44 |
# File 'lib/reliable/queue.rb', line 42 def create_worker(&work) Worker.new(self, &work) end |
#current_time ⇒ Object
23 24 25 |
# File 'lib/reliable/queue.rb', line 23 def current_time @redis.time end |
#each(&block) ⇒ Object
61 62 63 64 65 66 67 68 69 |
# File 'lib/reliable/queue.rb', line 61 def each(&block) if block_given? # This is because we confuse iteration with work here # So we need to front-load the work, then fake the iteration to_enum(&block).each { |item| item } else to_enum end end |
#failed ⇒ Object
38 39 40 |
# File 'lib/reliable/queue.rb', line 38 def failed @failed ||= List.new(@failed_key, @redis) end |
#logger ⇒ Object
100 101 102 |
# File 'lib/reliable/queue.rb', line 100 def logger Reliable.logger end |
#notify(e, other = {}) ⇒ Object
104 105 106 107 108 109 |
# File 'lib/reliable/queue.rb', line 104 def notify(e, other = {}) # TODO: make configurable logger.info e.inspect logger.info e.backtrace logger.info other.inspect end |
#peach(opts = {}, &block) ⇒ Object
71 72 73 74 75 76 77 78 79 80 81 82 |
# File 'lib/reliable/queue.rb', line 71 def peach(opts = {}, &block) raise "must supply a block" unless block_given? concurrency = opts.fetch(:concurrency) threads = concurrency.times.map do Thread.new { each(&block) } end threads.map { |t| t.abort_on_exception = true } threads.map(&:join) end |
#pending ⇒ Object
34 35 36 |
# File 'lib/reliable/queue.rb', line 34 def pending @pending ||= List.new(@pending_key, @redis) end |
#push(value) ⇒ Object Also known as: <<
27 28 29 30 31 |
# File 'lib/reliable/queue.rb', line 27 def push(value) UUID.new(current_time) do |uuid| @redis.set_and_lpush(pending.key, uuid.to_s, value) end end |
#take(number, &block) ⇒ Object
57 58 59 |
# File 'lib/reliable/queue.rb', line 57 def take(number, &block) to_enum(&block).take(number) end |
#to_enum(&work) ⇒ Object
46 47 48 49 50 51 52 53 54 55 |
# File 'lib/reliable/queue.rb', line 46 def to_enum(&work) work ||= ->(item) { item } worker = create_worker(&work) Enumerator.new do |y| loop do # forever result = worker.next # do work y.yield result # then release control end end end |
#total_items ⇒ Object
96 97 98 |
# File 'lib/reliable/queue.rb', line 96 def total_items @redis.scan("reliable:items:*").length end |
#total_processing ⇒ Object
84 85 86 87 88 89 90 91 92 93 94 |
# File 'lib/reliable/queue.rb', line 84 def total_processing keys = @redis.scan "reliable:queues:*:workers:*:processing" lengths = @redis.pipeline do |pipe| keys.each do |key| pipe.llen key end end lengths.map(&:to_i).reduce(0, &:+) end |