Class: Protobuf::Rpc::Zmq::Server

Inherits:
Object
  • Object
show all
Includes:
Util
Defined in:
lib/protobuf/rpc/servers/zmq/server.rb

Constant Summary collapse

DEFAULT_OPTIONS =
{
  :beacon_interval => 5,
  :broadcast_beacons => false,
  :broadcast_busy => false,
  :zmq_inproc => true,
}.freeze

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Util

#log_signature, #resolve_ip, #zmq_error_check

Methods included from Logging

initialize_logger, #log_exception, #log_signature, #logger, #sign_message

Constructor Details

#initialize(options) ⇒ Server

Returns a new instance of Server.



24
25
26
27
28
29
30
31
32
33
34
# File 'lib/protobuf/rpc/servers/zmq/server.rb', line 24

def initialize(options)
  @options = DEFAULT_OPTIONS.merge(options)
  @workers = []

  init_zmq_context
  init_beacon_socket if broadcast_beacons?
  init_shutdown_pipe
rescue
  teardown
  raise
end

Instance Attribute Details

#optionsObject

Returns the value of attribute options.



21
22
23
# File 'lib/protobuf/rpc/servers/zmq/server.rb', line 21

def options
  @options
end

#workersObject

Returns the value of attribute workers.



21
22
23
# File 'lib/protobuf/rpc/servers/zmq/server.rb', line 21

def workers
  @workers
end

#zmq_contextObject (readonly)

Returns the value of attribute zmq_context.



22
23
24
# File 'lib/protobuf/rpc/servers/zmq/server.rb', line 22

def zmq_context
  @zmq_context
end

Instance Method Details

#add_workerObject



36
37
38
# File 'lib/protobuf/rpc/servers/zmq/server.rb', line 36

def add_worker
  @total_workers = total_workers + 1
end

#all_workers_busy?Boolean

Returns:

  • (Boolean)


40
41
42
# File 'lib/protobuf/rpc/servers/zmq/server.rb', line 40

def all_workers_busy?
  workers.all? { |thread| !!thread[:busy] }
end

#backend_portObject



44
45
46
# File 'lib/protobuf/rpc/servers/zmq/server.rb', line 44

def backend_port
  options[:worker_port] || frontend_port + 1
end

#backend_uriObject



48
49
50
51
52
53
54
# File 'lib/protobuf/rpc/servers/zmq/server.rb', line 48

def backend_uri
  if inproc?
    "inproc://#{backend_ip}:#{backend_port}"
  else
    "tcp://#{backend_ip}:#{backend_port}"
  end
end

#beacon_intervalObject



56
57
58
# File 'lib/protobuf/rpc/servers/zmq/server.rb', line 56

def beacon_interval
  [options[:beacon_interval].to_i, 1].max
end

#beacon_ipObject



60
61
62
# File 'lib/protobuf/rpc/servers/zmq/server.rb', line 60

def beacon_ip
  "255.255.255.255"
end

#beacon_portObject



64
65
66
67
68
69
# File 'lib/protobuf/rpc/servers/zmq/server.rb', line 64

def beacon_port
  @beacon_port ||= options.fetch(
    :beacon_port,
    ::Protobuf::Rpc::ServiceDirectory.port,
  ).to_i
end

#beacon_uriObject



71
72
73
# File 'lib/protobuf/rpc/servers/zmq/server.rb', line 71

def beacon_uri
  "udp://#{beacon_ip}:#{beacon_port}"
end

#broadcast_beacons?Boolean

Returns:

  • (Boolean)


75
76
77
# File 'lib/protobuf/rpc/servers/zmq/server.rb', line 75

def broadcast_beacons?
  !brokerless? && options[:broadcast_beacons]
end

#broadcast_busy?Boolean

Returns:

  • (Boolean)


79
80
81
# File 'lib/protobuf/rpc/servers/zmq/server.rb', line 79

def broadcast_busy?
  broadcast_beacons? && options[:broadcast_busy]
end

#broadcast_flatlineObject



83
84
85
86
87
88
89
90
# File 'lib/protobuf/rpc/servers/zmq/server.rb', line 83

def broadcast_flatline
  flatline = ::Protobuf::Rpc::DynamicDiscovery::Beacon.new(
    :beacon_type => ::Protobuf::Rpc::DynamicDiscovery::BeaconType::FLATLINE,
    :server => to_proto,
  )

  @beacon_socket.send(flatline.encode, 0)
end

#broadcast_heartbeatObject



92
93
94
95
96
97
98
99
100
101
102
103
# File 'lib/protobuf/rpc/servers/zmq/server.rb', line 92

def broadcast_heartbeat
  @last_beacon = Time.now.to_i

  heartbeat = ::Protobuf::Rpc::DynamicDiscovery::Beacon.new(
    :beacon_type => ::Protobuf::Rpc::DynamicDiscovery::BeaconType::HEARTBEAT,
    :server => to_proto,
  )

  @beacon_socket.send(heartbeat.encode, 0)

  logger.debug { sign_message("sent heartbeat to #{beacon_uri}") }
end

#broadcast_heartbeat?Boolean

Returns:

  • (Boolean)


105
106
107
# File 'lib/protobuf/rpc/servers/zmq/server.rb', line 105

def broadcast_heartbeat?
  Time.now.to_i >= next_beacon && broadcast_beacons?
end

#brokerless?Boolean

Returns:

  • (Boolean)


109
110
111
# File 'lib/protobuf/rpc/servers/zmq/server.rb', line 109

def brokerless?
  !!options[:workers_only]
end

#busy_worker_countObject



113
114
115
# File 'lib/protobuf/rpc/servers/zmq/server.rb', line 113

def busy_worker_count
  workers.count { |thread| !!thread[:busy] }
end

#frontend_ipObject Also known as: backend_ip



117
118
119
# File 'lib/protobuf/rpc/servers/zmq/server.rb', line 117

def frontend_ip
  @frontend_ip ||= resolve_ip(options[:host])
end

#frontend_portObject



122
123
124
# File 'lib/protobuf/rpc/servers/zmq/server.rb', line 122

def frontend_port
  options[:port]
end

#frontend_uriObject



126
127
128
# File 'lib/protobuf/rpc/servers/zmq/server.rb', line 126

def frontend_uri
  "tcp://#{frontend_ip}:#{frontend_port}"
end

#inproc?Boolean

Returns:

  • (Boolean)


130
131
132
# File 'lib/protobuf/rpc/servers/zmq/server.rb', line 130

def inproc?
  !!options[:zmq_inproc]
end

#maintenance_timeoutObject



134
135
136
# File 'lib/protobuf/rpc/servers/zmq/server.rb', line 134

def maintenance_timeout
  next_maintenance - Time.now.to_i
end

#minimum_timeoutObject



145
146
147
# File 'lib/protobuf/rpc/servers/zmq/server.rb', line 145

def minimum_timeout
  0.1
end

#next_beaconObject



149
150
151
152
153
154
155
# File 'lib/protobuf/rpc/servers/zmq/server.rb', line 149

def next_beacon
  if @last_beacon.nil?
    0
  else
    @last_beacon + beacon_interval
  end
end

#next_maintenanceObject



138
139
140
141
142
143
# File 'lib/protobuf/rpc/servers/zmq/server.rb', line 138

def next_maintenance
  cycles = [next_reaping]
  cycles << next_beacon if broadcast_beacons?

  cycles.min
end

#next_reapingObject



157
158
159
160
161
162
163
# File 'lib/protobuf/rpc/servers/zmq/server.rb', line 157

def next_reaping
  if @last_reaping.nil?
    0
  else
    @last_reaping + reaping_interval
  end
end

#reap_dead_workersObject



165
166
167
168
169
170
171
# File 'lib/protobuf/rpc/servers/zmq/server.rb', line 165

def reap_dead_workers
  @last_reaping = Time.now.to_i

  @workers.keep_if do |worker|
    worker.alive? || worker.join && false
  end
end

#reap_dead_workers?Boolean

Returns:

  • (Boolean)


173
174
175
# File 'lib/protobuf/rpc/servers/zmq/server.rb', line 173

def reap_dead_workers?
  Time.now.to_i >= next_reaping
end

#reaping_intervalObject



177
178
179
# File 'lib/protobuf/rpc/servers/zmq/server.rb', line 177

def reaping_interval
  5
end

#runObject



181
182
183
184
185
186
187
188
189
190
191
# File 'lib/protobuf/rpc/servers/zmq/server.rb', line 181

def run
  @running = true
  yield if block_given? # runs on startup
  wait_for_shutdown_signal
  broadcast_flatline if broadcast_beacons?
  Thread.pass until reap_dead_workers.empty?
  @broker_thread.join unless brokerless?
ensure
  @running = false
  teardown
end

#running?Boolean

Returns:

  • (Boolean)


193
194
195
# File 'lib/protobuf/rpc/servers/zmq/server.rb', line 193

def running?
  !!@running
end

#start_missing_workersObject



197
198
199
200
201
202
203
204
# File 'lib/protobuf/rpc/servers/zmq/server.rb', line 197

def start_missing_workers
  missing_workers = total_workers - @workers.size

  if missing_workers > 0
    missing_workers.times { start_worker }
    logger.debug { sign_message("#{total_workers} workers started") }
  end
end

#stopObject



206
207
208
209
# File 'lib/protobuf/rpc/servers/zmq/server.rb', line 206

def stop
  @running = false
  @shutdown_w.write('.')
end

#teardownObject



211
212
213
214
215
216
217
# File 'lib/protobuf/rpc/servers/zmq/server.rb', line 211

def teardown
  @shutdown_r.try(:close)
  @shutdown_w.try(:close)
  @beacon_socket.try(:close)
  @zmq_context.try(:terminate)
  @last_reaping = @last_beacon = @timeout = nil
end

#timeoutObject



219
220
221
222
223
224
225
226
# File 'lib/protobuf/rpc/servers/zmq/server.rb', line 219

def timeout
  @timeout =
    if @timeout.nil?
      0
    else
      [minimum_timeout, maintenance_timeout].max
    end
end

#to_protoObject



232
233
234
235
236
237
238
239
240
# File 'lib/protobuf/rpc/servers/zmq/server.rb', line 232

def to_proto
  @proto ||= ::Protobuf::Rpc::DynamicDiscovery::Server.new(
    :uuid => uuid,
    :address => frontend_ip,
    :port => frontend_port.to_s,
    :ttl => (beacon_interval * 1.5).ceil,
    :services => ::Protobuf::Rpc::Service.implemented_services,
  )
end

#total_workersObject



228
229
230
# File 'lib/protobuf/rpc/servers/zmq/server.rb', line 228

def total_workers
  @total_workers ||= [@options[:threads].to_i, 1].max
end

#uuidObject



242
243
244
# File 'lib/protobuf/rpc/servers/zmq/server.rb', line 242

def uuid
  @uuid ||= SecureRandom.uuid
end

#wait_for_shutdown_signalObject



246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
# File 'lib/protobuf/rpc/servers/zmq/server.rb', line 246

def wait_for_shutdown_signal
  loop do
    break if IO.select([@shutdown_r], nil, nil, timeout)

    start_broker unless brokerless?
    reap_dead_workers if reap_dead_workers?
    start_missing_workers

    next unless broadcast_heartbeat?

    if broadcast_busy? && all_workers_busy?
      broadcast_flatline
    else
      broadcast_heartbeat
    end
  end
end