Class: Af::TCPCommand::Server

Inherits:
Object
  • Object
show all
Includes:
Application::Proxy
Defined in:
lib/fiksu-af/tcp_command/server.rb

Defined Under Namespace

Classes: InvalidCommand, NoMoreToDo

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Application::Proxy

#af_logger, #af_name

Constructor Details

#initialize(server_hostname, server_port) ⇒ Server

Returns a new instance of Server.



13
14
15
16
17
18
19
20
# File 'lib/fiksu-af/tcp_command/server.rb', line 13

def initialize(server_hostname, server_port)
  @server_hostname = server_hostname
  @server_port = server_port
  @server = TCPServer.new(server_hostname, server_port)
  @sessions = []
  @more_to_do = true
  @items = []
end

Instance Attribute Details

#itemsObject

Returns the value of attribute items.



11
12
13
# File 'lib/fiksu-af/tcp_command/server.rb', line 11

def items
  @items
end

#serverObject (readonly)

Returns the value of attribute server.



10
11
12
# File 'lib/fiksu-af/tcp_command/server.rb', line 10

def server
  @server
end

#server_hostnameObject (readonly)

Returns the value of attribute server_hostname.



10
11
12
# File 'lib/fiksu-af/tcp_command/server.rb', line 10

def server_hostname
  @server_hostname
end

#server_portObject (readonly)

Returns the value of attribute server_port.



10
11
12
# File 'lib/fiksu-af/tcp_command/server.rb', line 10

def server_port
  @server_port
end

#sessionsObject (readonly)

Returns the value of attribute sessions.



10
11
12
# File 'lib/fiksu-af/tcp_command/server.rb', line 10

def sessions
  @sessions
end

Instance Method Details

#_command_ready(rfd) ⇒ Object



55
56
57
58
59
60
61
62
63
64
65
66
67
68
# File 'lib/fiksu-af/tcp_command/server.rb', line 55

def _command_ready(rfd)
  if more_to_do?
    item = next_item
    if item
      logger.detail "requesting slave process: #{item}"
      rfd.write("#{item}\n")
    else
      no_more_to_do!
      raise NoMoreToDo.new
    end
  else
    raise NoMoreToDo.new
  end
end

#_unknown_command(line, rfd) ⇒ Object

Raises:



51
52
53
# File 'lib/fiksu-af/tcp_command/server.rb', line 51

def _unknown_command(line, rfd)
  raise InvalidCommand.new(dispatcher_command.to_s)
end

#command_dispatcher(line, rfd) ⇒ Object



42
43
44
45
46
47
48
49
# File 'lib/fiksu-af/tcp_command/server.rb', line 42

def command_dispatcher(line, rfd)
  dispatcher_command = "_command_#{line}".to_sym
  if self.respond_to?(dispatcher_command)
    self.send(dispatcher_command, rfd)
  else
    _unknown_command(line, rfd)
  end
end

#command_reader(rfd) ⇒ Object



38
39
40
# File 'lib/fiksu-af/tcp_command/server.rb', line 38

def command_reader(rfd)
  return rfd.readline.chomp
end

#loggerObject



22
23
24
# File 'lib/fiksu-af/tcp_command/server.rb', line 22

def logger
  return af_logger(self.class.name)
end

#more_to_do?Boolean

Returns:

  • (Boolean)


26
27
28
# File 'lib/fiksu-af/tcp_command/server.rb', line 26

def more_to_do?
  return @more_to_do
end

#next_itemObject



34
35
36
# File 'lib/fiksu-af/tcp_command/server.rb', line 34

def next_item
  return @items.shift
end

#no_more_to_do!Object



30
31
32
# File 'lib/fiksu-af/tcp_command/server.rb', line 30

def no_more_to_do!
  @more_to_do = false
end

#serveObject



70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
# File 'lib/fiksu-af/tcp_command/server.rb', line 70

def serve
  while true
    if !more_to_do? && sessions.blank?
      break
    end
    reads = [server] + sessions
    rfds, wfds, efds = IO.select(reads)
    if efds.present?
      logger.error "error: #{efds.inspect}"
      sessions -= efds
    end
    rfds.each do |rfd|
      logger.debug_fine "rfd: #{rfd.inspect}"
      if rfd == server
        nfd = server.accept
        if more_to_do?
          sessions << nfd
          logger.info "new slave: #{nfd.inspect}"
        else
          logger.warn "ignoring new slave: #{nfd.inspect}"
          nfd.close
        end
      else
        close_rfd = false
        begin
          # XXX need to keep track of which lines are processed by which slaves
          # XXX so we can retry processing when a slave crashes
          line = command_reader(rfd)
          command_dispatcher(line, rfd)
        rescue NoMoreToDo
          logger.info "closing slave connection: #{rfd.inspect}"
          close_rfd = true
        rescue InvalidCommand => e
          logger.warn "unknown request from slave: '#{e.message}': #{rfd.inspect}"
          close_rfd = true
        rescue EOFError
          logger.warn "slave closed connection: #{rfd.inspect}"
          close_rfd = true
        end
        if close_rfd
          logger.info "closing connection to slave: #{rfd.inspect}"
          sessions -= [rfd]
          rfd.close
        end
      end
    end
  end
end