Class: Pwrake::TaskQueue

Inherits:
Object
  • Object
show all
Defined in:
lib/pwrake/queue/task_queue.rb

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(queue_class, hostinfo_by_id, group_map = nil) ⇒ TaskQueue

Returns a new instance of TaskQueue.



9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
# File 'lib/pwrake/queue/task_queue.rb', line 9

def initialize(queue_class, hostinfo_by_id, group_map=nil)
  @queue_class = Pwrake.const_get(queue_class)
  @hostinfo_by_id = hostinfo_by_id
  @lock = Monitor.new
  @q_no_action = NoActionQueue.new
  @q_reserved = Hash.new
  @nenq = 0
  @ndeq = 0
  def @q_reserved.first
    super.last
  end
  def @q_reserved.last
    self[keys.last]
  end

  pri = Rake.application.pwrake_options['QUEUE_PRIORITY'] || "LIFO"
  case pri
  when /^fifo$/i
    @array_class = FifoQueueArray
  when /^lifo$/i
    @array_class = LifoQueueArray
  when /^lihr$/i
    @array_class = LifoHrfQueueArray
  else
    raise RuntimeError,"unknown option for QUEUE_PRIORITY: "+pri
  end
  Log.debug "@array_class=#{@array_class.inspect}"

  # median number of cores
  a = @hostinfo_by_id.map{|id,host_info| host_info.ncore}.sort
  n = a.size
  @median_core = (n%2==0) ? (a[n/2-1]+a[n/2])/2 : a[(n-1)/2]

  @q = @queue_class.new(hostinfo_by_id,@array_class,@median_core,group_map)
end

Class Method Details

._qstr(h, q) ⇒ Object



130
131
132
133
134
135
136
137
138
139
140
141
142
143
# File 'lib/pwrake/queue/task_queue.rb', line 130

def self._qstr(h,q)
  s = " #{h}: size=#{q.size} "
  case q.size
  when 0
    s << "[]\n"
  when 1
    s << "[#{q.first.name}]\n"
  when 2
    s << "[#{q.first.name}, #{q.last.name}]\n"
  else
    s << "[#{q.first.name}, .., #{q.last.name}]\n"
  end
  s
end

.firstObject



17
18
19
# File 'lib/pwrake/queue/task_queue.rb', line 17

def @q_reserved.first
  super.last
end

.lastObject



20
21
22
# File 'lib/pwrake/queue/task_queue.rb', line 20

def @q_reserved.last
  self[keys.last]
end

Instance Method Details

#clearObject



118
119
120
121
122
# File 'lib/pwrake/queue/task_queue.rb', line 118

def clear
  @q_no_action.clear
  @q_reserved.clear
  @q.clear
end

#deq_noaction_task(&block) ⇒ Object



75
76
77
78
79
80
# File 'lib/pwrake/queue/task_queue.rb', line 75

def deq_noaction_task(&block)
  while tw = @q_no_action.shift
    yield(tw)
    @ndeq += 1
  end
end

#deq_reserve(&block) ⇒ Object



82
83
84
85
86
87
88
89
90
91
92
93
# File 'lib/pwrake/queue/task_queue.rb', line 82

def deq_reserve(&block)
  @q_reserved.each do |host_info,tw|
    n_idle = host_info.idle_cores || 0
    n_core = tw.use_cores(host_info)
    if n_idle >= n_core
      @q_reserved.delete(host_info)
      Log.debug "deq_reserve: #{tw.name} n_use_cores=#{n_core}"
      yield(tw,host_info,n_core)
      @ndeq += 1
    end
  end
end

#deq_task(&block) ⇒ Object



56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
# File 'lib/pwrake/queue/task_queue.rb', line 56

def deq_task(&block)
  @lock.synchronize do
  if @nenq > 0
    Log.debug "deq_task nenq=#{@nenq}:"+(empty? ? " (empty)" : "\n"+inspect_q)
    @nenq = 0
  end
  deq_noaction_task(&block)
  deq_reserve(&block)
  @q.deq_start
  unless @q.empty?
    @q.turns.each{|turn| deq_turn(turn,&block) }
  end
  if @ndeq > 0
    Log.debug "deq_task ndeq=#{@ndeq}:"+(empty? ? " (empty)" : "\n"+inspect_q)
    @ndeq = 0
  end
  end
end

#deq_turn(turn, &block) ⇒ Object



95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
# File 'lib/pwrake/queue/task_queue.rb', line 95

def deq_turn(turn,&block)
  begin
    count = 0
    @hostinfo_by_id.each_value do |host_info|
      return if @q.turn_empty?(turn)
      n_idle = host_info.idle_cores || 0
      next if n_idle == 0 || @q_reserved[host_info]
      if tw = @q.deq_impl(host_info,turn)
        n_core = tw.use_cores(host_info)
        if n_idle >= n_core
          Log.debug "deq: #{tw.name} n_use_cores=#{n_core}"
          yield(tw,host_info,n_core)
          count += 1
          @ndeq += 1
        else
          @q_reserved[host_info] = tw
          Log.debug "reserve host: #{host_info.name} for #{tw.name} (#{n_core} cores)"
        end
      end
    end
  end while count > 0
end

#drop_host(host_info) ⇒ Object



151
152
153
# File 'lib/pwrake/queue/task_queue.rb', line 151

def drop_host(host_info)
  @q.drop_host(host_info)
end

#empty?Boolean

Returns:

  • (Boolean)


124
125
126
127
128
# File 'lib/pwrake/queue/task_queue.rb', line 124

def empty?
  @q_no_action.empty? &&
  @q_reserved.empty? &&
  @q.empty?
end

#enq(tw) ⇒ Object



45
46
47
48
49
50
51
52
53
54
# File 'lib/pwrake/queue/task_queue.rb', line 45

def enq(tw)
  @lock.synchronize do
  if tw.nil? || tw.actions.empty?
    @q_no_action.push(tw)
  else
    @q.enq_impl(tw)
  end
  @nenq += 1
  end
end

#inspect_qObject



145
146
147
148
149
# File 'lib/pwrake/queue/task_queue.rb', line 145

def inspect_q
  TaskQueue._qstr("noaction",@q_no_action) +
  @q.inspect_q +
  TaskQueue._qstr("reserved",@q_reserved)
end