Class: Que::JobBuffer
- Inherits:
-
Object
- Object
- Que::JobBuffer
- Defined in:
- lib/que/job_buffer.rb
Defined Under Namespace
Classes: PriorityQueue
Instance Attribute Summary collapse
-
#maximum_size ⇒ Object
readonly
Returns the value of attribute maximum_size.
-
#minimum_size ⇒ Object
readonly
Returns the value of attribute minimum_size.
-
#priority_queues ⇒ Object
readonly
Returns the value of attribute priority_queues.
Instance Method Summary collapse
- #accept?(metajobs) ⇒ Boolean
- #available_priorities ⇒ Object
- #buffer_space ⇒ Object
- #clear ⇒ Object
-
#initialize(maximum_size:, minimum_size:, priorities:) ⇒ JobBuffer
constructor
Since we use a mutex, which is not reentrant, we have to be a little careful to not call a method that locks the mutex when we’ve already locked it.
- #job_available?(priority) ⇒ Boolean
- #push(*metajobs) ⇒ Object
- #shift(priority = nil) ⇒ Object
- #shift_job(priority = nil) ⇒ Object
- #size ⇒ Object
- #stop ⇒ Object
- #stopping? ⇒ Boolean
- #to_a ⇒ Object
- #waiting_count ⇒ Object
Constructor Details
#initialize(maximum_size:, minimum_size:, priorities:) ⇒ JobBuffer
Since we use a mutex, which is not reentrant, we have to be a little careful to not call a method that locks the mutex when we’ve already locked it. So, as a general rule, public methods handle locking the mutex when necessary, while private methods handle the actual underlying data changes. This lets us reuse those private methods without running into locking issues.
18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 |
# File 'lib/que/job_buffer.rb', line 18 def initialize( maximum_size:, minimum_size:, priorities: ) @maximum_size = Que.assert(Integer, maximum_size) Que.assert(maximum_size >= 0) { "maximum_size for a JobBuffer must be at least zero!" } @minimum_size = Que.assert(Integer, minimum_size) Que.assert(minimum_size >= 0) { "minimum_size for a JobBuffer must be at least zero!" } Que.assert(minimum_size <= maximum_size) do "minimum buffer size (#{minimum_size}) is " \ "greater than the maximum buffer size (#{maximum_size})!" end @stop = false @array = [] @mutex = Mutex.new @priority_queues = Hash[ # Make sure that priority = nil sorts highest. priorities.sort_by{|p| p || MAXIMUM_PRIORITY}.map do |p| [p, PriorityQueue.new(priority: p, job_buffer: self)] end ].freeze end |
Instance Attribute Details
#maximum_size ⇒ Object (readonly)
Returns the value of attribute maximum_size.
9 10 11 |
# File 'lib/que/job_buffer.rb', line 9 def maximum_size @maximum_size end |
#minimum_size ⇒ Object (readonly)
Returns the value of attribute minimum_size.
9 10 11 |
# File 'lib/que/job_buffer.rb', line 9 def minimum_size @minimum_size end |
#priority_queues ⇒ Object (readonly)
Returns the value of attribute priority_queues.
9 10 11 |
# File 'lib/que/job_buffer.rb', line 9 def priority_queues @priority_queues end |
Instance Method Details
#accept?(metajobs) ⇒ Boolean
83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 |
# File 'lib/que/job_buffer.rb', line 83 def accept?() .sort! sync do return [] if _stopping? start_index = _buffer_space final_index = .length - 1 return if start_index > final_index index_to_lose = @array.length - 1 start_index.upto(final_index) do |index| if index_to_lose >= 0 && ([index] <=> @array[index_to_lose]) < 0 return if index == final_index index_to_lose -= 1 else return .slice(0...index) end end [] end end |
#available_priorities ⇒ Object
116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 |
# File 'lib/que/job_buffer.rb', line 116 def available_priorities hash = {} lowest_priority = true priority_queues.reverse_each do |priority, pq| count = pq.waiting_count if lowest_priority count += buffer_space lowest_priority = false end hash[priority || MAXIMUM_PRIORITY] = count if count > 0 end hash end |
#buffer_space ⇒ Object
134 135 136 |
# File 'lib/que/job_buffer.rb', line 134 def buffer_space sync { _buffer_space } end |
#clear ⇒ Object
151 152 153 |
# File 'lib/que/job_buffer.rb', line 151 def clear sync { pop(_size) } end |
#job_available?(priority) ⇒ Boolean
159 160 161 |
# File 'lib/que/job_buffer.rb', line 159 def job_available?(priority) (job = @array.first) && job.priority_sufficient?(priority) end |
#push(*metajobs) ⇒ Object
46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 |
# File 'lib/que/job_buffer.rb', line 46 def push(*) Que.internal_log(:job_buffer_push, self) do { maximum_size: maximum_size, ids: .map(&:id), current_queue: to_a, } end sync do return if _stopping? @array.concat().sort! # Relying on the hash's contents being sorted, here. priority_queues.reverse_each do |_, pq| pq.populate do _shift_job(pq.priority) end end # If we passed the maximum buffer size, drop the lowest sort keys and # return their ids to be unlocked. overage = -_buffer_space pop(overage) if overage > 0 end end |
#shift(priority = nil) ⇒ Object
74 75 76 77 |
# File 'lib/que/job_buffer.rb', line 74 def shift(priority = nil) queue = priority_queues.fetch(priority) { raise Error, "not a permitted priority! #{priority}" } queue.pop || shift_job(priority) end |
#shift_job(priority = nil) ⇒ Object
79 80 81 |
# File 'lib/que/job_buffer.rb', line 79 def shift_job(priority = nil) sync { _shift_job(priority) } end |
#size ⇒ Object
138 139 140 |
# File 'lib/que/job_buffer.rb', line 138 def size sync { _size } end |
#stop ⇒ Object
146 147 148 149 |
# File 'lib/que/job_buffer.rb', line 146 def stop sync { @stop = true } priority_queues.each_value(&:stop) end |
#stopping? ⇒ Boolean
155 156 157 |
# File 'lib/que/job_buffer.rb', line 155 def stopping? sync { _stopping? } end |
#to_a ⇒ Object
142 143 144 |
# File 'lib/que/job_buffer.rb', line 142 def to_a sync { @array.dup } end |
#waiting_count ⇒ Object
108 109 110 111 112 113 114 |
# File 'lib/que/job_buffer.rb', line 108 def waiting_count count = 0 priority_queues.each_value do |pq| count += pq.waiting_count end count end |