Class: Dramatis::Runtime::Actor

Inherits:
Object
  • Object
show all
Defined in:
lib/dramatis/runtime/actor.rb

Overview

:nodoc: all

Direct Known Subclasses

Main

Defined Under Namespace

Classes: Main

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(object = nil) ⇒ Actor

Returns a new instance of Actor.



24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
# File 'lib/dramatis/runtime/actor.rb', line 24

def initialize object = nil
  @call_threading = false
  @call_thread = nil
  @object = object
  @gate = Dramatis::Runtime::Gate.new
  if !object
    @gate.refuse :object
  end
  @gate.always( ( [ :object, :dramatis_exception ] ), true )
  blocked!
  @queue = []
  @mutex = Mutex.new
  @continuations = {}
  @object_interface = Dramatis::Actor::Interface.new self
  Dramatis::Runtime::Scheduler.current << self
end

Instance Attribute Details

#call_threadObject (readonly)

Returns the value of attribute call_thread.



22
23
24
# File 'lib/dramatis/runtime/actor.rb', line 22

def call_thread
  @call_thread
end

#gateObject (readonly)

Returns the value of attribute gate.



21
22
23
# File 'lib/dramatis/runtime/actor.rb', line 21

def gate
  @gate
end

#objectObject (readonly)

Returns the value of attribute object.



20
21
22
# File 'lib/dramatis/runtime/actor.rb', line 20

def object
  @object
end

#object_interfaceObject (readonly)

Returns the value of attribute object_interface.



19
20
21
# File 'lib/dramatis/runtime/actor.rb', line 19

def object_interface
  @object_interface
end

Instance Method Details

#actor_send(args, opts) ⇒ Object



92
93
94
# File 'lib/dramatis/runtime/actor.rb', line 92

def actor_send args, opts
  common_send :actor, args, opts
end

#bind(object) ⇒ Object



61
62
63
64
65
66
# File 'lib/dramatis/runtime/actor.rb', line 61

def bind object
  raise Dramatis::Error::Bind if @object
  @object = object
  @gate.accept :object
  name
end

#blocked!Object



218
219
220
221
# File 'lib/dramatis/runtime/actor.rb', line 218

def blocked!
  # warn "blocked! #{self} #{@state}"
  @state = :blocked
end

#call_threading?Boolean

Returns:

  • (Boolean)


41
42
43
# File 'lib/dramatis/runtime/actor.rb', line 41

def call_threading?
  @call_threading
end

#common_send(dest, args, opts) ⇒ Object



107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
# File 'lib/dramatis/runtime/actor.rb', line 107

def common_send dest, args, opts

  # warn "common send #{self} #{dest} #{args[0]}"
  # warn "common send #{self} #{dest} #{args.join(' ')} #{opts.to_a.join(' ' )}"

  task = Dramatis::Runtime::Task.new( self, dest, args, opts  )
  
  # warn "#{task.type} #{task.method}"
  # warn "#{self} #{Thread.current} common send r? #{runnable?} g? #{@gate.accepts?( *( [ task.type, task.method ] + task.arguments )  ) } q #{@queue.length}"
  @mutex.synchronize do
    # FIX arguments to gate
    if !runnable? and ( @gate.accepts?(  *( [ task.type, task.method ] + task.arguments ) ) or current_call_thread?( task.call_thread ) )
      runnable!
      Dramatis::Runtime::Scheduler.current.schedule task
    else
      # warn "+>schd #{self} #{@queue.join(' ')}"
      @queue << task
      # warn "+<schd #{self} #{@queue.join(' ')}"
    end
  end

  task.queued

end

#current_call_thread?(that) ⇒ Boolean

Returns:

  • (Boolean)


49
50
51
52
# File 'lib/dramatis/runtime/actor.rb', line 49

def current_call_thread? that
  # warn "current_call_thread? #{@call_thread} #{(@call_thread and (@call_thread == that)).inspect} #{that}"
  @call_thread and @call_thread == that
end

#deadlock(e) ⇒ Object



77
78
79
80
81
82
83
84
# File 'lib/dramatis/runtime/actor.rb', line 77

def deadlock e
  tasks = nil
  @mutex.synchronize do
    tasks = @queue.dup
    @queue.clear
  end
  tasks.each { |task| task.exception e }
end

#deliver(dest, args, continuation, call_thread) ⇒ Object



132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
# File 'lib/dramatis/runtime/actor.rb', line 132

def deliver dest, args, continuation, call_thread
  old_call_thread = @call_thread
  begin
    @call_thread = call_thread
    method = args.shift
    # warn "switch " + dest.to_s + " " + method.to_s
    result = 
      case dest
      when :actor
        # FIX: do name folding; needs test
        self.send method, *args
        # p "sent actor #{method}"
      when :object
        # p "send object #{@object} #{method} #{args.length}"
        v = @object.send method, *args
        if v.object_id == @object.object_id
          v = name
        end
        # p "sent object #{method}"
        v
      when :continuation
        # FIX: name folding?
        # p "send continuation #{method}"
        continuation_name = method
        # warn "c is #{continuation_name}"
        c = @continuations[continuation_name]
        # pp "cs", @continuations.keys
        raise "hell 0 #{Thread.current}" if !c
        method = args.shift
        method = case method
                   when :result; :continuation_result
                   when :exception; :continuation_exception
                   else; raise "hell *"
                 end
        # pp c.to_s, "send", method, args
        c.send method, *args
        @continuations.delete continuation_name
        # pp "csd", continuation_name, @continuations.keys
      else
        raise "hell 1: " + @dest.to_s
      end
    # p "call c '#{result}'"
    # p continuation.to_s
    continuation.result result
    # p "called c #{result}"
  rescue Exception => exception
    # pp "0 exception ", exception
    # pp exception.backtrace
    begin
      continuation.exception exception
    rescue Exception => e
      warn "double exception fault: #{e}"
      pp e
      raise e
    end
  ensure
    @call_thread = old_call_thread
    schedule
  end
end

#enable_call_threadingObject



45
46
47
# File 'lib/dramatis/runtime/actor.rb', line 45

def enable_call_threading
  @call_threading = true
end

#exception(exception) ⇒ Object



68
69
70
71
72
73
74
75
# File 'lib/dramatis/runtime/actor.rb', line 68

def exception exception
  if @object.respond_to? :dramatis_exception
    @object.dramatis_exception exception
  else
    Dramatis::Runtime.current.exception exception
  end
  self
end

#nameObject



15
16
17
# File 'lib/dramatis/runtime/actor.rb', line 15

def name
  @name ||= Dramatis::Actor::Name.new self
end

#object_initialize(*args) ⇒ Object



54
55
56
57
# File 'lib/dramatis/runtime/actor.rb', line 54

def object_initialize *args
  @gate.accept :object
  @object.send :initialize, *args
end

#object_send(args, opts) ⇒ Object



96
97
98
99
100
101
102
103
104
105
# File 'lib/dramatis/runtime/actor.rb', line 96

def object_send args, opts
  t = nil
  if opts[:continuation_send]
    t = :continuation 
    args.unshift opts[:continuation_send]
  else
    t = :object
  end
  common_send t, args, opts
end

#register_continuation(c) ⇒ Object



86
87
88
89
90
# File 'lib/dramatis/runtime/actor.rb', line 86

def register_continuation c
  # p "selfish", self, @continuations
  # pp "csr", c.to_s
  @continuations[c.to_s] = c
end

#runnable!Object



223
224
225
226
# File 'lib/dramatis/runtime/actor.rb', line 223

def runnable!
  # warn "runnable! #{self} #{@state}"
  @state = :runnable
end

#runnable?Boolean

Returns:

  • (Boolean)


228
229
230
231
# File 'lib/dramatis/runtime/actor.rb', line 228

def runnable?
  # warn "runnable? #{self} #{@state}"
  @state == :runnable
end

#schedule(continuation = nil) ⇒ Object

note called from task.rb, too



194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
# File 'lib/dramatis/runtime/actor.rb', line 194

def schedule continuation = nil
  @mutex.synchronize do
    # warn ">schd #{self} #{@state} #{@queue.join(' ')}"
    task = nil
    index = 0
    while task == nil and index < @queue.length do
      candidate = @queue[index]
      # FIX arugments?
      if @gate.accepts?( *( [ candidate.type, candidate.method ] + candidate.arguments ) ) or
         current_call_thread? candidate.call_thread
        task = candidate
        @queue[index,1] = []
      end
      index += 1
    end
    if task 
      Dramatis::Runtime::Scheduler.current.schedule task
    else
      blocked!
    end
    # warn "<schd #{self} #{@queue.join(' ')} #{@state} #{Thread.current}"
  end
end

#timeout(value, *args) ⇒ Object



233
234
235
# File 'lib/dramatis/runtime/actor.rb', line 233

def timeout value, *args
  @timer ||= Dramatis::Runtime::Timer.new
end

#yieldObject



59
# File 'lib/dramatis/runtime/actor.rb', line 59

def yield; end