Module: Celluloid

Extended by:
Celluloid, Forwardable
Included in:
Celluloid, Actor::Manager
Defined in:
lib/celluloid/deprecate.rb,
lib/celluloid.rb,
lib/celluloid/cell.rb,
lib/celluloid/task.rb,
lib/celluloid/calls.rb,
lib/celluloid/actor.rb,
lib/celluloid/group.rb,
lib/celluloid/future.rb,
lib/celluloid/thread.rb,
lib/celluloid/mailbox.rb,
lib/celluloid/notices.rb,
lib/celluloid/version.rb,
lib/celluloid/proxies.rb,
lib/celluloid/condition.rb,
lib/celluloid/call/sync.rb,
lib/celluloid/call/async.rb,
lib/celluloid/exceptions.rb,
lib/celluloid/call/block.rb,
lib/celluloid/group/pool.rb,
lib/celluloid/task/fibered.rb,
lib/celluloid/actor/system.rb,
lib/celluloid/task/threaded.rb,
lib/celluloid/group/spawner.rb,
lib/celluloid/system_events.rb,
lib/celluloid/actor/manager.rb,
lib/celluloid/mailbox/evented.rb

Overview

TODO: Remove link to Interal::Logger

Defined Under Namespace

Modules: ClassMethods, Feature, InstanceMethods, Notices, Proxy Classes: AbortError, Actor, Call, Cell, Condition, ConditionError, DeadActorError, DeadTaskError, Error, ExitEvent, Future, Group, Interruption, LinkingRequest, LinkingResponse, Mailbox, MailboxDead, MailboxShutdown, NamingRequest, NotActive, NotActorError, NotTaskError, SignalConditionRequest, StillActive, SystemEvent, Task, TaskTerminated, TaskTimeout, TerminationRequest, Thread, ThreadLeak, TimedOut

Constant Summary

LINKING_TIMEOUT =

Linking times out after 5 seconds

5
BARE_OBJECT_WARNING_MESSAGE =

Warning message added to Celluloid objects accessed outside their actors

"WARNING: BARE CELLULOID OBJECT "
OWNER_IVAR =

reference to owning actor

:@celluloid_owner
VERSION =
"0.17.3"
SyncCall =
Call::Sync
EventedMailbox =
Mailbox::Evented
InternalPool =
Group::Pool
TaskThread =
Task::Threaded
TaskFiber =
Task::Fibered
ActorSystem =
Actor::System

Class Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Class Attribute Details

.actor_systemObject



34
35
36
37
38
39
40
# File 'lib/celluloid.rb', line 34

def actor_system
  if Thread.current.celluloid?
    Thread.current[:celluloid_actor_system] || fail(Error, "actor system not running")
  else
    Thread.current[:celluloid_actor_system] || @actor_system || fail(Error, "Celluloid is not yet started; use Celluloid.boot")
  end
end

.group_classObject

Default internal thread group to use



30
31
32
# File 'lib/celluloid.rb', line 30

def group_class
  @group_class
end

.log_actor_crashesObject

Returns the value of attribute log_actor_crashes



29
30
31
# File 'lib/celluloid.rb', line 29

def log_actor_crashes
  @log_actor_crashes
end

.loggerObject

Thread-safe logger class



28
29
30
# File 'lib/celluloid.rb', line 28

def logger
  @logger
end

.shutdown_timeoutObject

How long actors have to terminate



32
33
34
# File 'lib/celluloid.rb', line 32

def shutdown_timeout
  @shutdown_timeout
end

.task_classObject

Default task type to use



31
32
33
# File 'lib/celluloid.rb', line 31

def task_class
  @task_class
end

Class Method Details

.actor?Boolean

Are we currently inside of an actor?



81
82
83
# File 'lib/celluloid.rb', line 81

def actor?
  !!Thread.current[:celluloid_actor]
end

.bootObject



145
146
147
148
# File 'lib/celluloid.rb', line 145

def boot
  init
  start
end

.coresObject Also known as: cpus, ncpus

Obtain the number of CPUs in the system



96
97
98
# File 'lib/celluloid.rb', line 96

def cores
  Internals::CPUCounter.cores
end

.detect_recursionObject

Detect if a particular call is recursing through multiple actors



119
120
121
122
123
124
125
126
127
128
# File 'lib/celluloid.rb', line 119

def detect_recursion
  actor = Thread.current[:celluloid_actor]
  return unless actor

  task = Thread.current[:celluloid_task]
  return unless task

  chain_id = Internals::CallChain.current_id
  actor.tasks.to_a.any? { |t| t != task && t.chain_id == chain_id }
end

.exception_handler(&block) ⇒ Object

Define an exception handler for actor crashes



131
132
133
# File 'lib/celluloid.rb', line 131

def exception_handler(&block)
  Internals::Logger.exception_handler(&block)
end

.included(klass) ⇒ Object



42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
# File 'lib/celluloid.rb', line 42

def included(klass)
  klass.send :extend,  ClassMethods
  klass.send :include, InstanceMethods

  klass.send :extend, Internals::Properties

  klass.property :mailbox_class, default: Celluloid::Mailbox
  klass.property :proxy_class,   default: Celluloid::Proxy::Cell
  klass.property :task_class,    default: Celluloid.task_class
  klass.property :group_class,   default: Celluloid.group_class
  klass.property :mailbox_size

  klass.property :exclusive_actor, default: false
  klass.property :exclusive_methods, multi: true
  klass.property :execute_block_on_receiver,
                 default: [:after, :every, :receive],
                 multi: true

  klass.property :finalizer
  klass.property :exit_handler_name

  singleton = class << klass; self; end
  singleton.send(:remove_method, :trap_exit) rescue nil
  singleton.send(:remove_method, :exclusive) rescue nil

  singleton.send(:define_method, :trap_exit) do |*args|
    exit_handler_name(*args)
  end

  singleton.send(:define_method, :exclusive) do |*args|
    if args.any?
      exclusive_methods(*exclusive_methods, *args)
    else
      exclusive_actor true
    end
  end
end

.initObject



150
151
152
# File 'lib/celluloid.rb', line 150

def init
  @actor_system = Actor::System.new
end

.mailboxObject

Retrieve the mailbox for the current thread or lazily initialize it



86
87
88
# File 'lib/celluloid.rb', line 86

def mailbox
  Thread.current[:celluloid_mailbox] ||= Celluloid::Mailbox.new
end

.public_registryObject



114
115
116
# File 'lib/celluloid.rb', line 114

def public_registry
  actor_system.public_registry
end

.register_shutdownObject



162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
# File 'lib/celluloid.rb', line 162

def register_shutdown
  return if defined?(@shutdown_registered) && @shutdown_registered

  # Terminate all actors at exit
  at_exit do
    sleep 0.126 # hax grace period for unnaturally terminating actors
    # allows "reason" in exit_handler to resolve before being destroyed
    if defined?(RUBY_ENGINE) && RUBY_ENGINE == "ruby" && RUBY_VERSION >= "1.9"
      # workaround for MRI bug losing exit status in at_exit block
      # http://bugs.ruby-lang.org/issues/5218
      exit_status = $ERROR_INFO.status if $ERROR_INFO.is_a?(SystemExit)
      Celluloid.shutdown
      exit exit_status if exit_status
    else
      Celluloid.shutdown
    end
  end
  @shutdown_registered = true
end

.running?Boolean



158
159
160
# File 'lib/celluloid.rb', line 158

def running?
  actor_system && actor_system.running?
end

.shutdownObject

Shut down all running actors



183
184
185
# File 'lib/celluloid.rb', line 183

def shutdown
  actor_system.shutdown
end

.stack_dump(output = STDERR) ⇒ Object Also known as: dump

Perform a stack dump of all actors to the given output object



103
104
105
# File 'lib/celluloid.rb', line 103

def stack_dump(output = STDERR)
  actor_system.stack_dump.print(output)
end

.stack_summary(output = STDERR) ⇒ Object Also known as: summarize

Perform a stack summary of all actors to the given output object



109
110
111
# File 'lib/celluloid.rb', line 109

def stack_summary(output = STDERR)
  actor_system.stack_summary.print(output)
end

.startObject



154
155
156
# File 'lib/celluloid.rb', line 154

def start
  actor_system.start
end

.suspend(status, waiter) ⇒ Object



135
136
137
138
139
140
141
142
143
# File 'lib/celluloid.rb', line 135

def suspend(status, waiter)
  task = Thread.current[:celluloid_task]
  if task && !Celluloid.exclusive?
    waiter.before_suspend(task) if waiter.respond_to?(:before_suspend)
    Task.suspend(status)
  else
    waiter.wait
  end
end

.uuidObject

Generate a Universally Unique Identifier



91
92
93
# File 'lib/celluloid.rb', line 91

def uuid
  Internals::UUID.generate
end

.versionObject



187
188
189
# File 'lib/celluloid.rb', line 187

def version
  VERSION
end

Instance Method Details

#abort(cause) ⇒ Object

Raise an exception in sender context, but stay running



317
318
319
320
321
322
323
324
# File 'lib/celluloid.rb', line 317

def abort(cause)
  cause = case cause
          when String then RuntimeError.new(cause)
          when Exception then cause
          else fail TypeError, "Exception object/String expected, but #{cause.class} received"
  end
  fail AbortError.new(cause)
end

#after(interval, &block) ⇒ Object

Call a block after a given interval, returning a Celluloid::Timer object



431
432
433
# File 'lib/celluloid.rb', line 431

def after(interval, &block)
  Thread.current[:celluloid_actor].after(interval, &block)
end

#async(meth = nil, *args, &block) ⇒ Object

Handle async calls within an actor itself



450
451
452
# File 'lib/celluloid.rb', line 450

def async(meth = nil, *args, &block)
  Thread.current[:celluloid_actor].behavior_proxy.async meth, *args, &block
end

#call_chain_idObject

Obtain the UUID of the current call chain



347
348
349
# File 'lib/celluloid.rb', line 347

def call_chain_id
  Internals::CallChain.current_id
end

#current_actorObject

Obtain the current_actor



342
343
344
# File 'lib/celluloid.rb', line 342

def current_actor
  Actor.current
end

#defer(&block) ⇒ Object

Perform a blocking or computationally intensive action inside an asynchronous group of threads, allowing the sender to continue processing other messages in its mailbox in the meantime



443
444
445
446
447
# File 'lib/celluloid.rb', line 443

def defer(&block)
  # This implementation relies on the present implementation of
  # Celluloid::Future, which uses a thread from InternalPool to run the block
  Future.new(&block).value
end

#every(interval, &block) ⇒ Object

Call a block every given interval, returning a Celluloid::Timer object



436
437
438
# File 'lib/celluloid.rb', line 436

def every(interval, &block)
  Thread.current[:celluloid_actor].every(interval, &block)
end

#exclusive(&block) ⇒ Object

Run given block in an exclusive mode: all synchronous calls block the whole actor, not only current message processing.



420
421
422
# File 'lib/celluloid.rb', line 420

def exclusive(&block)
  Thread.current[:celluloid_task].exclusive(&block)
end

#exclusive?Boolean

Are we currently exclusive



425
426
427
428
# File 'lib/celluloid.rb', line 425

def exclusive?
  task = Thread.current[:celluloid_task]
  task && task.exclusive?
end

#future(meth = nil, *args, &block) ⇒ Object

Handle calls to future within an actor itself



455
456
457
# File 'lib/celluloid.rb', line 455

def future(meth = nil, *args, &block)
  Thread.current[:celluloid_actor].behavior_proxy.future meth, *args, &block
end

Link this actor to another, allowing it to crash or react to errors



372
373
374
# File 'lib/celluloid.rb', line 372

def link(actor)
  Actor.link(actor)
end

#linked_to?(actor) ⇒ Boolean

Is this actor linked to another?



387
388
389
# File 'lib/celluloid.rb', line 387

def linked_to?(actor)
  Actor.linked_to?(actor)
end

Obtain the Celluloid::Links for this actor



357
358
359
# File 'lib/celluloid.rb', line 357

def links
  Thread.current[:celluloid_actor].links
end

#monitor(actor) ⇒ Object

Watch for exit events from another actor



362
363
364
# File 'lib/celluloid.rb', line 362

def monitor(actor)
  Actor.monitor(actor)
end

#monitoring?(actor) ⇒ Boolean

Are we monitoring another actor?



382
383
384
# File 'lib/celluloid.rb', line 382

def monitoring?(actor)
  Actor.monitoring?(actor)
end

#receive(timeout = nil, &block) ⇒ Object

Receive an asynchronous message via the actor protocol



392
393
394
395
396
397
398
399
# File 'lib/celluloid.rb', line 392

def receive(timeout = nil, &block)
  actor = Thread.current[:celluloid_actor]
  if actor
    actor.receive(timeout, &block)
  else
    Celluloid.mailbox.receive(timeout, &block)
  end
end

#signal(name, value = nil) ⇒ Object

Send a signal with the given name to all waiting methods



332
333
334
# File 'lib/celluloid.rb', line 332

def signal(name, value = nil)
  Thread.current[:celluloid_actor].signal name, value
end

#sleep(interval) ⇒ Object

Sleep letting the actor continue processing messages



402
403
404
405
406
407
408
409
# File 'lib/celluloid.rb', line 402

def sleep(interval)
  actor = Thread.current[:celluloid_actor]
  if actor
    actor.sleep(interval)
  else
    Kernel.sleep interval
  end
end

#tasksObject

Obtain the running tasks for this actor



352
353
354
# File 'lib/celluloid.rb', line 352

def tasks
  Thread.current[:celluloid_actor].tasks.to_a
end

#terminateObject

Terminate this actor



327
328
329
# File 'lib/celluloid.rb', line 327

def terminate
  Thread.current[:celluloid_actor].behavior_proxy.terminate!
end

#timeout(duration) ⇒ Object

Timeout on task suspension (eg Sync calls to other actors)



412
413
414
415
416
# File 'lib/celluloid.rb', line 412

def timeout(duration)
  Thread.current[:celluloid_actor].timeout(duration) do
    yield
  end
end

Remove links to another actor



377
378
379
# File 'lib/celluloid.rb', line 377

def unlink(actor)
  Actor.unlink(actor)
end

#unmonitor(actor) ⇒ Object

Stop waiting for exit events from another actor



367
368
369
# File 'lib/celluloid.rb', line 367

def unmonitor(actor)
  Actor.unmonitor(actor)
end

#wait(name) ⇒ Object

Wait for the given signal



337
338
339
# File 'lib/celluloid.rb', line 337

def wait(name)
  Thread.current[:celluloid_actor].wait name
end