Class: Async::Task

Inherits:
Node
  • Object
show all
Extended by:
Forwardable
Defined in:
lib/async/task.rb

Overview

A task represents the state associated with the execution of an asynchronous block.

Instance Attribute Summary collapse

Attributes inherited from Node

#annotation, #children, #head, #parent, #tail

Class Method Summary collapse

Instance Method Summary collapse

Methods inherited from Node

#annotate, #children?, #consume, #description, #print_hierarchy, #terminate, #transient?, #traverse

Constructor Details

#initialize(reactor, parent = Task.current?, logger: nil, finished: nil, **options, &block) ⇒ Task

Create a new task.



75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
# File 'lib/async/task.rb', line 75

def initialize(reactor, parent = Task.current?, logger: nil, finished: nil, **options, &block)
  super(parent || reactor, **options)
  
  @reactor = reactor
  
  @status = :initialized
  @result = nil
  @finished = finished
  
  @logger = logger || @parent.logger
  
  @fiber = make_fiber(&block)
  
  @defer_stop = nil
end

Instance Attribute Details

#fiberObject (readonly)



114
115
116
# File 'lib/async/task.rb', line 114

def fiber
  @fiber
end

#loggerObject (readonly)

Returns the value of attribute logger.



91
92
93
# File 'lib/async/task.rb', line 91

def logger
  @logger
end

#reactorObject (readonly)



104
105
106
# File 'lib/async/task.rb', line 104

def reactor
  @reactor
end

#statusObject (readonly)



121
122
123
# File 'lib/async/task.rb', line 121

def status
  @status
end

Class Method Details

.currentAsync::Task

Lookup the Async::Task for the current fiber. Raise ‘RuntimeError` if none is available.

Raises:

  • (RuntimeError)

    if task was not #set! for the current fiber.



232
233
234
# File 'lib/async/task.rb', line 232

def self.current
  Thread.current[:async_task] or raise RuntimeError, "No async task available!"
end

.current?Async::Task?

Check if there is a task defined for the current fiber.



238
239
240
# File 'lib/async/task.rb', line 238

def self.current?
  Thread.current[:async_task]
end

.yield {|result| ... } ⇒ Object

Yield the unerlying ‘result` for the task. If the result is an Exception, then that result will be raised an its exception.

Yields:

  • (result)

    result of the task if a block if given.

Raises:

  • (Exception)

    if the result is an exception



58
59
60
61
62
63
64
65
66
67
68
69
70
# File 'lib/async/task.rb', line 58

def self.yield
  if block_given?
    result = yield
  else
    result = Fiber.yield
  end
  
  if result.is_a? Exception
    raise result
  else
    return result
  end
end

Instance Method Details

#alive?Boolean



116
117
118
# File 'lib/async/task.rb', line 116

def alive?
  @fiber&.alive?
end

#async(*arguments, **options, &block) ⇒ Object



134
135
136
137
138
139
140
# File 'lib/async/task.rb', line 134

def async(*arguments, **options, &block)
  task = Task.new(@reactor, self, **options, &block)
  
  task.run(*arguments)
  
  return task
end

#backtrace(*arguments) ⇒ Object



94
95
96
# File 'lib/async/task.rb', line 94

def backtrace(*arguments)
  @fiber&.backtrace(*arguments)
end

#complete?Boolean



270
271
272
# File 'lib/async/task.rb', line 270

def complete?
  @status == :complete
end

#current?Boolean



242
243
244
# File 'lib/async/task.rb', line 242

def current?
  self.equal?(Thread.current[:async_task])
end

#defer_stopObject

Defer the handling of stop. During the execution of the given block, if a stop is requested, it will be deferred until the block exits. This is useful for ensuring graceful shutdown of servers and other long-running tasks. You should wrap the response handling code in a defer_stop block to ensure that the task is stopped when the response is complete but not before.

You can nest calls to defer_stop, but the stop will only be deferred until the outermost block exits.

If stop is invoked a second time, it will be immediately executed.



201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
# File 'lib/async/task.rb', line 201

def defer_stop
  # Tri-state variable for controlling stop:
  # - nil: defer_stop has not been called.
  # - false: defer_stop has been called and we are not stopping.
  # - true: defer_stop has been called and we will stop when exiting the block.
  if @defer_stop.nil?
    # If we are not deferring stop already, we can defer it now:
    @defer_stop = false
    
    begin
      yield
    rescue Stop
      # If we are exiting due to a stop, we shouldn't try to invoke stop again:
      @defer_stop = nil
      raise
    ensure
      # If we were asked to stop, we should do so now:
      if @defer_stop
        @defer_stop = nil
        self.stop
      end
    end
  else
    # If we are deferring stop already, entering it again is a no-op.
    yield
  end
end

#failed?Boolean



258
259
260
# File 'lib/async/task.rb', line 258

def failed?
  @status == :failed
end

#finished?Boolean

Whether we can remove this node from the reactor graph.



254
255
256
# File 'lib/async/task.rb', line 254

def finished?
  super && @status != :running
end

#run(*arguments) ⇒ Object

Begin the execution of the task.



124
125
126
127
128
129
130
131
132
# File 'lib/async/task.rb', line 124

def run(*arguments)
  if @status == :initialized
    @status = :running
    
    @fiber.resume(*arguments)
  else
    raise RuntimeError, "Task already running!"
  end
end

#running?Boolean

Check if the task is running.



248
249
250
# File 'lib/async/task.rb', line 248

def running?
  @status == :running
end

#stop(later = false) ⇒ Object

Stop the task and all of its children.



161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
# File 'lib/async/task.rb', line 161

def stop(later = false)
  if self.stopped?
    # If we already stopped this task... don't try to stop it again:
    return
  end
  
  # If we are deferring stop...
  if @defer_stop == false
    # Don't stop now... but update the state so we know we need to stop later.
    @defer_stop = true
    return false
  end
  
  if self.running?
    if self.current?
      if later
        @reactor << Stop::Later.new(self)
      else
        raise Stop, "Stopping current task!"
      end
    elsif @fiber&.alive?
      begin
        @fiber.resume(Stop.new)
      rescue FiberError
        @reactor << Stop::Later.new(self)
      end
    end
  else
    # We are not running, but children might be, so transition directly into stopped state:
    stop!
  end
end

#stopped?Boolean



266
267
268
# File 'lib/async/task.rb', line 266

def stopped?
  @status == :stopped
end

#stopping?Boolean



262
263
264
# File 'lib/async/task.rb', line 262

def stopping?
  @status == :stopping
end

#to_sObject



99
100
101
# File 'lib/async/task.rb', line 99

def to_s
  "\#<#{self.description} (#{@status})>"
end

#waitObject Also known as: result

Retrieve the current result of the task. Will cause the caller to wait until result is available.

Raises:

  • (RuntimeError)

    if the task’s fiber is the current fiber.



145
146
147
148
149
150
151
152
153
154
# File 'lib/async/task.rb', line 145

def wait
  raise RuntimeError, "Cannot wait on own fiber" if Fiber.current.equal?(@fiber)
  
  if running?
    @finished ||= Condition.new
    @finished.wait
  else
    Task.yield{@result}
  end
end

#yieldObject

Yield back to the reactor and allow other fibers to execute.



109
110
111
# File 'lib/async/task.rb', line 109

def yield
  Task.yield{reactor.yield}
end