Class: Concurrent::IVar

Inherits:
Synchronization::LockableObject
  • Object
show all
Includes:
Concern::Obligation, Concern::Observable
Defined in:
lib/concurrent/ivar.rb

Overview

An ‘IVar` is like a future that you can assign. As a future is a value that is being computed that you can wait on, an `IVar` is a value that is waiting to be assigned, that you can wait on. `IVars` are single assignment and deterministic.

Then, express futures as an asynchronous computation that assigns an ‘IVar`. The `IVar` becomes the primitive on which [futures](Future) and [dataflow](Dataflow) are built.

An ‘IVar` is a single-element container that is normally created empty, and can only be set once. The I in `IVar` stands for immutable. Reading an `IVar` normally blocks until it is set. It is safe to set and read an `IVar` from different threads.

If you want to have some parallel task set the value in an ‘IVar`, you want a `Future`. If you want to create a graph of parallel tasks all executed when the values they depend on are ready you want `dataflow`. `IVar` is generally a low-level primitive.

## Examples

Create, set and get an ‘IVar`

“‘ruby ivar = Concurrent::IVar.new ivar.set 14 ivar.value #=> 14 ivar.set 2 # would now be an error “`

## See Also

  1. For the theory: Arvind, R. Nikhil, and K. Pingali. [I-Structures: Data structures for parallel computing](dl.acm.org/citation.cfm?id=69562). In Proceedings of Workshop on Graph Reduction, 1986.

  2. For recent application: [DataDrivenFuture in Habanero Java from Rice](www.cs.rice.edu/~vs3/hjlib/doc/edu/rice/hj/api/HjDataDrivenFuture.html).

Direct Known Subclasses

Future, Promise, ScheduledTask

Instance Method Summary collapse

Methods included from Concern::Observable

#count_observers, #delete_observer, #delete_observers, #with_observer

Methods included from Concern::Obligation

#complete?, #exception, #fulfilled?, #incomplete?, #pending?, #reason, #rejected?, #state, #unscheduled?, #value, #value!, #wait, #wait!

Methods included from Concern::Dereferenceable

#value

Constructor Details

#initialize(value = NULL, opts = {}, &block) ⇒ IVar

Create a new ‘IVar` in the `:pending` state with the (optional) initial value.

Parameters:

  • value (Object) (defaults to: NULL)

    the initial value

  • opts (Hash) (defaults to: {})

    the options to create a message with

Options Hash (opts):

  • :dup_on_deref (String) — default: false

    call ‘#dup` before returning the data

  • :freeze_on_deref (String) — default: false

    call ‘#freeze` before returning the data

  • :copy_on_deref (String) — default: nil

    call the given ‘Proc` passing the internal value and returning the value returned from the proc



61
62
63
64
65
66
67
# File 'lib/concurrent/ivar.rb', line 61

def initialize(value = NULL, opts = {}, &block)
  if value != NULL && block_given?
    raise ArgumentError.new('provide only a value or a block')
  end
  super(&nil)
  synchronize { ns_initialize(value, opts, &block) }
end

Instance Method Details

#add_observer(observer = nil, func = :update, &block) ⇒ Object

Add an observer on this object that will receive notification on update.

Upon completion the ‘IVar` will notify all observers in a thread-safe way. The `func` method of the observer will be called with three arguments: the `Time` at which the `Future` completed the asynchronous operation, the final `value` (or `nil` on rejection), and the final `reason` (or `nil` on fulfillment).

Parameters:

  • observer (Object) (defaults to: nil)

    the object that will be notified of changes

  • func (Symbol) (defaults to: :update)

    symbol naming the method to call when this ‘Observable` has changes`

Raises:

  • (ArgumentError)


80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
# File 'lib/concurrent/ivar.rb', line 80

def add_observer(observer = nil, func = :update, &block)
  raise ArgumentError.new('cannot provide both an observer and a block') if observer && block
  direct_notification = false

  if block
    observer = block
    func = :call
  end

  synchronize do
    if event.set?
      direct_notification = true
    else
      observers.add_observer(observer, func)
    end
  end

  observer.send(func, Time.now, self.value, reason) if direct_notification
  observer
end

#fail(reason = StandardError.new) ⇒ IVar

Set the ‘IVar` to failed due to some error and wake or notify all threads waiting on it.

Parameters:

  • reason (Object) (defaults to: StandardError.new)

    for the failure

Returns:

Raises:



134
135
136
# File 'lib/concurrent/ivar.rb', line 134

def fail(reason = StandardError.new)
  complete(false, nil, reason)
end

#set(value = NULL) { ... } ⇒ IVar

Set the ‘IVar` to a value and wake or notify all threads waiting on it.

Parameters:

  • value (Object) (defaults to: NULL)

    the value to store in the ‘IVar`

Yields:

  • A block operation to use for setting the value

Returns:

Raises:



112
113
114
115
116
117
118
119
120
121
122
123
124
125
# File 'lib/concurrent/ivar.rb', line 112

def set(value = NULL)
  check_for_block_or_value!(block_given?, value)
  raise MultipleAssignmentError unless compare_and_set_state(:processing, :pending)

  begin
    value = yield if block_given?
    complete_without_notification(true, value, nil)
  rescue => ex
    complete_without_notification(false, nil, ex)
  end

  notify_observers(self.value, reason)
  self
end

#try_set(value = NULL) { ... } ⇒ Boolean

Attempt to set the ‘IVar` with the given value or block. Return a boolean indicating the success or failure of the set operation.

Parameters:

  • value (Object) (defaults to: NULL)

    the value to store in the ‘IVar`

Yields:

  • A block operation to use for setting the value

Returns:

  • (Boolean)

    true if the value was set else false

Raises:



144
145
146
147
148
149
# File 'lib/concurrent/ivar.rb', line 144

def try_set(value = NULL, &block)
  set(value, &block)
  true
rescue MultipleAssignmentError
  false
end