Class: AsyncEmitter
- Inherits:
-
Object
- Object
- AsyncEmitter
- Defined in:
- lib/async_emitter.rb
Overview
The AsyncEmitter class provides a mechanism for asyncronous communication in Ruby programs. Each instantiation provides notification of events registered using any object that is valid as a Hash key. Multiple events can be registered for each key and listeners can be registered for one or many events. Listeners for a key event can be released.
Where more then one listener is registered for an event they are notified in the order they are recieved.
Example
emitter = AsyncEmitter.new
emitter.on :error, lambda { |e| puts "Error: #{e}" }
emitter.on :data, lambda { |data| puts "Data: #{data}" }
begin
data = get_data_from_somewhere
emitter.emit :data, data
rescue Exception => e
emitter.emit :error, e
end
Instance Method Summary collapse
-
#emit(token, data) ⇒ Object
Send notification of an event.
-
#initialize ⇒ AsyncEmitter
constructor
A new instance of AsyncEmitter.
-
#on(token, p, once_only = false) ⇒ Object
Register for notification.
-
#once(token, p) ⇒ Object
Register for single notification - convenience and self documenting method for: on token, proc, true.
-
#release(token) ⇒ Object
Remove notification for an event.
-
#release_all ⇒ Object
Remove all notifications.
Constructor Details
#initialize ⇒ AsyncEmitter
Returns a new instance of AsyncEmitter.
30 31 32 |
# File 'lib/async_emitter.rb', line 30 def initialize @emissions = {} end |
Instance Method Details
#emit(token, data) ⇒ Object
Send notification of an event
#####################################################################
87 88 89 90 91 92 93 94 95 96 97 |
# File 'lib/async_emitter.rb', line 87 def emit (token, data) @emissions[token][:semaphore] ||= Mutex.new @emissions[token][:cv] ||= ConditionVariable.new @emissions[token][:data] ||= [] @emissions[token][:semaphore].synchronize do @emissions[token][:data].push data @emissions[token][:cv].signal end end |
#on(token, p, once_only = false) ⇒ Object
Register for notification
######################################################################
42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 |
# File 'lib/async_emitter.rb', line 42 def on (token, p, once_only=false) @emissions[token] ||= {} @emissions[token][:p] ||= [] @emissions[token][:data] ||= [] @emissions[token][:semaphore] ||= Mutex.new @emissions[token][:cv] ||= ConditionVariable.new @emissions[token][:p].push Hash[:p => p, :o => once_only] @emissions[token][:thread] ||= Thread.new do @emissions[token][:active] = true while @emissions[token][:active] @emissions[token][:semaphore].synchronize do self.post_data_for token @emissions[token][:cv].wait @emissions[token][:semaphore] if @emissions[token][:active] self.post_data_for token end end end end end |
#once(token, p) ⇒ Object
Register for single notification - convenience and self documenting method for: on token, proc, true
######################################################################
76 77 78 |
# File 'lib/async_emitter.rb', line 76 def once (token, p) self.on token, p, true end |
#release(token) ⇒ Object
Remove notification for an event
105 106 107 108 |
# File 'lib/async_emitter.rb', line 105 def release (token) @emissions[token][:active] = false Thread.kill @emissions[token][:thread] end |
#release_all ⇒ Object
Remove all notifications
113 114 115 116 117 118 |
# File 'lib/async_emitter.rb', line 113 def release_all @emissions.each do |key, value| value[:active] = false Thread.kill value[:thread] end end |