Class: SimpleQueues::Redis

Inherits:
Object
  • Object
show all
Defined in:
lib/simple_queues/redis.rb

Overview

The Redis version of SimpleQueues.

Messages are enqueued to the right, dequeued from the left - thus the most recent messages are at the end of the list.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(redis = ::Redis.new, options = {}) ⇒ Redis

Returns a new instance of Redis.

Parameters:

  • redis (defaults to: ::Redis.new)

    A Redis instance, or something that looks like Redis.

  • options (Hash) (defaults to: {})

    A set of options.

  • :encoder (Hash)

    a customizable set of options

Raises:

  • (ArgumentError)


15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# File 'lib/simple_queues/redis.rb', line 15

def initialize(redis = ::Redis.new, options={})
  @encoder = case options[:encoder]
             when :messagepack, :msgpack
               SimpleQueues::MessagePackEncoder.new
             when :json, nil
               SimpleQueues::JsonEncoder.new
             when :identity
               SimpleQueues::IdentityEncoder.new
             else # Use whatever was provided
               options[:encoder]
             end
  raise ArgumentError, "Provided encoder #{@encoder.inspect} does not handle #encode and #decode" unless @encoder.respond_to?(:encode) && @encoder.respond_to?(:decode)

  @redis  = redis
  @queues = Hash.new
end

Instance Attribute Details

#encoderObject

Returns the value of attribute encoder.



9
10
11
# File 'lib/simple_queues/redis.rb', line 9

def encoder
  @encoder
end

Instance Method Details

#clear(queue_name) ⇒ Object



74
75
76
# File 'lib/simple_queues/redis.rb', line 74

def clear(queue_name)
  @redis.ltrim(q_name(queue_name), 1, 0)
end

#decode(message) ⇒ Object



47
48
49
# File 'lib/simple_queues/redis.rb', line 47

def decode(message)
  encoder.decode(message) if message
end

#dequeue_blocking(queue_name) ⇒ String

Dequeues a message, and waits forever for one to arrive.

Parameters:

  • queue_name (String, Symbol)

    The queue name to read from.

Returns:

  • (String)

    The first message in the queue.

Raises:

  • ArgumentError If queue_name is nil or the empty String.



70
71
72
# File 'lib/simple_queues/redis.rb', line 70

def dequeue_blocking(queue_name)
  dequeue_with_timeout(queue_name, 0)
end

#dequeue_with_timeout(*args) ⇒ String?

Dequeues a message, or returns nil if the timeout is exceeded.

Parameters:

  • queue_name (String, Symbol)

    The queue name to read from. Optional if you used #on_dequeue.

  • timeout (#to_f)

    The number of seconds to wait before returning nil.

Returns:

  • (String, nil)

    When given two arguments, returns the message, or nil if the timeout was exceeded. When given a timeout only, always returns nil.

Raises:

  • ArgumentError If queue_name is absent and no #on_dequeue blocks were added.



88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
# File 'lib/simple_queues/redis.rb', line 88

def dequeue_with_timeout(*args)
  case args.length
  when 1 # Timeout only
    timeout = args.shift
    raise ArgumentError, "Timeout must not be nil" if timeout.nil? || timeout.to_s.empty?

    queue, result = @redis.blpop(*[@queues.keys, timeout.to_i].flatten)
    if queue then
      block = @queues.fetch(queue)
      message = decode(result)

      if block.arity == 1 then
        block.call(message)
      else
        block.call(queue, message)
      end
    end
    queue
  when 2
    queue_name, timeout = args.shift, args.shift
    _, result = @redis.blpop(q_name(queue_name), timeout.to_i)
    decode(result)
  else
    raise ArgumentError, "Expected 1 (timeout) or 2 (queue name, timeout) arguments, not #{args.length}"
  end

end

#encode(message) ⇒ Object

Raises:

  • (ArgumentError)


42
43
44
45
# File 'lib/simple_queues/redis.rb', line 42

def encode(message)
  raise ArgumentError, "message must be non-nil" if message.nil?
  encoder.encode(message)
end

#enqueue(queue_name, message) ⇒ Object

Enqueues a new message to the Redis backend.

Parameters:

  • queue_name (String, Symbol)

    The queue name, which must not be nil or the empty String.

  • message (#to_s)

    The message to be enqueued. The message will be turned into a String through #to_s before being enqueued. Must not be nil, but the empty string is accepted, although it seems meaningless to do so.

Returns:

  • No useful value.

Raises:

  • ArgumentError Whenever the queue name or the message are nil, or the queue name is empty.



57
58
59
60
61
62
63
# File 'lib/simple_queues/redis.rb', line 57

def enqueue(queue_name, message)
  raise ArgumentError, "Only hashes are accepted as messages" unless message.is_a?(Hash)

  msg = encode(message)
  @redis.rpush(q_name(queue_name), msg)
  msg
end

#on_dequeue(queue_name, &block) ⇒ Object

Saves a block for later execution from #dequeue_with_timeout or #dequeue.

When the block’s arity is 1, only the message will be passed. When the block’s arity is 2, the queue’s name and the message will be passed along, in that order. When the block’s arity is negative (accepts a variable number of arguments), SimpleQueues::Redis behaves as if the block’s arity was 2.

Raises:

  • (ArgumentError)


37
38
39
40
# File 'lib/simple_queues/redis.rb', line 37

def on_dequeue(queue_name, &block)
  raise ArgumentError, "The provided block must accept at least one argument - #{block.inspect} accepts no arguments" if block.arity.zero?
  @queues[q_name(queue_name)] = block
end

#size(queue_name) ⇒ Object



78
79
80
# File 'lib/simple_queues/redis.rb', line 78

def size(queue_name)
  @redis.llen(q_name(queue_name))
end