Module: Celluloid

Extended by:
Celluloid, Forwardable
Included in:
Celluloid, Group::Manager
Defined in:
lib/celluloid/deprecate.rb,
lib/celluloid.rb,
lib/celluloid/cell.rb,
lib/celluloid/task.rb,
lib/celluloid/calls.rb,
lib/celluloid/actor.rb,
lib/celluloid/group.rb,
lib/celluloid/future.rb,
lib/celluloid/thread.rb,
lib/celluloid/mailbox.rb,
lib/celluloid/notices.rb,
lib/celluloid/version.rb,
lib/celluloid/proxies.rb,
lib/celluloid/call/sync.rb,
lib/celluloid/condition.rb,
lib/celluloid/exceptions.rb,
lib/celluloid/proxy/cell.rb,
lib/celluloid/call/async.rb,
lib/celluloid/proxy/sync.rb,
lib/celluloid/group/pool.rb,
lib/celluloid/call/block.rb,
lib/celluloid/proxy/async.rb,
lib/celluloid/proxy/actor.rb,
lib/celluloid/proxy/block.rb,
lib/celluloid/actor_system.rb,
lib/celluloid/proxy/future.rb,
lib/celluloid/task/fibered.rb,
lib/celluloid/task/threaded.rb,
lib/celluloid/group/manager.rb,
lib/celluloid/system_events.rb,
lib/celluloid/group/spawner.rb,
lib/celluloid/proxy/abstract.rb,
lib/celluloid/mailbox/evented.rb

Overview

TODO: Remove link to Interal::Logger

Defined Under Namespace

Modules: ClassMethods, InstanceMethods, Notices, Proxy Classes: AbortError, Actor, ActorSystem, Call, Cell, Condition, ConditionError, DeadTaskError, ExitEvent, Future, Group, LinkingRequest, LinkingResponse, Mailbox, MailboxDead, MailboxShutdown, NamingRequest, NotTaskError, ResumableError, SignalConditionRequest, SystemEvent, Task, TerminationRequest, Thread

Constant Summary

LINKING_TIMEOUT =

Linking times out after 5 seconds

5
BARE_OBJECT_WARNING_MESSAGE =

Warning message added to Celluloid objects accessed outside their actors

"WARNING: BARE CELLULOID OBJECT "
OWNER_IVAR =

reference to owning actor

:@celluloid_owner
VERSION =
"0.17.0"
SyncCall =
Call::Sync
EventedMailbox =
Mailbox::Evented
InternalPool =
Group::Pool
TaskThread =
Task::Threaded
TaskFiber =
Task::Fibered
Error =

Base class of all Celluloid errors

Class.new(StandardError)
NotActorError =

Don't do Actor-like things outside Actor scope

Class.new(Celluloid::Error)
DeadActorError =

Trying to do something to a dead actor

Class.new(Celluloid::Error)
TimeoutError =

A timeout occured before the given request could complete

Class.new(Celluloid::Error)

Class Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Class Attribute Details

.actor_systemObject



33
34
35
36
37
38
39
# File 'lib/celluloid.rb', line 33

def actor_system
  if Thread.current.celluloid?
    Thread.current[:celluloid_actor_system] || fail(Error, "actor system not running")
  else
    Thread.current[:celluloid_actor_system] || @actor_system || fail(Error, "Celluloid is not yet started; use Celluloid.boot")
  end
end

.group_classObject

Default internal thread group to use



29
30
31
# File 'lib/celluloid.rb', line 29

def group_class
  @group_class
end

.log_actor_crashesObject

Returns the value of attribute log_actor_crashes



28
29
30
# File 'lib/celluloid.rb', line 28

def log_actor_crashes
  @log_actor_crashes
end

.loggerObject

Thread-safe logger class



27
28
29
# File 'lib/celluloid.rb', line 27

def logger
  @logger
end

.shutdown_timeoutObject

How long actors have to terminate



31
32
33
# File 'lib/celluloid.rb', line 31

def shutdown_timeout
  @shutdown_timeout
end

.task_classObject

Default task type to use



30
31
32
# File 'lib/celluloid.rb', line 30

def task_class
  @task_class
end

Class Method Details

.actor?Boolean

Are we currently inside of an actor?

Returns:

  • (Boolean)


80
81
82
# File 'lib/celluloid.rb', line 80

def actor?
  !!Thread.current[:celluloid_actor]
end

.bootObject



144
145
146
147
# File 'lib/celluloid.rb', line 144

def boot
  init
  start
end

.coresObject Also known as: cpus, ncpus

Obtain the number of CPUs in the system



95
96
97
# File 'lib/celluloid.rb', line 95

def cores
  Internals::CPUCounter.cores
end

.detect_recursionObject

Detect if a particular call is recursing through multiple actors



118
119
120
121
122
123
124
125
126
127
# File 'lib/celluloid.rb', line 118

def detect_recursion
  actor = Thread.current[:celluloid_actor]
  return unless actor

  task = Thread.current[:celluloid_task]
  return unless task

  chain_id = Internals::CallChain.current_id
  actor.tasks.to_a.any? { |t| t != task && t.chain_id == chain_id }
end

.exception_handler(&block) ⇒ Object

Define an exception handler for actor crashes



130
131
132
# File 'lib/celluloid.rb', line 130

def exception_handler(&block)
  Internals::Logger.exception_handler(&block)
end

.included(klass) ⇒ Object



41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
# File 'lib/celluloid.rb', line 41

def included(klass)
  klass.send :extend,  ClassMethods
  klass.send :include, InstanceMethods

  klass.send :extend, Internals::Properties

  klass.property :mailbox_class, default: Celluloid::Mailbox
  klass.property :proxy_class,   default: Celluloid::Proxy::Cell
  klass.property :task_class,    default: Celluloid.task_class
  klass.property :group_class,   default: Celluloid.group_class
  klass.property :mailbox_size

  klass.property :exclusive_actor, default: false
  klass.property :exclusive_methods, multi: true
  klass.property :execute_block_on_receiver,
                 default: [:after, :every, :receive],
                 multi: true

  klass.property :finalizer
  klass.property :exit_handler_name

  singleton = class << klass; self; end
  singleton.send(:remove_method, :trap_exit) rescue nil
  singleton.send(:remove_method, :exclusive) rescue nil

  singleton.send(:define_method, :trap_exit) do |*args|
    exit_handler_name(*args)
  end

  singleton.send(:define_method, :exclusive) do |*args|
    if args.any?
      exclusive_methods(*exclusive_methods, *args)
    else
      exclusive_actor true
    end
  end
end

.initObject



149
150
151
# File 'lib/celluloid.rb', line 149

def init
  @actor_system = ActorSystem.new
end

.mailboxObject

Retrieve the mailbox for the current thread or lazily initialize it



85
86
87
# File 'lib/celluloid.rb', line 85

def mailbox
  Thread.current[:celluloid_mailbox] ||= Celluloid::Mailbox.new
end

.public_registryObject



113
114
115
# File 'lib/celluloid.rb', line 113

def public_registry
  actor_system.public_registry
end

.register_shutdownObject



161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
# File 'lib/celluloid.rb', line 161

def register_shutdown
  return if defined?(@shutdown_registered) && @shutdown_registered

  # Terminate all actors at exit
  at_exit do
    if defined?(RUBY_ENGINE) && RUBY_ENGINE == "ruby" && RUBY_VERSION >= "1.9"
      # workaround for MRI bug losing exit status in at_exit block
      # http://bugs.ruby-lang.org/issues/5218
      exit_status = $ERROR_INFO.status if $ERROR_INFO.is_a?(SystemExit)
      Celluloid.shutdown
      exit exit_status if exit_status
    else
      Celluloid.shutdown
    end
  end
  @shutdown_registered = true
end

.running?Boolean

Returns:

  • (Boolean)


157
158
159
# File 'lib/celluloid.rb', line 157

def running?
  actor_system && actor_system.running?
end

.shutdownObject

Shut down all running actors



180
181
182
# File 'lib/celluloid.rb', line 180

def shutdown
  actor_system.shutdown
end

.stack_dump(output = STDERR) ⇒ Object Also known as: dump

Perform a stack dump of all actors to the given output object



102
103
104
# File 'lib/celluloid.rb', line 102

def stack_dump(output = STDERR)
  actor_system.stack_dump.print(output)
end

.stack_summary(output = STDERR) ⇒ Object Also known as: summarize

Perform a stack summary of all actors to the given output object



108
109
110
# File 'lib/celluloid.rb', line 108

def stack_summary(output = STDERR)
  actor_system.stack_summary.print(output)
end

.startObject



153
154
155
# File 'lib/celluloid.rb', line 153

def start
  actor_system.start
end

.suspend(status, waiter) ⇒ Object



134
135
136
137
138
139
140
141
142
# File 'lib/celluloid.rb', line 134

def suspend(status, waiter)
  task = Thread.current[:celluloid_task]
  if task && !Celluloid.exclusive?
    waiter.before_suspend(task) if waiter.respond_to?(:before_suspend)
    Task.suspend(status)
  else
    waiter.wait
  end
end

.uuidObject

Generate a Universally Unique Identifier



90
91
92
# File 'lib/celluloid.rb', line 90

def uuid
  Internals::UUID.generate
end

.versionObject



184
185
186
# File 'lib/celluloid.rb', line 184

def version
  VERSION
end

Instance Method Details

#abort(cause) ⇒ Object

Raise an exception in sender context, but stay running



314
315
316
317
318
319
320
321
# File 'lib/celluloid.rb', line 314

def abort(cause)
  cause = case cause
    when String then RuntimeError.new(cause)
    when Exception then cause
    else fail TypeError, "Exception object/String expected, but #{cause.class} received"
  end
  fail AbortError.new(cause)
end

#after(interval, &block) ⇒ Object

Call a block after a given interval, returning a Celluloid::Timer object



428
429
430
# File 'lib/celluloid.rb', line 428

def after(interval, &block)
  Thread.current[:celluloid_actor].after(interval, &block)
end

#async(meth = nil, *args, &block) ⇒ Object

Handle async calls within an actor itself



447
448
449
# File 'lib/celluloid.rb', line 447

def async(meth = nil, *args, &block)
  Thread.current[:celluloid_actor].behavior_proxy.async meth, *args, &block
end

#call_chain_idObject

Obtain the UUID of the current call chain



344
345
346
# File 'lib/celluloid.rb', line 344

def call_chain_id
  Internals::CallChain.current_id
end

#current_actorObject

Obtain the current_actor



339
340
341
# File 'lib/celluloid.rb', line 339

def current_actor
  Actor.current
end

#defer(&block) ⇒ Object

Perform a blocking or computationally intensive action inside an asynchronous group of threads, allowing the sender to continue processing other messages in its mailbox in the meantime



440
441
442
443
444
# File 'lib/celluloid.rb', line 440

def defer(&block)
  # This implementation relies on the present implementation of
  # Celluloid::Future, which uses a thread from InternalPool to run the block
  Future.new(&block).value
end

#every(interval, &block) ⇒ Object

Call a block every given interval, returning a Celluloid::Timer object



433
434
435
# File 'lib/celluloid.rb', line 433

def every(interval, &block)
  Thread.current[:celluloid_actor].every(interval, &block)
end

#exclusive(&block) ⇒ Object

Run given block in an exclusive mode: all synchronous calls block the whole actor, not only current message processing.



417
418
419
# File 'lib/celluloid.rb', line 417

def exclusive(&block)
  Thread.current[:celluloid_task].exclusive(&block)
end

#exclusive?Boolean

Are we currently exclusive

Returns:

  • (Boolean)


422
423
424
425
# File 'lib/celluloid.rb', line 422

def exclusive?
  task = Thread.current[:celluloid_task]
  task && task.exclusive?
end

#future(meth = nil, *args, &block) ⇒ Object

Handle calls to future within an actor itself



452
453
454
# File 'lib/celluloid.rb', line 452

def future(meth = nil, *args, &block)
  Thread.current[:celluloid_actor].behavior_proxy.future meth, *args, &block
end

Link this actor to another, allowing it to crash or react to errors



369
370
371
# File 'lib/celluloid.rb', line 369

def link(actor)
  Actor.link(actor)
end

#linked_to?(actor) ⇒ Boolean

Is this actor linked to another?

Returns:

  • (Boolean)


384
385
386
# File 'lib/celluloid.rb', line 384

def linked_to?(actor)
  Actor.linked_to?(actor)
end

Obtain the Celluloid::Links for this actor



354
355
356
# File 'lib/celluloid.rb', line 354

def links
  Thread.current[:celluloid_actor].links
end

#monitor(actor) ⇒ Object

Watch for exit events from another actor



359
360
361
# File 'lib/celluloid.rb', line 359

def monitor(actor)
  Actor.monitor(actor)
end

#monitoring?(actor) ⇒ Boolean

Are we monitoring another actor?

Returns:

  • (Boolean)


379
380
381
# File 'lib/celluloid.rb', line 379

def monitoring?(actor)
  Actor.monitoring?(actor)
end

#receive(timeout = nil, &block) ⇒ Object

Receive an asynchronous message via the actor protocol



389
390
391
392
393
394
395
396
# File 'lib/celluloid.rb', line 389

def receive(timeout = nil, &block)
  actor = Thread.current[:celluloid_actor]
  if actor
    actor.receive(timeout, &block)
  else
    Celluloid.mailbox.receive(timeout, &block)
  end
end

#signal(name, value = nil) ⇒ Object

Send a signal with the given name to all waiting methods



329
330
331
# File 'lib/celluloid.rb', line 329

def signal(name, value = nil)
  Thread.current[:celluloid_actor].signal name, value
end

#sleep(interval) ⇒ Object

Sleep letting the actor continue processing messages



399
400
401
402
403
404
405
406
# File 'lib/celluloid.rb', line 399

def sleep(interval)
  actor = Thread.current[:celluloid_actor]
  if actor
    actor.sleep(interval)
  else
    Kernel.sleep interval
  end
end

#tasksObject

Obtain the running tasks for this actor



349
350
351
# File 'lib/celluloid.rb', line 349

def tasks
  Thread.current[:celluloid_actor].tasks.to_a
end

#terminateObject

Terminate this actor



324
325
326
# File 'lib/celluloid.rb', line 324

def terminate
  Thread.current[:celluloid_actor].behavior_proxy.terminate!
end

#timeout(duration) ⇒ Object

Timeout on task suspension (eg Sync calls to other actors)



409
410
411
412
413
# File 'lib/celluloid.rb', line 409

def timeout(duration)
  Thread.current[:celluloid_actor].timeout(duration) do
    yield
  end
end

Remove links to another actor



374
375
376
# File 'lib/celluloid.rb', line 374

def unlink(actor)
  Actor.unlink(actor)
end

#unmonitor(actor) ⇒ Object

Stop waiting for exit events from another actor



364
365
366
# File 'lib/celluloid.rb', line 364

def unmonitor(actor)
  Actor.unmonitor(actor)
end

#wait(name) ⇒ Object

Wait for the given signal



334
335
336
# File 'lib/celluloid.rb', line 334

def wait(name)
  Thread.current[:celluloid_actor].wait name
end