Class: Cola::Queue
- Inherits:
-
Object
- Object
- Cola::Queue
- Defined in:
- lib/cola/queue.rb
Instance Attribute Summary collapse
-
#deadletter_queue_name ⇒ Object
readonly
Returns the value of attribute deadletter_queue_name.
-
#process_queue_name ⇒ Object
readonly
Returns the value of attribute process_queue_name.
-
#queue_name ⇒ Object
readonly
Returns the value of attribute queue_name.
Instance Method Summary collapse
- #commit ⇒ Object
- #deadletter_count ⇒ Object
-
#destroy ⇒ Object
not thread safe.
- #empty? ⇒ Boolean
- #flush ⇒ Object
- #flush_deadletter ⇒ Object
- #flush_processing ⇒ Object
-
#initialize(options = {}) ⇒ Queue
constructor
A new instance of Queue.
- #len ⇒ Object
- #pop(non_block = false, timeout: @timeout) ⇒ Object
- #pop_with_envelope(non_block = false, timeout: @timeout) ⇒ Object
- #process(non_block = false, timeout: @timeout) ⇒ Object
- #processing_count ⇒ Object
- #push(obj, ttl: 0) ⇒ Object (also: #<<)
-
#refill ⇒ Object
not thread safe.
Constructor Details
#initialize(options = {}) ⇒ Queue
Returns a new instance of Queue.
8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
# File 'lib/cola/queue.rb', line 8 def initialize( = {}) @redis = [:redis] || Redis.current prefix = [:prefix] || "rq_" @deadletter = [:deadletter] # default is false if [:queue_name] @queue_name = prefix + [:queue_name] @process_queue_name = prefix + "process_" + [:queue_name] @deadletter_queue_name = prefix + "deadletter_" + [:queue_name] else now = Time.new.to_i @queue_name = prefix + now.to_s @process_queue_name = prefix + "process_" + now.to_s @deadletter_queue_name = prefix + "deadletter_" + now.to_s end @timeout = [:timeout] ||= 0 @retries = [:retries] ||= 0 end |
Instance Attribute Details
#deadletter_queue_name ⇒ Object (readonly)
Returns the value of attribute deadletter_queue_name.
6 7 8 |
# File 'lib/cola/queue.rb', line 6 def deadletter_queue_name @deadletter_queue_name end |
#process_queue_name ⇒ Object (readonly)
Returns the value of attribute process_queue_name.
5 6 7 |
# File 'lib/cola/queue.rb', line 5 def process_queue_name @process_queue_name end |
#queue_name ⇒ Object (readonly)
Returns the value of attribute queue_name.
4 5 6 |
# File 'lib/cola/queue.rb', line 4 def queue_name @queue_name end |
Instance Method Details
#commit ⇒ Object
80 81 82 83 |
# File 'lib/cola/queue.rb', line 80 def commit @redis.lpop(@process_queue_name) return true end |
#deadletter_count ⇒ Object
58 59 60 |
# File 'lib/cola/queue.rb', line 58 def deadletter_count @redis.llen @deadletter_queue_name end |
#destroy ⇒ Object
not thread safe
44 45 46 47 48 |
# File 'lib/cola/queue.rb', line 44 def destroy flush_processing flush flush_deadletter end |
#empty? ⇒ Boolean
50 51 52 |
# File 'lib/cola/queue.rb', line 50 def empty? len <= 0 end |
#flush ⇒ Object
35 36 37 |
# File 'lib/cola/queue.rb', line 35 def flush @redis.del @queue_name end |
#flush_deadletter ⇒ Object
39 40 41 |
# File 'lib/cola/queue.rb', line 39 def flush_deadletter @redis.del @deadletter_queue_name end |
#flush_processing ⇒ Object
31 32 33 |
# File 'lib/cola/queue.rb', line 31 def flush_processing @redis.del @process_queue_name end |
#len ⇒ Object
27 28 29 |
# File 'lib/cola/queue.rb', line 27 def len @redis.llen @queue_name end |
#pop(non_block = false, timeout: @timeout) ⇒ Object
73 74 75 76 77 78 |
# File 'lib/cola/queue.rb', line 73 def pop(non_block = false, timeout: @timeout) obj = pop_with_envelope(non_block, timeout: timeout) return nil if obj.nil? return obj. end |
#pop_with_envelope(non_block = false, timeout: @timeout) ⇒ Object
119 120 121 122 123 124 125 126 127 128 129 130 131 |
# File 'lib/cola/queue.rb', line 119 def pop_with_envelope(non_block = false, timeout: @timeout) obj = non_block ? @redis.rpoplpush(@queue_name, @process_queue_name) : @redis.brpoplpush(@queue_name, @process_queue_name, timeout) env = Cola::Envelope.from_payload(obj) return env if env.nil? if env.expired? commit return nil else return env end end |
#process(non_block = false, timeout: @timeout) ⇒ Object
85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 |
# File 'lib/cola/queue.rb', line 85 def process(non_block = false, timeout: @timeout) loop do obj = pop_with_envelope(non_block, timeout: timeout) ret = yield obj. if !obj.nil? && block_given? commit if ret break if obj.nil? || (non_block && empty?) rescue => exc raise if obj.nil? # requeue if we should retry and it's not done if @retries != 0 if obj.retries < @retries obj.inc_retries(reason: exc.) push(obj) else mark_as_deadletter(obj) raise Cola::RetryError.new(obj, exc) end else mark_as_deadletter(obj) raise end end end |
#processing_count ⇒ Object
54 55 56 |
# File 'lib/cola/queue.rb', line 54 def processing_count @redis.llen @process_queue_name end |
#push(obj, ttl: 0) ⇒ Object Also known as: <<
62 63 64 65 66 67 68 69 70 71 |
# File 'lib/cola/queue.rb', line 62 def push(obj, ttl: 0) if obj.is_a? Cola::Envelope wrapped = obj else wrapped = Cola::Envelope.new(obj) wrapped.ttl = ttl end @redis.lpush(@queue_name, wrapped.to_json) end |
#refill ⇒ Object
not thread safe
110 111 112 113 114 115 |
# File 'lib/cola/queue.rb', line 110 def refill while (obj = @redis.lpop(@process_queue_name)) @redis.rpush(@queue_name, obj) end true end |