Class: Concurrent::Future

Inherits:
IVar
  • Object
show all
Includes:
Obligation, UsesGlobalThreadPool
Defined in:
lib/concurrent/future.rb

Overview

A Future represents a promise to complete an action at some time in the future. The action is atomic and permanent. The idea behind a future is to send an operation for asynchronous completion, do other stuff, then return and retrieve the result of the async operation at a later time.

A Future has four possible states: :unscheduled, :pending, :rejected, or :fulfilled. When a Future is created its state is set to :unscheduled. Once the #execute method is called the state becomes :pending and will remain in that state until processing is complete. A completed Future is either :rejected, indicating that an exception was thrown during processing, or :fulfilled, indicating success. If a Future is :fulfilled its value will be updated to reflect the result of the operation. If :rejected the reason will be updated with a reference to the thrown exception. The predicate methods #unscheduled?, #pending?, #rejected?, and fulfilled? can be called at any time to obtain the state of the Future, as can the #state method, which returns a symbol.

Retrieving the value of a Future is done through the #value (alias: #deref) method. Obtaining the value of a Future is a potentially blocking operation. When a Future is :rejected a call to #value will return nil immediately. When a Future is :fulfilled a call to #value will immediately return the current value. When a Future is :pending a call to #value will block until the Future is either :rejected or :fulfilled. A timeout value can be passed to #value to limit how long the call will block. If nil the call will block indefinitely. If 0 the call will not block. Any other integer or float value will indicate the maximum number of seconds to block.

The Future class also includes the behavior of the Ruby standard library Observable module, but does so in a thread-safe way. On fulfillment or rejection all observers will be notified according to the normal Observable behavior. The observer callback function will be called with three parameters: the Time of fulfillment/rejection, the final value, and the final reason. Observers added after fulfillment/rejection will still be notified as normal.

Constant Summary

Constants inherited from IVar

IVar::NO_VALUE

Class Method Summary collapse

Instance Method Summary collapse

Methods included from UsesGlobalThreadPool

included

Methods included from Obligation

#completed?, #fulfilled?, #incomplete?, #pending?, #reason, #rejected?, #state, #unscheduled?, #value

Methods included from Dereferenceable

#init_mutex, #mutex, #set_deref_options, #value

Methods inherited from IVar

#add_observer, #complete, #fail, #set

Constructor Details

#initialize(opts = {}) { ... } ⇒ Future

Create a new Future in the :unscheduled state.

Parameters:

  • (defaults to: {})

    the options to create a message with

Options Hash (opts):

  • :dup_on_deref (String) — default: false

    call #dup before returning the data

  • :freeze_on_deref (String) — default: false

    call #freeze before returning the data

  • :copy_on_deref (String) — default: nil

    call the given Proc passing the internal value and returning the value returned from the proc

Yields:

  • the asynchronous operation to perform

Raises:

  • if no block is given



57
58
59
60
61
62
# File 'lib/concurrent/future.rb', line 57

def initialize(opts = {}, &block)
  raise ArgumentError.new('no block given') unless block_given?
  super(IVar::NO_VALUE, opts)
  @state = :unscheduled
  @task = block
end

Class Method Details

.execute(opts = {}) { ... } ⇒ Future

Create a new Future object with the given block, execute it, and return the :pending object.

Examples:

future = Concurrent::Future.execute{ sleep(1); 42 }
future.state #=> :pending

Parameters:

  • (defaults to: {})

    a customizable set of options

Options Hash (opts):

  • :dup_on_deref (String) — default: false

    call #dup before returning the data

  • :freeze_on_deref (String) — default: false

    call #freeze before returning the data

  • :copy_on_deref (String) — default: nil

    call the given Proc passing the internal value and returning the value returned from the proc

Yields:

  • the asynchronous operation to perform

Returns:

  • the newly created Future in the :pending state

Raises:

  • if no block is given

Since:

  • 0.5.0



107
108
109
# File 'lib/concurrent/future.rb', line 107

def self.execute(opts = {}, &block)
  return Future.new(opts, &block).execute
end

Instance Method Details

#executeFuture

Execute an :unscheduled Future. Immediately sets the state to :pending and passes the block to a new thread/thread pool for eventual execution. Does nothing if the Future is in any state other than :unscheduled.

Examples:

Instance and execute in separate steps

future = Concurrent::Future.new{ sleep(1); 42 }
future.state #=> :unscheduled
future.execute
future.state #=> :pending

Instance and execute in one line

future = Concurrent::Future.new{ sleep(1); 42 }.execute
future.state #=> :pending

Returns:

  • a reference to self

Since:

  • 0.5.0



81
82
83
84
85
86
# File 'lib/concurrent/future.rb', line 81

def execute
  if compare_and_set_state(:pending, :unscheduled)
    Future.thread_pool.post { work }
    self
  end
end