Class: Que::JobBuffer::PriorityQueue

Inherits:
Object
  • Object
show all
Defined in:
lib/que/job_buffer.rb

Overview

A queue object dedicated to a specific worker priority. It’s basically a Queue object from the standard library, but it’s able to reach into the JobBuffer’s buffer in order to satisfy a pop.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(job_buffer:, priority:) ⇒ PriorityQueue

Returns a new instance of PriorityQueue.



199
200
201
202
203
204
205
206
207
208
209
210
# File 'lib/que/job_buffer.rb', line 199

def initialize(
  job_buffer:,
  priority:
)
  @job_buffer = job_buffer
  @priority   = priority
  @waiting    = 0
  @stopping   = false
  @items      = [] # Items pending distribution to waiting threads.
  @mutex      = Mutex.new
  @cv         = ConditionVariable.new
end

Instance Attribute Details

#job_bufferObject (readonly)

Returns the value of attribute job_buffer.



197
198
199
# File 'lib/que/job_buffer.rb', line 197

def job_buffer
  @job_buffer
end

#mutexObject (readonly)

Returns the value of attribute mutex.



197
198
199
# File 'lib/que/job_buffer.rb', line 197

def mutex
  @mutex
end

#priorityObject (readonly)

Returns the value of attribute priority.



197
198
199
# File 'lib/que/job_buffer.rb', line 197

def priority
  @priority
end

Instance Method Details

#popObject



212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
# File 'lib/que/job_buffer.rb', line 212

def pop
  sync do
    loop do
      if @stopping
        return false
      elsif item = @items.pop
        return item
      elsif job_buffer.job_available?(priority)
        return false
      end

      @waiting += 1
      @cv.wait(mutex)
      @waiting -= 1
    end
  end
end

#populateObject



237
238
239
240
241
242
243
244
245
# File 'lib/que/job_buffer.rb', line 237

def populate
  sync do
    waiting_count.times do
      job = yield
      break if job.nil? # False would mean we're stopping.
      _push(job)
    end
  end
end

#stopObject



230
231
232
233
234
235
# File 'lib/que/job_buffer.rb', line 230

def stop
  sync do
    @stopping = true
    @cv.broadcast
  end
end

#waiting_countObject



247
248
249
# File 'lib/que/job_buffer.rb', line 247

def waiting_count
  @waiting
end