Class: Protobuf::Rpc::Socket::Server
- Inherits:
-
Object
- Object
- Protobuf::Rpc::Socket::Server
show all
- Includes:
- Logging
- Defined in:
- lib/protobuf/rpc/servers/socket/server.rb
Constant Summary
collapse
- AUTO_COLLECT_TIMEOUT =
5
Instance Attribute Summary collapse
Instance Method Summary
collapse
Methods included from Logging
initialize_logger, #log_exception, #logger, #sign_message
Constructor Details
#initialize(options) ⇒ Server
Returns a new instance of Server.
23
24
25
26
27
28
29
|
# File 'lib/protobuf/rpc/servers/socket/server.rb', line 23
def initialize(options)
self.running = false
self.host = options.fetch(:host)
self.port = options.fetch(:port)
self.backlog = options.fetch(:backlog, 100)
self.threshold = options.fetch(:threshold, 100)
end
|
Instance Attribute Details
#running ⇒ Object
Also known as:
running?
Returns the value of attribute running.
20
21
22
|
# File 'lib/protobuf/rpc/servers/socket/server.rb', line 20
def running
@running
end
|
Instance Method Details
#cleanup? ⇒ Boolean
39
40
41
42
|
# File 'lib/protobuf/rpc/servers/socket/server.rb', line 39
def cleanup?
!threads.empty? && threads.size % threshold == 0
end
|
#cleanup_threads ⇒ Object
44
45
46
47
48
49
50
51
52
53
54
55
|
# File 'lib/protobuf/rpc/servers/socket/server.rb', line 44
def cleanup_threads
logger.debug { sign_message("Thread cleanup - #{threads.size} - start") }
threads.delete_if do |hash|
unless (thread = hash.fetch(:thread)).alive?
thread.join
working.delete(hash.fetch(:socket))
end
end
logger.debug { sign_message("Thread cleanup - #{threads.size} - complete") }
end
|
#log_signature ⇒ Object
57
58
59
|
# File 'lib/protobuf/rpc/servers/socket/server.rb', line 57
def log_signature
@_log_signature ||= "[server-#{self.class.name}]"
end
|
#new_worker(socket) ⇒ Object
61
62
63
64
65
|
# File 'lib/protobuf/rpc/servers/socket/server.rb', line 61
def new_worker(socket)
Thread.new(socket) do |sock|
::Protobuf::Rpc::Socket::Worker.new(sock, &:close)
end
end
|
#run ⇒ Object
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
|
# File 'lib/protobuf/rpc/servers/socket/server.rb', line 67
def run
logger.debug { sign_message("Run") }
server = ::TCPServer.new(host, port)
fail "The server was unable to start properly." if server.closed?
begin
server.listen(backlog)
listen_fds = [server]
self.running = true
while running?
logger.debug { sign_message("Waiting for connections") }
ready_cnxns = begin
IO.select(listen_fds, [], [], AUTO_COLLECT_TIMEOUT)
rescue IOError
nil
end
if ready_cnxns
ready_cnxns.first.each do |client|
case
when !running?
when client == server
logger.debug { sign_message("Accepted new connection") }
client, _sockaddr = server.accept
listen_fds << client
else
unless working.include?(client)
working << listen_fds.delete(client)
logger.debug { sign_message("Working") }
threads << { :thread => new_worker(client), :socket => client }
cleanup_threads if cleanup?
end
end
end
elsif threads.size > 1
cleanup_threads
end
end
ensure
server.close
end
end
|
#stop ⇒ Object
115
116
117
|
# File 'lib/protobuf/rpc/servers/socket/server.rb', line 115
def stop
self.running = false
end
|
#threads ⇒ Object
31
32
33
|
# File 'lib/protobuf/rpc/servers/socket/server.rb', line 31
def threads
@threads ||= []
end
|
#working ⇒ Object
35
36
37
|
# File 'lib/protobuf/rpc/servers/socket/server.rb', line 35
def working
@working ||= Set.new
end
|