Class: Redis::Queue
- Inherits:
-
Object
- Object
- Redis::Queue
- Defined in:
- lib/redis/queue.rb
Constant Summary collapse
- VERSION =
'0.1.0'
Class Method Summary collapse
Instance Method Summary collapse
- #clear(clear_process_queue = false) ⇒ Object
- #commit ⇒ Object
- #empty? ⇒ Boolean
-
#initialize(queue_name, process_queue_name, options = {}) ⇒ Queue
constructor
A new instance of Queue.
- #length ⇒ Object (also: #size)
- #pop(non_block = false) ⇒ Object (also: #dec, #shift)
- #process(non_block = false, timeout = nil) ⇒ Object
- #push(obj) ⇒ Object (also: #enc, #<<)
- #refill ⇒ Object
Constructor Details
#initialize(queue_name, process_queue_name, options = {}) ⇒ Queue
Returns a new instance of Queue.
11 12 13 14 15 16 17 18 19 20 21 |
# File 'lib/redis/queue.rb', line 11 def initialize(queue_name, process_queue_name, = {}) raise ArgumentError, 'First argument must be a non empty string' if !queue_name.is_a?(String) || queue_name.empty? raise ArgumentError, 'Second argument must be a non empty string' if !process_queue_name.is_a?(String) || process_queue_name.empty? raise ArgumentError, 'Queue and Process queue have the same name' if process_queue_name == queue_name @redis = [:redis] || Redis.current @queue_name = queue_name @process_queue_name = process_queue_name @last_message = nil @timeout = [:timeout] ||= 0 end |
Class Method Details
.version ⇒ Object
7 8 9 |
# File 'lib/redis/queue.rb', line 7 def self.version "redis-queue version #{VERSION}" end |
Instance Method Details
#clear(clear_process_queue = false) ⇒ Object
27 28 29 30 |
# File 'lib/redis/queue.rb', line 27 def clear(clear_process_queue = false) @redis.del @queue_name @redis.del @process_queue_name if clear_process_queue end |
#commit ⇒ Object
49 50 51 |
# File 'lib/redis/queue.rb', line 49 def commit @redis.lrem(@process_queue_name, 0, @last_message) end |
#empty? ⇒ Boolean
32 33 34 |
# File 'lib/redis/queue.rb', line 32 def empty? length <= 0 end |
#length ⇒ Object Also known as: size
23 24 25 |
# File 'lib/redis/queue.rb', line 23 def length @redis.llen @queue_name end |
#pop(non_block = false) ⇒ Object Also known as: dec, shift
40 41 42 43 44 45 46 47 |
# File 'lib/redis/queue.rb', line 40 def pop(non_block = false) @last_message = if non_block @redis.rpoplpush(@queue_name, @process_queue_name) else @redis.brpoplpush(@queue_name, @process_queue_name, @timeout) end @last_message end |
#process(non_block = false, timeout = nil) ⇒ Object
53 54 55 56 57 58 59 60 61 |
# File 'lib/redis/queue.rb', line 53 def process(non_block = false, timeout = nil) @timeout = timeout unless timeout.nil? loop do = pop(non_block) ret = yield if block_given? commit if ret break if .nil? || (non_block && empty?) end end |
#push(obj) ⇒ Object Also known as: enc, <<
36 37 38 |
# File 'lib/redis/queue.rb', line 36 def push(obj) @redis.lpush(@queue_name, obj) end |
#refill ⇒ Object
63 64 65 66 67 68 |
# File 'lib/redis/queue.rb', line 63 def refill while ( = @redis.lpop(@process_queue_name)) @redis.rpush(@queue_name, ) end true end |