Class: Protobuf::Rpc::Zmq::Server
- Inherits:
-
Object
- Object
- Protobuf::Rpc::Zmq::Server
show all
- Includes:
- Util
- Defined in:
- lib/protobuf/rpc/servers/zmq/server.rb
Constant Summary
collapse
- DEFAULT_OPTIONS =
{
:beacon_interval => 5,
:broadcast_beacons => false,
:broadcast_busy => false,
:zmq_inproc => true,
}
Instance Attribute Summary collapse
Instance Method Summary
collapse
Methods included from Util
#log_signature, #resolve_ip, #zmq_error_check
included, #log_exception, #log_signature, #sign_message
Constructor Details
#initialize(options) ⇒ Server
Returns a new instance of Server.
24
25
26
27
28
29
30
31
32
33
34
|
# File 'lib/protobuf/rpc/servers/zmq/server.rb', line 24
def initialize(options)
@options = DEFAULT_OPTIONS.merge(options)
@workers = []
init_zmq_context
init_beacon_socket if broadcast_beacons?
init_shutdown_pipe
rescue
teardown
raise
end
|
Instance Attribute Details
#options ⇒ Object
Returns the value of attribute options.
21
22
23
|
# File 'lib/protobuf/rpc/servers/zmq/server.rb', line 21
def options
@options
end
|
#workers ⇒ Object
Returns the value of attribute workers.
21
22
23
|
# File 'lib/protobuf/rpc/servers/zmq/server.rb', line 21
def workers
@workers
end
|
#zmq_context ⇒ Object
Returns the value of attribute zmq_context.
22
23
24
|
# File 'lib/protobuf/rpc/servers/zmq/server.rb', line 22
def zmq_context
@zmq_context
end
|
Instance Method Details
#add_worker ⇒ Object
36
37
38
|
# File 'lib/protobuf/rpc/servers/zmq/server.rb', line 36
def add_worker
@total_workers = total_workers + 1
end
|
#all_workers_busy? ⇒ Boolean
40
41
42
|
# File 'lib/protobuf/rpc/servers/zmq/server.rb', line 40
def all_workers_busy?
workers.all? { |thread| !!thread[:busy] }
end
|
#backend_port ⇒ Object
44
45
46
|
# File 'lib/protobuf/rpc/servers/zmq/server.rb', line 44
def backend_port
options[:worker_port] || frontend_port + 1
end
|
#backend_uri ⇒ Object
48
49
50
51
52
53
54
|
# File 'lib/protobuf/rpc/servers/zmq/server.rb', line 48
def backend_uri
if inproc?
"inproc://#{backend_ip}:#{backend_port}"
else
"tcp://#{backend_ip}:#{backend_port}"
end
end
|
#beacon_interval ⇒ Object
56
57
58
|
# File 'lib/protobuf/rpc/servers/zmq/server.rb', line 56
def beacon_interval
[options[:beacon_interval].to_i, 1].max
end
|
#beacon_ip ⇒ Object
60
61
62
|
# File 'lib/protobuf/rpc/servers/zmq/server.rb', line 60
def beacon_ip
"255.255.255.255"
end
|
#beacon_port ⇒ Object
64
65
66
67
68
69
70
71
72
73
74
|
# File 'lib/protobuf/rpc/servers/zmq/server.rb', line 64
def beacon_port
unless @beacon_port
unless port = options[:beacon_port]
port = ::Protobuf::Rpc::ServiceDirectory.port
end
@beacon_port = port.to_i
end
@beacon_port
end
|
#beacon_uri ⇒ Object
76
77
78
|
# File 'lib/protobuf/rpc/servers/zmq/server.rb', line 76
def beacon_uri
"udp://#{beacon_ip}:#{beacon_port}"
end
|
#broadcast_beacons? ⇒ Boolean
80
81
82
|
# File 'lib/protobuf/rpc/servers/zmq/server.rb', line 80
def broadcast_beacons?
!brokerless? && options[:broadcast_beacons]
end
|
#broadcast_flatline ⇒ Object
#broadcast_heartbeat ⇒ Object
93
94
95
96
97
98
99
100
101
102
103
104
|
# File 'lib/protobuf/rpc/servers/zmq/server.rb', line 93
def broadcast_heartbeat
@last_beacon = Time.now.to_i
heartbeat = ::Protobuf::Rpc::DynamicDiscovery::Beacon.new(
:beacon_type => ::Protobuf::Rpc::DynamicDiscovery::BeaconType::HEARTBEAT,
:server => self.to_proto
)
@beacon_socket.send(heartbeat.encode, 0)
log_debug { sign_message("sent heartbeat to #{beacon_uri}") }
end
|
#broadcast_heartbeat? ⇒ Boolean
106
107
108
|
# File 'lib/protobuf/rpc/servers/zmq/server.rb', line 106
def broadcast_heartbeat?
Time.now.to_i >= next_beacon && broadcast_beacons?
end
|
#brokerless? ⇒ Boolean
110
111
112
|
# File 'lib/protobuf/rpc/servers/zmq/server.rb', line 110
def brokerless?
!!options[:workers_only]
end
|
#busy_worker_count ⇒ Object
114
115
116
|
# File 'lib/protobuf/rpc/servers/zmq/server.rb', line 114
def busy_worker_count
workers.count { |thread| !!thread[:busy] }
end
|
#frontend_ip ⇒ Object
Also known as:
backend_ip
118
119
120
|
# File 'lib/protobuf/rpc/servers/zmq/server.rb', line 118
def frontend_ip
@frontend_ip ||= resolve_ip(options[:host])
end
|
#frontend_port ⇒ Object
123
124
125
|
# File 'lib/protobuf/rpc/servers/zmq/server.rb', line 123
def frontend_port
options[:port]
end
|
#frontend_uri ⇒ Object
127
128
129
|
# File 'lib/protobuf/rpc/servers/zmq/server.rb', line 127
def frontend_uri
"tcp://#{frontend_ip}:#{frontend_port}"
end
|
#inproc? ⇒ Boolean
131
132
133
|
# File 'lib/protobuf/rpc/servers/zmq/server.rb', line 131
def inproc?
!!self.options[:zmq_inproc]
end
|
#maintenance_timeout ⇒ Object
135
136
137
|
# File 'lib/protobuf/rpc/servers/zmq/server.rb', line 135
def maintenance_timeout
next_maintenance - Time.now.to_i
end
|
#minimum_timeout ⇒ Object
146
147
148
|
# File 'lib/protobuf/rpc/servers/zmq/server.rb', line 146
def minimum_timeout
0.1
end
|
#next_beacon ⇒ Object
150
151
152
153
154
155
156
|
# File 'lib/protobuf/rpc/servers/zmq/server.rb', line 150
def next_beacon
if @last_beacon.nil?
0
else
@last_beacon + beacon_interval
end
end
|
#next_maintenance ⇒ Object
139
140
141
142
143
144
|
# File 'lib/protobuf/rpc/servers/zmq/server.rb', line 139
def next_maintenance
cycles = [next_reaping]
cycles << next_beacon if broadcast_beacons?
cycles.min
end
|
#next_reaping ⇒ Object
158
159
160
161
162
163
164
|
# File 'lib/protobuf/rpc/servers/zmq/server.rb', line 158
def next_reaping
if @last_reaping.nil?
0
else
@last_reaping + reaping_interval
end
end
|
#reap_dead_workers ⇒ Object
166
167
168
169
170
171
172
|
# File 'lib/protobuf/rpc/servers/zmq/server.rb', line 166
def reap_dead_workers
@last_reaping = Time.now.to_i
@workers.keep_if do |worker|
worker.alive? or worker.join && false
end
end
|
#reap_dead_workers? ⇒ Boolean
174
175
176
|
# File 'lib/protobuf/rpc/servers/zmq/server.rb', line 174
def reap_dead_workers?
Time.now.to_i >= next_reaping
end
|
#reaping_interval ⇒ Object
178
179
180
|
# File 'lib/protobuf/rpc/servers/zmq/server.rb', line 178
def reaping_interval
5
end
|
#run ⇒ Object
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
|
# File 'lib/protobuf/rpc/servers/zmq/server.rb', line 182
def run
@running = true
start_broker unless brokerless?
start_missing_workers
yield if block_given? wait_for_shutdown_signal
broadcast_flatline if broadcast_beacons?
Thread.pass until reap_dead_workers.empty?
@broker.join unless brokerless?
ensure
@running = false
teardown
end
|
#running? ⇒ Boolean
198
199
200
|
# File 'lib/protobuf/rpc/servers/zmq/server.rb', line 198
def running?
!!@running
end
|
#start_missing_workers ⇒ Object
202
203
204
205
206
207
208
209
|
# File 'lib/protobuf/rpc/servers/zmq/server.rb', line 202
def start_missing_workers
missing_workers = total_workers - @workers.size
if missing_workers > 0
missing_workers.times { start_worker }
log_debug { sign_message("#{total_workers} workers started") }
end
end
|
#stop ⇒ Object
211
212
213
214
|
# File 'lib/protobuf/rpc/servers/zmq/server.rb', line 211
def stop
@running = false
@shutdown_w.write('.')
end
|
#teardown ⇒ Object
216
217
218
219
220
221
222
|
# File 'lib/protobuf/rpc/servers/zmq/server.rb', line 216
def teardown
@shutdown_r.try(:close)
@shutdown_w.try(:close)
@beacon_socket.try(:close)
@zmq_context.try(:terminate)
@last_reaping = @last_beacon = @timeout = nil
end
|
#timeout ⇒ Object
228
229
230
231
232
233
234
|
# File 'lib/protobuf/rpc/servers/zmq/server.rb', line 228
def timeout
if @timeout.nil?
@timeout = 0
else
@timeout = [minimum_timeout, maintenance_timeout].max
end
end
|
#total_workers ⇒ Object
224
225
226
|
# File 'lib/protobuf/rpc/servers/zmq/server.rb', line 224
def total_workers
@total_workers ||= [@options[:threads].to_i, 1].max
end
|
#uuid ⇒ Object
246
247
248
|
# File 'lib/protobuf/rpc/servers/zmq/server.rb', line 246
def uuid
@uuid ||= SecureRandom.uuid
end
|
#wait_for_shutdown_signal ⇒ Object
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
|
# File 'lib/protobuf/rpc/servers/zmq/server.rb', line 250
def wait_for_shutdown_signal
loop do
break if IO.select([@shutdown_r], nil, nil, timeout)
if reap_dead_workers?
reap_dead_workers
start_missing_workers
end
if broadcast_heartbeat?
if all_workers_busy? && options[:broadcast_busy]
broadcast_flatline
else
broadcast_heartbeat
end
end
end
end
|