Class: Dramatis::Runtime::Scheduler

Inherits:
Object
  • Object
show all
Defined in:
lib/dramatis/runtime/scheduler.rb

Overview

:nodoc: all

Defined Under Namespace

Classes: Done

Constant Summary collapse

@@thread_pools =
[]
@@current =
nil

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.currentObject



29
30
31
# File 'lib/dramatis/runtime/scheduler.rb', line 29

def self.current
  @@current ||= self.new
end

.resetObject



19
20
21
22
23
24
25
# File 'lib/dramatis/runtime/scheduler.rb', line 19

def self.reset
  @@thread_pools.each do |thread_pool|
    thread_pool.reset
  end
  @@current.reset
  @@current = nil
end

Instance Method Details

#<<(actor) ⇒ Object



156
157
158
# File 'lib/dramatis/runtime/scheduler.rb', line 156

def << actor
  @actors << actor
end

#checkioObject



15
# File 'lib/dramatis/runtime/scheduler.rb', line 15

def checkio; false; end

#main_at_exit(quiescing = false) ⇒ Object



106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
# File 'lib/dramatis/runtime/scheduler.rb', line 106

def main_at_exit quiescing = false
  # warn "quiescing" if quiescing
  # warn "main has exited: waiting" if !quiescing
  @mutex.synchronize do
    @quiescing = quiescing
    checkio and warn "#{Thread.current} main maybe checkin 1 #{@running_threads} #{@state} #{@main_state} #{quiescing}"
    if @state != :idle
      @running_threads -= 1
      if @state == :waiting
        @wait.signal
      end
    end

    raise "hell #{@main_state.to_s}" if @main_state != :running and @main_state != :may_finish
    checkio and warn "#{Thread.current} main signaled #{@running_threads} #{@state} #{@main_state} #{quiescing}"

    @main_mutex.synchronize do
    if @main_state == :running
      if @state != :idle
        @main_state = :waiting
        begin
          @main_wait.wait @mutex
        rescue Exception => e
          # pp "wait said #{e}", e.backtrace
          raise e
        end
        @main_join.join
        @main_join = nil
        raise "hell #{@main_state.to_s}" if @main_state != :may_finish
      else
        begin
          maybe_deadlock
        rescue Dramatis::Deadlock => deadlock
          warn "Deadlock at exit: uncompleted tasks exist"
          raise deadlock
        end
        @main_state = :may_finish
      end
    end
    @main_state = :running if @quiescing
    end
    @quiescing = false
  end
  raise "hell #{@main_state.to_s}" if @main_state != :may_finish and @main_state != :running
  # warn "?threads? #{Thread.list.join(' ')}"
  # warn "main has exited: done"
  @thread_pool.reset()
  Dramatis::Runtime.current.maybe_raise_exceptions quiescing
end

#maybe_deadlockObject

must be called with @mutex locked must be called after @running_threads decremented



63
64
65
66
67
68
69
# File 'lib/dramatis/runtime/scheduler.rb', line 63

def maybe_deadlock
  # warn "maybe_deadlock #{Thread.current} #{Thread.main} threads #{@running_threads} queue #{@queue.length} #{Thread.list.join(" ")} qg #{@quiescing} scl #{@suspended_continuations.length}"
  if @running_threads == 0 and @queue.length == 0 and @suspended_continuations.length > 0 and !@quiescing
    # p "DEADLOCK!"
    raise Dramatis::Deadlock.new
  end
end

#quiesceObject



101
102
103
104
# File 'lib/dramatis/runtime/scheduler.rb', line 101

def quiesce
  Dramatis::Runtime::Actor::Main.current.quiesce
  main_at_exit true
end

#resetObject



33
34
35
# File 'lib/dramatis/runtime/scheduler.rb', line 33

def reset
  # pp @suspended_continuations
end

#schedule(task) ⇒ Object



37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
# File 'lib/dramatis/runtime/scheduler.rb', line 37

def schedule task
  @mutex.synchronize do
    # warn "sched #{@queue.length} #{@state} #{task}"
    begin
      raise "bad bad bad" if task == nil
    rescue Exception => e
      p "very bad very #{e}"
      pp e.backtrace
      raise e
    end
    @queue << task
    if @queue.length == 1
      if @state == :waiting
        @wait.signal
      elsif @state == :idle
        @state = :running
        @running_threads = 1
        checkio and warn "#{Thread.current} checkout main #{Thread.main} #{@running_threads}"
        @thread = Thread.new { run }
      end
    end
  end
end

#suspend_notification(continuation) ⇒ Object



71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
# File 'lib/dramatis/runtime/scheduler.rb', line 71

def suspend_notification continuation
  @mutex.synchronize do
    if @state == :idle
      @state = :running
      @running_threads = 1
      checkio and warn "#{Thread.current} checkout -1 #{Thread.main} #{@running_threads}"
      @thread = Thread.new { run }
    end
    checkio and warn "#{Thread.current} checkin 0 #{Thread.main} #{@running_threads}"
    @running_threads -= 1
    begin
      raise "cane" if @running_threads < 0
    rescue ::Exception => e
      pp e.backtrace
    end
    if @state == :waiting
      @wait.signal
    end
    @suspended_continuations[continuation.to_s] = continuation
  end
end

#thread_countObject



160
161
162
163
164
# File 'lib/dramatis/runtime/scheduler.rb', line 160

def thread_count
  @mutex.synchronize do
    @@thread_pools.inject(0) { |a,b| a+b.length }
  end
end

#wakeup_notification(continuation) ⇒ Object



93
94
95
96
97
98
99
# File 'lib/dramatis/runtime/scheduler.rb', line 93

def wakeup_notification continuation
  @mutex.synchronize do
    @suspended_continuations.delete continuation.to_s
    @running_threads += 1
    checkio and warn "#{Thread.current} checkout #{@running_threads}"
  end
end