Class: Invoker::Reactor

Inherits:
Object
  • Object
show all
Defined in:
lib/invoker/reactor.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeReactor

Returns a new instance of Reactor.



4
5
6
# File 'lib/invoker/reactor.rb', line 4

def initialize
  @monitored_fds = []
end

Instance Attribute Details

#monitored_fdsObject

Returns the value of attribute monitored_fds.



3
4
5
# File 'lib/invoker/reactor.rb', line 3

def monitored_fds
  @monitored_fds
end

Instance Method Details

#add_to_monitor(fd) ⇒ Object



8
9
10
# File 'lib/invoker/reactor.rb', line 8

def add_to_monitor(fd)
  @monitored_fds << fd
end

#handle_read_event(ready_read_fds) ⇒ Object



24
25
26
27
# File 'lib/invoker/reactor.rb', line 24

def handle_read_event(ready_read_fds)
  ready_fds = ready_read_fds.flatten.compact
  ready_fds.each {|ready_fd| process_read(ready_fd) }
end

#process_read(ready_fd) ⇒ Object



29
30
31
32
33
34
35
36
37
38
# File 'lib/invoker/reactor.rb', line 29

def process_read(ready_fd)
  command_worker = Invoker::COMMANDER.get_worker_from_fd(ready_fd)
  begin
    data = read_data(ready_fd)
    command_worker.receive_data(data)
  rescue Invoker::Errors::ProcessTerminated
    remove_from_monitoring(command_worker.pipe_end)
    command_worker.unbind()
  end
end

#read_data(ready_fd) ⇒ Object



40
41
42
43
44
45
46
47
48
49
50
51
52
53
# File 'lib/invoker/reactor.rb', line 40

def read_data(ready_fd)
  sock_data = []
  begin
    while(t_data = ready_fd.read_nonblock(64))
      sock_data << t_data
    end
  rescue Errno::EAGAIN
    return sock_data.join
  rescue Errno::EWOULDBLOCK
    return sock_data.join
  rescue
    raise Invoker::Errors::ProcessTerminated.new(ready_fd,sock_data.join)
  end
end

#remove_from_monitoring(fd) ⇒ Object



12
13
14
# File 'lib/invoker/reactor.rb', line 12

def remove_from_monitoring(fd)
  @monitored_fds.delete(fd)
end

#watch_on_pipeObject



16
17
18
19
20
21
22
# File 'lib/invoker/reactor.rb', line 16

def watch_on_pipe
  ready_read_fds,ready_write_fds,read_error_fds = select(monitored_fds,[],[],0.05)

  if ready_read_fds && !ready_read_fds.empty?
    handle_read_event(ready_read_fds)
  end
end