Class: Bernstein::RedisQueue

Inherits:
Object
  • Object
show all
Includes:
States
Defined in:
lib/bernstein/redis_queue.rb

Constant Summary collapse

QUEUE_SET =
"queued_messages"

Constants included from States

States::STATES

Class Method Summary collapse

Class Method Details

.add(message) ⇒ Object



16
17
18
19
20
21
22
# File 'lib/bernstein/redis_queue.rb', line 16

def self.add(message)
  @redis.multi do
    @redis.sadd QUEUE_SET, message.id
    @redis.setex message.id, @options[:key_expiry], message.serialize
    @redis.setex status_key(message.id), @options[:key_expiry], STATES[:queued]
  end
end

.clearObject



43
44
45
# File 'lib/bernstein/redis_queue.rb', line 43

def self.clear
  @redis.del QUEUE_SET
end

.configure!(options = {}) ⇒ Object



11
12
13
14
# File 'lib/bernstein/redis_queue.rb', line 11

def self.configure!(options = {})
  @options.merge!(options || {})
  @redis = Redis::Namespace.new(:bernstein, :redis => Redis.new(@options[:redis]))
end

.dequeue(id, mark_as_sent = false) ⇒ Object



47
48
49
# File 'lib/bernstein/redis_queue.rb', line 47

def self.dequeue(id, mark_as_sent = false)
  remove_and_change_status(id, (mark_as_sent ? STATES[:sent] : STATES[:sending]))
end

.mark_as_sent(id) ⇒ Object



51
52
53
# File 'lib/bernstein/redis_queue.rb', line 51

def self.mark_as_sent(id)
  set_status(id, STATES[:sent])
end

.queued_messagesObject



28
29
30
31
32
33
34
35
36
37
38
39
40
41
# File 'lib/bernstein/redis_queue.rb', line 28

def self.queued_messages
  queued_message_ids = @redis.smembers QUEUE_SET
  messages = []
  unless queued_message_ids.empty?
    messages = @redis.mget(queued_message_ids).compact
    unless messages.empty?
      messages.map!{|m| Message.deserialize(m)} 
    end
    if messages.size < queued_message_ids.size
      clean_up_queue(queued_message_ids - messages.map{|m| m.id})
    end
  end
  messages
end

.status(id) ⇒ Object



24
25
26
# File 'lib/bernstein/redis_queue.rb', line 24

def self.status(id)
  @redis.get(status_key(id)) || STATES[:not_yet_queued]
end