Class: ThreadOrder

Inherits:
Object
  • Object
show all
Defined in:
lib/thread_order.rb,
lib/thread_order/mutex.rb,
lib/thread_order/version.rb

Constant Summary collapse

Error =
Class.new RuntimeError
CannotResume =
Class.new Error
Mutex =
if defined? ::Mutex
  # On 1.9 and up, this is in core, so we just use the real one
  ::Mutex
else

  # On 1.8.7, it's in the stdlib.
  # We don't want to load the stdlib, b/c this is a test tool, and can affect the test environment,
  # causing tests to pass where they should fail.
  #
  # So we're transcribing/modifying it from https://github.com/ruby/ruby/blob/v1_8_7_374/lib/thread.rb#L56
  # Some methods we don't need are deleted.
  # Anything I don't understand (there's quite a bit, actually) is left in.
  Class.new do
    def initialize
      @waiting = []
      @locked = false;
      @waiting.taint
      self.taint
    end

    def lock
      while (Thread.critical = true; @locked)
        @waiting.push Thread.current
        Thread.stop
      end
      @locked = true
      Thread.critical = false
      self
    end

    def unlock
      return unless @locked
      Thread.critical = true
      @locked = false
      begin
        t = @waiting.shift
        t.wakeup if t
      rescue ThreadError
        retry
      end
      Thread.critical = false
      begin
        t.run if t
      rescue ThreadError
      end
      self
    end

    def synchronize
      lock
      begin
        yield
      ensure
        unlock
      end
    end
  end
end
VERSION =
'1.1.1'

Instance Method Summary collapse

Constructor Details

#initializeThreadOrder

Note that this must tbe initialized in a threadsafe environment Otherwise, syncing may occur before the mutex is set



9
10
11
12
13
14
15
16
17
18
19
# File 'lib/thread_order.rb', line 9

def initialize
  @mutex   = Mutex.new
  @bodies  = {}
  @threads = []
  @queue   = [] # Queue is in stdlib, but half the purpose of this lib is to avoid such deps, so using an array in a Mutex
  @worker  = Thread.new do
    Thread.current.abort_on_exception = true
    Thread.current[:thread_order_name] = :internal_worker
    loop { break if :shutdown == work() }
  end
end

Instance Method Details

#apocalypse!(thread_method = :kill) ⇒ Object



67
68
69
70
71
72
73
74
# File 'lib/thread_order.rb', line 67

def apocalypse!(thread_method=:kill)
  enqueue do
    @threads.each(&thread_method)
    @queue.clear
    :shutdown
  end
  @worker.join
end

#currentObject



25
26
27
# File 'lib/thread_order.rb', line 25

def current
  Thread.current[:thread_order_name]
end

#declare(name, &block) ⇒ Object



21
22
23
# File 'lib/thread_order.rb', line 21

def declare(name, &block)
  sync { @bodies[name] = block }
end

#enqueue(&block) ⇒ Object



76
77
78
# File 'lib/thread_order.rb', line 76

def enqueue(&block)
  sync { @queue << block if @worker.alive? }
end

#join_allObject



63
64
65
# File 'lib/thread_order.rb', line 63

def join_all
  sync { @threads }.each { |th| th.join }
end

#pass_to(name, options = {}) ⇒ Object



29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
# File 'lib/thread_order.rb', line 29

def pass_to(name, options={})
  child        = nil
  parent       = Thread.current
  resume_event = extract_resume_event!(options)
  enqueue do
    sync do
      @threads << Thread.new {
        child = Thread.current
        child[:thread_order_name] = name
        body = sync { @bodies.fetch(name) }
        wait_until { parent.stop? }
        :run == resume_event && parent.wakeup
        wake_on_sleep = lambda do
          child.status == 'sleep' ? parent.wakeup :
          child.status == nil     ? :noop         :
          child.status == false   ? parent.raise(CannotResume.new "#{name} exited instead of sleeping") :
                                    enqueue(&wake_on_sleep)
        end
        :sleep == resume_event && enqueue(&wake_on_sleep)
        begin
          body.call parent
        rescue Exception => e
          enqueue { parent.raise e }
          raise
        ensure
          :exit == resume_event && enqueue { parent.wakeup }
        end
      }
    end
  end
  sleep
  child
end

#wait_until(&condition) ⇒ Object



80
81
82
83
84
85
86
87
88
89
90
91
92
# File 'lib/thread_order.rb', line 80

def wait_until(&condition)
  return if condition.call
  thread = Thread.current
  wake_when_true = lambda do
    if thread.stop? && condition.call
      thread.wakeup
    else
      enqueue(&wake_when_true)
    end
  end
  enqueue(&wake_when_true)
  sleep
end