Class: Celluloid::Actor

Inherits:
Object show all
Extended by:
Registry
Defined in:
lib/vendor/celluloid/lib/celluloid/actor.rb,
lib/vendor/celluloid/lib/celluloid/actor_pool.rb

Overview

Actors are Celluloid’s concurrency primitive. They’re implemented as normal Ruby objects wrapped in threads which communicate with asynchronous messages.

Defined Under Namespace

Classes: Pool

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Registry

[], []=, registered

Constructor Details

#initialize(subject) ⇒ Actor

Wrap the given subject with an Actor



63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
# File 'lib/vendor/celluloid/lib/celluloid/actor.rb', line 63

def initialize(subject)
  @subject = subject

  if subject.respond_to? :mailbox_factory
    @mailbox = subject.mailbox_factory
  else
    @mailbox = Mailbox.new
  end

  @links     = Links.new
  @signals   = Signals.new
  @receivers = Receivers.new
  @timers    = Timers.new
  @proxy     = ActorProxy.new(@mailbox, self.class.to_s)
  @running   = true

  @thread = Pool.get do
    Thread.current[:actor]   = self
    Thread.current[:mailbox] = @mailbox

    run
  end
end

Instance Attribute Details

Returns the value of attribute links.



25
26
27
# File 'lib/vendor/celluloid/lib/celluloid/actor.rb', line 25

def links
  @links
end

#mailboxObject (readonly)

Returns the value of attribute mailbox.



26
27
28
# File 'lib/vendor/celluloid/lib/celluloid/actor.rb', line 26

def mailbox
  @mailbox
end

#proxyObject (readonly)

Returns the value of attribute proxy.



24
25
26
# File 'lib/vendor/celluloid/lib/celluloid/actor.rb', line 24

def proxy
  @proxy
end

Class Method Details

.async(mailbox, meth, *args, &block) ⇒ Object

Invoke a method asynchronously on an actor via its mailbox



51
52
53
54
55
56
57
58
59
60
# File 'lib/vendor/celluloid/lib/celluloid/actor.rb', line 51

def self.async(mailbox, meth, *args, &block)
  begin
    mailbox << AsyncCall.new(Thread.mailbox, meth, args, block)
  rescue MailboxError
    # Silently swallow asynchronous calls to dead actors. There's no way
    # to reliably generate DeadActorErrors for async calls, so users of
    # async calls should find other ways to deal with actors dying
    # during an async call (i.e. linking/supervisors)
  end
end

.call(mailbox, meth, *args, &block) ⇒ Object

Invoke a method on the given actor via its mailbox



29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
# File 'lib/vendor/celluloid/lib/celluloid/actor.rb', line 29

def self.call(mailbox, meth, *args, &block)
  call = SyncCall.new(Thread.mailbox, meth, args, block)

  begin
    mailbox << call
  rescue MailboxError
    raise DeadActorError, "attempted to call a dead actor"
  end

  if Celluloid.actor?
    response = Thread.current[:actor].wait [:call, call.id]
  else
    # Otherwise we're inside a normal thread, so block
    response = Thread.mailbox.receive do |msg|
      msg.respond_to?(:call_id) and msg.call_id == call.id
    end
  end

  response.value
end

Instance Method Details

#after(interval) ⇒ Object

Schedule a block to run at the given time



179
180
181
182
183
# File 'lib/vendor/celluloid/lib/celluloid/actor.rb', line 179

def after(interval)
  @timers.add(interval) do
    Task.new(:timer) { yield }.resume
  end
end

#alive?Boolean

Is this actor alive?

Returns:

  • (Boolean)


88
89
90
# File 'lib/vendor/celluloid/lib/celluloid/actor.rb', line 88

def alive?
  @running
end

#cleanup(exit_event) ⇒ Object

Handle cleaning up this actor after it exits



230
231
232
233
234
235
236
237
238
239
240
# File 'lib/vendor/celluloid/lib/celluloid/actor.rb', line 230

def cleanup(exit_event)
  @mailbox.shutdown
  @links.send_event exit_event
  tasks.each { |task, _| task.terminate }

  begin
    @subject.finalize if @subject.respond_to? :finalize
  rescue Exception => ex
    Logger.crash("#{@subject.class}#finalize crashed!", ex)
  end
end

#handle_crash(exception) ⇒ Object

Handle any exceptions that occur within a running actor



222
223
224
225
226
227
# File 'lib/vendor/celluloid/lib/celluloid/actor.rb', line 222

def handle_crash(exception)
  Logger.crash("#{@subject.class} crashed!", exception)
  cleanup ExitEvent.new(@proxy, exception)
rescue Exception => ex
  Logger.crash("#{@subject.class}: ERROR HANDLER CRASHED!", ex)
end

#handle_exit_event(exit_event) ⇒ Object

Handle exit events received by this actor



210
211
212
213
214
215
216
217
218
219
# File 'lib/vendor/celluloid/lib/celluloid/actor.rb', line 210

def handle_exit_event(exit_event)
  exit_handler = @subject.class.exit_handler
  if exit_handler
    return @subject.send(exit_handler, exit_event.actor, exit_event.reason)
  end

  # Reraise exceptions from linked actors
  # If no reason is given, actor terminated cleanly
  raise exit_event.reason if exit_event.reason
end

#handle_message(message) ⇒ Object

Handle an incoming message



193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
# File 'lib/vendor/celluloid/lib/celluloid/actor.rb', line 193

def handle_message(message)
  case message
  when Call
    Task.new(:message_handler) { message.dispatch(@subject) }.resume
  when Response
    handled_successfully = signal [:call, message.call_id], message

    unless handled_successfully
      Logger.debug("anomalous message! spurious response to call #{message.call_id}")
    end
  else
    @receivers.handle_message(message)
  end
  message
end

#receive(timeout = nil, &block) ⇒ Object

Receive an asynchronous message



109
110
111
# File 'lib/vendor/celluloid/lib/celluloid/actor.rb', line 109

def receive(timeout = nil, &block)
  @receivers.receive(timeout, &block)
end

#runObject

Run the actor loop



114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
# File 'lib/vendor/celluloid/lib/celluloid/actor.rb', line 114

def run
  while @running
    begin
      message = @mailbox.receive(timeout)
    rescue ExitEvent => exit_event
      Task.new(:exit_handler) { handle_exit_event exit_event }.resume
      retry
    end

    if message
      handle_message message
    else
      # No message indicates a timeout
      @timers.fire
      @receivers.fire_timers
    end
  end

  cleanup ExitEvent.new(@proxy)
rescue MailboxShutdown
  # If the mailbox detects shutdown, exit the actor
  @running = false
rescue Exception => ex
  @running = false
  handle_crash(ex)
ensure
  Pool.put @thread
end

#signal(name, value = nil) ⇒ Object

Send a signal with the given name to all waiting methods



99
100
101
# File 'lib/vendor/celluloid/lib/celluloid/actor.rb', line 99

def signal(name, value = nil)
  @signals.send name, value
end

#sleep(interval) ⇒ Object

Sleep for the given amount of time



186
187
188
189
190
# File 'lib/vendor/celluloid/lib/celluloid/actor.rb', line 186

def sleep(interval)
  task = Task.current
  @timers.add(interval) { task.resume }
  Task.suspend
end

#tasksObject

Obtain a hash of tasks that are currently waiting



158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
# File 'lib/vendor/celluloid/lib/celluloid/actor.rb', line 158

def tasks
  # A hash of tasks to what they're waiting on is more meaningful to the
  # end-user, and lets us make a copy of the tasks table, rather than
  # handing them the one we're using internally across threads, a definite
  # thread safety shared state no-no
  tasks = {}
  current_task = Task.current rescue nil
  tasks[current_task] = :running if current_task

  @signals.waiting.each do |waitable, waiters|
    if waiters.is_a? Enumerable
      waiters.each { |waiter| tasks[waiter] = waitable }
    else
      tasks[waiters] = waitable
    end
  end

  tasks
end

#terminateObject

Terminate this actor



93
94
95
96
# File 'lib/vendor/celluloid/lib/celluloid/actor.rb', line 93

def terminate
  @running = false
  nil
end

#timeoutObject

How long to wait until the next timer fires



144
145
146
147
148
149
150
151
152
153
154
155
# File 'lib/vendor/celluloid/lib/celluloid/actor.rb', line 144

def timeout
  i1 = @timers.wait_interval
  i2 = @receivers.wait_interval

  if i1 and i2
    i1 < i2 ? i1 : i2
  elsif i1
    i1
  else
    i2
  end
end

#wait(name) ⇒ Object

Wait for the given signal



104
105
106
# File 'lib/vendor/celluloid/lib/celluloid/actor.rb', line 104

def wait(name)
  @signals.wait name
end