Class: Async::Task

Inherits:
Node
  • Object
show all
Extended by:
Forwardable
Defined in:
lib/async/task.rb

Overview

A task represents the state associated with the execution of an asynchronous block.

Instance Attribute Summary collapse

Attributes inherited from Node

#annotation, #children, #parent

Class Method Summary collapse

Instance Method Summary collapse

Methods inherited from Node

#annotate, #consume, #description, #print_hierarchy, #reap, #traverse

Constructor Details

#initialize(reactor, parent = Task.current?, logger: nil, &block) ⇒ Task

Create a new task.

Parameters:

  • reactor (Async::Reactor)

    the reactor this task will run within.

  • parent (Async::Task) (defaults to: Task.current?)

    the parent task.



60
61
62
63
64
65
66
67
68
69
70
71
72
# File 'lib/async/task.rb', line 60

def initialize(reactor, parent = Task.current?, logger: nil, &block)
	super(parent || reactor)
	
	@reactor = reactor
	
	@status = :initialized
	@result = nil
	@finished = nil
	
	@logger = logger
	
	@fiber = make_fiber(&block)
end

Instance Attribute Details

#fiberObject (readonly)



92
93
94
# File 'lib/async/task.rb', line 92

def fiber
  @fiber
end

#reactorObject (readonly)



83
84
85
# File 'lib/async/task.rb', line 83

def reactor
  @reactor
end

#statusObject (readonly)



96
97
98
# File 'lib/async/task.rb', line 96

def status
  @status
end

Class Method Details

.currentAsync::Task

Lookup the Async::Task for the current fiber. Raise RuntimeError if none is available.

Returns:

Raises:

  • (RuntimeError)

    if task was not #set! for the current fiber.



149
150
151
# File 'lib/async/task.rb', line 149

def self.current
	Thread.current[:async_task] or raise RuntimeError, "No async task available!"
end

.current?Async::Task?

Check if there is a task defined for the current fiber.

Returns:



155
156
157
# File 'lib/async/task.rb', line 155

def self.current?
	Thread.current[:async_task]
end

.yield {|result| ... } ⇒ Object

Yield the unerlying result for the task. If the result is an Exception, then that result will be raised an its exception.

Yields:

  • (result)

    result of the task if a block if given.

Returns:

  • (Object)

    result of the task

Raises:

  • (Exception)

    if the result is an exception



43
44
45
46
47
48
49
50
51
52
53
54
55
# File 'lib/async/task.rb', line 43

def self.yield
	if block_given?
		result = yield
	else
		result = Fiber.yield
	end
	
	if result.is_a? Exception
		raise result
	else
		return result
	end
end

Instance Method Details

#async(*args, **options, &block) ⇒ Object



108
109
110
111
112
113
114
# File 'lib/async/task.rb', line 108

def async(*args, **options, &block)
	task = Task.new(@reactor, self, **options, &block)
	
	task.run(*args)
	
	return task
end

#current?Boolean

Returns:

  • (Boolean)


159
160
161
# File 'lib/async/task.rb', line 159

def current?
	self.equal?(Thread.current[:async_task])
end

#failed?Boolean

Returns:

  • (Boolean)


175
176
177
# File 'lib/async/task.rb', line 175

def failed?
	@status == :failed
end

#finished?Boolean

Whether we can remove this node from the reactor graph.

Returns:

  • (Boolean)


171
172
173
# File 'lib/async/task.rb', line 171

def finished?
	super && @status != :running
end

#loggerObject



78
79
80
# File 'lib/async/task.rb', line 78

def logger
	@logger ||= @parent&.logger
end

#run(*args) ⇒ Object

Begin the execution of the task.



99
100
101
102
103
104
105
106
# File 'lib/async/task.rb', line 99

def run(*args)
	if @status == :initialized
		@status = :running
		@fiber.resume(*args)
	else
		raise RuntimeError, "Task already running!"
	end
end

#running?Boolean

Check if the task is running.

Returns:

  • (Boolean)


165
166
167
# File 'lib/async/task.rb', line 165

def running?
	@status == :running
end

#stopvoid

This method returns an undefined value.

Stop the task and all of its children.



136
137
138
139
140
141
142
143
144
# File 'lib/async/task.rb', line 136

def stop
	@children&.each(&:stop)
	
	if self.current?
		raise Stop, "Stopping current fiber!"
	elsif @fiber.alive?
		@fiber.resume(Stop.new)
	end
end

#stopped?Boolean

Returns:

  • (Boolean)


179
180
181
# File 'lib/async/task.rb', line 179

def stopped?
	@status == :stopped
end

#to_sObject



74
75
76
# File 'lib/async/task.rb', line 74

def to_s
	"<#{self.description} #{@status}>"
end

#waitObject Also known as: result

Retrieve the current result of the task. Will cause the caller to wait until result is available.

Returns:

  • (Object)

    the final expression/result of the task's block.

Raises:

  • (RuntimeError)

    if the task's fiber is the current fiber.



119
120
121
122
123
124
125
126
127
128
# File 'lib/async/task.rb', line 119

def wait
	raise RuntimeError, "Cannot wait on own fiber" if Fiber.current.equal?(@fiber)
	
	if running?
		@finished ||= Condition.new
		@finished.wait
	else
		Task.yield{@result}
	end
end

#yieldObject

Yield back to the reactor and allow other fibers to execute.



87
88
89
# File 'lib/async/task.rb', line 87

def yield
	reactor.yield
end