Class: RedisQueue
- Inherits:
-
Object
- Object
- RedisQueue
- Defined in:
- lib/redis_queue.rb
Defined Under Namespace
Classes: RedisConnection
Constant Summary collapse
- PUSH_CODE =
''" if ARGV[3] == 'true' then local insert = redis.call('linsert', ARGV[1], 'before', '', ARGV[2]) if insert == -1 or insert == 0 then redis.call('lpush', ARGV[1], '') redis.call('lpush', ARGV[1], ARGV[2]) end else redis.call('rpush', ARGV[1], ARGV[2]) end"''.freeze
- SCRIPTS =
{ push: PUSH_CODE, nonblpop: ''" local message = '' while message == '' do message = redis.call('lpop', ARGV[1]) end if message then redis.call('sadd', ARGV[1]..'_in_use', message) end return message "'', repush: ''" #{PUSH_CODE} redis.call('srem', ARGV[1]..'_in_use', ARGV[2]) "'', fail: ''" redis.call('sadd', ARGV[1]..'_failed', ARGV[2]) redis.call('srem', ARGV[1]..'_in_use', ARGV[2]) "'', done: ''" redis.call('sadd', ARGV[1]..'_done', ARGV[2]) redis.call('srem', ARGV[1]..'_in_use', ARGV[2]) "'', unpop: ''" redis.call('lpush', ARGV[1], ARGV[2]) redis.call('srem', ARGV[1]..'_in_use', ARGV[2]) "'', init_from: ''" local vals = redis.call('smembers', ARGV[2]) for i = 1, table.getn(vals) do redis.call('lpush', ARGV[1], vals[i]) end"'' }.freeze
Instance Method Summary collapse
- #clear ⇒ Object
- #done(message) ⇒ Object
- #done_list ⇒ Object
- #done_size ⇒ Object
- #fail(message) ⇒ Object
- #failed_list ⇒ Object
- #failed_size ⇒ Object
- #forget(message) ⇒ Object
- #in_use_list ⇒ Object
- #in_use_size ⇒ Object
- #init_from(set) ⇒ Object
-
#initialize(args = {}) ⇒ RedisQueue
constructor
A new instance of RedisQueue.
- #list ⇒ Object
- #pop(block: true) ⇒ Object
- #print_contents ⇒ Object
- #print_stats ⇒ Object
- #push(message, priority = false) ⇒ Object
- #repush(message, priority = false) ⇒ Object
- #reset ⇒ Object
- #restart ⇒ Object
- #size ⇒ Object
- #unpop(message) ⇒ Object
Constructor Details
#initialize(args = {}) ⇒ RedisQueue
Returns a new instance of RedisQueue.
48 49 50 51 52 53 54 |
# File 'lib/redis_queue.rb', line 48 def initialize(args = {}) args = { id: :messages, url: 'redis://localhost:6379/0' }.merge(args) @id = args.delete(:id) @redis = RedisConnection.new(args) @redis_blocking = RedisConnection.new(args) load_scripts end |
Instance Method Details
#clear ⇒ Object
149 150 151 152 153 154 155 156 |
# File 'lib/redis_queue.rb', line 149 def clear @redis.run do |redis| redis.del @id redis.del "#{@id}_in_use" redis.del "#{@id}_done" redis.del "#{@id}_failed" end end |
#done(message) ⇒ Object
73 74 75 |
# File 'lib/redis_queue.rb', line 73 def done() script :done, @id, end |
#done_list ⇒ Object
123 124 125 |
# File 'lib/redis_queue.rb', line 123 def done_list @redis.run { |redis| redis.smembers "#{@id}_done" } end |
#done_size ⇒ Object
107 108 109 |
# File 'lib/redis_queue.rb', line 107 def done_size @redis.run { |redis| redis.scard "#{@id}_done" }.to_i end |
#fail(message) ⇒ Object
69 70 71 |
# File 'lib/redis_queue.rb', line 69 def fail() script :fail, @id, end |
#failed_list ⇒ Object
127 128 129 |
# File 'lib/redis_queue.rb', line 127 def failed_list @redis.run { |redis| redis.smembers "#{@id}_failed" } end |
#failed_size ⇒ Object
111 112 113 |
# File 'lib/redis_queue.rb', line 111 def failed_size @redis.run { |redis| redis.scard "#{@id}_failed" }.to_i end |
#forget(message) ⇒ Object
85 86 87 |
# File 'lib/redis_queue.rb', line 85 def forget() @redis.run { |redis| redis.srem "#{@id}_in_use", } end |
#in_use_list ⇒ Object
131 132 133 |
# File 'lib/redis_queue.rb', line 131 def in_use_list @redis.run { |redis| redis.smembers "#{@id}_in_use" } end |
#in_use_size ⇒ Object
115 116 117 |
# File 'lib/redis_queue.rb', line 115 def in_use_size @redis.run { |redis| redis.scard "#{@id}_in_use" }.to_i end |
#init_from(set) ⇒ Object
99 100 101 |
# File 'lib/redis_queue.rb', line 99 def init_from(set) script(:init_from, @id, set) end |
#list ⇒ Object
119 120 121 |
# File 'lib/redis_queue.rb', line 119 def list @redis.run { |redis| redis.lrange @id, 0, -1 } end |
#pop(block: true) ⇒ Object
56 57 58 59 60 61 62 63 |
# File 'lib/redis_queue.rb', line 56 def pop(block: true) return nonblpop unless block begin = @redis_blocking.run { |redis| redis.blpop(@id) }.last end while == '' @redis.run { |redis| redis.sadd "#{@id}_in_use", } if end |
#print_contents ⇒ Object
142 143 144 145 146 147 |
# File 'lib/redis_queue.rb', line 142 def print_contents puts "#{@id} enqueued: #{list}" puts "#{@id} in use: #{in_use_list}" puts "#{@id} failed: #{failed_list}" puts "#{@id} done: #{done_list}" end |
#print_stats ⇒ Object
135 136 137 138 139 140 |
# File 'lib/redis_queue.rb', line 135 def print_stats puts "#{@id} enqueued: #{size}" puts "#{@id} in use: #{in_use_size}" puts "#{@id} failed: #{failed_size}" puts "#{@id} done: #{done_size}" end |
#push(message, priority = false) ⇒ Object
65 66 67 |
# File 'lib/redis_queue.rb', line 65 def push(, priority = false) script :push, @id, , priority end |
#repush(message, priority = false) ⇒ Object
81 82 83 |
# File 'lib/redis_queue.rb', line 81 def repush(, priority = false) script :repush, @id, , priority end |
#reset ⇒ Object
89 90 91 92 |
# File 'lib/redis_queue.rb', line 89 def reset init_from "#{@id}_in_use" @redis.run { |redis| redis.del "#{@id}_in_use" } end |
#restart ⇒ Object
94 95 96 97 |
# File 'lib/redis_queue.rb', line 94 def restart init_from "#{@id}_done" @redis.run { |redis| redis.del "#{@id}_done" } end |
#size ⇒ Object
103 104 105 |
# File 'lib/redis_queue.rb', line 103 def size @redis.run { |redis| redis.llen @id }.to_i end |
#unpop(message) ⇒ Object
77 78 79 |
# File 'lib/redis_queue.rb', line 77 def unpop() script :unpop, @id, end |