Class: Celluloid::Task

Inherits:
Object
  • Object
show all
Defined in:
lib/celluloid/task.rb,
lib/celluloid/task/fibered.rb,
lib/celluloid/task/threaded.rb

Overview

Tasks are interruptable/resumable execution contexts used to run methods

Direct Known Subclasses

Fibered, Threaded

Defined Under Namespace

Classes: Fibered, Threaded

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(type, meta) ⇒ Task

Create a new task

Raises:



18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
# File 'lib/celluloid/task.rb', line 18

def initialize(type, meta)
  @type     = type
  @meta     = meta
  @status   = :new

  @exclusive         = false
  @dangerous_suspend = @meta ? @meta.dup.delete(:dangerous_suspend) : false
  @guard_warnings    = false

  actor     = Thread.current[:celluloid_actor]
  @chain_id = Internals::CallChain.current_id

  raise NotActorError, "can't create tasks outside of actors" unless actor
  guard "can't create tasks inside of tasks" if Thread.current[:celluloid_task]

  create do
    begin
      @status = :running
      actor.setup_thread

      name_current_thread 

      Thread.current[:celluloid_task] = self
      Internals::CallChain.current_id = @chain_id

      actor.tasks << self
      yield
    rescue TaskTerminated
      # Task was explicitly terminated
    ensure
      name_current_thread nil
      @status = :dead
      actor.tasks.delete self
    end
  end
end

Instance Attribute Details

#chain_idObject

Returns the value of attribute chain_id.



15
16
17
# File 'lib/celluloid/task.rb', line 15

def chain_id
  @chain_id
end

#guard_warningsObject

Returns the value of attribute guard_warnings.



15
16
17
# File 'lib/celluloid/task.rb', line 15

def guard_warnings
  @guard_warnings
end

#metaObject (readonly)

Returns the value of attribute meta.



14
15
16
# File 'lib/celluloid/task.rb', line 14

def meta
  @meta
end

#statusObject (readonly)

Returns the value of attribute status.



14
15
16
# File 'lib/celluloid/task.rb', line 14

def status
  @status
end

#typeObject (readonly)

Returns the value of attribute type.



14
15
16
# File 'lib/celluloid/task.rb', line 14

def type
  @type
end

Class Method Details

.currentObject

Obtain the current task



5
6
7
# File 'lib/celluloid/task.rb', line 5

def self.current
  Thread.current[:celluloid_task] || raise(NotTaskError, "not within a task context")
end

.suspend(status) ⇒ Object

Suspend the running task, deferring to the scheduler



10
11
12
# File 'lib/celluloid/task.rb', line 10

def self.suspend(status)
  Task.current.suspend(status)
end

Instance Method Details

#backtraceObject



137
# File 'lib/celluloid/task.rb', line 137

def backtrace; end

#create(&_block) ⇒ Object



55
56
57
# File 'lib/celluloid/task.rb', line 55

def create(&_block)
  raise "Implement #{self.class}#create"
end

#exclusiveObject

Execute a code block in exclusive mode.



96
97
98
99
100
101
102
103
104
105
106
107
# File 'lib/celluloid/task.rb', line 96

def exclusive
  if @exclusive
    yield
  else
    begin
      @exclusive = true
      yield
    ensure
      @exclusive = false
    end
  end
end

#exclusive?Boolean

Is this task running in exclusive mode?

Returns:

  • (Boolean)


133
134
135
# File 'lib/celluloid/task.rb', line 133

def exclusive?
  @exclusive
end

#guard(message) ⇒ Object



149
150
151
152
153
154
155
156
157
158
# File 'lib/celluloid/task.rb', line 149

def guard(message)
  # !!! DO NOT INTRODUCE ADDITIONAL GLOBAL VARIABLES !!!
  # rubocop:disable Style/GlobalVars
  if @guard_warnings
    Internals::Logger.warn message if $CELLULOID_DEBUG
  else
    raise message if $CELLULOID_DEBUG
  end
  # rubocop:enable Style/GlobalVars
end

#inspectObject

Nicer string inspect for tasks



145
146
147
# File 'lib/celluloid/task.rb', line 145

def inspect
  "#<#{self.class}:0x#{object_id.to_s(16)} @type=#{@type.inspect}, @meta=#{@meta.inspect}, @status=#{@status.inspect}>"
end

#resume(value = nil) ⇒ Object

Resume a suspended task, giving it a value to return if needed



83
84
85
86
87
88
89
90
91
92
93
# File 'lib/celluloid/task.rb', line 83

def resume(value = nil)
  guard "Cannot resume a task from inside of a task" if Thread.current[:celluloid_task]
  if running?
    deliver(value)
  else
    # rubocop:disable Metrics/LineLength
    Internals::Logger.warn "Attempted to resume a dead task: type=#{@type.inspect}, meta=#{@meta.inspect}, status=#{@status.inspect}"
    # rubocop:enable Metrics/LineLength
  end
  nil
end

#running?Boolean

Is the current task still running?

Returns:

  • (Boolean)


140
141
142
# File 'lib/celluloid/task.rb', line 140

def running?
  @status != :dead
end

#suspend(status) ⇒ Object

Suspend the current task, changing the status to the given argument



60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
# File 'lib/celluloid/task.rb', line 60

def suspend(status)
  raise "Cannot suspend while in exclusive mode" if exclusive?
  raise "Cannot suspend a task from outside of itself" unless Task.current == self

  @status = status

  # !!! DO NOT INTRODUCE ADDITIONAL GLOBAL VARIABLES !!!
  # rubocop:disable Style/GlobalVars
  if $CELLULOID_DEBUG && @dangerous_suspend
    Internals::Logger.with_backtrace(caller[2...8]) do |logger|
      logger.warn "Dangerously suspending task: type=#{@type.inspect}, meta=#{@meta.inspect}, status=#{@status.inspect}"
    end
  end
  # rubocop:enable Style/GlobalVars

  value = signal

  @status = :running
  raise value if value.is_a?(Celluloid::Interruption)
  value
end

#terminateObject

Terminate this task



110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
# File 'lib/celluloid/task.rb', line 110

def terminate
  raise "Cannot terminate an exclusive task" if exclusive?

  if running?
    # !!! DO NOT INTRODUCE ADDITIONAL GLOBAL VARIABLES !!!
    # rubocop:disable Style/GlobalVars
    if $CELLULOID_DEBUG
      Internals::Logger.with_backtrace(backtrace) do |logger|
        type = @dangerous_suspend ? :warn : :debug
        logger.send(type, "Terminating task: type=#{@type.inspect}, meta=#{@meta.inspect}, status=#{@status.inspect}")
      end
    end
    # rubocop:enable Style/GlobalVars

    exception = TaskTerminated.new("task was terminated")
    exception.set_backtrace(caller)
    resume exception
  else
    raise DeadTaskError, "task is already dead"
  end
end