Class: RedisQueue

Inherits:
Object
  • Object
show all
Defined in:
lib/redis_queue.rb

Defined Under Namespace

Classes: RedisConnection

Constant Summary collapse

PUSH_CODE =
''"
if ARGV[3] == 'true' then
  local insert = redis.call('linsert', ARGV[1], 'before', '', ARGV[2])
  if insert == -1 or insert == 0 then
    redis.call('lpush', ARGV[1], '')
    redis.call('lpush', ARGV[1], ARGV[2])
  end
else
  redis.call('rpush', ARGV[1], ARGV[2])
end"''.freeze
SCRIPTS =
{
  push: PUSH_CODE,
  repush: ''"
    #{PUSH_CODE}
    redis.call('srem', ARGV[1]..'_in_use', ARGV[2])
  "'',
  fail: ''"
    redis.call('sadd', ARGV[1]..'_failed', ARGV[2])
    redis.call('srem', ARGV[1]..'_in_use', ARGV[2])
  "'',
  done: ''"
    redis.call('sadd', ARGV[1]..'_done', ARGV[2])
    redis.call('srem', ARGV[1]..'_in_use', ARGV[2])
  "'',
  unpop: ''"
    redis.call('lpush', ARGV[1], ARGV[2])
    redis.call('srem', ARGV[1]..'_in_use', ARGV[2])
  "'',
  init_from: ''"
    local vals = redis.call('smembers', ARGV[2])
    for i = 1, table.getn(vals) do
      redis.call('lpush', ARGV[1], vals[i])
    end"''
}.freeze

Instance Method Summary collapse

Constructor Details

#initialize(args = {}) ⇒ RedisQueue

Returns a new instance of RedisQueue.



38
39
40
41
42
43
44
# File 'lib/redis_queue.rb', line 38

def initialize(args = {})
  args = { id: :messages, url: 'redis://localhost:6379/0' }.merge(args)
  @id             = args.delete(:id)
  @redis          = RedisConnection.new(args)
  @redis_blocking = RedisConnection.new(args)
  load_scripts
end

Instance Method Details

#clearObject



140
141
142
143
144
145
146
147
# File 'lib/redis_queue.rb', line 140

def clear
  @redis.run do |redis|
    redis.del @id
    redis.del "#{@id}_in_use"
    redis.del "#{@id}_done"
    redis.del "#{@id}_failed"
  end
end

#done(message) ⇒ Object



64
65
66
# File 'lib/redis_queue.rb', line 64

def done(message)
  script :done, @id, message
end

#done_listObject



114
115
116
# File 'lib/redis_queue.rb', line 114

def done_list
  @redis.run { |redis| redis.smembers "#{@id}_done" }
end

#done_sizeObject



98
99
100
# File 'lib/redis_queue.rb', line 98

def done_size
  @redis.run { |redis| redis.scard "#{@id}_done" }.to_i
end

#fail(message) ⇒ Object



60
61
62
# File 'lib/redis_queue.rb', line 60

def fail(message)
  script :fail, @id, message
end

#failed_listObject



118
119
120
# File 'lib/redis_queue.rb', line 118

def failed_list
  @redis.run { |redis| redis.smembers "#{@id}_failed" }
end

#failed_sizeObject



102
103
104
# File 'lib/redis_queue.rb', line 102

def failed_size
  @redis.run { |redis| redis.scard "#{@id}_failed" }.to_i
end

#forget(message) ⇒ Object



76
77
78
# File 'lib/redis_queue.rb', line 76

def forget(message)
  @redis.run { |redis| redis.srem "#{@id}_in_use", message }
end

#in_use_listObject



122
123
124
# File 'lib/redis_queue.rb', line 122

def in_use_list
  @redis.run { |redis| redis.smembers "#{@id}_in_use" }
end

#in_use_sizeObject



106
107
108
# File 'lib/redis_queue.rb', line 106

def in_use_size
  @redis.run { |redis| redis.scard "#{@id}_in_use" }.to_i
end

#init_from(set) ⇒ Object



90
91
92
# File 'lib/redis_queue.rb', line 90

def init_from(set)
  script(:init_from, @id, set)
end

#listObject



110
111
112
# File 'lib/redis_queue.rb', line 110

def list
  @redis.run { |redis| redis.lrange @id, 0, -1 }
end

#pop(block: true) ⇒ Object



46
47
48
49
50
51
52
53
54
# File 'lib/redis_queue.rb', line 46

def pop(block: true)
  command = block ? :blpop : :lpop
  begin
    message = @redis_blocking.run { |redis| redis.send(command, @id) }
    message = message.last if command == :blpop
  end while message == ''
  @redis.run { |redis| redis.sadd "#{@id}_in_use", message } if message
  message
end


133
134
135
136
137
138
# File 'lib/redis_queue.rb', line 133

def print_contents
  puts "#{@id} enqueued: #{list}"
  puts "#{@id} in use:   #{in_use_list}"
  puts "#{@id} failed:   #{failed_list}"
  puts "#{@id} done:     #{done_list}"
end


126
127
128
129
130
131
# File 'lib/redis_queue.rb', line 126

def print_stats
  puts "#{@id} enqueued: #{size}"
  puts "#{@id} in use:   #{in_use_size}"
  puts "#{@id} failed:   #{failed_size}"
  puts "#{@id} done:     #{done_size}"
end

#push(message, priority = false) ⇒ Object



56
57
58
# File 'lib/redis_queue.rb', line 56

def push(message, priority = false)
  script :push, @id, message, priority
end

#repush(message, priority = false) ⇒ Object



72
73
74
# File 'lib/redis_queue.rb', line 72

def repush(message, priority = false)
  script :repush, @id, message, priority
end

#resetObject



80
81
82
83
# File 'lib/redis_queue.rb', line 80

def reset
  init_from "#{@id}_in_use"
  @redis.run { |redis| redis.del "#{@id}_in_use" }
end

#restartObject



85
86
87
88
# File 'lib/redis_queue.rb', line 85

def restart
  init_from "#{@id}_done"
  @redis.run { |redis| redis.del "#{@id}_done" }
end

#sizeObject



94
95
96
# File 'lib/redis_queue.rb', line 94

def size
  @redis.run { |redis| redis.llen @id }.to_i
end

#unpop(message) ⇒ Object



68
69
70
# File 'lib/redis_queue.rb', line 68

def unpop(message)
  script :unpop, @id, message
end