Module: Concurrent::Observable

Included in:
Agent, IVar, TimerTask
Defined in:
lib/concurrent/observable.rb

Overview

The [observer pattern](en.wikipedia.org/wiki/Observer_pattern) is one of the most useful design pattern.

The workflow is very simple:

  • an ‘observer` can register itself to a `subject` via a callback

  • many ‘observers` can be registered to the same `subject`

  • the ‘subject` notifies all registered observers when its status changes

  • an ‘observer` can deregister itself when is no more interested to receive event notifications

In a single threaded environment the whole pattern is very easy: the ‘subject` can use a simple data structure to manage all its subscribed `observer`s and every `observer` can react directly to every event without caring about synchronization.

In a multi threaded environment things are more complex. The ‘subject` must synchronize the access to its data structure and to do so currently we’re using two specialized ObserverSet: CopyOnWriteObserverSet and CopyOnNotifyObserverSet.

When implementing and ‘observer` there’s a very important rule to remember: **there are no guarantees about the thread that will execute the callback**

Let’s take this example “‘ class Observer

def initialize
  @count = 0
end

def update
  @count += 1
end

end

obs = Observer.new [obj1, obj2, obj3, obj4].each { |o| o.add_observer(obs) } # execute [obj1, obj2, obj3, obj4] “‘

‘obs` is wrong because the variable `@count` can be accessed by different threads at the same time, so it should be synchronized (using either a Mutex or an AtomicFixum)

Instance Method Summary collapse

Instance Method Details

#add_observer(*args, &block) ⇒ Object

Returns the added observer.

Returns:

  • (Object)

    the added observer



42
43
44
# File 'lib/concurrent/observable.rb', line 42

def add_observer(*args, &block)
  observers.add_observer(*args, &block)
end

#count_observersInteger

Returns the observers count.

Returns:

  • (Integer)

    the observers count



65
66
67
# File 'lib/concurrent/observable.rb', line 65

def count_observers
  observers.count_observers
end

#delete_observer(*args) ⇒ Object

Returns the deleted observer.

Returns:

  • (Object)

    the deleted observer



54
55
56
# File 'lib/concurrent/observable.rb', line 54

def delete_observer(*args)
  observers.delete_observer(*args)
end

#delete_observersObservable

Returns self.

Returns:



59
60
61
62
# File 'lib/concurrent/observable.rb', line 59

def delete_observers
  observers.delete_observers
  self
end

#with_observer(*args, &block) ⇒ Observable

as #add_observer but it can be used for chaining

Returns:



48
49
50
51
# File 'lib/concurrent/observable.rb', line 48

def with_observer(*args, &block)
  add_observer *args, &block
  self
end