Class: RedisQueue

Inherits:
Object
  • Object
show all
Defined in:
lib/redis_queue.rb

Instance Method Summary collapse

Constructor Details

#initialize(args = {id: :messages, url: 'redis://localhost:6379/0'}) ⇒ RedisQueue

Returns a new instance of RedisQueue.



4
5
6
7
8
# File 'lib/redis_queue.rb', line 4

def initialize args={id: :messages, url: 'redis://localhost:6379/0'}
  @id             = args.delete(:id)
  @redis          = RedisConnection.new(args)
  @redis_blocking = RedisConnection.new(args)
end

Instance Method Details

#clearObject



106
107
108
109
110
111
112
113
# File 'lib/redis_queue.rb', line 106

def clear
  @redis.run do |redis|
    redis.del @id
    redis.del "#{@id}_in_use"
    redis.del "#{@id}_done"
    redis.del "#{@id}_failed"
  end
end

#done(task) ⇒ Object



27
28
29
30
31
32
# File 'lib/redis_queue.rb', line 27

def done task
  @redis.run do |redis|
    redis.sadd "#{@id}_done", task
    redis.srem "#{@id}_in_use", task
  end
end

#done_listObject



80
81
82
# File 'lib/redis_queue.rb', line 80

def done_list
  @redis.run { |redis| redis.smembers "#{@id}_done" }
end

#done_sizeObject



64
65
66
# File 'lib/redis_queue.rb', line 64

def done_size
  @redis.run { |redis| redis.scard "#{@id}_done" }.to_i
end

#fail(task) ⇒ Object



20
21
22
23
24
25
# File 'lib/redis_queue.rb', line 20

def fail task
  @redis.run do |redis|
    redis.sadd "#{@id}_failed", task
    redis.srem "#{@id}_in_use", task
  end
end

#failed_listObject



84
85
86
# File 'lib/redis_queue.rb', line 84

def failed_list
  @redis.run { |redis| redis.smembers "#{@id}_failed" }
end

#failed_sizeObject



68
69
70
# File 'lib/redis_queue.rb', line 68

def failed_size
  @redis.run { |redis| redis.scard "#{@id}_failed" }.to_i
end

#in_use_listObject



88
89
90
# File 'lib/redis_queue.rb', line 88

def in_use_list
  @redis.run { |redis| redis.smembers "#{@id}_in_use" }
end

#in_use_sizeObject



72
73
74
# File 'lib/redis_queue.rb', line 72

def in_use_size
  @redis.run { |redis| redis.scard "#{@id}_in_use" }.to_i
end

#init_from(set) ⇒ Object



51
52
53
54
55
56
57
58
# File 'lib/redis_queue.rb', line 51

def init_from set
  @redis.run do |redis|
    redis.eval "local vals = redis.call('smembers', '#{set}')
    for i = 1, table.getn(vals) do
      redis.call('rpush', '#{@id}', vals[i])
    end"
  end
end

#listObject



76
77
78
# File 'lib/redis_queue.rb', line 76

def list
  @redis.run { |redis| redis.lrange @id, 0, -1 }
end

#popObject



10
11
12
13
14
# File 'lib/redis_queue.rb', line 10

def pop
  message = @redis_blocking.run { |redis| redis.blpop(@id) }.last
  @redis.run { |redis| redis.sadd "#{@id}_in_use", message }
  message
end


99
100
101
102
103
104
# File 'lib/redis_queue.rb', line 99

def print_contents
  puts "#{@id} enqueued: #{list}"
  puts "#{@id} in use:   #{in_use_list}"
  puts "#{@id} failed:   #{failed_list}"
  puts "#{@id} done:     #{done_list}"
end


92
93
94
95
96
97
# File 'lib/redis_queue.rb', line 92

def print_stats
  puts "#{@id} enqueued: #{size}"
  puts "#{@id} in use:   #{in_use_size}"
  puts "#{@id} failed:   #{failed_size}"
  puts "#{@id} done:     #{done_size}"
end

#push(task) ⇒ Object



16
17
18
# File 'lib/redis_queue.rb', line 16

def push task
  @redis.run { |redis| redis.rpush @id, task }
end

#resetObject



41
42
43
44
# File 'lib/redis_queue.rb', line 41

def reset
  init_from "#{@id}_in_use"
  @redis.run { |redis| redis.del "#{@id}_in_use" }
end

#restartObject



46
47
48
49
# File 'lib/redis_queue.rb', line 46

def restart
  init_from "#{@id}_done"
  @redis.run { |redis| redis.del "#{@id}_done" }
end

#sizeObject



60
61
62
# File 'lib/redis_queue.rb', line 60

def size
  @redis.run { |redis| redis.llen @id }.to_i
end

#unpop(task) ⇒ Object



34
35
36
37
38
39
# File 'lib/redis_queue.rb', line 34

def unpop task
  @redis.run do |redis|
    redis.lpush @id, task
    redis.srem "#{@id}_in_use", task
  end
end