Class: Pwrake::TaskQueue
- Inherits:
-
Object
- Object
- Pwrake::TaskQueue
- Defined in:
- lib/pwrake/queue/task_queue.rb
Direct Known Subclasses
Instance Attribute Summary collapse
-
#enable_steal ⇒ Object
Returns the value of attribute enable_steal.
Instance Method Summary collapse
- #_qstr(h, q) ⇒ Object
- #clear ⇒ Object
- #deq_impl(host_info = nil, turn = nil) ⇒ Object
- #deq_noaction_task(&block) ⇒ Object
-
#deq_task(&block) ⇒ Object
locality version.
- #deq_turn(turn, &block) ⇒ Object
- #drop_host(host_info) ⇒ Object
- #empty? ⇒ Boolean
-
#enq(tw) ⇒ Object
enq.
- #enq_body(tw) ⇒ Object
- #enq_impl(tw) ⇒ Object
- #init_queue(group_map = nil) ⇒ Object
-
#initialize(hostinfo_by_id, group_map = nil) ⇒ TaskQueue
constructor
A new instance of TaskQueue.
- #inspect_q ⇒ Object
- #turn_empty?(turn) ⇒ Boolean
Constructor Details
#initialize(hostinfo_by_id, group_map = nil) ⇒ TaskQueue
8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 |
# File 'lib/pwrake/queue/task_queue.rb', line 8 def initialize(hostinfo_by_id, group_map=nil) @enable_steal = true @q_no_action = NoActionQueue.new @q_reserved = Hash.new @hostinfo_by_id = hostinfo_by_id pri = Rake.application.['QUEUE_PRIORITY'] || "LIHR" case pri when /prio/i @array_class = PriorityQueueArray when /fifo/i @array_class = FifoQueueArray # Array # Fifo when /lifo/i @array_class = LifoQueueArray when /lihr/i @array_class = LifoHrfQueueArray when /prhr/i @array_class = PriorityHrfQueueArray when /rank/i @array_class = RankQueueArray else raise RuntimeError,"unknown option for QUEUE_PRIORITY: "+pri end Log.debug "@array_class=#{@array_class.inspect}" init_queue(group_map) end |
Instance Attribute Details
#enable_steal ⇒ Object
Returns the value of attribute enable_steal.
42 43 44 |
# File 'lib/pwrake/queue/task_queue.rb', line 42 def enable_steal @enable_steal end |
Instance Method Details
#_qstr(h, q) ⇒ Object
143 144 145 146 147 148 149 150 151 152 153 154 155 156 |
# File 'lib/pwrake/queue/task_queue.rb', line 143 def _qstr(h,q) s = " #{h}: size=#{q.size} " case q.size when 0 s << "[]\n" when 1 s << "[#{q.first.name}]\n" when 2 s << "[#{q.first.name}, #{q.last.name}]\n" else s << "[#{q.first.name},..,#{q.last.name}]\n" end s end |
#clear ⇒ Object
129 130 131 132 133 134 |
# File 'lib/pwrake/queue/task_queue.rb', line 129 def clear @q_no_action.clear @q_reserved.clear @q_input.clear @q_no_input.clear end |
#deq_impl(host_info = nil, turn = nil) ⇒ Object
123 124 125 126 127 |
# File 'lib/pwrake/queue/task_queue.rb', line 123 def deq_impl(host_info=nil, turn=nil) @q_no_action.shift || @q_input.shift(host_info) || @q_no_input.shift(host_info) end |
#deq_noaction_task(&block) ⇒ Object
65 66 67 68 69 70 71 |
# File 'lib/pwrake/queue/task_queue.rb', line 65 def deq_noaction_task(&block) Log.debug "deq_noaction_task:"+(empty? ? " (empty)" : "\n#{inspect_q}") while tw = @q_no_action.shift Log.debug "deq_noaction: #{tw.name}" yield(tw) end end |
#deq_task(&block) ⇒ Object
locality version
73 74 75 76 77 78 79 80 |
# File 'lib/pwrake/queue/task_queue.rb', line 73 def deq_task(&block) # locality version Log.debug "deq_task from:"+(empty? ? " (empty)" : "\n#{inspect_q}") queued = 0 @n_turn.times do |turn| next if turn_empty?(turn) queued += deq_turn(turn,&block) end end |
#deq_turn(turn, &block) ⇒ Object
82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 |
# File 'lib/pwrake/queue/task_queue.rb', line 82 def deq_turn(turn,&block) queued = 0 while true count = 0 @hostinfo_by_id.each_value do |host_info| #Log.debug "TaskQueue#deq_turn host_info=#{host_info.name}" if turn_empty?(turn) return queued elsif (n_idle = host_info.idle_cores) && n_idle > 0 if tw = @q_reserved[host_info] n_use = tw.n_used_cores(host_info) if n_idle < n_use next end @q_reserved.delete(host_info) Log.debug "deq_reserve: #{tw.name} n_use_cores=#{n_use}" elsif tw = deq_impl(host_info,turn) n_use = tw.n_used_cores(host_info) if n_idle < n_use @q_reserved[host_info] = tw Log.debug "reserve host: #{host_info.name} for #{tw.name} (#{n_use} cores)" next end Log.debug "deq: #{tw.name} n_use_cores=#{n_use}" end if tw yield(tw,host_info,n_use) count += 1 queued += 1 end end end break if count == 0 end queued end |
#drop_host(host_info) ⇒ Object
164 165 |
# File 'lib/pwrake/queue/task_queue.rb', line 164 def drop_host(host_info) end |
#empty? ⇒ Boolean
136 137 138 139 140 141 |
# File 'lib/pwrake/queue/task_queue.rb', line 136 def empty? @q_no_action.empty? && @q_reserved.empty? && @q_input.empty? && @q_no_input.empty? end |
#enq(tw) ⇒ Object
enq
45 46 47 48 49 50 51 |
# File 'lib/pwrake/queue/task_queue.rb', line 45 def enq(tw) if tw.nil? || tw.actions.empty? @q_no_action.push(tw) else enq_body(tw) end end |
#enq_body(tw) ⇒ Object
53 54 55 |
# File 'lib/pwrake/queue/task_queue.rb', line 53 def enq_body(tw) enq_impl(tw) end |
#enq_impl(tw) ⇒ Object
57 58 59 60 61 62 63 |
# File 'lib/pwrake/queue/task_queue.rb', line 57 def enq_impl(tw) if tw.has_input_file? @q_input.push(tw) else @q_no_input.push(tw) end end |
#init_queue(group_map = nil) ⇒ Object
36 37 38 39 40 |
# File 'lib/pwrake/queue/task_queue.rb', line 36 def init_queue(group_map=nil) @q_input = @array_class.new(0) @q_no_input = FifoQueueArray.new @n_turn = 1 end |
#inspect_q ⇒ Object
158 159 160 161 162 |
# File 'lib/pwrake/queue/task_queue.rb', line 158 def inspect_q _qstr("noaction",@q_no_action) + _qstr("input", @q_input) + _qstr("no_input",@q_no_input) end |
#turn_empty?(turn) ⇒ Boolean
119 120 121 |
# File 'lib/pwrake/queue/task_queue.rb', line 119 def turn_empty?(turn) empty? end |