Class: RedisQueue

Inherits:
Object
  • Object
show all
Defined in:
lib/redis_queue.rb

Defined Under Namespace

Classes: RedisConnection

Constant Summary collapse

PUSH_CODE =
''"
if ARGV[3] == 'true' then
  local insert = redis.call('linsert', ARGV[1], 'before', '', ARGV[2])
  if insert == -1 or insert == 0 then
    redis.call('lpush', ARGV[1], '')
    redis.call('lpush', ARGV[1], ARGV[2])
  end
else
  redis.call('rpush', ARGV[1], ARGV[2])
end"''.freeze
SCRIPTS =
{
  push: PUSH_CODE,
  nonblpop: ''"
    local message = ''
    while message == '' do
      message = redis.call('lpop', ARGV[1])
    end
    if message then
      redis.call('sadd', ARGV[1]..'_in_use', message)
    end
    return message
  "'',
  repush: ''"
    #{PUSH_CODE}
    redis.call('srem', ARGV[1]..'_in_use', ARGV[2])
  "'',
  fail: ''"
    redis.call('sadd', ARGV[1]..'_failed', ARGV[2])
    redis.call('srem', ARGV[1]..'_in_use', ARGV[2])
  "'',
  done: ''"
    redis.call('sadd', ARGV[1]..'_done', ARGV[2])
    redis.call('srem', ARGV[1]..'_in_use', ARGV[2])
  "'',
  unpop: ''"
    redis.call('lpush', ARGV[1], ARGV[2])
    redis.call('srem', ARGV[1]..'_in_use', ARGV[2])
  "'',
  init_from: ''"
    local vals = redis.call('smembers', ARGV[2])
    for i = 1, table.getn(vals) do
      redis.call('lpush', ARGV[1], vals[i])
    end"''
}.freeze

Instance Method Summary collapse

Constructor Details

#initialize(args = {}) ⇒ RedisQueue

Returns a new instance of RedisQueue.



48
49
50
51
52
53
54
# File 'lib/redis_queue.rb', line 48

def initialize(args = {})
  args = { id: :messages, url: 'redis://localhost:6379/0' }.merge(args)
  @id             = args.delete(:id)
  @redis          = RedisConnection.new(args)
  @redis_blocking = RedisConnection.new(args)
  load_scripts
end

Instance Method Details

#clearObject



149
150
151
152
153
154
155
156
# File 'lib/redis_queue.rb', line 149

def clear
  @redis.run do |redis|
    redis.del @id
    redis.del "#{@id}_in_use"
    redis.del "#{@id}_done"
    redis.del "#{@id}_failed"
  end
end

#done(message) ⇒ Object



73
74
75
# File 'lib/redis_queue.rb', line 73

def done(message)
  script :done, @id, message
end

#done_listObject



123
124
125
# File 'lib/redis_queue.rb', line 123

def done_list
  @redis.run { |redis| redis.smembers "#{@id}_done" }
end

#done_sizeObject



107
108
109
# File 'lib/redis_queue.rb', line 107

def done_size
  @redis.run { |redis| redis.scard "#{@id}_done" }.to_i
end

#fail(message) ⇒ Object



69
70
71
# File 'lib/redis_queue.rb', line 69

def fail(message)
  script :fail, @id, message
end

#failed_listObject



127
128
129
# File 'lib/redis_queue.rb', line 127

def failed_list
  @redis.run { |redis| redis.smembers "#{@id}_failed" }
end

#failed_sizeObject



111
112
113
# File 'lib/redis_queue.rb', line 111

def failed_size
  @redis.run { |redis| redis.scard "#{@id}_failed" }.to_i
end

#forget(message) ⇒ Object



85
86
87
# File 'lib/redis_queue.rb', line 85

def forget(message)
  @redis.run { |redis| redis.srem "#{@id}_in_use", message }
end

#in_use_listObject



131
132
133
# File 'lib/redis_queue.rb', line 131

def in_use_list
  @redis.run { |redis| redis.smembers "#{@id}_in_use" }
end

#in_use_sizeObject



115
116
117
# File 'lib/redis_queue.rb', line 115

def in_use_size
  @redis.run { |redis| redis.scard "#{@id}_in_use" }.to_i
end

#init_from(set) ⇒ Object



99
100
101
# File 'lib/redis_queue.rb', line 99

def init_from(set)
  script(:init_from, @id, set)
end

#listObject



119
120
121
# File 'lib/redis_queue.rb', line 119

def list
  @redis.run { |redis| redis.lrange @id, 0, -1 }
end

#pop(block: true) ⇒ Object



56
57
58
59
60
61
62
63
# File 'lib/redis_queue.rb', line 56

def pop(block: true)
  return nonblpop unless block
  begin
    message = @redis_blocking.run { |redis| redis.blpop(@id) }.last
  end while message == ''
  @redis.run { |redis| redis.sadd "#{@id}_in_use", message } if message
  message
end


142
143
144
145
146
147
# File 'lib/redis_queue.rb', line 142

def print_contents
  puts "#{@id} enqueued: #{list}"
  puts "#{@id} in use:   #{in_use_list}"
  puts "#{@id} failed:   #{failed_list}"
  puts "#{@id} done:     #{done_list}"
end


135
136
137
138
139
140
# File 'lib/redis_queue.rb', line 135

def print_stats
  puts "#{@id} enqueued: #{size}"
  puts "#{@id} in use:   #{in_use_size}"
  puts "#{@id} failed:   #{failed_size}"
  puts "#{@id} done:     #{done_size}"
end

#push(message, priority = false) ⇒ Object



65
66
67
# File 'lib/redis_queue.rb', line 65

def push(message, priority = false)
  script :push, @id, message, priority
end

#repush(message, priority = false) ⇒ Object



81
82
83
# File 'lib/redis_queue.rb', line 81

def repush(message, priority = false)
  script :repush, @id, message, priority
end

#resetObject



89
90
91
92
# File 'lib/redis_queue.rb', line 89

def reset
  init_from "#{@id}_in_use"
  @redis.run { |redis| redis.del "#{@id}_in_use" }
end

#restartObject



94
95
96
97
# File 'lib/redis_queue.rb', line 94

def restart
  init_from "#{@id}_done"
  @redis.run { |redis| redis.del "#{@id}_done" }
end

#sizeObject



103
104
105
# File 'lib/redis_queue.rb', line 103

def size
  @redis.run { |redis| redis.llen @id }.to_i
end

#unpop(message) ⇒ Object



77
78
79
# File 'lib/redis_queue.rb', line 77

def unpop(message)
  script :unpop, @id, message
end