Class: RedisQueue

Inherits:
Object
  • Object
show all
Defined in:
lib/redis_queue.rb

Instance Method Summary collapse

Constructor Details

#initialize(args = {id: :messages, url: 'redis://localhost:6379/0'}) ⇒ RedisQueue

Returns a new instance of RedisQueue.



4
5
6
7
8
# File 'lib/redis_queue.rb', line 4

def initialize args={id: :messages, url: 'redis://localhost:6379/0'}
  @id             = args.delete(:id)
  @redis          = RedisConnection.new(args)
  @redis_blocking = RedisConnection.new(args)
end

Instance Method Details

#clearObject



112
113
114
115
116
117
118
119
# File 'lib/redis_queue.rb', line 112

def clear
  @redis.run do |redis|
    redis.del @id
    redis.del "#{@id}_in_use"
    redis.del "#{@id}_done"
    redis.del "#{@id}_failed"
  end
end

#done(task) ⇒ Object



29
30
31
32
33
34
35
36
# File 'lib/redis_queue.rb', line 29

def done task
  @redis.run do |redis|
    redis.pipelined do
      redis.sadd "#{@id}_done", task
      redis.srem "#{@id}_in_use", task
    end
  end
end

#done_listObject



86
87
88
# File 'lib/redis_queue.rb', line 86

def done_list
  @redis.run { |redis| redis.smembers "#{@id}_done" }
end

#done_sizeObject



70
71
72
# File 'lib/redis_queue.rb', line 70

def done_size
  @redis.run { |redis| redis.scard "#{@id}_done" }.to_i
end

#fail(task) ⇒ Object



20
21
22
23
24
25
26
27
# File 'lib/redis_queue.rb', line 20

def fail task
  @redis.run do |redis|
    redis.pipelined do
      redis.sadd "#{@id}_failed", task
      redis.srem "#{@id}_in_use", task
    end
  end
end

#failed_listObject



90
91
92
# File 'lib/redis_queue.rb', line 90

def failed_list
  @redis.run { |redis| redis.smembers "#{@id}_failed" }
end

#failed_sizeObject



74
75
76
# File 'lib/redis_queue.rb', line 74

def failed_size
  @redis.run { |redis| redis.scard "#{@id}_failed" }.to_i
end

#in_use_listObject



94
95
96
# File 'lib/redis_queue.rb', line 94

def in_use_list
  @redis.run { |redis| redis.smembers "#{@id}_in_use" }
end

#in_use_sizeObject



78
79
80
# File 'lib/redis_queue.rb', line 78

def in_use_size
  @redis.run { |redis| redis.scard "#{@id}_in_use" }.to_i
end

#init_from(set) ⇒ Object



57
58
59
60
61
62
63
64
# File 'lib/redis_queue.rb', line 57

def init_from set
  @redis.run do |redis|
    redis.eval "local vals = redis.call('smembers', '#{set}')
    for i = 1, table.getn(vals) do
      redis.call('rpush', '#{@id}', vals[i])
    end"
  end
end

#listObject



82
83
84
# File 'lib/redis_queue.rb', line 82

def list
  @redis.run { |redis| redis.lrange @id, 0, -1 }
end

#popObject



10
11
12
13
14
# File 'lib/redis_queue.rb', line 10

def pop
  @redis_blocking.run do |redis|
    redis.blpop(@id).last.tap { |msg| redis.sadd "#{@id}_in_use", msg }
  end
end


105
106
107
108
109
110
# File 'lib/redis_queue.rb', line 105

def print_contents
  puts "#{@id} enqueued: #{list}"
  puts "#{@id} in use:   #{in_use_list}"
  puts "#{@id} failed:   #{failed_list}"
  puts "#{@id} done:     #{done_list}"
end


98
99
100
101
102
103
# File 'lib/redis_queue.rb', line 98

def print_stats
  puts "#{@id} enqueued: #{size}"
  puts "#{@id} in use:   #{in_use_size}"
  puts "#{@id} failed:   #{failed_size}"
  puts "#{@id} done:     #{done_size}"
end

#push(task) ⇒ Object



16
17
18
# File 'lib/redis_queue.rb', line 16

def push task
  @redis.run { |redis| redis.rpush @id, task }
end

#resetObject



47
48
49
50
# File 'lib/redis_queue.rb', line 47

def reset
  init_from "#{@id}_in_use"
  @redis.run { |redis| redis.del "#{@id}_in_use" }
end

#restartObject



52
53
54
55
# File 'lib/redis_queue.rb', line 52

def restart
  init_from "#{@id}_done"
  @redis.run { |redis| redis.del "#{@id}_done" }
end

#sizeObject



66
67
68
# File 'lib/redis_queue.rb', line 66

def size
  @redis.run { |redis| redis.llen @id }.to_i
end

#unpop(task) ⇒ Object



38
39
40
41
42
43
44
45
# File 'lib/redis_queue.rb', line 38

def unpop task
  @redis.run do |redis|
    redis.pipelined do
      redis.lpush @id, task
      redis.srem "#{@id}_in_use", task
    end
  end
end