Class: Protobuf::Rpc::Socket::Server

Inherits:
Object
  • Object
show all
Includes:
Logging
Defined in:
lib/protobuf/rpc/servers/socket/server.rb

Constant Summary collapse

AUTO_COLLECT_TIMEOUT =

seconds

5

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Logging

initialize_logger, #log_exception, #logger, #sign_message

Constructor Details

#initialize(options) ⇒ Server

Returns a new instance of Server.



23
24
25
26
27
28
29
# File 'lib/protobuf/rpc/servers/socket/server.rb', line 23

def initialize(options)
  self.running = false
  self.host = options.fetch(:host)
  self.port = options.fetch(:port)
  self.backlog = options.fetch(:backlog, 100)
  self.threshold = options.fetch(:threshold, 100)
end

Instance Attribute Details

#runningObject Also known as: running?

Returns the value of attribute running.



20
21
22
# File 'lib/protobuf/rpc/servers/socket/server.rb', line 20

def running
  @running
end

Instance Method Details

#cleanup?Boolean

Returns:

  • (Boolean)


39
40
41
42
# File 'lib/protobuf/rpc/servers/socket/server.rb', line 39

def cleanup?
  # every `threshold` connections run a cleanup routine after closing the response
  !threads.empty? && threads.size % threshold == 0
end

#cleanup_threadsObject



44
45
46
47
48
49
50
51
52
53
54
55
# File 'lib/protobuf/rpc/servers/socket/server.rb', line 44

def cleanup_threads
  logger.debug { sign_message("Thread cleanup - #{threads.size} - start") }

  threads.delete_if do |hash|
    unless (thread = hash.fetch(:thread)).alive?
      thread.join
      working.delete(hash.fetch(:socket))
    end
  end

  logger.debug { sign_message("Thread cleanup - #{threads.size} - complete") }
end

#log_signatureObject



57
58
59
# File 'lib/protobuf/rpc/servers/socket/server.rb', line 57

def log_signature
  @_log_signature ||= "[server-#{self.class.name}]"
end

#new_worker(socket) ⇒ Object



61
62
63
64
65
# File 'lib/protobuf/rpc/servers/socket/server.rb', line 61

def new_worker(socket)
  Thread.new(socket) do |sock|
    ::Protobuf::Rpc::Socket::Worker.new(sock, &:close)
  end
end

#runObject



67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
# File 'lib/protobuf/rpc/servers/socket/server.rb', line 67

def run
  logger.debug { sign_message("Run") }

  server = ::TCPServer.new(host, port)
  fail "The server was unable to start properly." if server.closed?

  begin
    server.listen(backlog)
    listen_fds = [server]
    self.running = true

    while running?
      logger.debug { sign_message("Waiting for connections") }
      ready_cnxns = begin
        IO.select(listen_fds, [], [], AUTO_COLLECT_TIMEOUT)
      rescue IOError
        nil
      end

      if ready_cnxns
        ready_cnxns.first.each do |client|
          case
          when !running?
            # no-op
          when client == server
            logger.debug { sign_message("Accepted new connection") }
            client, _sockaddr = server.accept
            listen_fds << client
          else
            unless working.include?(client)
              working << listen_fds.delete(client)
              logger.debug { sign_message("Working") }
              threads << { :thread => new_worker(client), :socket => client }

              cleanup_threads if cleanup?
            end
          end
        end
      elsif threads.size > 1
        # Run a cleanup if select times out while waiting
        cleanup_threads
      end
    end
  ensure
    server.close
  end
end

#stopObject



115
116
117
# File 'lib/protobuf/rpc/servers/socket/server.rb', line 115

def stop
  self.running = false
end

#threadsObject



31
32
33
# File 'lib/protobuf/rpc/servers/socket/server.rb', line 31

def threads
  @threads ||= []
end

#workingObject



35
36
37
# File 'lib/protobuf/rpc/servers/socket/server.rb', line 35

def working
  @working ||= Set.new
end