Class: RedisQueue

Inherits:
Object
  • Object
show all
Defined in:
lib/redis_queue.rb

Constant Summary collapse

SCRIPTS =
{
  fail: """
    redis.call('sadd', ARGV[1]..'_failed', ARGV[2])
    redis.call('srem', ARGV[1]..'_in_use', ARGV[2])
  """,
  done: """
    redis.call('sadd', ARGV[1]..'_done', ARGV[2])
    redis.call('srem', ARGV[1]..'_in_use', ARGV[2])
  """,
  unpop: """
    redis.call('lpush', ARGV[1], ARGV[2])
    redis.call('srem', ARGV[1]..'_in_use', ARGV[2])
  """,
  init_from: """
    local vals = redis.call('smembers', ARGV[2])
    for i = 1, table.getn(vals) do
      redis.call('rpush', ARGV[1], vals[i])
    end"""
}

Instance Method Summary collapse

Constructor Details

#initialize(args = {id: :messages, url: 'redis://localhost:6379/0'}) ⇒ RedisQueue

Returns a new instance of RedisQueue.



24
25
26
27
28
29
# File 'lib/redis_queue.rb', line 24

def initialize args={id: :messages, url: 'redis://localhost:6379/0'}
  @id             = args.delete(:id)
  @redis          = RedisConnection.new(args)
  @redis_blocking = RedisConnection.new(args)
  load_scripts
end

Instance Method Details

#clearObject



117
118
119
120
121
122
123
124
# File 'lib/redis_queue.rb', line 117

def clear
  @redis.run do |redis|
    redis.del @id
    redis.del "#{@id}_in_use"
    redis.del "#{@id}_done"
    redis.del "#{@id}_failed"
  end
end

#done(task) ⇒ Object



45
46
47
# File 'lib/redis_queue.rb', line 45

def done task
  script :done, @id, task
end

#done_listObject



91
92
93
# File 'lib/redis_queue.rb', line 91

def done_list
  @redis.run { |redis| redis.smembers "#{@id}_done" }
end

#done_sizeObject



75
76
77
# File 'lib/redis_queue.rb', line 75

def done_size
  @redis.run { |redis| redis.scard "#{@id}_done" }.to_i
end

#fail(task) ⇒ Object



41
42
43
# File 'lib/redis_queue.rb', line 41

def fail task
  script :fail, @id, task
end

#failed_listObject



95
96
97
# File 'lib/redis_queue.rb', line 95

def failed_list
  @redis.run { |redis| redis.smembers "#{@id}_failed" }
end

#failed_sizeObject



79
80
81
# File 'lib/redis_queue.rb', line 79

def failed_size
  @redis.run { |redis| redis.scard "#{@id}_failed" }.to_i
end

#forget(task) ⇒ Object



53
54
55
# File 'lib/redis_queue.rb', line 53

def forget task
  @redis.run { |redis| redis.srem "#{@id}_in_use", task }
end

#in_use_listObject



99
100
101
# File 'lib/redis_queue.rb', line 99

def in_use_list
  @redis.run { |redis| redis.smembers "#{@id}_in_use" }
end

#in_use_sizeObject



83
84
85
# File 'lib/redis_queue.rb', line 83

def in_use_size
  @redis.run { |redis| redis.scard "#{@id}_in_use" }.to_i
end

#init_from(set) ⇒ Object



67
68
69
# File 'lib/redis_queue.rb', line 67

def init_from set
  script(:init_from, @id, set)
end

#listObject



87
88
89
# File 'lib/redis_queue.rb', line 87

def list
  @redis.run { |redis| redis.lrange @id, 0, -1 }
end

#popObject



31
32
33
34
35
# File 'lib/redis_queue.rb', line 31

def pop
  task = @redis_blocking.run { |redis| redis.blpop(@id) }.last
  @redis.run { |redis| redis.sadd "#{@id}_in_use", task }
  task
end


110
111
112
113
114
115
# File 'lib/redis_queue.rb', line 110

def print_contents
  puts "#{@id} enqueued: #{list}"
  puts "#{@id} in use:   #{in_use_list}"
  puts "#{@id} failed:   #{failed_list}"
  puts "#{@id} done:     #{done_list}"
end


103
104
105
106
107
108
# File 'lib/redis_queue.rb', line 103

def print_stats
  puts "#{@id} enqueued: #{size}"
  puts "#{@id} in use:   #{in_use_size}"
  puts "#{@id} failed:   #{failed_size}"
  puts "#{@id} done:     #{done_size}"
end

#push(task) ⇒ Object



37
38
39
# File 'lib/redis_queue.rb', line 37

def push task
  @redis.run { |redis| redis.rpush @id, task }
end

#resetObject



57
58
59
60
# File 'lib/redis_queue.rb', line 57

def reset
  init_from "#{@id}_in_use"
  @redis.run { |redis| redis.del "#{@id}_in_use" }
end

#restartObject



62
63
64
65
# File 'lib/redis_queue.rb', line 62

def restart
  init_from "#{@id}_done"
  @redis.run { |redis| redis.del "#{@id}_done" }
end

#sizeObject



71
72
73
# File 'lib/redis_queue.rb', line 71

def size
  @redis.run { |redis| redis.llen @id }.to_i
end

#unpop(task) ⇒ Object



49
50
51
# File 'lib/redis_queue.rb', line 49

def unpop task
  script :unpop, @id, task
end