Class: Adaptation::Mom::Mom

Inherits:
Object show all
Defined in:
lib/adaptation/mom.rb

Instance Method Summary collapse

Constructor Details

#initialize(mom_uri) ⇒ Mom

Returns a new instance of Mom.



10
11
12
13
# File 'lib/adaptation/mom.rb', line 10

def initialize mom_uri
  @mom_uri = mom_uri
  @messages = []
end

Instance Method Details

#listObject



70
71
72
73
74
75
76
# File 'lib/adaptation/mom.rb', line 70

def list
  puts "MOM subscriptions:"
  get_subscribers.each do |s|
    puts "  #{s}"
  end
  return
end

#publish(message, topic) ⇒ Object



25
26
27
28
29
30
31
32
33
34
# File 'lib/adaptation/mom.rb', line 25

def publish message, topic
 # Insert message into messages buffer, and awake
 # deliverer process (@sleeper) if paused  
  puts "-----------------------------------"
  puts "Received message in topic: #{topic}"
  puts "#{message}"
  puts "-----------------------------------"
  @messages << [message, topic]
  @sleeper.run if @sleeper.stop?
end

#startObject



36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
# File 'lib/adaptation/mom.rb', line 36

def start
  DRb.start_service(@mom_uri, self)
  puts "MOM started. Listening at #{@mom_uri}"
     
  @sleeper = Thread.new{
    loop do 

      # deliver all messages
      while !@messages.empty?
        @messages.each do |message|
          get_subscribers.each do |uri|
            begin
              puts "Calling #{uri}"
              DRb.start_service
              oapdaemon = DRbObject.new(nil, uri)
              oapdaemon.send_message message[0], message[1]
            rescue
              puts "Couldn't send message to subscriber: #{uri}"
            end
          end
          @messages.delete message
        end
      end

      # go to sleep
      Thread.stop
    
    end
  }
   
  @sleeper.join
  DRb.thread.join # Don't exit just yet
end

#subscribe(drb_uri) ⇒ Object



15
16
17
18
19
20
21
22
23
# File 'lib/adaptation/mom.rb', line 15

def subscribe drb_uri
  unless get_subscribers.include?(drb_uri)
    add_subscriber drb_uri
    puts "Added new subscriber: #{drb_uri}"
  end

  oapdaemon = DRbObject.new(nil, drb_uri)
  oapdaemon.subscription_result(true)
end