Class: RedisQueue
- Inherits:
-
Object
- Object
- RedisQueue
- Defined in:
- lib/redis_queue.rb
Defined Under Namespace
Classes: RedisConnection
Constant Summary collapse
- PUSH_CODE =
''" if ARGV[3] == 'true' then local insert = redis.call('linsert', ARGV[1], 'before', '', ARGV[2]) if insert == -1 or insert == 0 then redis.call('lpush', ARGV[1], '') redis.call('lpush', ARGV[1], ARGV[2]) end else redis.call('rpush', ARGV[1], ARGV[2]) end"''.freeze
- SCRIPTS =
{ push: PUSH_CODE, repush: ''" #{PUSH_CODE} redis.call('srem', ARGV[1]..'_in_use', ARGV[2]) "'', fail: ''" redis.call('sadd', ARGV[1]..'_failed', ARGV[2]) redis.call('srem', ARGV[1]..'_in_use', ARGV[2]) "'', done: ''" redis.call('sadd', ARGV[1]..'_done', ARGV[2]) redis.call('srem', ARGV[1]..'_in_use', ARGV[2]) "'', unpop: ''" redis.call('lpush', ARGV[1], ARGV[2]) redis.call('srem', ARGV[1]..'_in_use', ARGV[2]) "'', init_from: ''" local vals = redis.call('smembers', ARGV[2]) for i = 1, table.getn(vals) do redis.call('lpush', ARGV[1], vals[i]) end"'' }.freeze
Instance Method Summary collapse
- #clear ⇒ Object
- #done(message) ⇒ Object
- #done_list ⇒ Object
- #done_size ⇒ Object
- #fail(message) ⇒ Object
- #failed_list ⇒ Object
- #failed_size ⇒ Object
- #forget(message) ⇒ Object
- #in_use_list ⇒ Object
- #in_use_size ⇒ Object
- #init_from(set) ⇒ Object
-
#initialize(args = {}) ⇒ RedisQueue
constructor
A new instance of RedisQueue.
- #list ⇒ Object
- #pop(block: true) ⇒ Object
- #print_contents ⇒ Object
- #print_stats ⇒ Object
- #push(message, priority = false) ⇒ Object
- #repush(message, priority = false) ⇒ Object
- #reset ⇒ Object
- #restart ⇒ Object
- #size ⇒ Object
- #unpop(message) ⇒ Object
Constructor Details
#initialize(args = {}) ⇒ RedisQueue
Returns a new instance of RedisQueue.
38 39 40 41 42 43 44 |
# File 'lib/redis_queue.rb', line 38 def initialize(args = {}) args = { id: :messages, url: 'redis://localhost:6379/0' }.merge(args) @id = args.delete(:id) @redis = RedisConnection.new(args) @redis_blocking = RedisConnection.new(args) load_scripts end |
Instance Method Details
#clear ⇒ Object
140 141 142 143 144 145 146 147 |
# File 'lib/redis_queue.rb', line 140 def clear @redis.run do |redis| redis.del @id redis.del "#{@id}_in_use" redis.del "#{@id}_done" redis.del "#{@id}_failed" end end |
#done(message) ⇒ Object
64 65 66 |
# File 'lib/redis_queue.rb', line 64 def done() script :done, @id, end |
#done_list ⇒ Object
114 115 116 |
# File 'lib/redis_queue.rb', line 114 def done_list @redis.run { |redis| redis.smembers "#{@id}_done" } end |
#done_size ⇒ Object
98 99 100 |
# File 'lib/redis_queue.rb', line 98 def done_size @redis.run { |redis| redis.scard "#{@id}_done" }.to_i end |
#fail(message) ⇒ Object
60 61 62 |
# File 'lib/redis_queue.rb', line 60 def fail() script :fail, @id, end |
#failed_list ⇒ Object
118 119 120 |
# File 'lib/redis_queue.rb', line 118 def failed_list @redis.run { |redis| redis.smembers "#{@id}_failed" } end |
#failed_size ⇒ Object
102 103 104 |
# File 'lib/redis_queue.rb', line 102 def failed_size @redis.run { |redis| redis.scard "#{@id}_failed" }.to_i end |
#forget(message) ⇒ Object
76 77 78 |
# File 'lib/redis_queue.rb', line 76 def forget() @redis.run { |redis| redis.srem "#{@id}_in_use", } end |
#in_use_list ⇒ Object
122 123 124 |
# File 'lib/redis_queue.rb', line 122 def in_use_list @redis.run { |redis| redis.smembers "#{@id}_in_use" } end |
#in_use_size ⇒ Object
106 107 108 |
# File 'lib/redis_queue.rb', line 106 def in_use_size @redis.run { |redis| redis.scard "#{@id}_in_use" }.to_i end |
#init_from(set) ⇒ Object
90 91 92 |
# File 'lib/redis_queue.rb', line 90 def init_from(set) script(:init_from, @id, set) end |
#list ⇒ Object
110 111 112 |
# File 'lib/redis_queue.rb', line 110 def list @redis.run { |redis| redis.lrange @id, 0, -1 } end |
#pop(block: true) ⇒ Object
46 47 48 49 50 51 52 53 54 |
# File 'lib/redis_queue.rb', line 46 def pop(block: true) command = block ? :blpop : :lpop begin = @redis_blocking.run { |redis| redis.send(command, @id) } = .last if command == :blpop end while == '' @redis.run { |redis| redis.sadd "#{@id}_in_use", } if end |
#print_contents ⇒ Object
133 134 135 136 137 138 |
# File 'lib/redis_queue.rb', line 133 def print_contents puts "#{@id} enqueued: #{list}" puts "#{@id} in use: #{in_use_list}" puts "#{@id} failed: #{failed_list}" puts "#{@id} done: #{done_list}" end |
#print_stats ⇒ Object
126 127 128 129 130 131 |
# File 'lib/redis_queue.rb', line 126 def print_stats puts "#{@id} enqueued: #{size}" puts "#{@id} in use: #{in_use_size}" puts "#{@id} failed: #{failed_size}" puts "#{@id} done: #{done_size}" end |
#push(message, priority = false) ⇒ Object
56 57 58 |
# File 'lib/redis_queue.rb', line 56 def push(, priority = false) script :push, @id, , priority end |
#repush(message, priority = false) ⇒ Object
72 73 74 |
# File 'lib/redis_queue.rb', line 72 def repush(, priority = false) script :repush, @id, , priority end |
#reset ⇒ Object
80 81 82 83 |
# File 'lib/redis_queue.rb', line 80 def reset init_from "#{@id}_in_use" @redis.run { |redis| redis.del "#{@id}_in_use" } end |
#restart ⇒ Object
85 86 87 88 |
# File 'lib/redis_queue.rb', line 85 def restart init_from "#{@id}_done" @redis.run { |redis| redis.del "#{@id}_done" } end |
#size ⇒ Object
94 95 96 |
# File 'lib/redis_queue.rb', line 94 def size @redis.run { |redis| redis.llen @id }.to_i end |
#unpop(message) ⇒ Object
68 69 70 |
# File 'lib/redis_queue.rb', line 68 def unpop() script :unpop, @id, end |