Class: Sidekiq::PriorityQueue::ReliableFetch

Inherits:
Object
  • Object
show all
Includes:
Util
Defined in:
lib/sidekiq/priority_queue/reliable_fetch.rb

Defined Under Namespace

Classes: UnitOfWork

Instance Method Summary collapse

Constructor Details

#initialize(options) ⇒ ReliableFetch

Returns a new instance of ReliableFetch.



39
40
41
42
43
44
45
# File 'lib/sidekiq/priority_queue/reliable_fetch.rb', line 39

def initialize(options)
  @strictly_ordered_queues = !!options[:strict]
  @queues = options[:queues].map { |q| "priority-queue:#{q}" }
  @queues = @queues.uniq if @strictly_ordered_queues
  @done = false
  @process_index = options[:index] || ENV['PROCESS_INDEX']
end

Instance Method Details

#bulk_requeue(_inprogress, _options) ⇒ Object

Below method is called when we close sidekiq process gracefully



94
95
96
97
98
# File 'lib/sidekiq/priority_queue/reliable_fetch.rb', line 94

def bulk_requeue(_inprogress, _options)
  Sidekiq.logger.debug { "Priority ReliableFetch: Re-queueing terminated jobs" }
  requeue_wip_jobs
  unregister_super_process
end

#queues_cmdObject



85
86
87
88
89
90
91
# File 'lib/sidekiq/priority_queue/reliable_fetch.rb', line 85

def queues_cmd
  if @strictly_ordered_queues
    @queues
  else
    @queues.shuffle.uniq
  end
end

#retrieve_workObject



60
61
62
63
64
65
66
67
68
# File 'lib/sidekiq/priority_queue/reliable_fetch.rb', line 60

def retrieve_work
  return nil if @done

  work = @queues.detect do |q|
    job = zpopmin_sadd(q, wip_queue(q));
    break [q,job] if job
  end
  UnitOfWork.new(*work, wip_queue(work.first)) if work
end

#setupObject



47
48
49
50
51
52
53
54
55
56
57
58
# File 'lib/sidekiq/priority_queue/reliable_fetch.rb', line 47

def setup
  Sidekiq.on(:startup) do
    cleanup_the_dead
    register_myself
  end
  Sidekiq.on(:shutdown) do
    @done = true
  end
  Sidekiq.on(:heartbeat) do
    register_myself
  end
end

#spop(wip_queue) ⇒ Object



81
82
83
# File 'lib/sidekiq/priority_queue/reliable_fetch.rb', line 81

def spop(wip_queue)
  Sidekiq.redis { |con| con.spop(wip_queue) }
end

#wip_queue(q) ⇒ Object



70
71
72
# File 'lib/sidekiq/priority_queue/reliable_fetch.rb', line 70

def wip_queue(q)
  "queue:spriorityq|#{identity}|#{q}"
end

#zpopmin_sadd(queue, wip_queue) ⇒ Object



74
75
76
77
78
79
# File 'lib/sidekiq/priority_queue/reliable_fetch.rb', line 74

def zpopmin_sadd(queue, wip_queue)
  Sidekiq.redis do |con|
    @script_sha ||= con.script(:load, Sidekiq::PriorityQueue::Scripts::ZPOPMIN_SADD)
    con.evalsha(@script_sha, [queue, wip_queue])
  end
end