Class: Protobuf::Rpc::Zmq::Server
- Inherits:
-
Object
- Object
- Protobuf::Rpc::Zmq::Server
show all
- Includes:
- Util
- Defined in:
- lib/protobuf/rpc/servers/zmq/server.rb
Constant Summary
collapse
- DEFAULT_OPTIONS =
{
:beacon_interval => 5,
:broadcast_beacons => false,
:broadcast_busy => false,
:zmq_inproc => true,
}.freeze
Instance Attribute Summary collapse
Instance Method Summary
collapse
Methods included from Util
#log_signature, #resolve_ip, #zmq_error_check
Methods included from Logging
initialize_logger, #log_exception, #log_signature, #logger, #sign_message
Constructor Details
#initialize(options) ⇒ Server
Returns a new instance of Server.
24
25
26
27
28
29
30
31
32
33
34
|
# File 'lib/protobuf/rpc/servers/zmq/server.rb', line 24
def initialize(options)
@options = DEFAULT_OPTIONS.merge(options)
@workers = []
init_zmq_context
init_beacon_socket if broadcast_beacons?
init_shutdown_pipe
rescue
teardown
raise
end
|
Instance Attribute Details
#options ⇒ Object
Returns the value of attribute options.
21
22
23
|
# File 'lib/protobuf/rpc/servers/zmq/server.rb', line 21
def options
@options
end
|
#workers ⇒ Object
Returns the value of attribute workers.
21
22
23
|
# File 'lib/protobuf/rpc/servers/zmq/server.rb', line 21
def workers
@workers
end
|
#zmq_context ⇒ Object
Returns the value of attribute zmq_context.
22
23
24
|
# File 'lib/protobuf/rpc/servers/zmq/server.rb', line 22
def zmq_context
@zmq_context
end
|
Instance Method Details
#add_worker ⇒ Object
36
37
38
|
# File 'lib/protobuf/rpc/servers/zmq/server.rb', line 36
def add_worker
@total_workers = total_workers + 1
end
|
#all_workers_busy? ⇒ Boolean
40
41
42
|
# File 'lib/protobuf/rpc/servers/zmq/server.rb', line 40
def all_workers_busy?
workers.all? { |thread| !!thread[:busy] }
end
|
#backend_port ⇒ Object
44
45
46
|
# File 'lib/protobuf/rpc/servers/zmq/server.rb', line 44
def backend_port
options[:worker_port] || frontend_port + 1
end
|
#backend_uri ⇒ Object
48
49
50
51
52
53
54
|
# File 'lib/protobuf/rpc/servers/zmq/server.rb', line 48
def backend_uri
if inproc?
"inproc://#{backend_ip}:#{backend_port}"
else
"tcp://#{backend_ip}:#{backend_port}"
end
end
|
#beacon_interval ⇒ Object
56
57
58
|
# File 'lib/protobuf/rpc/servers/zmq/server.rb', line 56
def beacon_interval
[options[:beacon_interval].to_i, 1].max
end
|
#beacon_ip ⇒ Object
60
61
62
|
# File 'lib/protobuf/rpc/servers/zmq/server.rb', line 60
def beacon_ip
"255.255.255.255"
end
|
#beacon_port ⇒ Object
64
65
66
67
68
69
|
# File 'lib/protobuf/rpc/servers/zmq/server.rb', line 64
def beacon_port
@beacon_port ||= options.fetch(
:beacon_port,
::Protobuf::Rpc::ServiceDirectory.port,
).to_i
end
|
#beacon_uri ⇒ Object
71
72
73
|
# File 'lib/protobuf/rpc/servers/zmq/server.rb', line 71
def beacon_uri
"udp://#{beacon_ip}:#{beacon_port}"
end
|
#broadcast_beacons? ⇒ Boolean
75
76
77
|
# File 'lib/protobuf/rpc/servers/zmq/server.rb', line 75
def broadcast_beacons?
!brokerless? && options[:broadcast_beacons]
end
|
#broadcast_busy? ⇒ Boolean
79
80
81
|
# File 'lib/protobuf/rpc/servers/zmq/server.rb', line 79
def broadcast_busy?
broadcast_beacons? && options[:broadcast_busy]
end
|
#broadcast_flatline ⇒ Object
#broadcast_heartbeat ⇒ Object
92
93
94
95
96
97
98
99
100
101
102
103
|
# File 'lib/protobuf/rpc/servers/zmq/server.rb', line 92
def broadcast_heartbeat
@last_beacon = Time.now.to_i
heartbeat = ::Protobuf::Rpc::DynamicDiscovery::Beacon.new(
:beacon_type => ::Protobuf::Rpc::DynamicDiscovery::BeaconType::HEARTBEAT,
:server => to_proto,
)
@beacon_socket.send(heartbeat.encode, 0)
logger.debug { sign_message("sent heartbeat to #{beacon_uri}") }
end
|
#broadcast_heartbeat? ⇒ Boolean
105
106
107
|
# File 'lib/protobuf/rpc/servers/zmq/server.rb', line 105
def broadcast_heartbeat?
Time.now.to_i >= next_beacon && broadcast_beacons?
end
|
#brokerless? ⇒ Boolean
109
110
111
|
# File 'lib/protobuf/rpc/servers/zmq/server.rb', line 109
def brokerless?
!!options[:workers_only]
end
|
#busy_worker_count ⇒ Object
113
114
115
|
# File 'lib/protobuf/rpc/servers/zmq/server.rb', line 113
def busy_worker_count
workers.count { |thread| !!thread[:busy] }
end
|
#frontend_ip ⇒ Object
Also known as:
backend_ip
117
118
119
|
# File 'lib/protobuf/rpc/servers/zmq/server.rb', line 117
def frontend_ip
@frontend_ip ||= resolve_ip(options[:host])
end
|
#frontend_port ⇒ Object
122
123
124
|
# File 'lib/protobuf/rpc/servers/zmq/server.rb', line 122
def frontend_port
options[:port]
end
|
#frontend_uri ⇒ Object
126
127
128
|
# File 'lib/protobuf/rpc/servers/zmq/server.rb', line 126
def frontend_uri
"tcp://#{frontend_ip}:#{frontend_port}"
end
|
#inproc? ⇒ Boolean
130
131
132
|
# File 'lib/protobuf/rpc/servers/zmq/server.rb', line 130
def inproc?
!!options[:zmq_inproc]
end
|
#maintenance_timeout ⇒ Object
134
135
136
|
# File 'lib/protobuf/rpc/servers/zmq/server.rb', line 134
def maintenance_timeout
next_maintenance - Time.now.to_i
end
|
#minimum_timeout ⇒ Object
145
146
147
|
# File 'lib/protobuf/rpc/servers/zmq/server.rb', line 145
def minimum_timeout
0.1
end
|
#next_beacon ⇒ Object
149
150
151
152
153
154
155
|
# File 'lib/protobuf/rpc/servers/zmq/server.rb', line 149
def next_beacon
if @last_beacon.nil?
0
else
@last_beacon + beacon_interval
end
end
|
#next_maintenance ⇒ Object
138
139
140
141
142
143
|
# File 'lib/protobuf/rpc/servers/zmq/server.rb', line 138
def next_maintenance
cycles = [next_reaping]
cycles << next_beacon if broadcast_beacons?
cycles.min
end
|
#next_reaping ⇒ Object
157
158
159
160
161
162
163
|
# File 'lib/protobuf/rpc/servers/zmq/server.rb', line 157
def next_reaping
if @last_reaping.nil?
0
else
@last_reaping + reaping_interval
end
end
|
#reap_dead_workers ⇒ Object
165
166
167
168
169
170
171
|
# File 'lib/protobuf/rpc/servers/zmq/server.rb', line 165
def reap_dead_workers
@last_reaping = Time.now.to_i
@workers.keep_if do |worker|
worker.alive? || worker.join && false
end
end
|
#reap_dead_workers? ⇒ Boolean
173
174
175
|
# File 'lib/protobuf/rpc/servers/zmq/server.rb', line 173
def reap_dead_workers?
Time.now.to_i >= next_reaping
end
|
#reaping_interval ⇒ Object
177
178
179
|
# File 'lib/protobuf/rpc/servers/zmq/server.rb', line 177
def reaping_interval
5
end
|
#run ⇒ Object
181
182
183
184
185
186
187
188
189
190
191
|
# File 'lib/protobuf/rpc/servers/zmq/server.rb', line 181
def run
@running = true
yield if block_given? wait_for_shutdown_signal
broadcast_flatline if broadcast_beacons?
Thread.pass until reap_dead_workers.empty?
@broker_thread.join unless brokerless?
ensure
@running = false
teardown
end
|
#running? ⇒ Boolean
193
194
195
|
# File 'lib/protobuf/rpc/servers/zmq/server.rb', line 193
def running?
!!@running
end
|
#start_missing_workers ⇒ Object
197
198
199
200
201
202
203
204
|
# File 'lib/protobuf/rpc/servers/zmq/server.rb', line 197
def start_missing_workers
missing_workers = total_workers - @workers.size
if missing_workers > 0
missing_workers.times { start_worker }
logger.debug { sign_message("#{total_workers} workers started") }
end
end
|
#stop ⇒ Object
206
207
208
209
|
# File 'lib/protobuf/rpc/servers/zmq/server.rb', line 206
def stop
@running = false
@shutdown_w.write('.')
end
|
#teardown ⇒ Object
211
212
213
214
215
216
217
|
# File 'lib/protobuf/rpc/servers/zmq/server.rb', line 211
def teardown
@shutdown_r.try(:close)
@shutdown_w.try(:close)
@beacon_socket.try(:close)
@zmq_context.try(:terminate)
@last_reaping = @last_beacon = @timeout = nil
end
|
#timeout ⇒ Object
219
220
221
222
223
224
225
226
|
# File 'lib/protobuf/rpc/servers/zmq/server.rb', line 219
def timeout
@timeout =
if @timeout.nil?
0
else
[minimum_timeout, maintenance_timeout].max
end
end
|
#total_workers ⇒ Object
228
229
230
|
# File 'lib/protobuf/rpc/servers/zmq/server.rb', line 228
def total_workers
@total_workers ||= [@options[:threads].to_i, 1].max
end
|
#uuid ⇒ Object
242
243
244
|
# File 'lib/protobuf/rpc/servers/zmq/server.rb', line 242
def uuid
@uuid ||= SecureRandom.uuid
end
|
#wait_for_shutdown_signal ⇒ Object
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
|
# File 'lib/protobuf/rpc/servers/zmq/server.rb', line 246
def wait_for_shutdown_signal
loop do
break if IO.select([@shutdown_r], nil, nil, timeout)
start_broker unless brokerless?
reap_dead_workers if reap_dead_workers?
start_missing_workers
next unless broadcast_heartbeat?
if broadcast_busy? && all_workers_busy?
broadcast_flatline
else
broadcast_heartbeat
end
end
end
|