Class: Pione::DRbPatch::ReplyReader

Inherits:
Object
  • Object
show all
Defined in:
lib/pione/patch/drb-patch.rb

Instance Method Summary collapse

Constructor Details

#initializeReplyReader

Returns a new instance of ReplyReader.



24
25
26
27
# File 'lib/pione/patch/drb-patch.rb', line 24

def initialize
  @watcher_lock = Mutex.new
  @watchers = Set.new
end

Instance Method Details

#add_watcher(watcher) ⇒ Object

Makes reader thread for receiving unordered replies.



56
57
58
59
60
# File 'lib/pione/patch/drb-patch.rb', line 56

def add_watcher(watcher)
  @watcher_lock.synchronize do
    @watchers << watcher
  end
end

#remove_watcher(watcher) ⇒ Object

Remove the request reader thread watcher.



63
64
65
66
67
# File 'lib/pione/patch/drb-patch.rb', line 63

def remove_watcher(watcher)
  @watcher_lock.synchronize do
    @watchers.delete_if {|th| th == watcher}
  end
end

#start(protocol) ⇒ Object



29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
# File 'lib/pione/patch/drb-patch.rb', line 29

def start(protocol)
  @thread ||= Thread.new do
    begin
      # loop for receiving reply and waiting the result
      while true
        # receive a replay
        req_id, succ, result = protocol.recv_reply
        # register it to waiter table
        DRbPatch.waiter_table.push(req_id, [succ, result])
      end
    rescue => e
      @watcher_lock.synchronize do
        # pass the exception to watchers
        @watchers.each do |watcher|
          Log::Debug.communication("connection error happened in receiving reply.")
          Log::Debug.communication(e)
          watcher.raise(ReplyReaderError.new(e)) if watcher.alive?
        end

        # remove dead watchers
        @watchers.delete_if {|watcher| not(watcher.alive?)}
      end
    end
  end
end