Class: Celluloid::Actor
- Extended by:
- Registry
- Defined in:
- lib/vendor/celluloid/lib/celluloid/actor.rb,
lib/vendor/celluloid/lib/celluloid/actor_pool.rb
Overview
Actors are Celluloid’s concurrency primitive. They’re implemented as normal Ruby objects wrapped in threads which communicate with asynchronous messages.
Defined Under Namespace
Classes: Pool
Instance Attribute Summary collapse
-
#links ⇒ Object
readonly
Returns the value of attribute links.
-
#mailbox ⇒ Object
readonly
Returns the value of attribute mailbox.
-
#proxy ⇒ Object
readonly
Returns the value of attribute proxy.
Class Method Summary collapse
-
.async(mailbox, meth, *args, &block) ⇒ Object
Invoke a method asynchronously on an actor via its mailbox.
-
.call(mailbox, meth, *args, &block) ⇒ Object
Invoke a method on the given actor via its mailbox.
Instance Method Summary collapse
-
#after(interval) ⇒ Object
Schedule a block to run at the given time.
-
#alive? ⇒ Boolean
Is this actor alive?.
-
#cleanup(exit_event) ⇒ Object
Handle cleaning up this actor after it exits.
-
#handle_crash(exception) ⇒ Object
Handle any exceptions that occur within a running actor.
-
#handle_exit_event(exit_event) ⇒ Object
Handle exit events received by this actor.
-
#handle_message(message) ⇒ Object
Handle an incoming message.
-
#initialize(subject) ⇒ Actor
constructor
Wrap the given subject with an Actor.
-
#receive(timeout = nil, &block) ⇒ Object
Receive an asynchronous message.
-
#run ⇒ Object
Run the actor loop.
-
#signal(name, value = nil) ⇒ Object
Send a signal with the given name to all waiting methods.
-
#sleep(interval) ⇒ Object
Sleep for the given amount of time.
-
#tasks ⇒ Object
Obtain a hash of tasks that are currently waiting.
-
#terminate ⇒ Object
Terminate this actor.
-
#timeout ⇒ Object
How long to wait until the next timer fires.
-
#wait(name) ⇒ Object
Wait for the given signal.
Methods included from Registry
[], []=, registered
Constructor Details
#initialize(subject) ⇒ Actor
Wrap the given subject with an Actor
63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 |
# File 'lib/vendor/celluloid/lib/celluloid/actor.rb', line 63 def initialize(subject) @subject = subject if subject.respond_to? :mailbox_factory @mailbox = subject.mailbox_factory else @mailbox = Mailbox.new end @links = Links.new @signals = Signals.new @receivers = Receivers.new @timers = Timers.new @proxy = ActorProxy.new(@mailbox, self.class.to_s) @running = true @thread = Pool.get do Thread.current[:actor] = self Thread.current[:mailbox] = @mailbox run end end |
Instance Attribute Details
#links ⇒ Object (readonly)
Returns the value of attribute links.
25 26 27 |
# File 'lib/vendor/celluloid/lib/celluloid/actor.rb', line 25 def links @links end |
#mailbox ⇒ Object (readonly)
Returns the value of attribute mailbox.
26 27 28 |
# File 'lib/vendor/celluloid/lib/celluloid/actor.rb', line 26 def mailbox @mailbox end |
#proxy ⇒ Object (readonly)
Returns the value of attribute proxy.
24 25 26 |
# File 'lib/vendor/celluloid/lib/celluloid/actor.rb', line 24 def proxy @proxy end |
Class Method Details
.async(mailbox, meth, *args, &block) ⇒ Object
Invoke a method asynchronously on an actor via its mailbox
51 52 53 54 55 56 57 58 59 60 |
# File 'lib/vendor/celluloid/lib/celluloid/actor.rb', line 51 def self.async(mailbox, meth, *args, &block) begin mailbox << AsyncCall.new(Thread.mailbox, meth, args, block) rescue MailboxError # Silently swallow asynchronous calls to dead actors. There's no way # to reliably generate DeadActorErrors for async calls, so users of # async calls should find other ways to deal with actors dying # during an async call (i.e. linking/supervisors) end end |
.call(mailbox, meth, *args, &block) ⇒ Object
Invoke a method on the given actor via its mailbox
29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 |
# File 'lib/vendor/celluloid/lib/celluloid/actor.rb', line 29 def self.call(mailbox, meth, *args, &block) call = SyncCall.new(Thread.mailbox, meth, args, block) begin mailbox << call rescue MailboxError raise DeadActorError, "attempted to call a dead actor" end if Celluloid.actor? response = Thread.current[:actor].wait [:call, call.id] else # Otherwise we're inside a normal thread, so block response = Thread.mailbox.receive do |msg| msg.respond_to?(:call_id) and msg.call_id == call.id end end response.value end |
Instance Method Details
#after(interval) ⇒ Object
Schedule a block to run at the given time
179 180 181 182 183 |
# File 'lib/vendor/celluloid/lib/celluloid/actor.rb', line 179 def after(interval) @timers.add(interval) do Task.new(:timer) { yield }.resume end end |
#alive? ⇒ Boolean
Is this actor alive?
88 89 90 |
# File 'lib/vendor/celluloid/lib/celluloid/actor.rb', line 88 def alive? @running end |
#cleanup(exit_event) ⇒ Object
Handle cleaning up this actor after it exits
230 231 232 233 234 235 236 237 238 239 240 |
# File 'lib/vendor/celluloid/lib/celluloid/actor.rb', line 230 def cleanup(exit_event) @mailbox.shutdown @links.send_event exit_event tasks.each { |task, _| task.terminate } begin @subject.finalize if @subject.respond_to? :finalize rescue Exception => ex Logger.crash("#{@subject.class}#finalize crashed!", ex) end end |
#handle_crash(exception) ⇒ Object
Handle any exceptions that occur within a running actor
222 223 224 225 226 227 |
# File 'lib/vendor/celluloid/lib/celluloid/actor.rb', line 222 def handle_crash(exception) Logger.crash("#{@subject.class} crashed!", exception) cleanup ExitEvent.new(@proxy, exception) rescue Exception => ex Logger.crash("#{@subject.class}: ERROR HANDLER CRASHED!", ex) end |
#handle_exit_event(exit_event) ⇒ Object
Handle exit events received by this actor
210 211 212 213 214 215 216 217 218 219 |
# File 'lib/vendor/celluloid/lib/celluloid/actor.rb', line 210 def handle_exit_event(exit_event) exit_handler = @subject.class.exit_handler if exit_handler return @subject.send(exit_handler, exit_event.actor, exit_event.reason) end # Reraise exceptions from linked actors # If no reason is given, actor terminated cleanly raise exit_event.reason if exit_event.reason end |
#handle_message(message) ⇒ Object
Handle an incoming message
193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 |
# File 'lib/vendor/celluloid/lib/celluloid/actor.rb', line 193 def () case when Call Task.new(:message_handler) { .dispatch(@subject) }.resume when Response handled_successfully = signal [:call, .call_id], unless handled_successfully Logger.debug("anomalous message! spurious response to call #{.call_id}") end else @receivers.() end end |
#receive(timeout = nil, &block) ⇒ Object
Receive an asynchronous message
109 110 111 |
# File 'lib/vendor/celluloid/lib/celluloid/actor.rb', line 109 def receive(timeout = nil, &block) @receivers.receive(timeout, &block) end |
#run ⇒ Object
Run the actor loop
114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 |
# File 'lib/vendor/celluloid/lib/celluloid/actor.rb', line 114 def run while @running begin = @mailbox.receive(timeout) rescue ExitEvent => exit_event Task.new(:exit_handler) { handle_exit_event exit_event }.resume retry end if else # No message indicates a timeout @timers.fire @receivers.fire_timers end end cleanup ExitEvent.new(@proxy) rescue MailboxShutdown # If the mailbox detects shutdown, exit the actor @running = false rescue Exception => ex @running = false handle_crash(ex) ensure Pool.put @thread end |
#signal(name, value = nil) ⇒ Object
Send a signal with the given name to all waiting methods
99 100 101 |
# File 'lib/vendor/celluloid/lib/celluloid/actor.rb', line 99 def signal(name, value = nil) @signals.send name, value end |
#sleep(interval) ⇒ Object
Sleep for the given amount of time
186 187 188 189 190 |
# File 'lib/vendor/celluloid/lib/celluloid/actor.rb', line 186 def sleep(interval) task = Task.current @timers.add(interval) { task.resume } Task.suspend end |
#tasks ⇒ Object
Obtain a hash of tasks that are currently waiting
158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 |
# File 'lib/vendor/celluloid/lib/celluloid/actor.rb', line 158 def tasks # A hash of tasks to what they're waiting on is more meaningful to the # end-user, and lets us make a copy of the tasks table, rather than # handing them the one we're using internally across threads, a definite # thread safety shared state no-no tasks = {} current_task = Task.current rescue nil tasks[current_task] = :running if current_task @signals.waiting.each do |waitable, waiters| if waiters.is_a? Enumerable waiters.each { |waiter| tasks[waiter] = waitable } else tasks[waiters] = waitable end end tasks end |
#terminate ⇒ Object
Terminate this actor
93 94 95 96 |
# File 'lib/vendor/celluloid/lib/celluloid/actor.rb', line 93 def terminate @running = false nil end |
#timeout ⇒ Object
How long to wait until the next timer fires
144 145 146 147 148 149 150 151 152 153 154 155 |
# File 'lib/vendor/celluloid/lib/celluloid/actor.rb', line 144 def timeout i1 = @timers.wait_interval i2 = @receivers.wait_interval if i1 and i2 i1 < i2 ? i1 : i2 elsif i1 i1 else i2 end end |
#wait(name) ⇒ Object
Wait for the given signal
104 105 106 |
# File 'lib/vendor/celluloid/lib/celluloid/actor.rb', line 104 def wait(name) @signals.wait name end |