Class: Concurrent::IVar

Inherits:
Object
  • Object
show all
Includes:
Obligation
Defined in:
lib/concurrent/ivar.rb

Direct Known Subclasses

Future

Constant Summary collapse

NO_VALUE =
Object.new

Instance Method Summary collapse

Methods included from Obligation

#completed?, #fulfilled?, #incomplete?, #pending?, #reason, #rejected?, #state, #unscheduled?, #value

Methods included from Dereferenceable

#init_mutex, #mutex, #set_deref_options, #value

Constructor Details

#initialize(value = NO_VALUE, opts = {}) ⇒ IVar

Create a new Ivar in the :pending state with the (optional) initial value.

Parameters:

  • value (Object) (defaults to: NO_VALUE)

    the initial value

  • opts (Hash) (defaults to: {})

    the options to create a message with

Options Hash (opts):

  • :dup_on_deref (String) — default: false

    call #dup before returning the data

  • :freeze_on_deref (String) — default: false

    call #freeze before returning the data

  • :copy_on_deref (String) — default: nil

    call the given Proc passing the internal value and returning the value returned from the proc



23
24
25
26
27
28
29
30
31
32
33
# File 'lib/concurrent/ivar.rb', line 23

def initialize(value = NO_VALUE, opts = {})
  init_obligation
  @observers = CopyOnWriteObserverSet.new
  set_deref_options(opts)

  if value == NO_VALUE
    @state = :pending
  else
    set(value)
  end
end

Instance Method Details

#add_observer(observer, func = :update) ⇒ Object

Add an observer on this object that will receive notification on update.

Upon completion the IVar will notify all observers in a thread-say way. The func method of the observer will be called with three arguments: the Time at which the Future completed the asynchronous operation, the final value (or nil on rejection), and the final reason (or nil on fulfillment).

Parameters:

  • observer (Object)

    the object that will be notified of changes

  • func (Symbol) (defaults to: :update)

    symbol naming the method to call when this Observable has changes`



44
45
46
47
48
49
50
51
52
53
54
55
56
57
# File 'lib/concurrent/ivar.rb', line 44

def add_observer(observer, func = :update)
  direct_notification = false

  mutex.synchronize do
    if event.set?
      direct_notification = true
    else
      @observers.add_observer(observer, func)
    end
  end

  observer.send(func, Time.now, self.value, reason) if direct_notification
  func
end

#complete(success, value, reason) ⇒ Object



67
68
69
70
71
72
73
74
75
76
# File 'lib/concurrent/ivar.rb', line 67

def complete(success, value, reason)
  mutex.synchronize do
    raise MultipleAssignmentError.new('multiple assignment') if [:fulfilled, :rejected].include? @state
    set_state(success, value, reason)
    event.set
  end

  time = Time.now
  @observers.notify_and_delete_observers{ [time, self.value, reason] }
end

#fail(reason = nil) ⇒ Object



63
64
65
# File 'lib/concurrent/ivar.rb', line 63

def fail(reason = nil)
  complete(false, nil, reason)
end

#set(value) ⇒ Object



59
60
61
# File 'lib/concurrent/ivar.rb', line 59

def set(value)
  complete(true, value, nil)
end