Class: Bernstein::RedisQueue
- Inherits:
-
Object
- Object
- Bernstein::RedisQueue
- Includes:
- States
- Defined in:
- lib/bernstein/redis_queue.rb
Constant Summary collapse
- QUEUE_SET =
"queued_messages"
Constants included from States
Class Method Summary collapse
- .add(message) ⇒ Object
- .clear ⇒ Object
- .configure!(options = {}) ⇒ Object
- .dequeue(id, mark_as_sent = false) ⇒ Object
- .mark_as_sent(id) ⇒ Object
- .queued_messages ⇒ Object
- .status(id) ⇒ Object
Class Method Details
.add(message) ⇒ Object
16 17 18 19 20 21 22 |
# File 'lib/bernstein/redis_queue.rb', line 16 def self.add() @redis.multi do @redis.sadd QUEUE_SET, .id @redis.setex .id, @options[:key_expiry], .serialize @redis.setex status_key(.id), @options[:key_expiry], STATES[:queued] end end |
.clear ⇒ Object
43 44 45 |
# File 'lib/bernstein/redis_queue.rb', line 43 def self.clear @redis.del QUEUE_SET end |
.configure!(options = {}) ⇒ Object
11 12 13 14 |
# File 'lib/bernstein/redis_queue.rb', line 11 def self.configure!( = {}) @options.merge!( || {}) @redis = Redis::Namespace.new(:bernstein, :redis => Redis.new(@options[:redis])) end |
.dequeue(id, mark_as_sent = false) ⇒ Object
47 48 49 |
# File 'lib/bernstein/redis_queue.rb', line 47 def self.dequeue(id, mark_as_sent = false) remove_and_change_status(id, (mark_as_sent ? STATES[:sent] : STATES[:sending])) end |
.mark_as_sent(id) ⇒ Object
51 52 53 |
# File 'lib/bernstein/redis_queue.rb', line 51 def self.mark_as_sent(id) set_status(id, STATES[:sent]) end |
.queued_messages ⇒ Object
28 29 30 31 32 33 34 35 36 37 38 39 40 41 |
# File 'lib/bernstein/redis_queue.rb', line 28 def self. = @redis.smembers QUEUE_SET = [] unless .empty? = @redis.mget().compact unless .empty? .map!{|m| Message.deserialize(m)} end if .size < .size clean_up_queue( - .map{|m| m.id}) end end end |
.status(id) ⇒ Object
24 25 26 |
# File 'lib/bernstein/redis_queue.rb', line 24 def self.status(id) @redis.get(status_key(id)) || STATES[:not_yet_queued] end |