Class: Sidekiq::SortedSet

Inherits:
Object
  • Object
show all
Includes:
Enumerable
Defined in:
lib/sidekiq/api.rb

Direct Known Subclasses

RetrySet, ScheduledSet

Instance Method Summary collapse

Constructor Details

#initialize(name) ⇒ SortedSet

Returns a new instance of SortedSet.



235
236
237
# File 'lib/sidekiq/api.rb', line 235

def initialize(name)
  @zset = name
end

Instance Method Details

#clearObject



304
305
306
307
308
# File 'lib/sidekiq/api.rb', line 304

def clear
  Sidekiq.redis do |conn|
    conn.del(@zset)
  end
end

#delete(score, jid = nil) ⇒ Object



282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
# File 'lib/sidekiq/api.rb', line 282

def delete(score, jid = nil)
  if jid
    elements = Sidekiq.redis do |conn|
      conn.zrangebyscore(@zset, score, score)
    end

    elements_with_jid = elements.map do |element|
      message = Sidekiq.load_json(element)

      if message["jid"] == jid
        Sidekiq.redis { |conn| conn.zrem(@zset, element) }
      end
    end
    elements_with_jid.count != 0
  else
    count = Sidekiq.redis do |conn|
      conn.zremrangebyscore(@zset, score, score)
    end
    count != 0
  end
end

#each(&block) ⇒ Object



249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
# File 'lib/sidekiq/api.rb', line 249

def each(&block)
  # page thru the sorted set backwards so deleting entries doesn't screw up indexing
  page = -1
  page_size = 50

  loop do
    elements = Sidekiq.redis do |conn|
      conn.zrange @zset, page * page_size, (page * page_size) + (page_size - 1), :with_scores => true
    end
    break if elements.empty?
    page -= 1
    elements.each do |element, score|
      block.call SortedEntry.new(self, score, element)
    end
  end
end

#fetch(score, jid = nil) ⇒ Object



266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
# File 'lib/sidekiq/api.rb', line 266

def fetch(score, jid = nil)
  elements = Sidekiq.redis do |conn|
    conn.zrangebyscore(@zset, score, score)
  end

  elements.inject([]) do |result, element|
    entry = SortedEntry.new(self, score, element)
    if jid
      result << entry if entry.jid == jid
    else
      result << entry
    end
    result
  end
end

#schedule(timestamp, message) ⇒ Object



243
244
245
246
247
# File 'lib/sidekiq/api.rb', line 243

def schedule(timestamp, message)
  Sidekiq.redis do |conn|
    conn.zadd(@zset, timestamp.to_s, Sidekiq.dump_json(message))
  end
end

#sizeObject



239
240
241
# File 'lib/sidekiq/api.rb', line 239

def size
  Sidekiq.redis {|c| c.zcard(@zset) }
end