Class: MessageChannel::Observer

Inherits:
Object
  • Object
show all
Defined in:
lib/message_channel/observer.rb

Defined Under Namespace

Classes: Agent

Instance Method Summary collapse

Constructor Details

#initialize(**options) ⇒ Observer

Returns a new instance of Observer.



17
18
19
20
21
22
23
# File 'lib/message_channel/observer.rb', line 17

def initialize( **options )
  @asyncs  =  {}
  @awaits  =  {}
  @queues  =  {}
  @@Agent  ||=  Agent.new
  @@Agent.add_observer( self, :action ) 
end

Instance Method Details

#action(topic, message) ⇒ Object



25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
# File 'lib/message_channel/observer.rb', line 25

def action( topic, message )
  items  =  JSON.parse( message, symbolize_names: true )
  @asyncs.keys.each do |pattern|
    if File.fnmatch( pattern, topic, File::FNM_PATHNAME )
      if  action  =  @asyncs[pattern]
        action.call( topic, items )
      end
    end
  end
  @awaits.keys.each do |queue|
    @awaits[queue].each do |pattern|
      if File.fnmatch( pattern, topic, File::FNM_PATHNAME )
        queue.push( [topic, items] )
      end
    end
  end
end

#listen(*patterns, &block) ⇒ Object



57
58
59
60
61
62
63
64
65
# File 'lib/message_channel/observer.rb', line 57

def listen( *patterns, &block )
  if block.nil?
    listen_once( *patterns )
  else
    listen_each( *patterns ) do |topic, items|
      block.call( topic, items )
    end
  end
end

#listen_each(*patterns, &block) ⇒ Object



51
52
53
54
55
# File 'lib/message_channel/observer.rb', line 51

def listen_each( *patterns, &block )
  patterns.each do |pattern|
    @asyncs[pattern]  =  block
  end
end

#listen_once(*patterns) ⇒ Object



43
44
45
46
47
48
49
# File 'lib/message_channel/observer.rb', line 43

def listen_once( *patterns )
  queue  =  Queue.new
  @awaits[queue]  =  patterns
  topic, items  =  * queue.pop
  @awaits.delete( queue )    rescue nil
  [topic, items]
end

#notify(topic, **items) ⇒ Object



75
76
77
# File 'lib/message_channel/observer.rb', line 75

def notify( topic, **items )
  @@Agent.notify( topic, items.to_json )
end

#unlisten(**patterns) ⇒ Object



67
68
69
70
71
72
73
# File 'lib/message_channel/observer.rb', line 67

def unlisten( **patterns )
  patterns.each do |pattern|
    if  action  =  @asyncs[pattern]
      @asyncs.delete( pattern )
    end
  end
end