Class: Pwrake::TaskQueue

Inherits:
Object
  • Object
show all
Defined in:
lib/pwrake/queue/task_queue.rb

Direct Known Subclasses

LocalityAwareQueue

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(hostinfo_by_id, group_map = nil) ⇒ TaskQueue

Returns a new instance of TaskQueue.



8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
# File 'lib/pwrake/queue/task_queue.rb', line 8

def initialize(hostinfo_by_id, group_map=nil)
  @enable_steal = true
  @q_no_action = NoActionQueue.new

  @hostinfo_by_id = hostinfo_by_id

  pri = Rake.application.pwrake_options['QUEUE_PRIORITY'] || "LIHR"
  case pri
  when /prio/i
    @array_class = PriorityQueueArray
  when /fifo/i
    @array_class = FifoQueueArray # Array # Fifo
  when /lifo/i
    @array_class = LifoQueueArray
  when /lihr/i
    @array_class = LifoHrfQueueArray
  when /prhr/i
    @array_class = PriorityHrfQueueArray
  when /rank/i
    @array_class = RankQueueArray
  else
    raise RuntimeError,"unknown option for QUEUE_PRIORITY: "+pri
  end
  Log.debug "@array_class=#{@array_class.inspect}"
  init_queue(group_map)
end

Instance Attribute Details

#enable_stealObject

Returns the value of attribute enable_steal.



41
42
43
# File 'lib/pwrake/queue/task_queue.rb', line 41

def enable_steal
  @enable_steal
end

Instance Method Details

#_qstr(h, q) ⇒ Object



136
137
138
139
140
141
142
143
144
145
146
147
148
149
# File 'lib/pwrake/queue/task_queue.rb', line 136

def _qstr(h,q)
  s = " #{h}: size=#{q.size} "
  case q.size
  when 0
    s << "[]\n"
  when 1
    s << "[#{q.first.name}]\n"
  when 2
    s << "[#{q.first.name}, #{q.last.name}]\n"
  else
    s << "[#{q.first.name},..,#{q.last.name}]\n"
  end
  s
end

#clearObject



124
125
126
127
128
# File 'lib/pwrake/queue/task_queue.rb', line 124

def clear
  @q_no_action.clear
  @q_input.clear
  @q_no_input.clear
end

#deq_impl(host_info = nil, turn = nil) ⇒ Object



118
119
120
121
122
# File 'lib/pwrake/queue/task_queue.rb', line 118

def deq_impl(host_info=nil, turn=nil)
  @q_no_action.shift ||
    @q_input.shift(host_info) ||
    @q_no_input.shift(host_info)
end

#deq_noaction_task(&block) ⇒ Object



64
65
66
67
68
69
70
# File 'lib/pwrake/queue/task_queue.rb', line 64

def deq_noaction_task(&block)
  Log.debug "deq_noaction_task:"+(empty? ? " (empty)" : "\n#{inspect_q}")
  while tw = @q_no_action.shift
    Log.debug "deq_noaction: #{tw.name}"
    yield(tw)
  end
end

#deq_task(&block) ⇒ Object

locality version



72
73
74
75
76
77
78
79
80
81
82
# File 'lib/pwrake/queue/task_queue.rb', line 72

def deq_task(&block) # locality version
  Log.debug "deq_task:"+(empty? ? " (empty)" : "\n#{inspect_q}")
  queued = 0
  @n_turn.times do |turn|
    next if turn_empty?(turn)
    queued += deq_turn(turn,&block)
  end
  if queued>0
    Log.debug "deq_task: queued=#{queued}"
  end
end

#deq_turn(turn, &block) ⇒ Object



84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
# File 'lib/pwrake/queue/task_queue.rb', line 84

def deq_turn(turn,&block)
  queued = 0
  while true
    count = 0
    @hostinfo_by_id.each_value do |host_info|
      #Log.debug "TaskQueue#deq_turn host_info=#{host_info.name}"
      if (n = host_info.idle_cores) && n > 0
      if turn_empty?(turn)
        return queued
      elsif tw = deq_impl(host_info,turn)
        n_task_cores = tw.n_used_cores(host_info)
        Log.debug "deq: #{tw.name} n_task_cores=#{n_task_cores}"
        if host_info.idle_cores < n_task_cores
          m = "task.n_used_cores=#{n_task_cores} must be "+
            "<= host_info.idle_cores=#{host_info.idle_cores}"
          Log.fatal m
          raise RuntimeError,m
        else
          yield(tw,host_info,n_task_cores)
          count += 1
          queued += 1
        end
      end
      end
    end
    break if count == 0
  end
  queued
end

#drop_host(host_info) ⇒ Object



157
158
# File 'lib/pwrake/queue/task_queue.rb', line 157

def drop_host(host_info)
end

#empty?Boolean

Returns:

  • (Boolean)


130
131
132
133
134
# File 'lib/pwrake/queue/task_queue.rb', line 130

def empty?
  @q_no_action.empty? &&
    @q_input.empty? &&
    @q_no_input.empty?
end

#enq(tw) ⇒ Object

enq



44
45
46
47
48
49
50
# File 'lib/pwrake/queue/task_queue.rb', line 44

def enq(tw)
  if tw.nil? || tw.actions.empty?
    @q_no_action.push(tw)
  else
    enq_body(tw)
  end
end

#enq_body(tw) ⇒ Object



52
53
54
# File 'lib/pwrake/queue/task_queue.rb', line 52

def enq_body(tw)
  enq_impl(tw)
end

#enq_impl(tw) ⇒ Object



56
57
58
59
60
61
62
# File 'lib/pwrake/queue/task_queue.rb', line 56

def enq_impl(tw)
  if tw.has_input_file?
    @q_input.push(tw)
  else
    @q_no_input.push(tw)
  end
end

#init_queue(group_map = nil) ⇒ Object



35
36
37
38
39
# File 'lib/pwrake/queue/task_queue.rb', line 35

def init_queue(group_map=nil)
  @q_input = @array_class.new(0)
  @q_no_input = FifoQueueArray.new
  @n_turn = 1
end

#inspect_qObject



151
152
153
154
155
# File 'lib/pwrake/queue/task_queue.rb', line 151

def inspect_q
  _qstr("noaction",@q_no_action) +
  _qstr("input",   @q_input) +
  _qstr("no_input",@q_no_input)
end

#turn_empty?(turn) ⇒ Boolean

Returns:

  • (Boolean)


114
115
116
# File 'lib/pwrake/queue/task_queue.rb', line 114

def turn_empty?(turn)
  empty?
end