Class: MobME::Infrastructure::Queue::Backends::Redis

Inherits:
MobME::Infrastructure::Queue::Backend show all
Defined in:
lib/mobme/infrastructure/queue/backends/redis.rb

Constant Summary collapse

NAMESPACE =

The namespace that all redis queue keys live inside Redis

'redis:queue:'
QUEUESET =

The set used to store all keys

'redis:queue:set'
UUID_SUFFIX =

The UUID suffix for keys that store values

':uuid'
QUEUE_SUFFIX =

The sorted set suffix for the list of all keys in a queue

':queue'
VALUE_SUFFIX =

The hash suffix for the hash that stores values of a queue

':values'

Instance Method Summary collapse

Constructor Details

#initialize(options = {}) ⇒ Redis

Initialises the Queue

Parameters:

  • options (Hash) (defaults to: {})

    all options to pass to the queue

Options Hash (options):

  • :redis_options (Hash)

    is passed on to the underlying Redis client



23
24
25
26
27
28
# File 'lib/mobme/infrastructure/queue/backends/redis.rb', line 23

def initialize(options = {})
  redis_options = options.delete(:redis_options) || {}

  # Connect to Redis!
  connect(redis_options)
end

Instance Method Details

#add(queue, item, metadata = {}) ⇒ String

Add a value to a queue

Parameters:

  • queue_name (String)

    The queue name to add to.

  • item (Object)

    is the item to add

  • metadata (Hash) (defaults to: {})

    is stored with the item and returned.

Options Hash (metadata):

  • dequeue-timestamp (Time)

    An item with a dequeue-timestamp is only dequeued after this timestamp.

  • priority (Integer)

    An item with a higher priority is dequeued first. Always between 1 and 100.

Returns:

  • (String)

    A unique key in the queue name where the item is set.

Raises:

  • (ArgumentError)


45
46
47
48
49
50
51
52
53
54
55
56
# File 'lib/mobme/infrastructure/queue/backends/redis.rb', line 45

def add(queue, item,  = {})
  raise ArgumentError, "Metadata must be a hash, but #{.class} given" unless .is_a? Hash

   = ()
  uuid = generate_uuid(queue)
  
  add_to_queueset(queue)
  write_value(queue, uuid, item, )
  add_to_queue(queue, uuid, ['dequeue-timestamp'], ['priority'])
  
  uuid
end

#add_bulk(queue, items = []) ⇒ Object

Add values to the queue in bulk This works by pipelining writes to Redis, so results are generally much faster

Parameters:

  • queue (String)

    The queue name to add to

  • items (Array) (defaults to: [])

    The items to add



62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
# File 'lib/mobme/infrastructure/queue/backends/redis.rb', line 62

def add_bulk(queue, items = [])
   = {}
  
  # UUIDs have to be in sync!
  uuids = []
  items.each do |item|
    uuids << generate_uuid(queue)
  end
  
  add_to_queueset(queue)
  
  @redis.pipelined do
    items.each do |item|
      uuid = uuids.shift

      # write value
      value_hash = "#{NAMESPACE}#{queue}#{VALUE_SUFFIX}"
      @redis.hset value_hash, uuid, serialize_item(item, )

      # add to queue
      queue_key = NAMESPACE + queue.to_s + QUEUE_SUFFIX
      @redis.zadd queue_key, (['dequeue_timestamp'], ['priority']), uuid
    end
  end
end

#connect(options) ⇒ Object

Connect to Redis

Parameters:

  • options (Hash)

    to pass to the Redis client as is

Options Hash (options):

  • :connection (Object)

    Instead of making a new connection, the queue will reuse this existing Redis connection



33
34
35
36
# File 'lib/mobme/infrastructure/queue/backends/redis.rb', line 33

def connect(options)
  @redis = options.delete(:connection)
  @redis ||= Redis.new(options)
end

#empty(queue) ⇒ Object

Clear the queue

Parameters:

  • queue_name (String)

    is the queue name to clear



167
168
169
170
171
# File 'lib/mobme/infrastructure/queue/backends/redis.rb', line 167

def empty(queue)
  # Delete key and value stores.
  @redis.del "#{NAMESPACE}#{queue}#{VALUE_SUFFIX}"
  @redis.del "#{NAMESPACE}#{queue}#{QUEUE_SUFFIX}"
end

#list(queue) ⇒ Array<Object, Hash>

Lists all items in the queue. This is an expensive operation

Parameters:

  • queue_name (String)

    is the queue name

Returns:

  • (Array<Object, Hash>)

    An array of list items, the first element the object stored and the second, metadata



151
152
153
154
155
156
157
158
159
160
161
162
163
# File 'lib/mobme/infrastructure/queue/backends/redis.rb', line 151

def list(queue)
  batch_size = 1_000 # keep this low as the time complexity of zrangebyscore is O(log(N)+M) : M -> the size
  
  count = 0; values = []
  (size(queue)/batch_size + 1).times do |i|
    limit = [(batch_size * i), batch_size]
    uuids = range_in_queue(queue, limit)
    batch_values = uuids.map { |uuid| read_value(queue, uuid) }
    values.push(*batch_values)
  end
  
  values
end

#list_queuesArray

List all queues

Returns:

  • (Array)

    A list of queues (includes empty queues that were once available)



175
176
177
# File 'lib/mobme/infrastructure/queue/backends/redis.rb', line 175

def list_queues
  @redis.smembers QUEUESET
end

#peek(queue) ⇒ [Object, Hash]

Peek into the first item in a queue without removing it

Parameters:

  • queue_name (String)

    is the queue name

Returns:

  • ([Object, Hash])

    The item plus the metadata in the queue



135
136
137
138
# File 'lib/mobme/infrastructure/queue/backends/redis.rb', line 135

def peek(queue)
  uuid = first_in_queue(queue)
  read_value(queue, uuid)
end

#remove(queue) {|[Object, Hash]| ... } ⇒ [Object, Hash]

Remove an item from a queue. When a block is passed, the item is reserved instead and automatically put back in case of an error. Raise MobME::Infrastructure::QueueRemoveAbort within the block to manually abort the remove.

Parameters:

  • queue_name (String)

    is the queue name

Yields:

  • ([Object, Hash])

    An optional block that is passed the item being remove alongside metadata.

Returns:

  • ([Object, Hash])

    The item plus the metadata in the queue



95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
# File 'lib/mobme/infrastructure/queue/backends/redis.rb', line 95

def remove(queue, &block)
  begin
    # Remove the first item!
    uuid = first_in_queue(queue)
    if uuid
      # If we're not able to remove the key from the set here, it means that
      # some other thread (or evented operation) has done it before us, so
      # the current remove is invalid and we should retry!
      raise MobME::Infrastructure::Queue::RemoveConflictException unless remove_from_queue(queue, uuid)
    
      queue_item = read_value(queue, uuid)
      
      # When a block is given, safely reserve the queue item
      if block_given?
        begin
          block.call(queue_item)
          remove_value(queue, uuid)
        rescue #generic error
          put_back_in_queue(queue, uuid, queue_item)
          
          # And now re-raise the error
          raise
        rescue MobME::Infrastructure::Queue::RemoveAbort
          put_back_in_queue(queue, uuid, queue_item)
        end
      else
        remove_value(queue, uuid)
        queue_item
      end
    else
      nil
    end
  rescue MobME::Infrastructure::Queue::RemoveConflictException
    retry
  end
end

#remove_queues(*queues) ⇒ Object Also known as: remove_queue

Delete queues

Parameters:

  • queues (optional String ...)

    A list of queues to delete. If empty, all queues are deleted.



181
182
183
184
185
186
187
# File 'lib/mobme/infrastructure/queue/backends/redis.rb', line 181

def remove_queues(*queues)
  queues = list_queues if queues.empty?
  queues.each do |queue_name|
    empty(queue_name)
    remove_from_queueset(queue_name)
  end
end

#size(queue) ⇒ Integer

Find the size of a queue

Parameters:

  • queue_name (String)

    is the queue name

Returns:

  • (Integer)

    The size of the queue



143
144
145
146
# File 'lib/mobme/infrastructure/queue/backends/redis.rb', line 143

def size(queue)
  queue = NAMESPACE + queue.to_s + QUEUE_SUFFIX
  length = (@redis.zcard queue)
end