Class: Wakame::MasterManagers::LockQueue

Inherits:
Object
  • Object
show all
Defined in:
lib/wakame/master_managers/action_manager.rb

Instance Method Summary collapse

Constructor Details

#initializeLockQueue

Returns a new instance of LockQueue.



215
216
217
218
219
220
221
222
223
# File 'lib/wakame/master_managers/action_manager.rb', line 215

def initialize()
  @locks = {}
  @id2res = {}

  @self_m = ::Mutex.new

  @queue_by_thread = {}
  @qbt_m = ::Mutex.new
end

Instance Method Details

#clear_resource(resource) ⇒ Object



290
291
# File 'lib/wakame/master_managers/action_manager.rb', line 290

def clear_resource(resource)
end

#inspectObject



293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
# File 'lib/wakame/master_managers/action_manager.rb', line 293

def inspect
  output = @locks.collect { |k, lst|
    [k, lst].flatten
  }
  return "" if output.empty?

  # Table display
  maxcolws = (0..(output.size)).zip(*output).collect { |i| i.shift; i.map!{|i| (i.nil? ? "" : i).length }.max }
  maxcol = maxcolws.size
  maxcolws.reverse.each { |i| 
    break if i > 0
    maxcol -= 1
  }

  textrows = output.collect { |x|
    buf=""
    maxcol.times { |n|
      buf << "|" + (x[n] || "").ljust(maxcolws[n])
    }
    buf << "|"
  }

  "+" + (["-"] * (textrows[0].length - 2)).join('') + "+\n" + \
  textrows.join("\n") + \
  "\n+" + (["-"] * (textrows[0].length - 2)).join('')+ "+"
end

#quit(id) ⇒ Object



274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
# File 'lib/wakame/master_managers/action_manager.rb', line 274

def quit(id)
  case test(id)
  when :runnable, :wait
    @self_m.synchronize {
      @id2res[id].keys.each { |r| @locks[r.to_s].delete_if{ |i| i == id } }
      @locks.delete_if{ |k,v| v.nil? || v.empty? }
    }
    @qbt_m.synchronize {
      @queue_by_thread.each {|t, q| q.enq(id) }
    }
  end
  
  @id2res.delete(id)
  Wakame.log.debug("#{self.class}: quit(#{id})" + "\n#{self.inspect}")
end

#resetObject



239
240
241
242
243
244
245
246
# File 'lib/wakame/master_managers/action_manager.rb', line 239

def reset()
  @self_m.synchronize {
    @locks.keys { |k|
      @locks[k].clear
    }
    @id2res.clear
  }
end

#set(resource, id) ⇒ Object



225
226
227
228
229
230
231
232
233
234
235
236
237
# File 'lib/wakame/master_managers/action_manager.rb', line 225

def set(resource, id)
  @self_m.synchronize {
    # Ths Job ID already holds/reserves the lock regarding the resource.
    return if @id2res.has_key?(id) && @id2res[id].has_key?(resource.to_s)
  
    @locks[resource.to_s] ||= []
    @id2res[id] ||= {}
    
    @id2res[id][resource.to_s]=1
    @locks[resource.to_s] << id
  }
  Wakame.log.debug("#{self.class}: set(#{resource.to_s}, #{id})" + "\n#{self.inspect}")
end

#test(id) ⇒ Object



248
249
250
251
252
253
254
255
256
257
258
259
# File 'lib/wakame/master_managers/action_manager.rb', line 248

def test(id)
  @self_m.synchronize {
    reslist = @id2res[id]
    return :pass if reslist.nil? || reslist.empty?
    
    if reslist.keys.all? { |r| id == @locks[r.to_s][0] }
      return :runnable
    else
      return :wait
    end
  }
end

#wait(id, tout = 60*30) ⇒ Object



261
262
263
264
265
266
267
268
269
270
271
272
# File 'lib/wakame/master_managers/action_manager.rb', line 261

def wait(id, tout=60*30)
  @qbt_m.synchronize { @queue_by_thread[Thread.current] = ::Queue.new }

  timeout(tout) {
    while test(id) == :wait
      Wakame.log.debug("#{self.class}: Job #{id} waits for locked resouces: #{@id2res[id].keys.join(', ')}")
      break if id == @queue_by_thread[Thread.current].deq
    end
  }
ensure
  @qbt_m.synchronize { @queue_by_thread.delete(Thread.current) }
end