Class: Sidekiq::JobSet

Inherits:
SortedSet show all
Defined in:
lib/sidekiq/api.rb

Direct Known Subclasses

DeadSet, RetrySet, ScheduledSet

Instance Attribute Summary

Attributes inherited from SortedSet

#name

Instance Method Summary collapse

Methods inherited from SortedSet

#clear, #initialize, #scan, #size

Constructor Details

This class inherits a constructor from Sidekiq::SortedSet

Instance Method Details

#delete_by_jid(score, jid) ⇒ Object Also known as: delete



643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
# File 'lib/sidekiq/api.rb', line 643

def delete_by_jid(score, jid)
  Sidekiq.redis do |conn|
    elements = conn.zrangebyscore(name, score, score)
    elements.each do |element|
      if element.index(jid)
        message = Sidekiq.load_json(element)
        if message["jid"] == jid
          ret = conn.zrem(name, element)
          @_size -= 1 if ret
          break ret
        end
      end
    end
  end
end

#delete_by_value(name, value) ⇒ Object



635
636
637
638
639
640
641
# File 'lib/sidekiq/api.rb', line 635

def delete_by_value(name, value)
  Sidekiq.redis do |conn|
    ret = conn.zrem(name, value)
    @_size -= 1 if ret
    ret
  end
end

#eachObject



578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
# File 'lib/sidekiq/api.rb', line 578

def each
  initial_size = @_size
  offset_size = 0
  page = -1
  page_size = 50

  loop do
    range_start = page * page_size + offset_size
    range_end = range_start + page_size - 1
    elements = Sidekiq.redis { |conn|
      conn.zrange name, range_start, range_end, with_scores: true
    }
    break if elements.empty?
    page -= 1
    elements.reverse_each do |element, score|
      yield SortedEntry.new(self, score, element)
    end
    offset_size = initial_size - @_size
  end
end

#fetch(score, jid = nil) ⇒ Object

Fetch jobs that match a given time or Range. Job ID is an optional second argument.



602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
# File 'lib/sidekiq/api.rb', line 602

def fetch(score, jid = nil)
  begin_score, end_score =
    if score.is_a?(Range)
      [score.first, score.last]
    else
      [score, score]
    end

  elements = Sidekiq.redis { |conn|
    conn.zrangebyscore(name, begin_score, end_score, with_scores: true)
  }

  elements.each_with_object([]) do |element, result|
    data, job_score = element
    entry = SortedEntry.new(self, job_score, data)
    result << entry if jid.nil? || entry.jid == jid
  end
end

#find_job(jid) ⇒ Object

Find the job with the given JID within this sorted set. This is a slower O(n) operation. Do not use for app logic.



624
625
626
627
628
629
630
631
632
633
# File 'lib/sidekiq/api.rb', line 624

def find_job(jid)
  Sidekiq.redis do |conn|
    conn.zscan_each(name, match: "*#{jid}*", count: 100) do |entry, score|
      job = JSON.parse(entry)
      matched = job["jid"] == jid
      return SortedEntry.new(self, score, entry) if matched
    end
  end
  nil
end

#schedule(timestamp, message) ⇒ Object



572
573
574
575
576
# File 'lib/sidekiq/api.rb', line 572

def schedule(timestamp, message)
  Sidekiq.redis do |conn|
    conn.zadd(name, timestamp.to_f.to_s, Sidekiq.dump_json(message))
  end
end