Class: MessageChannel::Druby::Agent

Inherits:
Object
  • Object
show all
Defined in:
lib/message_channel/druby.rb

Instance Method Summary collapse

Constructor Details

#initialize(host: @@host, port: @@port) ⇒ Agent

Returns a new instance of Agent.



64
65
66
67
68
69
70
71
72
73
74
75
76
# File 'lib/message_channel/druby.rb', line 64

def initialize( host: @@host, port: @@port )
  @@host  =  host
  @@port  =  port
  @uri  =  "druby://#{host}:#{port}"
 
  if  !defined?( @@Broker )  ||  @@Broker.nil?
    @@Broker  =  Broker.new
    DRb.start_service( @uri, @@Broker )    rescue nil
  end
 
  @drb  =  DRbObject.new_with_uri( @uri )
  @queue_ids  =  {}
end

Instance Method Details

#listen_each(pattern, &block) ⇒ Object



82
83
84
85
86
87
88
89
90
91
92
93
94
95
# File 'lib/message_channel/druby.rb', line 82

def listen_each( pattern, &block )
  queue_id  =  @drb.subscribe( pattern )
  @queue_ids[pattern]  =  queue_id
  while  true
    topic, message  =  * @drb.wait( queue_id )
    break    if  topic.nil?
    block.call( topic, message )
  end
rescue => error
  nil
ensure
  @drb.unsubscribe( queue_id )
  @queue_ids.delete( pattern )    rescue  nil
end

#listen_once(pattern) ⇒ Object



78
79
80
# File 'lib/message_channel/druby.rb', line 78

def listen_once( pattern )
  topic, message  =  * @drb.fetch( pattern )
end

#notify(topic, message) ⇒ Object



104
105
106
# File 'lib/message_channel/druby.rb', line 104

def notify( topic, message )
  @drb.publish( topic, message )
end

#unlisten(pattern) ⇒ Object



97
98
99
100
101
102
# File 'lib/message_channel/druby.rb', line 97

def unlisten( pattern )
  if ( queue_id  =  @queue_ids[pattern] )
    @drb.unsubscribe( queue_id )
    @queue_ids.delete( pattern )    rescue  nil
  end
end