Class: MessageChannel::Druby

Inherits:
Object
  • Object
show all
Defined in:
lib/message_channel/druby.rb

Defined Under Namespace

Classes: Agent, Broker

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(host: nil, port: nil) ⇒ Druby

Returns a new instance of Druby.



110
111
112
113
114
115
# File 'lib/message_channel/druby.rb', line 110

def initialize( host: nil, port: nil )
  @host  =  host  || "127.0.0.1"
  @port  =  ( port  ||  8787 ).to_i
  @agent  =  Agent.new( host: @host, port: @port )
  @threads  =  {}
end

Instance Attribute Details

#hostObject (readonly)

Returns the value of attribute host.



108
109
110
# File 'lib/message_channel/druby.rb', line 108

def host
  @host
end

#portObject (readonly)

Returns the value of attribute port.



108
109
110
# File 'lib/message_channel/druby.rb', line 108

def port
  @port
end

Instance Method Details

#listen(*patterns, &block) ⇒ Object



158
159
160
161
162
163
164
165
166
# File 'lib/message_channel/druby.rb', line 158

def listen( *patterns, &block )
  if block.nil?
    listen_once( *patterns )
  else
    listen_each( *patterns ) do |topic, items|
      block.call( topic, items )
    end
  end
end

#listen_each(*patterns, &block) ⇒ Object



143
144
145
146
147
148
149
150
151
152
153
154
155
156
# File 'lib/message_channel/druby.rb', line 143

def listen_each( *patterns, &block )
  patterns.each do |pattern|
    @threads[pattern]  =  Thread.start(pattern) do |pattern|
      begin
        @agent.listen_each( pattern ) do |topic, message| 
          items  =  JSON.parse( message, symbolize_names: true )
          block.call( topic, items )
        end
      rescue => error
        nil
      end
    end
  end
end

#listen_once(*patterns) ⇒ Object



117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
# File 'lib/message_channel/druby.rb', line 117

def listen_once( *patterns )
  queue  =  Queue.new
  threads  =  {}
  patterns.each do |pattern|
    threads[pattern]  =  Thread.start(pattern) do |pattern|
      agent  =  Agent.new
      begin
        topic, message  =  * agent.listen_once( pattern )
        items  =  JSON.parse( message, symbolize_names: true )
        queue.push( [topic, items] )
      rescue => error
        nil
      end
    end
  end

  topic, items  =  queue.pop
  patterns.each do |pattern|
    threads[pattern].kill    rescue  nil
    threads.delete( pattern )    rescue  nil
  end
  [topic, items]
rescue
  nil
end

#notify(topic, **items) ⇒ Object



175
176
177
# File 'lib/message_channel/druby.rb', line 175

def notify( topic, **items )
  @agent.notify( topic, items.to_json )
end

#unlisten(**patterns) ⇒ Object



168
169
170
171
172
173
# File 'lib/message_channel/druby.rb', line 168

def unlisten( **patterns )
  patterns.each do |pattern|
    @agent.unlisten( pattern )
    @threads.delete( pattern )
  end
end