Class: Async::Task

Inherits:
Node
  • Object
show all
Extended by:
Forwardable
Defined in:
lib/async/task.rb

Overview

A task represents the state associated with the execution of an asynchronous block.

Instance Attribute Summary collapse

Attributes inherited from Node

#annotation, #children, #parent

Class Method Summary collapse

Instance Method Summary collapse

Methods inherited from Node

#annotate, #consume, #description, #print_hierarchy, #reap, #traverse

Constructor Details

#initialize(reactor, parent = Task.current?, logger: nil, &block) ⇒ Task

Create a new task.

Parameters:

  • reactor (Async::Reactor)

    the reactor this task will run within.

  • parent (Async::Task) (defaults to: Task.current?)

    the parent task.



60
61
62
63
64
65
66
67
68
69
70
71
72
# File 'lib/async/task.rb', line 60

def initialize(reactor, parent = Task.current?, logger: nil, &block)
  super(parent || reactor)
  
  @reactor = reactor
  
  @status = :initialized
  @result = nil
  @finished = nil
  
  @logger = logger
  
  @fiber = make_fiber(&block)
end

Instance Attribute Details

#fiberObject (readonly)



92
93
94
# File 'lib/async/task.rb', line 92

def fiber
  @fiber
end

#reactorObject (readonly)



83
84
85
# File 'lib/async/task.rb', line 83

def reactor
  @reactor
end

#statusObject (readonly)



96
97
98
# File 'lib/async/task.rb', line 96

def status
  @status
end

Class Method Details

.currentAsync::Task

Lookup the Async::Task for the current fiber. Raise RuntimeError if none is available.

Returns:

Raises:

  • (RuntimeError)

    if task was not #set! for the current fiber.



156
157
158
# File 'lib/async/task.rb', line 156

def self.current
  Thread.current[:async_task] or raise RuntimeError, "No async task available!"
end

.current?Async::Task?

Check if there is a task defined for the current fiber.

Returns:



162
163
164
# File 'lib/async/task.rb', line 162

def self.current?
  Thread.current[:async_task]
end

.yield {|result| ... } ⇒ Object

Yield the unerlying result for the task. If the result is an Exception, then that result will be raised an its exception.

Yields:

  • (result)

    result of the task if a block if given.

Returns:

  • (Object)

    result of the task

Raises:

  • (Exception)

    if the result is an exception



43
44
45
46
47
48
49
50
51
52
53
54
55
# File 'lib/async/task.rb', line 43

def self.yield
  if block_given?
    result = yield
  else
    result = Fiber.yield
  end
  
  if result.is_a? Exception
    raise result
  else
    return result
  end
end

Instance Method Details

#async(*args, **options, &block) ⇒ Object



108
109
110
111
112
113
114
# File 'lib/async/task.rb', line 108

def async(*args, **options, &block)
  task = Task.new(@reactor, self, **options, &block)
  
  task.run(*args)
  
  return task
end

#complete?Boolean

Returns:

  • (Boolean)


194
195
196
# File 'lib/async/task.rb', line 194

def complete?
  @status == :complete
end

#current?Boolean

Returns:

  • (Boolean)


166
167
168
# File 'lib/async/task.rb', line 166

def current?
  self.equal?(Thread.current[:async_task])
end

#failed?Boolean

Returns:

  • (Boolean)


182
183
184
# File 'lib/async/task.rb', line 182

def failed?
  @status == :failed
end

#finished?Boolean

Whether we can remove this node from the reactor graph.

Returns:

  • (Boolean)


178
179
180
# File 'lib/async/task.rb', line 178

def finished?
  super && @status != :running
end

#loggerObject



78
79
80
# File 'lib/async/task.rb', line 78

def logger
  @logger ||= @parent&.logger
end

#run(*args) ⇒ Object

Begin the execution of the task.



99
100
101
102
103
104
105
106
# File 'lib/async/task.rb', line 99

def run(*args)
  if @status == :initialized
    @status = :running
    @fiber.resume(*args)
  else
    raise RuntimeError, "Task already running!"
  end
end

#running?Boolean

Check if the task is running.

Returns:

  • (Boolean)


172
173
174
# File 'lib/async/task.rb', line 172

def running?
  @status == :running
end

#stopvoid

This method returns an undefined value.

Stop the task and all of its children.



136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
# File 'lib/async/task.rb', line 136

def stop
  if self.stopping?
    # If we are already stopping this task... don't try to stop it again.
    return true
  elsif self.running?
    @status = :stopping
    
    if self.current?
      raise Stop, "Stopping current fiber!"
    elsif @fiber.alive?
      @fiber.resume(Stop.new)
    end
  end
ensure
  @children&.each(&:stop)
end

#stopped?Boolean

Returns:

  • (Boolean)


190
191
192
# File 'lib/async/task.rb', line 190

def stopped?
  @status == :stopped
end

#stopping?Boolean

Returns:

  • (Boolean)


186
187
188
# File 'lib/async/task.rb', line 186

def stopping?
  @status == :stopping
end

#to_sObject



74
75
76
# File 'lib/async/task.rb', line 74

def to_s
  "<#{self.description} #{@status}>"
end

#waitObject Also known as: result

Retrieve the current result of the task. Will cause the caller to wait until result is available.

Returns:

  • (Object)

    the final expression/result of the task's block.

Raises:

  • (RuntimeError)

    if the task's fiber is the current fiber.



119
120
121
122
123
124
125
126
127
128
# File 'lib/async/task.rb', line 119

def wait
  raise RuntimeError, "Cannot wait on own fiber" if Fiber.current.equal?(@fiber)
  
  if running?
    @finished ||= Condition.new
    @finished.wait
  else
    Task.yield{@result}
  end
end

#yieldObject

Yield back to the reactor and allow other fibers to execute.



87
88
89
# File 'lib/async/task.rb', line 87

def yield
  reactor.yield
end