Class: MessageChannel::Druby::Agent
- Inherits:
-
Object
- Object
- MessageChannel::Druby::Agent
- Defined in:
- lib/message_channel/druby.rb
Instance Method Summary collapse
-
#initialize(host: @@host, port: @@port) ⇒ Agent
constructor
A new instance of Agent.
- #listen_each(pattern, &block) ⇒ Object
- #listen_once(pattern) ⇒ Object
- #notify(topic, message) ⇒ Object
- #unlisten(pattern) ⇒ Object
Constructor Details
#initialize(host: @@host, port: @@port) ⇒ Agent
Returns a new instance of Agent.
63 64 65 66 67 68 69 70 71 72 73 74 75 |
# File 'lib/message_channel/druby.rb', line 63 def initialize( host: @@host, port: @@port ) @@host = host @@port = port @uri = "druby://#{host}:#{port}" if !defined?( @@Broker ) || @@Broker.nil? @@Broker = Broker.new DRb.start_service( @uri, @@Broker ) rescue nil end @drb = DRbObject.new_with_uri( @uri ) @queue_ids = {} end |
Instance Method Details
#listen_each(pattern, &block) ⇒ Object
81 82 83 84 85 86 87 88 89 90 91 92 93 94 |
# File 'lib/message_channel/druby.rb', line 81 def listen_each( pattern, &block ) queue_id = @drb.subscribe( pattern ) @queue_ids[pattern] = queue_id while true topic, = * @drb.wait( queue_id ) break if topic.nil? block.call( topic, ) end rescue => error nil ensure @drb.unsubscribe( queue_id ) @queue_ids.delete( pattern ) rescue nil end |
#listen_once(pattern) ⇒ Object
77 78 79 |
# File 'lib/message_channel/druby.rb', line 77 def listen_once( pattern ) topic, = * @drb.fetch( pattern ) end |
#notify(topic, message) ⇒ Object
103 104 105 |
# File 'lib/message_channel/druby.rb', line 103 def notify( topic, ) @drb.publish( topic, ) end |
#unlisten(pattern) ⇒ Object
96 97 98 99 100 101 |
# File 'lib/message_channel/druby.rb', line 96 def unlisten( pattern ) if ( queue_id = @queue_ids[pattern] ) @drb.unsubscribe( queue_id ) @queue_ids.delete( pattern ) rescue nil end end |