Class: Que::JobBuffer

Inherits:
Object
  • Object
show all
Defined in:
lib/que/job_buffer.rb

Defined Under Namespace

Classes: PriorityQueue

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(maximum_size:, minimum_size:, priorities:) ⇒ JobBuffer

Since we use a mutex, which is not reentrant, we have to be a little careful to not call a method that locks the mutex when we’ve already locked it. So, as a general rule, public methods handle locking the mutex when necessary, while private methods handle the actual underlying data changes. This lets us reuse those private methods without running into locking issues.



18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
# File 'lib/que/job_buffer.rb', line 18

def initialize(
  maximum_size:,
  minimum_size:,
  priorities:
)
  @maximum_size = Que.assert(Integer, maximum_size)
  Que.assert(maximum_size >= 0) { "maximum_size for a JobBuffer must be at least zero!" }

  @minimum_size = Que.assert(Integer, minimum_size)
  Que.assert(minimum_size >= 0) { "minimum_size for a JobBuffer must be at least zero!" }

  Que.assert(minimum_size <= maximum_size) do
    "minimum buffer size (#{minimum_size}) is " \
      "greater than the maximum buffer size (#{maximum_size})!"
  end

  @stop  = false
  @array = []
  @mutex = Mutex.new

  @priority_queues = Hash[
    # Make sure that priority = nil sorts highest.
    priorities.sort_by{|p| p || MAXIMUM_PRIORITY}.map do |p|
      [p, PriorityQueue.new(priority: p, job_buffer: self)]
    end
  ].freeze
end

Instance Attribute Details

#maximum_sizeObject (readonly)

Returns the value of attribute maximum_size.



9
10
11
# File 'lib/que/job_buffer.rb', line 9

def maximum_size
  @maximum_size
end

#minimum_sizeObject (readonly)

Returns the value of attribute minimum_size.



9
10
11
# File 'lib/que/job_buffer.rb', line 9

def minimum_size
  @minimum_size
end

#priority_queuesObject (readonly)

Returns the value of attribute priority_queues.



9
10
11
# File 'lib/que/job_buffer.rb', line 9

def priority_queues
  @priority_queues
end

Instance Method Details

#accept?(metajobs) ⇒ Boolean

Returns:

  • (Boolean)


83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
# File 'lib/que/job_buffer.rb', line 83

def accept?(metajobs)
  metajobs.sort!

  sync do
    return [] if _stopping?

    start_index = _buffer_space
    final_index = metajobs.length - 1

    return metajobs if start_index > final_index
    index_to_lose = @array.length - 1

    start_index.upto(final_index) do |index|
      if index_to_lose >= 0 && (metajobs[index] <=> @array[index_to_lose]) < 0
        return metajobs if index == final_index
        index_to_lose -= 1
      else
        return metajobs.slice(0...index)
      end
    end

    []
  end
end

#available_prioritiesObject



116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
# File 'lib/que/job_buffer.rb', line 116

def available_priorities
  hash = {}
  lowest_priority = true

  priority_queues.reverse_each do |priority, pq|
    count = pq.waiting_count

    if lowest_priority
      count += buffer_space
      lowest_priority = false
    end

    hash[priority || MAXIMUM_PRIORITY] = count if count > 0
  end

  hash
end

#buffer_spaceObject



134
135
136
# File 'lib/que/job_buffer.rb', line 134

def buffer_space
  sync { _buffer_space }
end

#clearObject



151
152
153
# File 'lib/que/job_buffer.rb', line 151

def clear
  sync { pop(_size) }
end

#job_available?(priority) ⇒ Boolean

Returns:

  • (Boolean)


159
160
161
# File 'lib/que/job_buffer.rb', line 159

def job_available?(priority)
  (job = @array.first) && job.priority_sufficient?(priority)
end

#push(*metajobs) ⇒ Object



46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
# File 'lib/que/job_buffer.rb', line 46

def push(*metajobs)
  Que.internal_log(:job_buffer_push, self) do
    {
      maximum_size:  maximum_size,
      ids:           metajobs.map(&:id),
      current_queue: to_a,
    }
  end

  sync do
    return metajobs if _stopping?

    @array.concat(metajobs).sort!

    # Relying on the hash's contents being sorted, here.
    priority_queues.reverse_each do |_, pq|
      pq.populate do
        _shift_job(pq.priority)
      end
    end

    # If we passed the maximum buffer size, drop the lowest sort keys and
    # return their ids to be unlocked.
    overage = -_buffer_space
    pop(overage) if overage > 0
  end
end

#shift(priority = nil) ⇒ Object



74
75
76
77
# File 'lib/que/job_buffer.rb', line 74

def shift(priority = nil)
  queue = priority_queues.fetch(priority) { raise Error, "not a permitted priority! #{priority}" }
  queue.pop || shift_job(priority)
end

#shift_job(priority = nil) ⇒ Object



79
80
81
# File 'lib/que/job_buffer.rb', line 79

def shift_job(priority = nil)
  sync { _shift_job(priority) }
end

#sizeObject



138
139
140
# File 'lib/que/job_buffer.rb', line 138

def size
  sync { _size }
end

#stopObject



146
147
148
149
# File 'lib/que/job_buffer.rb', line 146

def stop
  sync { @stop = true }
  priority_queues.each_value(&:stop)
end

#stopping?Boolean

Returns:

  • (Boolean)


155
156
157
# File 'lib/que/job_buffer.rb', line 155

def stopping?
  sync { _stopping? }
end

#to_aObject



142
143
144
# File 'lib/que/job_buffer.rb', line 142

def to_a
  sync { @array.dup }
end

#waiting_countObject



108
109
110
111
112
113
114
# File 'lib/que/job_buffer.rb', line 108

def waiting_count
  count = 0
  priority_queues.each_value do |pq|
    count += pq.waiting_count
  end
  count
end