Class: Concurrent::Future

Inherits:
IVar
  • Object
show all
Defined in:
lib/concurrent-ruby/concurrent/future.rb

Overview

## Copy Options

Object references in Ruby are mutable. This can lead to serious problems when the Concern::Obligation#value of an object is a mutable reference. Which is always the case unless the value is a ‘Fixnum`, `Symbol`, or similar “primitive” data type. Each instance can be configured with a few options that can help protect the program from potentially dangerous operations. Each of these options can be optionally set when the object instance is created:

  • ‘:dup_on_deref` When true the object will call the `#dup` method on the `value` object every time the `#value` method is called (default: false)

  • ‘:freeze_on_deref` When true the object will call the `#freeze` method on the `value` object every time the `#value` method is called (default: false)

  • ‘:copy_on_deref` When given a `Proc` object the `Proc` will be run every time the `#value` method is called. The `Proc` will be given the current `value` as its only argument and the result returned by the block will be the return value of the `#value` call. When `nil` this option will be ignored (default: nil)

When multiple deref options are set the order of operations is strictly defined. The order of deref operations is:

  • ‘:copy_on_deref`

  • ‘:dup_on_deref`

  • ‘:freeze_on_deref`

Because of this ordering there is no need to ‘#freeze` an object created by a provided `:copy_on_deref` block. Simply set `:freeze_on_deref` to `true`. Setting both `:dup_on_deref` to `true` and `:freeze_on_deref` to `true` is as close to the behavior of a “pure” functional language (like Erlang, Clojure, or Haskell) as we are likely to get in Ruby.

Class Method Summary collapse

Instance Method Summary collapse

Methods inherited from IVar

#add_observer, #fail, #try_set

Methods included from Concern::Observable

#add_observer, #count_observers, #delete_observer, #delete_observers, #with_observer

Methods included from Concern::Obligation

#complete?, #exception, #fulfilled?, #incomplete?, #pending?, #reason, #rejected?, #state, #unscheduled?, #value, #value!, #wait, #wait!

Methods included from Concern::Dereferenceable

#value

Constructor Details

#initialize(opts = {}) { ... } ⇒ Future

Create a new ‘Future` in the `:unscheduled` state.

Parameters:

  • opts (Hash) (defaults to: {})

    the options used to define the behavior at update and deref and to specify the executor on which to perform actions

Options Hash (opts):

  • :executor (Executor)

    when set use the given ‘Executor` instance. Three special values are also supported: `:io` returns the global pool for long, blocking (IO) tasks, `:fast` returns the global pool for short, fast operations, and `:immediate` returns the global `ImmediateExecutor` object.

  • :dup_on_deref (Boolean) — default: false

    Call ‘#dup` before returning the data from Concern::Obligation#value

  • :freeze_on_deref (Boolean) — default: false

    Call ‘#freeze` before returning the data from Concern::Obligation#value

  • :copy_on_deref (Proc) — default: nil

    When calling the Concern::Obligation#value method, call the given proc passing the internal value as the sole argument then return the new value returned from the proc.

  • :args (object, Array)

    zero or more arguments to be passed the task block on execution

Yields:

  • the asynchronous operation to perform

Raises:

  • (ArgumentError)

    if no block is given



33
34
35
36
# File 'lib/concurrent-ruby/concurrent/future.rb', line 33

def initialize(opts = {}, &block)
  raise ArgumentError.new('no block given') unless block_given?
  super(NULL, opts.merge(__task_from_block__: block), &nil)
end

Class Method Details

.execute(opts = {}) { ... } ⇒ Future

Create a new ‘Future` object with the given block, execute it, and return the `:pending` object.

Examples:

future = Concurrent::Future.execute{ sleep(1); 42 }
future.state #=> :pending

Parameters:

  • opts (Hash) (defaults to: {})

    the options used to define the behavior at update and deref and to specify the executor on which to perform actions

Options Hash (opts):

  • :executor (Executor)

    when set use the given ‘Executor` instance. Three special values are also supported: `:io` returns the global pool for long, blocking (IO) tasks, `:fast` returns the global pool for short, fast operations, and `:immediate` returns the global `ImmediateExecutor` object.

  • :dup_on_deref (Boolean) — default: false

    Call ‘#dup` before returning the data from Concern::Obligation#value

  • :freeze_on_deref (Boolean) — default: false

    Call ‘#freeze` before returning the data from Concern::Obligation#value

  • :copy_on_deref (Proc) — default: nil

    When calling the Concern::Obligation#value method, call the given proc passing the internal value as the sole argument then return the new value returned from the proc.

  • :args (object, Array)

    zero or more arguments to be passed the task block on execution

Yields:

  • the asynchronous operation to perform

Returns:

  • (Future)

    the newly created ‘Future` in the `:pending` state

Raises:

  • (ArgumentError)

    if no block is given



77
78
79
# File 'lib/concurrent-ruby/concurrent/future.rb', line 77

def self.execute(opts = {}, &block)
  Future.new(opts, &block).execute
end

Instance Method Details

#cancelBoolean

Attempt to cancel the operation if it has not already processed. The operation can only be cancelled while still ‘pending`. It cannot be cancelled once it has begun processing or has completed.

Returns:

  • (Boolean)

    was the operation successfully cancelled.



99
100
101
102
103
104
105
106
# File 'lib/concurrent-ruby/concurrent/future.rb', line 99

def cancel
  if compare_and_set_state(:cancelled, :pending)
    complete(false, nil, CancelledOperationError.new)
    true
  else
    false
  end
end

#cancelled?Boolean

Has the operation been successfully cancelled?

Returns:

  • (Boolean)


111
112
113
# File 'lib/concurrent-ruby/concurrent/future.rb', line 111

def cancelled?
  state == :cancelled
end

#executeFuture

Execute an ‘:unscheduled` `Future`. Immediately sets the state to `:pending` and passes the block to a new thread/thread pool for eventual execution. Does nothing if the `Future` is in any state other than `:unscheduled`.

Examples:

Instance and execute in separate steps

future = Concurrent::Future.new{ sleep(1); 42 }
future.state #=> :unscheduled
future.execute
future.state #=> :pending

Instance and execute in one line

future = Concurrent::Future.new{ sleep(1); 42 }.execute
future.state #=> :pending

Returns:

  • (Future)

    a reference to ‘self`



53
54
55
56
57
58
# File 'lib/concurrent-ruby/concurrent/future.rb', line 53

def execute
  if compare_and_set_state(:pending, :unscheduled)
    @executor.post{ safe_execute(@task, @args) }
    self
  end
end

#set(value = NULL) { ... } ⇒ IVar

Set the ‘IVar` to a value and wake or notify all threads waiting on it.

Parameters:

  • value (Object) (defaults to: NULL)

    the value to store in the ‘IVar`

Yields:

  • A block operation to use for setting the value

Returns:

Raises:



82
83
84
85
86
87
88
89
90
91
92
# File 'lib/concurrent-ruby/concurrent/future.rb', line 82

def set(value = NULL, &block)
  check_for_block_or_value!(block_given?, value)
  synchronize do
    if @state != :unscheduled
      raise MultipleAssignmentError
    else
      @task = block || Proc.new { value }
    end
  end
  execute
end

#wait_or_cancel(timeout) ⇒ Boolean

Wait the given number of seconds for the operation to complete. On timeout attempt to cancel the operation.

Parameters:

  • timeout (Numeric)

    the maximum time in seconds to wait.

Returns:

  • (Boolean)

    true if the operation completed before the timeout else false



121
122
123
124
125
126
127
128
129
# File 'lib/concurrent-ruby/concurrent/future.rb', line 121

def wait_or_cancel(timeout)
  wait(timeout)
  if complete?
    true
  else
    cancel
    false
  end
end