Class: Wakame::LockQueue
- Inherits:
-
Object
- Object
- Wakame::LockQueue
- Defined in:
- lib/wakame/action_manager.rb
Instance Method Summary collapse
- #clear_resource(resource) ⇒ Object
-
#initialize ⇒ LockQueue
constructor
A new instance of LockQueue.
- #inspect ⇒ Object
- #quit(id) ⇒ Object
- #reset ⇒ Object
- #set(resource, id) ⇒ Object
- #test(id) ⇒ Object
- #wait(id, tout = 60*30) ⇒ Object
Constructor Details
#initialize ⇒ LockQueue
Returns a new instance of LockQueue.
197 198 199 200 201 202 203 |
# File 'lib/wakame/action_manager.rb', line 197 def initialize() @locks = {} @id2res = {} @queue_by_thread = {} @qbt_m = ::Mutex.new end |
Instance Method Details
#clear_resource(resource) ⇒ Object
272 273 |
# File 'lib/wakame/action_manager.rb', line 272 def clear_resource(resource) end |
#inspect ⇒ Object
275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 |
# File 'lib/wakame/action_manager.rb', line 275 def inspect output = @locks.collect { |k, lst| [k, lst].flatten } return "" if output.empty? # Table display maxcolws = (0..(output.size)).zip(*output).collect { |i| i.shift; i.map!{|i| (i.nil? ? "" : i).length }.max } maxcol = maxcolws.size maxcolws.reverse.each { |i| break if i > 0 maxcol -= 1 } textrows = output.collect { |x| buf="" maxcol.times { |n| buf << "|" + (x[n] || "").ljust(maxcolws[n]) } buf << "|" } "+" + (["-"] * (textrows[0].length - 2)).join('') + "+\n" + \ textrows.join("\n") + \ "\n+" + (["-"] * (textrows[0].length - 2)).join('')+ "+" end |
#quit(id) ⇒ Object
253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 |
# File 'lib/wakame/action_manager.rb', line 253 def quit(id) # Need to use EM.barrier while RuleEngine is using EventMachine's threads. #StatusDB.barrier { EM. { case test(id) when :runnable, :wait @id2res[id].keys.each { |r| @locks[r.to_s].delete_if{ |i| i == id } } @locks.delete_if{ |k,v| v.nil? || v.empty? } @qbt_m.synchronize { @queue_by_thread.each {|t, q| q.enq(id) } } end @id2res.delete(id) } Wakame.log.debug("#{self.class}: quit(#{id})" + "\n#{self.inspect}") end |
#reset ⇒ Object
221 222 223 224 225 226 |
# File 'lib/wakame/action_manager.rb', line 221 def reset() @locks.keys { |k| @locks[k].clear } @id2res.clear end |
#set(resource, id) ⇒ Object
205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 |
# File 'lib/wakame/action_manager.rb', line 205 def set(resource, id) # Ths Job ID already holds/reserves the lock regarding the resource. return if @id2res.has_key?(id) && @id2res[id].has_key?(resource.to_s) # Need to use EM.barrier while RuleEngine is using EventMachine's threads. #StatusDB.barrier { EM. { @locks[resource.to_s] ||= [] @id2res[id] ||= {} @id2res[id][resource.to_s]=1 @locks[resource.to_s] << id } Wakame.log.debug("#{self.class}: set(#{resource.to_s}, #{id})" + "\n#{self.inspect}") end |
#test(id) ⇒ Object
228 229 230 231 232 233 234 235 236 237 238 |
# File 'lib/wakame/action_manager.rb', line 228 def test(id) reslist = @id2res[id] return :pass if reslist.nil? || reslist.empty? # if reslist.keys.all? { |r| id == @locks[r.to_s][0] } return :runnable else return :wait end end |
#wait(id, tout = 60*30) ⇒ Object
240 241 242 243 244 245 246 247 248 249 250 251 |
# File 'lib/wakame/action_manager.rb', line 240 def wait(id, tout=60*30) @qbt_m.synchronize { @queue_by_thread[Thread.current] = ::Queue.new } timeout(tout) { while test(id) == :wait Wakame.log.debug("#{self.class}: Job #{id} waits for locked resouces: #{@id2res[id].keys.join(', ')}") break if id == @queue_by_thread[Thread.current].deq end } ensure @qbt_m.synchronize { @queue_by_thread.delete(Thread.current) } end |