Class: DEVp2p::PeerManager

Inherits:
WiredService show all
Defined in:
lib/devp2p/peer_manager.rb

Overview

connection strategy

for service which requires peers
  while peers.size > min_num_peers
    gen random id
    resolve closest node address
    [ideally know their services]
    connect closest node

Defined Under Namespace

Classes: ServiceListener

Instance Attribute Summary

Attributes inherited from WiredService

#wire_protocol

Attributes inherited from Service

#app

Instance Method Summary collapse

Methods inherited from WiredService

#on_wire_protocol_start, #on_wire_protocol_stop

Methods inherited from Service

register_with_app, #to_s

Methods included from Configurable

#add_config

Constructor Details

#initialize(app) ⇒ PeerManager



68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
# File 'lib/devp2p/peer_manager.rb', line 68

def initialize(app)
  super(app)

  logger.info "PeerManager init"

  @peers = []
  @excluded = []
  @errors = @config[:log_disconnects] ? PeerErrors.new : PeerErrorsBase.new

  @wire_protocol = P2PProtocol

  # setup nodeid based on privkey
  unless @config[:p2p].has_key?(:id)
    @config[:node][:id] = Crypto.privtopub Utils.decode_hex(@config[:node][:privkey_hex])
  end

  @connect_timeout = 2.0
  @connect_loop_delay = 0.5
  @discovery_delay = 0.5

  @host = @config[:p2p][:listen_host]
  @port = @config[:p2p][:listen_port]

  @stopped = false
end

Instance Method Details

#add(peer) ⇒ Object



123
124
125
# File 'lib/devp2p/peer_manager.rb', line 123

def add(peer)
  @peers.push peer
end

#add_error(*args) ⇒ Object



216
217
218
# File 'lib/devp2p/peer_manager.rb', line 216

def add_error(*args)
  @errors.add *args
end

#broadcast(protocol, command_name, args = [], kwargs = {}, num_peers = nil, exclude_peers = []) ⇒ Object

Raises:

  • (ArgumentError)


157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
# File 'lib/devp2p/peer_manager.rb', line 157

def broadcast(protocol, command_name, args=[], kwargs={}, num_peers=nil, exclude_peers=[])
  logger.debug "broadcasting", protocol: protocol, command: command_name, num_peers: num_peers, exclude_peers: exclude_peers.map(&:to_s)
  raise ArgumentError, 'invalid num_peers' unless num_peers.nil? || num_peers > 0

  peers_with_proto = @peers.select {|p| p.protocols.include?(protocol) && !exclude_peers.include?(p) }
  if peers_with_proto.empty?
    logger.debug "no peers with protocol found", protos: @peers.select {|p| p.protocols }
  end

  num_peers ||= peers_with_proto.size
  peers_with_proto.sample([num_peers, peers_with_proto.size].min).each do |peer|
    logger.debug "broadcasting to", proto: peer.protocols[protocol]

    args.push kwargs
    peer.protocols[protocol].send "send_#{command_name}", *args

    peer.safe_to_read.wait
    logger.debug "broadcasting done", ts: Time.now
  end
end

#connect(host, port, remote_pubkey) ⇒ Object

Connect to address (a 2-tuple [host, port]) and return the socket object.

Passing the optional timeout parameter will set the timeout.



183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
# File 'lib/devp2p/peer_manager.rb', line 183

def connect(host, port, remote_pubkey)
  socket = create_connection host, port, @connect_timeout
  logger.debug "connecting to", peer: socket.peeraddr

  start_peer socket, remote_pubkey
  true
rescue Errno::ETIMEDOUT
  address = "#{host}:#{port}"
  logger.debug "connection timeout", address: address, timeout: @connect_timeout
  @errors.add address, 'connection timeout'
  false
rescue Errno::ECONNRESET, Errno::ECONNABORTED, Errno::ECONNREFUSED
  address = "#{host}:#{port}"
  logger.debug "connection error #{$!}"
  @errors.add address, "connection error #{$!}"
  false
rescue
  address = "#{host}:#{port}"
  logger.debug $!
  @errors.add address, "connection error #{$!}"
  false
end

#delete(peer) ⇒ Object



127
128
129
# File 'lib/devp2p/peer_manager.rb', line 127

def delete(peer)
  @peers.delete peer
end

#exclude(peer) ⇒ Object



131
132
133
134
# File 'lib/devp2p/peer_manager.rb', line 131

def exclude(peer)
  @excluded.push peer.remote_pubkey
  peer.async.stop
end

#handle_connection(socket) ⇒ Object



220
221
222
223
224
225
226
227
228
# File 'lib/devp2p/peer_manager.rb', line 220

def handle_connection(socket)
  _, port, host = socket.peeraddr
  logger.debug "incoming connection", host: host, port: port

  start_peer socket
rescue EOFError
  logger.debug "connection disconnected", host: host, port: port
  socket.close
end

#num_peersObject



206
207
208
209
210
211
212
213
214
# File 'lib/devp2p/peer_manager.rb', line 206

def num_peers
  active = @peers.select {|p| !p.stopped? }

  if @peers.size != active.size
    logger.error "stopped peers in peers list", inlist: @peers.size, active: active.size
  end

  active.size
end

#on_hello_received(proto, version, client_version_string, capabilities, listen_port, remote_pubkey) ⇒ Object



136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
# File 'lib/devp2p/peer_manager.rb', line 136

def on_hello_received(proto, version, client_version_string, capabilities, listen_port, remote_pubkey)
  logger.debug 'hello_received', listen_port: listen_port, peer: proto.peer, num_peers: @peers.size

  if @peers.size > @config[:p2p][:max_peers]
    logger.debug "too many peers", max: @config[:p2p][:max_peers]
    proto.send_disconnect proto.class::Disconnect::Reason[:too_many_peers]
    return false
  end
  if @peers.select {|p| p != proto.peer }.include?(remote_pubkey)
    logger.debug "connected to that node already"
    proto.send_disconnect proto.class::Disconnect::Reason[:useless_peer]
    return false
  end

  return true
end

#startObject



94
95
96
97
98
99
100
101
102
103
104
105
106
107
# File 'lib/devp2p/peer_manager.rb', line 94

def start
  logger.info "starting peermanager"

  logger.info "starting tcp listener", host: @host, port: @port
  @server = TCPServer.new @host, @port

  @service_listener = ServiceListener.new self, @server
  @service_listener.async.start

  @discovery_loop = Thread.new do
    sleep 0.1
    discovery_loop
  end
end

#stopObject



109
110
111
112
113
114
115
116
117
# File 'lib/devp2p/peer_manager.rb', line 109

def stop
  logger.info "stopping peermanager"

  @server.close if @server
  @peers.each(&:stop)
  @discovery_loop.kill

  @stopped = true
end

#stopped?Boolean



119
120
121
# File 'lib/devp2p/peer_manager.rb', line 119

def stopped?
  @stopped
end

#wired_servicesObject



153
154
155
# File 'lib/devp2p/peer_manager.rb', line 153

def wired_services
  app.services.values.select {|s| s.is_a?(WiredService) }
end