Class: Dramatis::Runtime::Actor
- Inherits:
-
Object
- Object
- Dramatis::Runtime::Actor
- Defined in:
- lib/dramatis/runtime/actor.rb
Overview
:nodoc: all
Direct Known Subclasses
Defined Under Namespace
Classes: Main
Instance Attribute Summary collapse
-
#call_thread ⇒ Object
readonly
Returns the value of attribute call_thread.
-
#gate ⇒ Object
readonly
Returns the value of attribute gate.
-
#object ⇒ Object
readonly
Returns the value of attribute object.
-
#object_interface ⇒ Object
readonly
Returns the value of attribute object_interface.
Instance Method Summary collapse
- #actor_send(args, opts) ⇒ Object
- #bind(object) ⇒ Object
- #blocked! ⇒ Object
- #call_threading? ⇒ Boolean
- #common_send(dest, args, opts) ⇒ Object
- #current_call_thread?(that) ⇒ Boolean
- #deadlock(e) ⇒ Object
- #deliver(dest, args, continuation, call_thread) ⇒ Object
- #enable_call_threading ⇒ Object
- #exception(exception) ⇒ Object
-
#initialize(object = nil) ⇒ Actor
constructor
A new instance of Actor.
- #name ⇒ Object
- #object_initialize(*args) ⇒ Object
- #object_send(args, opts) ⇒ Object
- #register_continuation(c) ⇒ Object
- #runnable! ⇒ Object
- #runnable? ⇒ Boolean
-
#schedule(continuation = nil) ⇒ Object
note called from task.rb, too.
- #timeout(value, *args) ⇒ Object
- #yield ⇒ Object
Constructor Details
#initialize(object = nil) ⇒ Actor
Returns a new instance of Actor.
24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 |
# File 'lib/dramatis/runtime/actor.rb', line 24 def initialize object = nil @call_threading = false @call_thread = nil @object = object @gate = Dramatis::Runtime::Gate.new if !object @gate.refuse :object end @gate.always( ( [ :object, :dramatis_exception ] ), true ) blocked! @queue = [] @mutex = Mutex.new @continuations = {} @object_interface = Dramatis::Actor::Interface.new self Dramatis::Runtime::Scheduler.current << self end |
Instance Attribute Details
#call_thread ⇒ Object (readonly)
Returns the value of attribute call_thread.
22 23 24 |
# File 'lib/dramatis/runtime/actor.rb', line 22 def call_thread @call_thread end |
#gate ⇒ Object (readonly)
Returns the value of attribute gate.
21 22 23 |
# File 'lib/dramatis/runtime/actor.rb', line 21 def gate @gate end |
#object ⇒ Object (readonly)
Returns the value of attribute object.
20 21 22 |
# File 'lib/dramatis/runtime/actor.rb', line 20 def object @object end |
#object_interface ⇒ Object (readonly)
Returns the value of attribute object_interface.
19 20 21 |
# File 'lib/dramatis/runtime/actor.rb', line 19 def object_interface @object_interface end |
Instance Method Details
#actor_send(args, opts) ⇒ Object
92 93 94 |
# File 'lib/dramatis/runtime/actor.rb', line 92 def actor_send args, opts common_send :actor, args, opts end |
#bind(object) ⇒ Object
61 62 63 64 65 66 |
# File 'lib/dramatis/runtime/actor.rb', line 61 def bind object raise Dramatis::Error::Bind if @object @object = object @gate.accept :object name end |
#blocked! ⇒ Object
218 219 220 221 |
# File 'lib/dramatis/runtime/actor.rb', line 218 def blocked! # warn "blocked! #{self} #{@state}" @state = :blocked end |
#call_threading? ⇒ Boolean
41 42 43 |
# File 'lib/dramatis/runtime/actor.rb', line 41 def call_threading? @call_threading end |
#common_send(dest, args, opts) ⇒ Object
107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 |
# File 'lib/dramatis/runtime/actor.rb', line 107 def common_send dest, args, opts # warn "common send #{self} #{dest} #{args[0]}" # warn "common send #{self} #{dest} #{args.join(' ')} #{opts.to_a.join(' ' )}" task = Dramatis::Runtime::Task.new( self, dest, args, opts ) # warn "#{task.type} #{task.method}" # warn "#{self} #{Thread.current} common send r? #{runnable?} g? #{@gate.accepts?( *( [ task.type, task.method ] + task.arguments ) ) } q #{@queue.length}" @mutex.synchronize do # FIX arguments to gate if !runnable? and ( @gate.accepts?( *( [ task.type, task.method ] + task.arguments ) ) or current_call_thread?( task.call_thread ) ) runnable! Dramatis::Runtime::Scheduler.current.schedule task else # warn "+>schd #{self} #{@queue.join(' ')}" @queue << task # warn "+<schd #{self} #{@queue.join(' ')}" end end task.queued end |
#current_call_thread?(that) ⇒ Boolean
49 50 51 52 |
# File 'lib/dramatis/runtime/actor.rb', line 49 def current_call_thread? that # warn "current_call_thread? #{@call_thread} #{(@call_thread and (@call_thread == that)).inspect} #{that}" @call_thread and @call_thread == that end |
#deadlock(e) ⇒ Object
77 78 79 80 81 82 83 84 |
# File 'lib/dramatis/runtime/actor.rb', line 77 def deadlock e tasks = nil @mutex.synchronize do tasks = @queue.dup @queue.clear end tasks.each { |task| task.exception e } end |
#deliver(dest, args, continuation, call_thread) ⇒ Object
132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 |
# File 'lib/dramatis/runtime/actor.rb', line 132 def deliver dest, args, continuation, call_thread old_call_thread = @call_thread begin @call_thread = call_thread method = args.shift # warn "switch " + dest.to_s + " " + method.to_s result = case dest when :actor # FIX: do name folding; needs test self.send method, *args # p "sent actor #{method}" when :object # p "send object #{@object} #{method} #{args.length}" v = @object.send method, *args if v.object_id == @object.object_id v = name end # p "sent object #{method}" v when :continuation # FIX: name folding? # p "send continuation #{method}" continuation_name = method # warn "c is #{continuation_name}" c = @continuations[continuation_name] # pp "cs", @continuations.keys raise "hell 0 #{Thread.current}" if !c method = args.shift method = case method when :result; :continuation_result when :exception; :continuation_exception else; raise "hell *" end # pp c.to_s, "send", method, args c.send method, *args @continuations.delete continuation_name # pp "csd", continuation_name, @continuations.keys else raise "hell 1: " + @dest.to_s end # p "call c '#{result}'" # p continuation.to_s continuation.result result # p "called c #{result}" rescue Exception => exception # pp "0 exception ", exception # pp exception.backtrace begin continuation.exception exception rescue Exception => e warn "double exception fault: #{e}" pp e raise e end ensure @call_thread = old_call_thread schedule end end |
#enable_call_threading ⇒ Object
45 46 47 |
# File 'lib/dramatis/runtime/actor.rb', line 45 def enable_call_threading @call_threading = true end |
#exception(exception) ⇒ Object
68 69 70 71 72 73 74 75 |
# File 'lib/dramatis/runtime/actor.rb', line 68 def exception exception if @object.respond_to? :dramatis_exception @object.dramatis_exception exception else Dramatis::Runtime.current.exception exception end self end |
#name ⇒ Object
15 16 17 |
# File 'lib/dramatis/runtime/actor.rb', line 15 def name @name ||= Dramatis::Actor::Name.new self end |
#object_initialize(*args) ⇒ Object
54 55 56 57 |
# File 'lib/dramatis/runtime/actor.rb', line 54 def object_initialize *args @gate.accept :object @object.send :initialize, *args end |
#object_send(args, opts) ⇒ Object
96 97 98 99 100 101 102 103 104 105 |
# File 'lib/dramatis/runtime/actor.rb', line 96 def object_send args, opts t = nil if opts[:continuation_send] t = :continuation args.unshift opts[:continuation_send] else t = :object end common_send t, args, opts end |
#register_continuation(c) ⇒ Object
86 87 88 89 90 |
# File 'lib/dramatis/runtime/actor.rb', line 86 def register_continuation c # p "selfish", self, @continuations # pp "csr", c.to_s @continuations[c.to_s] = c end |
#runnable! ⇒ Object
223 224 225 226 |
# File 'lib/dramatis/runtime/actor.rb', line 223 def runnable! # warn "runnable! #{self} #{@state}" @state = :runnable end |
#runnable? ⇒ Boolean
228 229 230 231 |
# File 'lib/dramatis/runtime/actor.rb', line 228 def runnable? # warn "runnable? #{self} #{@state}" @state == :runnable end |
#schedule(continuation = nil) ⇒ Object
note called from task.rb, too
194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 |
# File 'lib/dramatis/runtime/actor.rb', line 194 def schedule continuation = nil @mutex.synchronize do # warn ">schd #{self} #{@state} #{@queue.join(' ')}" task = nil index = 0 while task == nil and index < @queue.length do candidate = @queue[index] # FIX arugments? if @gate.accepts?( *( [ candidate.type, candidate.method ] + candidate.arguments ) ) or current_call_thread? candidate.call_thread task = candidate @queue[index,1] = [] end index += 1 end if task Dramatis::Runtime::Scheduler.current.schedule task else blocked! end # warn "<schd #{self} #{@queue.join(' ')} #{@state} #{Thread.current}" end end |
#timeout(value, *args) ⇒ Object
233 234 235 |
# File 'lib/dramatis/runtime/actor.rb', line 233 def timeout value, *args @timer ||= Dramatis::Runtime::Timer.new end |
#yield ⇒ Object
59 |
# File 'lib/dramatis/runtime/actor.rb', line 59 def yield; end |