Class: Redis::Queue

Inherits:
Object
  • Object
show all
Defined in:
lib/redis/queue.rb

Constant Summary collapse

VERSION =
"0.0.4"

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(queue_name, process_queue_name, options = {}) ⇒ Queue

Returns a new instance of Queue.

Raises:

  • (ArgumentError)


10
11
12
13
14
15
16
17
18
19
20
# File 'lib/redis/queue.rb', line 10

def initialize(queue_name, process_queue_name, options = {})
  raise ArgumentError, 'First argument must be a non empty string'  if !queue_name.is_a?(String) || queue_name.empty?
  raise ArgumentError, 'Second argument must be a non empty string' if !process_queue_name.is_a?(String) || process_queue_name.empty?
  raise ArgumentError, 'Queue and Process queue have the same name'  if process_queue_name == queue_name 

  @redis = options[:redis] || Redis.current
  @queue_name = queue_name
  @process_queue_name = process_queue_name
  @last_message = nil
  @timeout = options[:timeout] ||= 0
end

Class Method Details

.versionObject



6
7
8
# File 'lib/redis/queue.rb', line 6

def self.version
  "redis-queue version #{VERSION}"
end

Instance Method Details

#clear(clear_process_queue = false) ⇒ Object



26
27
28
29
# File 'lib/redis/queue.rb', line 26

def clear(clear_process_queue = false)
  @redis.del @queue_name
  @redis.del @process_queue_name if clear_process_queue
end

#commitObject



48
49
50
# File 'lib/redis/queue.rb', line 48

def commit
  @redis.lrem(@process_queue_name, 0, @last_message)
end

#empty?Boolean



31
32
33
# File 'lib/redis/queue.rb', line 31

def empty?
  !(length > 0)
end

#lengthObject Also known as: size



22
23
24
# File 'lib/redis/queue.rb', line 22

def length
  @redis.llen @queue_name
end

#pop(non_block = false) ⇒ Object Also known as: dec, shift



39
40
41
42
43
44
45
46
# File 'lib/redis/queue.rb', line 39

def pop(non_block=false)
  if non_block
    @last_message = @redis.rpoplpush(@queue_name,@process_queue_name)
  else
    @last_message = @redis.brpoplpush(@queue_name,@process_queue_name, @timeout)
  end
  @last_message
end

#process(non_block = false, timeout = nil) ⇒ Object



52
53
54
55
56
57
58
59
60
61
# File 'lib/redis/queue.rb', line 52

def process(non_block=false, timeout = nil)
  @timeout = timeout unless timeout.nil?
  loop do
    message = pop(non_block)
    ret = yield message if block_given?
    commit if ret
    break if message.nil? || (non_block && empty?)
  end

end

#push(obj) ⇒ Object Also known as: enc, <<



35
36
37
# File 'lib/redis/queue.rb', line 35

def push(obj)
  @redis.lpush(@queue_name, obj)
end

#refillObject



63
64
65
66
67
68
# File 'lib/redis/queue.rb', line 63

def refill
  while message=@redis.lpop(@process_queue_name)
    @redis.rpush(@queue_name, message)
  end
  true
end