Class: Sidekiq::Addons::Prioritize::Enqr

Inherits:
Object
  • Object
show all
Defined in:
lib/sidekiq/addons/prioritize/enqr.rb

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.enqueue_with_priority(con, queue, priority, msg) ⇒ Object



59
60
61
62
# File 'lib/sidekiq/addons/prioritize/enqr.rb', line 59

def enqueue_with_priority(con, queue, priority, msg)
  q_name = Sidekiq::Addons::Util.priority_job_queue_name(queue)
  return con.zadd(q_name, priority, msg.to_json)
end

.get_priority_from_msg(msg) ⇒ Object



41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
# File 'lib/sidekiq/addons/prioritize/enqr.rb', line 41

def get_priority_from_msg(msg)
  priority = nil

  if msg["args"].is_a?(Array)
    msg["args"].each do |param|
      if param.is_a?(Hash) and ( param.has_key?(:with_priority) \
         or param.has_key?("with_priority") )

        priority = param[:with_priority]
        priority = param["with_priority"] unless priority
        break
      end
    end
  end

  return priority.to_i
end

Instance Method Details

#call(worker_class, msg, queue, redis_pool) ⇒ Object



28
29
30
31
32
33
34
35
36
37
# File 'lib/sidekiq/addons/prioritize/enqr.rb', line 28

def call(worker_class, msg, queue, redis_pool)
  priority = compute_priority(queue, msg)
  if priority
    msg["queue"] = "queue:#{queue}"
    redis_pool.with {|con| self.class.enqueue_with_priority(con, queue, priority, msg) }
    return false
  else
    yield
  end
end

#compute_priority(queue, msg) ⇒ Object



4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# File 'lib/sidekiq/addons/prioritize/enqr.rb', line 4

def compute_priority(queue, msg)
  to_ignore_qs = Sidekiq.options[:ignore_priority] || []
  min_priority = msg["min_priority"] || Sidekiq.options[:min_priority] || 0
  ignore_priority = msg["ignore_priority"]
  lazy_eval = msg["lazy_eval"]

  priority = min_priority
  if ignore_priority or (to_ignore_qs.include?(queue))
    priority = 0

  elsif lazy_eval
    priority = lazy_eval.call(msg["args"])
    unless priority.is_a?(Integer)
      priority = priority ? (min_priority + 1) : (min_priority - 1)
    end
    msg.delete("lazy_eval")
  else
    priority = self.class.get_priority_from_msg(msg)

  end

  return (priority > min_priority) ? priority : nil
end