Class: Performer::Task

Inherits:
Object
  • Object
show all
Defined in:
lib/performer/task.rb

Overview

A task is constructed with a callable object (like a proc), and provides ways to:

  • call the contained object, once and only once

  • retrieve the value from the execution of the callable, and wait if execution is not finished

  • cancel a task, and as such wake up all waiting for the task value

Furthermore, the Task public API is thread-safe, and a resolved task will never change value.

Examples:

constructing a task

task = Task.new(lambda { 1 + 1 })
worker = Thread.new(task, &:call)
task.value # => 2

Defined Under Namespace

Classes: CancelledError, Error, InvariantError

Constant Summary collapse

Transitions =

This constant is part of a private API. You should avoid using this constant if possible, as it may be removed or be changed in the future.

Used for the internal task state machine.

{
  idle: { executing: true, cancelled: true },
  executing: { error: true, value: true },
}

Instance Method Summary collapse

Constructor Details

#initialize(callable) ⇒ Task

Create a new Task from a callable object.

Parameters:



37
38
39
40
41
42
43
44
45
# File 'lib/performer/task.rb', line 37

def initialize(callable)
  @callable = callable

  @value_mutex = Mutex.new
  @value_cond = Performer::ConditionVariable.new

  @value = nil
  @value_type = :idle
end

Instance Method Details

#call(*args, &block) ⇒ Task

Note:

A task can only be called once.

Note:

A task can not be called after it has been cancelled.

Note:

A task swallows standard errors during execution, but all other errors are propagated.

Execute the task. Arguments and block are passed on to the callable.

When execution finishes, all waiting for #value will be woken up with the result.

Returns:



56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
# File 'lib/performer/task.rb', line 56

def call(*args, &block)
  set(:executing) { nil }

  begin
    value = @callable.call(*args, &block)
  rescue => ex
    set(:error) { ex }
  rescue Exception => ex
    set(:error) { ex }
    raise ex
  else
    set(:value) { value }
  end

  self
end

#cancel(message = "task was cancelled") ⇒ Object

Note:

This cannot be done while the task is executing.

Note:

This cannot be done if the task has finished executing.

Cancel the task. All waiting for #value will be woken up with a CancelledError.

Parameters:

  • message (String) (defaults to: "task was cancelled")

    for the cancellation error



79
80
81
# File 'lib/performer/task.rb', line 79

def cancel(message = "task was cancelled")
  set(:cancelled) { CancelledError.new(message) }
end

#value(timeout = nil) { ... } ⇒ Object

Retrieve the value of the task. If the task is not finished, this will block.

Examples:

waiting with a timeout and a block

task.value(1) { raise MyOwnError, "Timed out after 1s" }

Parameters:

  • timeout (Integer, nil) (defaults to: nil)

    how long to wait for value before timing out, nil if wait forever

Yields:

  • if block given, yields instead of raising an error on timeout

Raises:

  • (TimeoutError)

    if waiting timeout was reached, and no block was given



91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
# File 'lib/performer/task.rb', line 91

def value(timeout = nil)
  unless done?
    @value_mutex.synchronize do
      @value_cond.wait_until(@value_mutex, timeout) { done? }
    end
  end

  if value?
    return @value
  elsif error? or cancelled?
    raise @value
  elsif block_given?
    yield
  else
    raise TimeoutError, "retrieving value timed out after #{timeout}s"
  end
end