Class: Protobuf::Rpc::Zmq::Server

Inherits:
Object
  • Object
show all
Includes:
Util
Defined in:
lib/protobuf/rpc/servers/zmq/server.rb

Constant Summary collapse

DEFAULT_OPTIONS =
{
  :beacon_interval => 5,
  :broadcast_beacons => false,
  :broadcast_busy => false,
  :zmq_inproc => true,
}

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Util

#log_signature, #resolve_ip, #zmq_error_check

Methods included from Logger::LogMethods

included, #log_exception, #log_signature, #sign_message

Constructor Details

#initialize(options) ⇒ Server

Returns a new instance of Server.



24
25
26
27
28
29
30
31
32
33
34
# File 'lib/protobuf/rpc/servers/zmq/server.rb', line 24

def initialize(options)
  @options = DEFAULT_OPTIONS.merge(options)
  @workers = []

  init_zmq_context
  init_beacon_socket if broadcast_beacons?
  init_shutdown_pipe
rescue
  teardown
  raise
end

Instance Attribute Details

#optionsObject

Returns the value of attribute options.



21
22
23
# File 'lib/protobuf/rpc/servers/zmq/server.rb', line 21

def options
  @options
end

#workersObject

Returns the value of attribute workers.



21
22
23
# File 'lib/protobuf/rpc/servers/zmq/server.rb', line 21

def workers
  @workers
end

#zmq_contextObject (readonly)

Returns the value of attribute zmq_context.



22
23
24
# File 'lib/protobuf/rpc/servers/zmq/server.rb', line 22

def zmq_context
  @zmq_context
end

Instance Method Details

#add_workerObject



36
37
38
# File 'lib/protobuf/rpc/servers/zmq/server.rb', line 36

def add_worker
  @total_workers = total_workers + 1
end

#all_workers_busy?Boolean

Returns:

  • (Boolean)


40
41
42
# File 'lib/protobuf/rpc/servers/zmq/server.rb', line 40

def all_workers_busy?
  workers.all? { |thread| !!thread[:busy] }
end

#backend_portObject



44
45
46
# File 'lib/protobuf/rpc/servers/zmq/server.rb', line 44

def backend_port
  options[:worker_port] || frontend_port + 1
end

#backend_uriObject



48
49
50
51
52
53
54
# File 'lib/protobuf/rpc/servers/zmq/server.rb', line 48

def backend_uri
  if inproc?
    "inproc://#{backend_ip}:#{backend_port}"
  else
    "tcp://#{backend_ip}:#{backend_port}"
  end
end

#beacon_intervalObject



56
57
58
# File 'lib/protobuf/rpc/servers/zmq/server.rb', line 56

def beacon_interval
  [options[:beacon_interval].to_i, 1].max
end

#beacon_ipObject



60
61
62
# File 'lib/protobuf/rpc/servers/zmq/server.rb', line 60

def beacon_ip
  "255.255.255.255"
end

#beacon_portObject



64
65
66
67
68
69
70
71
72
73
74
# File 'lib/protobuf/rpc/servers/zmq/server.rb', line 64

def beacon_port
  unless @beacon_port
    unless port = options[:beacon_port]
      port = ::Protobuf::Rpc::ServiceDirectory.port
    end

    @beacon_port = port.to_i
  end

  @beacon_port
end

#beacon_uriObject



76
77
78
# File 'lib/protobuf/rpc/servers/zmq/server.rb', line 76

def beacon_uri
  "udp://#{beacon_ip}:#{beacon_port}"
end

#broadcast_beacons?Boolean

Returns:

  • (Boolean)


80
81
82
# File 'lib/protobuf/rpc/servers/zmq/server.rb', line 80

def broadcast_beacons?
  !brokerless? && options[:broadcast_beacons]
end

#broadcast_flatlineObject



84
85
86
87
88
89
90
91
# File 'lib/protobuf/rpc/servers/zmq/server.rb', line 84

def broadcast_flatline
  flatline = ::Protobuf::Rpc::DynamicDiscovery::Beacon.new(
    :beacon_type => ::Protobuf::Rpc::DynamicDiscovery::BeaconType::FLATLINE,
    :server => self.to_proto
  )

  @beacon_socket.send(flatline.encode, 0)
end

#broadcast_heartbeatObject



93
94
95
96
97
98
99
100
101
102
103
104
# File 'lib/protobuf/rpc/servers/zmq/server.rb', line 93

def broadcast_heartbeat
  @last_beacon = Time.now.to_i

  heartbeat = ::Protobuf::Rpc::DynamicDiscovery::Beacon.new(
    :beacon_type => ::Protobuf::Rpc::DynamicDiscovery::BeaconType::HEARTBEAT,
    :server => self.to_proto
  )

  @beacon_socket.send(heartbeat.encode, 0)

  log_debug { sign_message("sent heartbeat to #{beacon_uri}") }
end

#broadcast_heartbeat?Boolean

Returns:

  • (Boolean)


106
107
108
# File 'lib/protobuf/rpc/servers/zmq/server.rb', line 106

def broadcast_heartbeat?
  Time.now.to_i >= next_beacon && broadcast_beacons?
end

#brokerless?Boolean

Returns:

  • (Boolean)


110
111
112
# File 'lib/protobuf/rpc/servers/zmq/server.rb', line 110

def brokerless?
  !!options[:workers_only]
end

#busy_worker_countObject



114
115
116
# File 'lib/protobuf/rpc/servers/zmq/server.rb', line 114

def busy_worker_count
  workers.count { |thread| !!thread[:busy] }
end

#frontend_ipObject Also known as: backend_ip



118
119
120
# File 'lib/protobuf/rpc/servers/zmq/server.rb', line 118

def frontend_ip
  @frontend_ip ||= resolve_ip(options[:host])
end

#frontend_portObject



123
124
125
# File 'lib/protobuf/rpc/servers/zmq/server.rb', line 123

def frontend_port
  options[:port]
end

#frontend_uriObject



127
128
129
# File 'lib/protobuf/rpc/servers/zmq/server.rb', line 127

def frontend_uri
  "tcp://#{frontend_ip}:#{frontend_port}"
end

#inproc?Boolean

Returns:

  • (Boolean)


131
132
133
# File 'lib/protobuf/rpc/servers/zmq/server.rb', line 131

def inproc?
  !!self.options[:zmq_inproc]
end

#maintenance_timeoutObject



135
136
137
# File 'lib/protobuf/rpc/servers/zmq/server.rb', line 135

def maintenance_timeout
  next_maintenance - Time.now.to_i
end

#minimum_timeoutObject



146
147
148
# File 'lib/protobuf/rpc/servers/zmq/server.rb', line 146

def minimum_timeout
  0.1
end

#next_beaconObject



150
151
152
153
154
155
156
# File 'lib/protobuf/rpc/servers/zmq/server.rb', line 150

def next_beacon
  if @last_beacon.nil?
    0
  else
    @last_beacon + beacon_interval
  end
end

#next_maintenanceObject



139
140
141
142
143
144
# File 'lib/protobuf/rpc/servers/zmq/server.rb', line 139

def next_maintenance
  cycles = [next_reaping]
  cycles << next_beacon if broadcast_beacons?

  cycles.min
end

#next_reapingObject



158
159
160
161
162
163
164
# File 'lib/protobuf/rpc/servers/zmq/server.rb', line 158

def next_reaping
  if @last_reaping.nil?
    0
  else
    @last_reaping + reaping_interval
  end
end

#reap_dead_workersObject



166
167
168
169
170
171
172
# File 'lib/protobuf/rpc/servers/zmq/server.rb', line 166

def reap_dead_workers
  @last_reaping = Time.now.to_i

  @workers.keep_if do |worker|
    worker.alive? or worker.join && false
  end
end

#reap_dead_workers?Boolean

Returns:

  • (Boolean)


174
175
176
# File 'lib/protobuf/rpc/servers/zmq/server.rb', line 174

def reap_dead_workers?
  Time.now.to_i >= next_reaping
end

#reaping_intervalObject



178
179
180
# File 'lib/protobuf/rpc/servers/zmq/server.rb', line 178

def reaping_interval
  5
end

#runObject



182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
# File 'lib/protobuf/rpc/servers/zmq/server.rb', line 182

def run
  @running = true

  start_broker unless brokerless?
  start_missing_workers

  yield if block_given? # runs on startup
  wait_for_shutdown_signal
  broadcast_flatline if broadcast_beacons?
  Thread.pass until reap_dead_workers.empty?
  @broker.join unless brokerless?
ensure
  @running = false
  teardown
end

#running?Boolean

Returns:

  • (Boolean)


198
199
200
# File 'lib/protobuf/rpc/servers/zmq/server.rb', line 198

def running?
  !!@running
end

#start_missing_workersObject



202
203
204
205
206
207
208
209
# File 'lib/protobuf/rpc/servers/zmq/server.rb', line 202

def start_missing_workers
  missing_workers = total_workers - @workers.size

  if missing_workers > 0
    missing_workers.times { start_worker }
    log_debug { sign_message("#{total_workers} workers started") }
  end
end

#stopObject



211
212
213
214
# File 'lib/protobuf/rpc/servers/zmq/server.rb', line 211

def stop
  @running = false
  @shutdown_w.write('.')
end

#teardownObject



216
217
218
219
220
221
222
# File 'lib/protobuf/rpc/servers/zmq/server.rb', line 216

def teardown
  @shutdown_r.try(:close)
  @shutdown_w.try(:close)
  @beacon_socket.try(:close)
  @zmq_context.try(:terminate)
  @last_reaping = @last_beacon = @timeout = nil
end

#timeoutObject



228
229
230
231
232
233
234
# File 'lib/protobuf/rpc/servers/zmq/server.rb', line 228

def timeout
  if @timeout.nil?
    @timeout = 0
  else
    @timeout = [minimum_timeout, maintenance_timeout].max
  end
end

#to_protoObject



236
237
238
239
240
241
242
243
244
# File 'lib/protobuf/rpc/servers/zmq/server.rb', line 236

def to_proto
  @proto ||= ::Protobuf::Rpc::DynamicDiscovery::Server.new(
    :uuid => uuid,
    :address => frontend_ip,
    :port => frontend_port.to_s,
    :ttl => (beacon_interval * 1.5).ceil,
    :services => ::Protobuf::Rpc::Service.implemented_services
  )
end

#total_workersObject



224
225
226
# File 'lib/protobuf/rpc/servers/zmq/server.rb', line 224

def total_workers
  @total_workers ||= [@options[:threads].to_i, 1].max
end

#uuidObject



246
247
248
# File 'lib/protobuf/rpc/servers/zmq/server.rb', line 246

def uuid
  @uuid ||= SecureRandom.uuid
end

#wait_for_shutdown_signalObject



250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
# File 'lib/protobuf/rpc/servers/zmq/server.rb', line 250

def wait_for_shutdown_signal
  loop do
    break if IO.select([@shutdown_r], nil, nil, timeout)

    if reap_dead_workers?
      reap_dead_workers
      start_missing_workers
    end

    if broadcast_heartbeat?
      if all_workers_busy? && options[:broadcast_busy]
        broadcast_flatline
      else
        broadcast_heartbeat
      end
    end

  end
end