Class: DEVp2p::ConnectionMonitor
- Inherits:
-
Object
- Object
- DEVp2p::ConnectionMonitor
- Includes:
- Concurrent::Async
- Defined in:
- lib/devp2p/connection_monitor.rb
Overview
monitors the connection by sending pings and checking pongs
Instance Method Summary collapse
-
#initialize(proto) ⇒ ConnectionMonitor
constructor
A new instance of ConnectionMonitor.
- #latency(num_samples = @max_samples) ⇒ Object
- #start ⇒ Object
- #stop ⇒ Object
Constructor Details
#initialize(proto) ⇒ ConnectionMonitor
Returns a new instance of ConnectionMonitor.
11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 |
# File 'lib/devp2p/connection_monitor.rb', line 11 def initialize(proto) @proto = proto logger.debug "init" raise ArgumentError, 'protocol must be P2PProtocol' unless proto.is_a?(P2PProtocol) @samples = [] @last_response = @last_request = Time.now @ping_interval = 15 @response_delay_threshold = 120 @max_samples = 1000 track_response = ->(proto, **data) { @last_response = Time.now @samples.unshift(@last_response - @last_request) @samples.pop if @samples.size > @max_samples } @proto.receive_pong_callbacks.push(track_response) monitor = self # FIXME: sleep 1 to make sure ConnectionMonitor start after connection of # other protocols like ETHProtocol @proto.receive_hello_callbacks.push(->(p, **kwargs) { sleep 1; monitor.start }) end |
Instance Method Details
#latency(num_samples = @max_samples) ⇒ Object
37 38 39 40 41 |
# File 'lib/devp2p/connection_monitor.rb', line 37 def latency(num_samples=@max_samples) num_samples = [num_samples, @samples.size].min return 1 unless num_samples > 0 (0...num_samples).map {|i| @samples[i] }.reduce(0, &:+) end |
#start ⇒ Object
43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 |
# File 'lib/devp2p/connection_monitor.rb', line 43 def start logger.debug 'started', monitor: self logger.debug 'pinging', monitor: self @proto.async.send_ping now = @last_request = Time.now @task = Concurrent::TimerTask.new(execution_interval: @ping_interval) do logger.debug('latency', peer: @proto, latency: ("%.3f" % latency)) if now - @last_response > @response_delay_threshold logger.debug "unresponsive_peer", monitor: self @proto.peer.async.report_error 'not responding to ping' @proto.async.stop end logger.debug 'pinging', monitor: self @proto.async.send_ping now = @last_request = Time.now end @task.execute rescue puts $! puts $!.backtrace[0,10].join("\n") end |
#stop ⇒ Object
69 70 71 72 73 |
# File 'lib/devp2p/connection_monitor.rb', line 69 def stop logger.debug 'stopped', monitor: self @task.shutdown @task = nil end |