Class: Celluloid::Actor

Inherits:
Object
  • Object
show all
Extended by:
Registry
Defined in:
lib/celluloid/actor.rb

Overview

Actors are Celluloid’s concurrency primitive. They’re implemented as normal Ruby objects wrapped in threads which communicate with asynchronous messages.

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Registry

[], []=, registered

Constructor Details

#initialize(subject) ⇒ Actor

Wrap the given subject with an Actor



94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
# File 'lib/celluloid/actor.rb', line 94

def initialize(subject)
  @subject   = subject
  @mailbox   = subject.class.mailbox_factory
  @tasks     = Set.new
  @links     = Links.new
  @signals   = Signals.new
  @receivers = Receivers.new
  @timers    = Timers.new
  @running   = true
  @exclusive = false
  @name      = nil

  @thread = ThreadHandle.new do
    Thread.current[:actor]   = self
    Thread.current[:mailbox] = @mailbox
    run
  end
  
  @proxy = ActorProxy.new(self)
end

Instance Attribute Details

Returns the value of attribute links.



23
24
25
# File 'lib/celluloid/actor.rb', line 23

def links
  @links
end

#mailboxObject (readonly)

Returns the value of attribute mailbox.



23
24
25
# File 'lib/celluloid/actor.rb', line 23

def mailbox
  @mailbox
end

#nameObject (readonly)

Returns the value of attribute name.



23
24
25
# File 'lib/celluloid/actor.rb', line 23

def name
  @name
end

#proxyObject (readonly)

Returns the value of attribute proxy.



23
24
25
# File 'lib/celluloid/actor.rb', line 23

def proxy
  @proxy
end

#subjectObject (readonly)

Returns the value of attribute subject.



23
24
25
# File 'lib/celluloid/actor.rb', line 23

def subject
  @subject
end

#tasksObject (readonly)

Returns the value of attribute tasks.



23
24
25
# File 'lib/celluloid/actor.rb', line 23

def tasks
  @tasks
end

#threadObject (readonly)

Returns the value of attribute thread.



23
24
25
# File 'lib/celluloid/actor.rb', line 23

def thread
  @thread
end

Class Method Details

.allObject

Obtain all running actors in the system



83
84
85
86
87
88
89
90
# File 'lib/celluloid/actor.rb', line 83

def all
  actors = []
  Thread.list.each do |t|
    actor = t[:actor]
    actors << actor.proxy if actor
  end
  actors
end

.async(mailbox, meth, *args, &block) ⇒ Object

Invoke a method asynchronously on an actor via its mailbox



64
65
66
67
68
69
70
71
72
73
# File 'lib/celluloid/actor.rb', line 64

def async(mailbox, meth, *args, &block)
  begin
    mailbox << AsyncCall.new(Thread.mailbox, meth, args, block)
  rescue MailboxError
    # Silently swallow asynchronous calls to dead actors. There's no way
    # to reliably generate DeadActorErrors for async calls, so users of
    # async calls should find other ways to deal with actors dying
    # during an async call (i.e. linking/supervisors)
  end
end

.call(mailbox, meth, *args, &block) ⇒ Object

Invoke a method on the given actor via its mailbox



41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
# File 'lib/celluloid/actor.rb', line 41

def call(mailbox, meth, *args, &block)
  call = SyncCall.new(Thread.mailbox, meth, args, block)

  begin
    mailbox << call
  rescue MailboxError
    raise DeadActorError, "attempted to call a dead actor"
  end

  if Celluloid.actor? and not Celluloid.exclusive?
    # The current task will be automatically resumed when we get a response
    Task.suspend(:callwait).value
  else
    # Otherwise we're inside a normal thread, so block
    response = Thread.mailbox.receive do |msg|
      msg.respond_to?(:call) and msg.call == call
    end

    response.value
  end
end

.currentObject

Obtain the current actor

Raises:



27
28
29
30
31
# File 'lib/celluloid/actor.rb', line 27

def current
  actor = Thread.current[:actor]
  raise NotActorError, "not in actor scope" unless actor
  actor.proxy
end

.future(mailbox, meth, *args, &block) ⇒ Object

Call a method asynchronously and retrieve its value later



76
77
78
79
80
# File 'lib/celluloid/actor.rb', line 76

def future(mailbox, meth, *args, &block)
  future = Future.new
  future.execute(mailbox, meth, args, block)
  future
end

.nameObject

Obtain the name of the current actor

Raises:



34
35
36
37
38
# File 'lib/celluloid/actor.rb', line 34

def name
  actor = Thread.current[:actor]
  raise NotActorError, "not in actor scope" unless actor
  actor.name
end

Instance Method Details

#after(interval) ⇒ Object

Schedule a block to run at the given time



197
198
199
200
201
# File 'lib/celluloid/actor.rb', line 197

def after(interval)
  @timers.add(interval) do
    Task.new(:timer) { yield }.resume
  end
end

#cleanup(exit_event) ⇒ Object

Clean up after this actor



271
272
273
274
275
276
277
# File 'lib/celluloid/actor.rb', line 271

def cleanup(exit_event)
  @mailbox.shutdown
  @links.send_event exit_event
  tasks.each { |task| task.terminate }
rescue => ex
  Logger.crash("#{@subject.class}: CLEANUP CRASHED!", ex)
end

#every(interval) ⇒ Object

Schedule a block to run at the given time



204
205
206
207
208
# File 'lib/celluloid/actor.rb', line 204

def every(interval)
  @timers.add(interval, true) do
    Task.new(:timer) { yield }.resume
  end
end

#exclusiveObject

Execute a code block in exclusive mode.



121
122
123
124
125
126
# File 'lib/celluloid/actor.rb', line 121

def exclusive
  @exclusive = true
  yield
ensure
  @exclusive = false
end

#exclusive?Boolean

Is this actor running in exclusive mode?

Returns:

  • (Boolean)


116
117
118
# File 'lib/celluloid/actor.rb', line 116

def exclusive?
  @exclusive
end

#handle_crash(exception) ⇒ Object

Handle any exceptions that occur within a running actor



247
248
249
250
251
252
# File 'lib/celluloid/actor.rb', line 247

def handle_crash(exception)
  Logger.crash("#{@subject.class} crashed!", exception)
  shutdown ExitEvent.new(@proxy, exception)
rescue => ex
  Logger.crash("#{@subject.class}: ERROR HANDLER CRASHED!", ex)
end

#handle_exit_event(exit_event) ⇒ Object

Handle exit events received by this actor



235
236
237
238
239
240
241
242
243
244
# File 'lib/celluloid/actor.rb', line 235

def handle_exit_event(exit_event)
  exit_handler = @subject.class.exit_handler
  if exit_handler
    return @subject.send(exit_handler, exit_event.actor, exit_event.reason)
  end

  # Reraise exceptions from linked actors
  # If no reason is given, actor terminated cleanly
  raise exit_event.reason if exit_event.reason
end

#handle_message(message) ⇒ Object

Handle an incoming message



222
223
224
225
226
227
228
229
230
231
232
# File 'lib/celluloid/actor.rb', line 222

def handle_message(message)
  case message
  when Call
    Task.new(:message_handler) { message.dispatch(@subject) }.resume
  when Response
    message.call.task.resume message
  else
    @receivers.handle_message(message)
  end
  message
end

#receive(timeout = nil, &block) ⇒ Object

Receive an asynchronous message



144
145
146
# File 'lib/celluloid/actor.rb', line 144

def receive(timeout = nil, &block)
  @receivers.receive(timeout, &block)
end

#runObject

Run the actor loop



149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
# File 'lib/celluloid/actor.rb', line 149

def run
  begin
    while @running
      begin
        message = @mailbox.receive(timeout)
      rescue ExitEvent => exit_event
        Task.new(:exit_handler) { handle_exit_event exit_event }.resume
        retry
      rescue NamingRequest => ex
        @name = ex.name
        retry
      rescue TerminationRequest
        break
      end

      if message
        handle_message message
      else
        # No message indicates a timeout
        @timers.fire
        @receivers.fire_timers
      end
    end
  rescue MailboxShutdown
    # If the mailbox detects shutdown, exit the actor
  end

  shutdown
rescue Exception => ex
  handle_crash(ex)
  raise unless ex.is_a? StandardError
end

#run_finalizerObject

Run the user-defined finalizer, if one is set



264
265
266
267
268
# File 'lib/celluloid/actor.rb', line 264

def run_finalizer
  @subject.finalize if @subject.respond_to? :finalize
rescue => ex
  Logger.crash("#{@subject.class}#finalize crashed!", ex)
end

#shutdown(exit_event = ExitEvent.new(@proxy)) ⇒ Object

Handle cleaning up this actor after it exits



255
256
257
258
259
260
261
# File 'lib/celluloid/actor.rb', line 255

def shutdown(exit_event = ExitEvent.new(@proxy))
  run_finalizer
  cleanup exit_event
ensure
  Thread.current[:actor]   = nil
  Thread.current[:mailbox] = nil
end

#signal(name, value = nil) ⇒ Object

Send a signal with the given name to all waiting methods



134
135
136
# File 'lib/celluloid/actor.rb', line 134

def signal(name, value = nil)
  @signals.send name, value
end

#sleep(interval) ⇒ Object

Sleep for the given amount of time



211
212
213
214
215
216
217
218
219
# File 'lib/celluloid/actor.rb', line 211

def sleep(interval)
  if Celluloid.exclusive?
    Kernel.sleep(interval)
  else
    task = Task.current
    @timers.add(interval) { task.resume }
    Task.suspend :sleeping
  end
end

#terminateObject

Terminate this actor



129
130
131
# File 'lib/celluloid/actor.rb', line 129

def terminate
  @running = false
end

#timeoutObject

How long to wait until the next timer fires



183
184
185
186
187
188
189
190
191
192
193
194
# File 'lib/celluloid/actor.rb', line 183

def timeout
  i1 = @timers.wait_interval
  i2 = @receivers.wait_interval

  if i1 and i2
    i1 < i2 ? i1 : i2
  elsif i1
    i1
  else
    i2
  end
end

#wait(name) ⇒ Object

Wait for the given signal



139
140
141
# File 'lib/celluloid/actor.rb', line 139

def wait(name)
  @signals.wait name
end