Class: MessageChannel::Druby::Agent
- Inherits:
-
Object
- Object
- MessageChannel::Druby::Agent
- Defined in:
- lib/message_channel/druby.rb
Instance Method Summary collapse
-
#initialize(host: @@host, port: @@port) ⇒ Agent
constructor
A new instance of Agent.
- #listen_each(pattern, &block) ⇒ Object
- #listen_once(pattern) ⇒ Object
- #notify(topic, message) ⇒ Object
- #unlisten(pattern) ⇒ Object
Constructor Details
#initialize(host: @@host, port: @@port) ⇒ Agent
Returns a new instance of Agent.
64 65 66 67 68 69 70 71 72 73 74 75 76 |
# File 'lib/message_channel/druby.rb', line 64 def initialize( host: @@host, port: @@port ) @@host = host @@port = port @uri = "druby://#{host}:#{port}" if !defined?( @@Broker ) || @@Broker.nil? @@Broker = Broker.new DRb.start_service( @uri, @@Broker ) rescue nil end @drb = DRbObject.new_with_uri( @uri ) @queue_ids = {} end |
Instance Method Details
#listen_each(pattern, &block) ⇒ Object
82 83 84 85 86 87 88 89 90 91 92 93 94 95 |
# File 'lib/message_channel/druby.rb', line 82 def listen_each( pattern, &block ) queue_id = @drb.subscribe( pattern ) @queue_ids[pattern] = queue_id while true topic, = * @drb.wait( queue_id ) break if topic.nil? block.call( topic, ) end rescue => error nil ensure @drb.unsubscribe( queue_id ) @queue_ids.delete( pattern ) rescue nil end |
#listen_once(pattern) ⇒ Object
78 79 80 |
# File 'lib/message_channel/druby.rb', line 78 def listen_once( pattern ) topic, = * @drb.fetch( pattern ) end |
#notify(topic, message) ⇒ Object
104 105 106 |
# File 'lib/message_channel/druby.rb', line 104 def notify( topic, ) @drb.publish( topic, ) end |
#unlisten(pattern) ⇒ Object
97 98 99 100 101 102 |
# File 'lib/message_channel/druby.rb', line 97 def unlisten( pattern ) if ( queue_id = @queue_ids[pattern] ) @drb.unsubscribe( queue_id ) @queue_ids.delete( pattern ) rescue nil end end |