Class: ThreadOrder
- Inherits:
-
Object
- Object
- ThreadOrder
- Defined in:
- lib/thread_order.rb,
lib/thread_order/mutex.rb,
lib/thread_order/version.rb
Constant Summary collapse
- Error =
Class.new RuntimeError
- CannotResume =
Class.new Error
- Mutex =
if defined? ::Mutex # On 1.9 and up, this is in core, so we just use the real one ::Mutex else # On 1.8.7, it's in the stdlib. # We don't want to load the stdlib, b/c this is a test tool, and can affect the test environment, # causing tests to pass where they should fail. # # So we're transcribing/modifying it from https://github.com/ruby/ruby/blob/v1_8_7_374/lib/thread.rb#L56 # Some methods we don't need are deleted. # Anything I don't understand (there's quite a bit, actually) is left in. Class.new do def initialize @waiting = [] @locked = false; @waiting.taint self.taint end def lock while (Thread.critical = true; @locked) @waiting.push Thread.current Thread.stop end @locked = true Thread.critical = false self end def unlock return unless @locked Thread.critical = true @locked = false begin t = @waiting.shift t.wakeup if t rescue ThreadError retry end Thread.critical = false begin t.run if t rescue ThreadError end self end def synchronize lock begin yield ensure unlock end end end end
- VERSION =
'1.1.1'
Instance Method Summary collapse
- #apocalypse!(thread_method = :kill) ⇒ Object
- #current ⇒ Object
- #declare(name, &block) ⇒ Object
- #enqueue(&block) ⇒ Object
-
#initialize ⇒ ThreadOrder
constructor
Note that this must tbe initialized in a threadsafe environment Otherwise, syncing may occur before the mutex is set.
- #join_all ⇒ Object
- #pass_to(name, options = {}) ⇒ Object
- #wait_until(&condition) ⇒ Object
Constructor Details
#initialize ⇒ ThreadOrder
Note that this must tbe initialized in a threadsafe environment Otherwise, syncing may occur before the mutex is set
9 10 11 12 13 14 15 16 17 18 19 |
# File 'lib/thread_order.rb', line 9 def initialize @mutex = Mutex.new @bodies = {} @threads = [] @queue = [] # Queue is in stdlib, but half the purpose of this lib is to avoid such deps, so using an array in a Mutex @worker = Thread.new do Thread.current.abort_on_exception = true Thread.current[:thread_order_name] = :internal_worker loop { break if :shutdown == work() } end end |
Instance Method Details
#apocalypse!(thread_method = :kill) ⇒ Object
67 68 69 70 71 72 73 74 |
# File 'lib/thread_order.rb', line 67 def apocalypse!(thread_method=:kill) enqueue do @threads.each(&thread_method) @queue.clear :shutdown end @worker.join end |
#current ⇒ Object
25 26 27 |
# File 'lib/thread_order.rb', line 25 def current Thread.current[:thread_order_name] end |
#declare(name, &block) ⇒ Object
21 22 23 |
# File 'lib/thread_order.rb', line 21 def declare(name, &block) sync { @bodies[name] = block } end |
#enqueue(&block) ⇒ Object
76 77 78 |
# File 'lib/thread_order.rb', line 76 def enqueue(&block) sync { @queue << block if @worker.alive? } end |
#join_all ⇒ Object
63 64 65 |
# File 'lib/thread_order.rb', line 63 def join_all sync { @threads }.each { |th| th.join } end |
#pass_to(name, options = {}) ⇒ Object
29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 |
# File 'lib/thread_order.rb', line 29 def pass_to(name, ={}) child = nil parent = Thread.current resume_event = extract_resume_event!() enqueue do sync do @threads << Thread.new { child = Thread.current child[:thread_order_name] = name body = sync { @bodies.fetch(name) } wait_until { parent.stop? } :run == resume_event && parent.wakeup wake_on_sleep = lambda do child.status == 'sleep' ? parent.wakeup : child.status == nil ? :noop : child.status == false ? parent.raise(CannotResume.new "#{name} exited instead of sleeping") : enqueue(&wake_on_sleep) end :sleep == resume_event && enqueue(&wake_on_sleep) begin body.call parent rescue Exception => e enqueue { parent.raise e } raise ensure :exit == resume_event && enqueue { parent.wakeup } end } end end sleep child end |
#wait_until(&condition) ⇒ Object
80 81 82 83 84 85 86 87 88 89 90 91 92 |
# File 'lib/thread_order.rb', line 80 def wait_until(&condition) return if condition.call thread = Thread.current wake_when_true = lambda do if thread.stop? && condition.call thread.wakeup else enqueue(&wake_when_true) end end enqueue(&wake_when_true) sleep end |