Class: Wakame::LockQueue

Inherits:
Object
  • Object
show all
Defined in:
lib/wakame/action_manager.rb

Instance Method Summary collapse

Constructor Details

#initializeLockQueue

Returns a new instance of LockQueue.



197
198
199
200
201
202
203
# File 'lib/wakame/action_manager.rb', line 197

def initialize()
  @locks = {}
  @id2res = {}

  @queue_by_thread = {}
  @qbt_m = ::Mutex.new
end

Instance Method Details

#clear_resource(resource) ⇒ Object



272
273
# File 'lib/wakame/action_manager.rb', line 272

def clear_resource(resource)
end

#inspectObject



275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
# File 'lib/wakame/action_manager.rb', line 275

def inspect
  output = @locks.collect { |k, lst|
    [k, lst].flatten
  }
  return "" if output.empty?

  # Table display
  maxcolws = (0..(output.size)).zip(*output).collect { |i| i.shift; i.map!{|i| (i.nil? ? "" : i).length }.max }
  maxcol = maxcolws.size
  maxcolws.reverse.each { |i| 
    break if i > 0
    maxcol -= 1
  }

  textrows = output.collect { |x|
    buf=""
    maxcol.times { |n|
      buf << "|" + (x[n] || "").ljust(maxcolws[n])
    }
    buf << "|"
  }

  "+" + (["-"] * (textrows[0].length - 2)).join('') + "+\n" + \
  textrows.join("\n") + \
  "\n+" + (["-"] * (textrows[0].length - 2)).join('')+ "+"
end

#quit(id) ⇒ Object



253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
# File 'lib/wakame/action_manager.rb', line 253

def quit(id)
  # Need to use EM.barrier while RuleEngine is using EventMachine's threads.
  #StatusDB.barrier {
  EM.barrier {
    case test(id)
    when :runnable, :wait
      @id2res[id].keys.each { |r| @locks[r.to_s].delete_if{ |i| i == id } }
      @locks.delete_if{ |k,v| v.nil? || v.empty? }

      @qbt_m.synchronize {
        @queue_by_thread.each {|t, q| q.enq(id) }
      }
    end
    
    @id2res.delete(id)
  }
  Wakame.log.debug("#{self.class}: quit(#{id})" + "\n#{self.inspect}")
end

#resetObject



221
222
223
224
225
226
# File 'lib/wakame/action_manager.rb', line 221

def reset()
  @locks.keys { |k|
    @locks[k].clear
  }
  @id2res.clear
end

#set(resource, id) ⇒ Object



205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
# File 'lib/wakame/action_manager.rb', line 205

def set(resource, id)
  # Ths Job ID already holds/reserves the lock regarding the resource.
  return if @id2res.has_key?(id) && @id2res[id].has_key?(resource.to_s)

  # Need to use EM.barrier while RuleEngine is using EventMachine's threads.
  #StatusDB.barrier {
  EM.barrier {
    @locks[resource.to_s] ||= []
    @id2res[id] ||= {}
    
    @id2res[id][resource.to_s]=1
    @locks[resource.to_s] << id
  }
  Wakame.log.debug("#{self.class}: set(#{resource.to_s}, #{id})" + "\n#{self.inspect}")
end

#test(id) ⇒ Object



228
229
230
231
232
233
234
235
236
237
238
# File 'lib/wakame/action_manager.rb', line 228

def test(id)
  reslist = @id2res[id]
  return :pass if reslist.nil? || reslist.empty?

  # 
  if reslist.keys.all? { |r| id == @locks[r.to_s][0] }
    return :runnable
  else
    return :wait
  end
end

#wait(id, tout = 60*30) ⇒ Object



240
241
242
243
244
245
246
247
248
249
250
251
# File 'lib/wakame/action_manager.rb', line 240

def wait(id, tout=60*30)
  @qbt_m.synchronize { @queue_by_thread[Thread.current] = ::Queue.new }

  timeout(tout) {
    while test(id) == :wait
      Wakame.log.debug("#{self.class}: Job #{id} waits for locked resouces: #{@id2res[id].keys.join(', ')}")
      break if id == @queue_by_thread[Thread.current].deq
    end
  }
ensure
  @qbt_m.synchronize { @queue_by_thread.delete(Thread.current) }
end