Class: Pwrake::LocalityAwareQueue

Inherits:
TaskQueue
  • Object
show all
Defined in:
lib/pwrake/locality_aware_queue.rb

Defined Under Namespace

Classes: Throughput

Instance Attribute Summary collapse

Attributes inherited from TaskQueue

#enable_steal, #mutex

Instance Method Summary collapse

Methods inherited from TaskQueue

#after_check, #deq, #enq, #enq_body, #halt, #initialize, #reserve, #resume, #stop, #synchronize, #thread_end

Constructor Details

This class inherits a constructor from Pwrake::TaskQueue

Instance Attribute Details

#sizeObject (readonly)

Returns the value of attribute size.



151
152
153
# File 'lib/pwrake/locality_aware_queue.rb', line 151

def size
  @size
end

Instance Method Details

#clearObject



268
269
270
271
272
273
# File 'lib/pwrake/locality_aware_queue.rb', line 268

def clear
  @q.each{|h,q| q.clear}
  @q_remote.clear
  @q_later.clear
  @reserved_q.clear
end

#deq_impl(host, n) ⇒ Object



176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
# File 'lib/pwrake/locality_aware_queue.rb', line 176

def deq_impl(host,n)
  if t = deq_locate(host)
    Log.info "-- deq_locate n=#{n} task=#{t.name} host=#{host}"
    Log.debug "--- deq_impl\n#{inspect_q}"
    return t
  end

  if !@q_remote.empty?
    t = @q_remote.shift
    Log.info "-- deq_remote n=#{n} task=#{t.name} host=#{host}"
    Log.debug "--- deq_impl\n#{inspect_q}"
    return t
  end

  if !@q_later.empty?
    t = @q_later.shift
    Log.info "-- deq_later n=#{n} task=#{t.name} host=#{host}"
    Log.debug "--- deq_impl\n#{inspect_q}"
    return t
  end

  if @enable_steal && n > 0 && Time.now-@last_enq_time > @steal_wait_after_enq
    if t = deq_steal(host)
      Log.info "-- deq_steal n=#{n} task=#{t.name} host=#{host}"
      Log.debug "--- deq_impl\n#{inspect_q}"
      return t
    end
  end

  m = [@steal_wait*(2**n), @steal_wait_max].min
  @cv.wait(@mutex,m)
  nil
end

#deq_locate(host) ⇒ Object



211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
# File 'lib/pwrake/locality_aware_queue.rb', line 211

def deq_locate(host)
  q = @q[host]
  if q && !q.empty?
    t = q.shift
    t.assigned.each do |h|
      a = @q[h]
      if i = a.index(t)
        a.delete_at(i)
      end
    end
    @size -= 1
    return t
  else
    nil
  end
end

#deq_steal(host) ⇒ Object



228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
# File 'lib/pwrake/locality_aware_queue.rb', line 228

def deq_steal(host)
  # select a task based on many and close
  max_host = nil
  max_num  = 0
  @q.each do |h,a|
    if !a.empty?
      d = a.size
      if d > max_num
        max_host = h
        max_num  = d
      end
    end
  end
  Log.info "-- deq_steal max_host=#{max_host} max_num=#{max_num}"
  deq_locate(max_host)
end

#empty?Boolean

Returns:

  • (Boolean)


275
276
277
278
279
280
# File 'lib/pwrake/locality_aware_queue.rb', line 275

def empty?
  @q.all?{|h,q| q.empty?} &&
    @q_remote.empty? &&
    @q_later.empty? &&
    @reserved_q.empty?
end

#enq_impl(t) ⇒ Object



154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
# File 'lib/pwrake/locality_aware_queue.rb', line 154

def enq_impl(t)
  hints = t.suggest_location
  if hints.nil? || hints.empty?
    @q_later.push(t)
  else
    stored = false
    hints.each do |h|
      if q = @q[h]
        t.assigned.push(h)
        q.push(t)
        stored = true
      end
    end
    if !stored
      @q_remote.push(t)
    end
  end
  @last_enq_time = Time.now
  @size += 1
end

#finishObject



282
283
284
# File 'lib/pwrake/locality_aware_queue.rb', line 282

def finish
  super
end

#init_queue(hosts) ⇒ Object



134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
# File 'lib/pwrake/locality_aware_queue.rb', line 134

def init_queue(hosts)
  @cv = LocalityConditionVariable.new
  @hosts = hosts
  @throughput = Throughput.new
  @size = 0
  @q = {}
  @hosts.each{|h| @q[h]=@array_class.new}
  @q_remote = @array_class.new
  @q_later = Array.new
  @enable_steal = !Pwrake.application.pwrake_options['DISABLE_STEAL']
  @steal_wait = (Pwrake.application.pwrake_options['STEAL_WAIT'] || 0).to_i
  @steal_wait_max = (Pwrake.application.pwrake_options['STEAL_WAIT_MAX'] || 10).to_i
  @steal_wait_after_enq = (Pwrake.application.pwrake_options['STEAL_WAIT_AFTER_ENQ'] || 0.1).to_f
  @last_enq_time = Time.now
  Log.info("-- @enable_steal=#{@enable_steal.inspect} @steal_wait=#{@steal_wait} @steal_wait_max=#{@steal_wait_max} @steal_wait_after_enq={@steal_wait_after_enq}")
end

#inspect_qObject



245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
# File 'lib/pwrake/locality_aware_queue.rb', line 245

def inspect_q
  s = ""
  b = proc{|h,q|
    s += " #{h}: size=#{q.size} "
    case q.size
    when 0
      s += "[]\n"
    when 1
      s += "[#{q[0].name}]\n"
    else
      s += "[#{q[0].name},..]\n"
    end
  }
  @q.each(&b)
  b.call("remote",@q_remote)
  b.call("later",@q_later)
  s
end