Class: DEVp2p::ConnectionMonitor

Inherits:
Object
  • Object
show all
Includes:
Concurrent::Async
Defined in:
lib/devp2p/connection_monitor.rb

Overview

monitors the connection by sending pings and checking pongs

Instance Method Summary collapse

Constructor Details

#initialize(proto) ⇒ ConnectionMonitor

Returns a new instance of ConnectionMonitor.

Raises:

  • (ArgumentError)


11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# File 'lib/devp2p/connection_monitor.rb', line 11

def initialize(proto)
  @proto = proto

  logger.debug "init"
  raise ArgumentError, 'protocol must be P2PProtocol' unless proto.is_a?(P2PProtocol)

  @samples = []
  @last_response = @last_request = Time.now

  @ping_interval = 15
  @response_delay_threshold = 120
  @max_samples = 1000

  track_response = ->(proto, **data) {
    @last_response = Time.now
    @samples.unshift(@last_response - @last_request)
    @samples.pop if @samples.size > @max_samples
  }
  @proto.receive_pong_callbacks.push(track_response)

  monitor = self
  # FIXME: sleep 1 to make sure ConnectionMonitor start after connection of
  # other protocols like ETHProtocol
  @proto.receive_hello_callbacks.push(->(p, **kwargs) { sleep 1; monitor.start })
end

Instance Method Details

#latency(num_samples = @max_samples) ⇒ Object



37
38
39
40
41
# File 'lib/devp2p/connection_monitor.rb', line 37

def latency(num_samples=@max_samples)
  num_samples = [num_samples, @samples.size].min
  return 1 unless num_samples > 0
  (0...num_samples).map {|i| @samples[i] }.reduce(0, &:+)
end

#startObject



43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
# File 'lib/devp2p/connection_monitor.rb', line 43

def start
  logger.debug 'started', monitor: self

  logger.debug 'pinging', monitor: self
  @proto.async.send_ping
  now = @last_request = Time.now

  @task = Concurrent::TimerTask.new(execution_interval: @ping_interval) do
    logger.debug('latency', peer: @proto, latency: ("%.3f" % latency))

    if now - @last_response > @response_delay_threshold
      logger.debug "unresponsive_peer", monitor: self
      @proto.peer.async.report_error 'not responding to ping'
      @proto.async.stop
    end

    logger.debug 'pinging', monitor: self
    @proto.async.send_ping
    now = @last_request = Time.now
  end
  @task.execute
rescue
  puts $!
  puts $!.backtrace[0,10].join("\n")
end

#stopObject



69
70
71
72
73
# File 'lib/devp2p/connection_monitor.rb', line 69

def stop
  logger.debug 'stopped', monitor: self
  @task.shutdown
  @task = nil
end