Class: Adaptation::Mom::DrubySubscriber

Inherits:
Object
  • Object
show all
Defined in:
lib/adaptation/druby_subscriber.rb

Instance Method Summary collapse

Constructor Details

#initialize(subscriber_uri, mom_uri, topics) ⇒ DrubySubscriber

constructor, called from the subscribe command



11
12
13
14
15
16
# File 'lib/adaptation/druby_subscriber.rb', line 11

def initialize subscriber_uri, mom_uri, topics
  @subscriber_uri = subscriber_uri
  @mom_uri = mom_uri
  @topics = topics
  @messages = []
end

Instance Method Details

#send_message(message, topic) ⇒ Object

method to receive messages, called from the mom



19
20
21
22
23
24
25
26
27
28
# File 'lib/adaptation/druby_subscriber.rb', line 19

def send_message message, topic 
  # Insert message into messages buffer, and awake
  # message processor (@sleeper) if paused
  puts "-----------------------------------"
  puts "Received message in topic: #{topic}"
  puts "#{message}"
  puts "-----------------------------------"
  @messages << {:message => message, :topic => topic}
  @sleeper.run if @sleeper.stop?
end

#startObject



36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
# File 'lib/adaptation/druby_subscriber.rb', line 36

def start
	begin
	  # try to start the subscriber service,
	  # using the uri specified in config/mom.yml
    DRb.start_service(@subscriber_uri, self)
    
	  # subscribe that uri to the mom
	  mom = DRbObject.new(nil, @mom_uri)
    mom.subscribe @subscriber_uri
	rescue Exception => e
    # desired uri already in use
    puts "Couldn't start subscriber at #{@subscriber_uri}. Address already in use?"
    return
	end	
 
  @sleeper = Thread.new{
    loop do 

      # process all messages
      while !@messages.empty?
        @messages.each do |message|         
          if ( (@topics.include?(message[:topic])) or (@topics.include?("all")) )
            system("ruby public/dispatch.rb '#{message[:message]}'") 
          end
          @messages.delete message
        end
      end

      # go to sleep
      Thread.stop
    
    end
  }

  @sleeper.join
  Drb.thread.join

end

#subscription_result(subscribed) ⇒ Object



30
31
32
33
34
# File 'lib/adaptation/druby_subscriber.rb', line 30

def subscription_result subscribed
  if subscribed
    puts "Subscribed to mom (#{@mom_uri}). Listening at #{@subscriber_uri}"
  end
end