Class: Concurrent::IVar

Inherits:
Object
  • Object
show all
Includes:
Obligation, Observable
Defined in:
lib/concurrent/ivar.rb

Overview

An IVar is like a future that you can assign. As a future is a value that is being computed that you can wait on, an IVar is a value that is waiting to be assigned, that you can wait on. IVars are single assignment and deterministic.

Then, express futures as an asynchronous computation that assigns an IVar. The IVar becomes the primitive on which futures and dataflow are built.

An IVar is a single-element container that is normally created empty, and can only be set once. The I in IVar stands for immutable. Reading an IVar normally blocks until it is set. It is safe to set and read an IVar from different threads.

If you want to have some parallel task set the value in an IVar, you want a Future. If you want to create a graph of parallel tasks all executed when the values they depend on are ready you want dataflow. IVar is generally a low-level primitive.

See Also:

Examples:

Create, set and get an IVar

ivar = Concurrent::IVar.new
ivar.set 14
ivar.get #=> 14
ivar.set 2 # would now be an error

Direct Known Subclasses

Channel::Probe, Future, ScheduledTask

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(value = NO_VALUE, opts = {}) ⇒ IVar

Create a new IVar in the :pending state with the (optional) initial value.

Parameters:

  • value (Object) (defaults to: NO_VALUE)

    the initial value

  • opts (Hash) (defaults to: {})

    the options to create a message with

Options Hash (opts):

  • :dup_on_deref (String) — default: false

    call #dup before returning the data

  • :freeze_on_deref (String) — default: false

    call #freeze before returning the data

  • :copy_on_deref (String) — default: nil

    call the given Proc passing the internal value and returning the value returned from the proc



49
50
51
52
53
54
55
56
57
58
59
# File 'lib/concurrent/ivar.rb', line 49

def initialize(value = NO_VALUE, opts = {})
  init_obligation
  self.observers = CopyOnWriteObserverSet.new
  set_deref_options(opts)

  if value == NO_VALUE
    @state = :pending
  else
    set(value)
  end
end

Instance Attribute Details

#observersObject (protected) Originally defined in module Observable

Returns the value of attribute observers

Instance Method Details

#add_observer(observer = nil, func = :update, &block) ⇒ Object

Add an observer on this object that will receive notification on update.

Upon completion the IVar will notify all observers in a thread-say way. The func method of the observer will be called with three arguments: the Time at which the Future completed the asynchronous operation, the final value (or nil on rejection), and the final reason (or nil on fulfillment).

Parameters:

  • observer (Object) (defaults to: nil)

    the object that will be notified of changes

  • func (Symbol) (defaults to: :update)

    symbol naming the method to call when this Observable has changes`

Raises:

  • (ArgumentError)


70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
# File 'lib/concurrent/ivar.rb', line 70

def add_observer(observer = nil, func = :update, &block)
  raise ArgumentError.new('cannot provide both an observer and a block') if observer && block
  direct_notification = false

  if block
    observer = block
    func = :call
  end

  mutex.synchronize do
    if event.set?
      direct_notification = true
    else
      observers.add_observer(observer, func)
    end
  end

  observer.send(func, Time.now, self.value, reason) if direct_notification
  observer
end

#completed?Boolean Originally defined in module Obligation

Returns:

  • (Boolean)

#count_observersInteger Originally defined in module Observable

Returns the observers count

Returns:

  • (Integer)

    the observers count

#delete_observer(*args) ⇒ Object Originally defined in module Observable

Returns the deleted observer

Returns:

  • (Object)

    the deleted observer

#delete_observersObservable Originally defined in module Observable

Returns self

Returns:

#exception(*args) ⇒ Object Originally defined in module Obligation

Examples:

allows Obligation to be risen

rejected_ivar = Ivar.new.fail
raise rejected_ivar

#fail(reason = StandardError.new) ⇒ Object

Set the IVar to failed due to some error and wake or notify all threads waiting on it.

Parameters:

  • reason (Object) (defaults to: StandardError.new)

    for the failure

Raises:



103
104
105
# File 'lib/concurrent/ivar.rb', line 103

def fail(reason = StandardError.new)
  complete(false, nil, reason)
end

#fulfilled?Boolean Also known as: realized? Originally defined in module Obligation

Has the obligation been fulfilled?

Returns:

  • (Boolean)

#incomplete?Boolean Originally defined in module Obligation

Returns:

  • (Boolean)

#init_mutexObject (protected) Originally defined in module Dereferenceable

Note:

This method must be called from within the constructor of the including class.

Initializes the internal Mutex.

See Also:

#mutexMutex (protected) Originally defined in module Dereferenceable

A mutex lock used for synchronizing thread-safe operations. Methods defined by Dereferenceable are synchronized using the Mutex returned from this method. Operations performed by the including class that operate on the @value instance variable should be locked with this Mutex.

Returns:

  • (Mutex)

    the synchronization object

#no_error!(timeout = nil) ⇒ Obligation Originally defined in module Obligation

wait until Obligation is #complete?

Parameters:

  • timeout (Numeric) (defaults to: nil)

    the maximum time in second to wait.

Returns:

Raises:

  • (Exception)

    when #rejected? it raises #reason

#pending?Boolean Originally defined in module Obligation

Is obligation completion still pending?

Returns:

  • (Boolean)

#reasonObject Originally defined in module Obligation

#rejected?Boolean Originally defined in module Obligation

Has the obligation been rejected?

Returns:

  • (Boolean)

#set(value) ⇒ Object

Set the IVar to a value and wake or notify all threads waiting on it.

Parameters:

  • value (Object)

    the value to store in the IVar

Raises:



95
96
97
# File 'lib/concurrent/ivar.rb', line 95

def set(value)
  complete(true, value, nil)
end

#set_deref_options(opts = {}) ⇒ Object (protected) Originally defined in module Dereferenceable

Note:

Most classes that include this module will call #set_deref_options

Set the options which define the operations #value performs before returning data to the caller (dereferencing).

from within the constructor, thus allowing these options to be set at object creation.

Parameters:

  • opts (Hash) (defaults to: {})

    the options defining dereference behavior.

Options Hash (opts):

  • :dup_on_deref (String) — default: false

    call #dup before returning the data

  • :freeze_on_deref (String) — default: false

    call #freeze before returning the data

  • :copy_on_deref (String) — default: nil

    call the given Proc passing the internal value and returning the value returned from the proc

#stateObject Originally defined in module Obligation

#unscheduled?Boolean Originally defined in module Obligation

Is the obligation still unscheduled?

Returns:

  • (Boolean)

#value(timeout = nil) ⇒ Object Originally defined in module Obligation

Returns see Dereferenceable#deref

Returns:

  • (Object)

    see Dereferenceable#deref

#value!(timeout = nil) ⇒ Object Originally defined in module Obligation

Returns see Dereferenceable#deref

Returns:

  • (Object)

    see Dereferenceable#deref

Raises:

  • (Exception)

    when #rejected? it raises #reason

#value=(val) ⇒ Object (protected) Originally defined in module Dereferenceable

Set the internal value of this object

Parameters:

  • val (Object)

    the new value

#wait(timeout = nil) ⇒ Obligation Originally defined in module Obligation

wait until Obligation is #complete?

Parameters:

  • timeout (Numeric) (defaults to: nil)

    the maximum time in second to wait.

Returns:

#with_observer(*args, &block) ⇒ Observable Originally defined in module Observable

as #add_observer but it can be used for chaining

Returns: