Class: DEVp2p::Peer

Inherits:
Object
  • Object
show all
Includes:
Concurrent::Async
Defined in:
lib/devp2p/peer.rb

Constant Summary collapse

DUMB_REMOTE_TIMEOUT =
10.0

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(peermanager, socket, remote_pubkey = nil) ⇒ Peer

Returns a new instance of Peer.



12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
# File 'lib/devp2p/peer.rb', line 12

def initialize(peermanager, socket, remote_pubkey=nil)
  @peermanager = peermanager
  @socket = socket
  @config = peermanager.config

  @protocols = {}

  @stopped = false
  @hello_received = false

  _, @port, _, @ip = @socket.peeraddr
  @remote_client_version = ''
  logger.debug "peer init", peer: self

  privkey = Utils.decode_hex @config[:node][:privkey_hex]
  hello_packet = P2PProtocol.get_hello_packet hello_data

  @mux = MultiplexedSession.new privkey, hello_packet, remote_pubkey
  @remote_pubkey = remote_pubkey

  connect_service @peermanager

  # assure, we don't get messages while replies are not read
  @safe_to_read = Concurrent::Event.new
  @safe_to_read.set

  # stop peer if hello not received in DUMB_REMOTE_TIMEOUT
  Concurrent::ScheduledTask.execute(DUMB_REMOTE_TIMEOUT) { check_if_dumb_remote }
end

Instance Attribute Details

#configObject (readonly)

Returns the value of attribute config.



10
11
12
# File 'lib/devp2p/peer.rb', line 10

def config
  @config
end

#protocolsObject (readonly)

Returns the value of attribute protocols.



10
11
12
# File 'lib/devp2p/peer.rb', line 10

def protocols
  @protocols
end

#remote_client_versionObject (readonly)

Returns the value of attribute remote_client_version.



10
11
12
# File 'lib/devp2p/peer.rb', line 10

def remote_client_version
  @remote_client_version
end

#remote_pubkeyObject

if peer is responder, then the remote_pubkey will not be available before the first packet is received



72
73
74
# File 'lib/devp2p/peer.rb', line 72

def remote_pubkey
  @remote_pubkey
end

Instance Method Details

#capabilitiesObject



163
164
165
# File 'lib/devp2p/peer.rb', line 163

def capabilities
  @peermanager.wired_services.map {|s| [s.wire_protocol.name, s.wire_protocol.version] }
end

#connect_service(service) ⇒ Object

Raises:

  • (ArgumentError)


94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
# File 'lib/devp2p/peer.rb', line 94

def connect_service(service)
  raise ArgumentError, "service must be WiredService" unless service.is_a?(WiredService)

  # create protocol instance which connects peer with service
  protocol_class = service.wire_protocol
  protocol = protocol_class.new self, service

  # register protocol
  raise PeerError, 'protocol already connected' if @protocols.has_key?(protocol_class)
  logger.debug "registering protocol", protocol: protocol.name, peer: self

  @protocols[protocol_class] = protocol
  @mux.add_protocol protocol.protocol_id

  protocol.start
end

#has_protocol?(protocol) ⇒ Boolean

Returns:

  • (Boolean)


111
112
113
# File 'lib/devp2p/peer.rb', line 111

def has_protocol?(protocol)
  @protocols.has_key?(protocol)
end

#receive_hello(proto, data) ⇒ Object

Raises:

  • (ArgumentError)


115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
# File 'lib/devp2p/peer.rb', line 115

def receive_hello(proto, data)
  version = data[:version]
  listen_port = data[:listen_port]
  capabilities = data[:capabilities]
  remote_pubkey = data[:remote_pubkey]
  client_version_string = data[:client_version_string]

  logger.info 'received hello', version: version, client_version: client_version_string, capabilities: capabilities

  raise ArgumentError, "invalid remote pubkey" unless remote_pubkey.size == 64
  raise ArgumentError, "remote pubkey mismatch" if @remote_pubkey_available && @remote_pubkey != remote_pubkey

  @hello_received = true

  # enable backwards compatibility for legacy peers
  if version < 5
    @offset_based_dispatch = true
    max_window_size = 2**32 # disable chunked transfers
  end

  # call peermanager
  agree = @peermanager.on_hello_received(proto, version, client_version_string, capabilities, listen_port, remote_pubkey)
  return unless agree

  @remote_client_version = client_version_string
  @remote_pubkey = remote_pubkey

  # register in common protocols
  logger.debug 'connecting services', services: @peermanager.wired_services
  remote_services = capabilities.map {|name, version| [name, version] }.to_h

  @peermanager.wired_services.sort_by(&:name).each do |service|
    raise PeerError, 'invalid service' unless service.is_a?(WiredService)

    proto = service.wire_protocol
    if remote_services.has_key?(proto.name)
      if remote_services[proto.name] == proto.version
        if service != @peermanager # p2p protocol already registered
          connect_service service
        end
      else
        logger.debug 'wrong version', service: proto.name, local_version: proto.version, remote_version: remote_services[proto.name]
        report_error 'wrong version'
      end
    end
  end
end

#report_error(reason) ⇒ Object



89
90
91
92
# File 'lib/devp2p/peer.rb', line 89

def report_error(reason)
  pn = "#@ip:#@port"
  @peermanager.add_error pn, reason, @remote_client_version
end

#runObject



208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
# File 'lib/devp2p/peer.rb', line 208

def run
  logger.debug "peer starting main loop"
  raise PeerError, 'connection is closed' if @socket.closed?

  @run_decoded_packets = Thread.new { run_decoded_packets }
  @run_egress_message = Thread.new { run_egress_message }

  while !stopped?
    @safe_to_read.wait

    begin
      imsg = @socket.recv(4096)
      if imsg.empty?
        logger.info "socket closed"
        stop
      end
    rescue EOFError # imsg is empty
      if @socket.closed?
        logger.info "socket closed"
        stop
      else
        imsg = ''
      end
    rescue SystemCallError => e
      logger.debug "read error", error: e, peer: self
      report_error "network error #{e}"
      if [Errno::ECONNRESET, Errno::ETIMEDOUT, Errno::ENETDOWN, Errno::EHOSTUNREACH].any? {|syserr| e.instance_of?(syserr) }
        stop
      else
        raise e
        break
      end
    end

    if !imsg.empty?
      logger.debug "read data", size: imsg.size
      @mux.add_message imsg
    end
  end
rescue RLPxSessionError, DecryptionError => e
  logger.debug "rlpx session error", peer: self, error: e
  report_error "rlpx session error"
  stop
rescue MultiplexerError => e
  logger.debug "multiplexer error", peer: self, error: e
  report_error "multiplexer error"
  stop
rescue
  logger.debug "ingress message error", peer: self, error: $!
  report_error "ingress message error"
  stop
end

#send_data(data) ⇒ Object



189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
# File 'lib/devp2p/peer.rb', line 189

def send_data(data)
  return if data.nil? || data.empty?

  @safe_to_read.reset

  @socket.write data
  logger.debug "wrote data", size: data.size

  @safe_to_read.set
rescue Errno::ETIMEDOUT
  logger.debug "write timeout"
  report_error "write timeout"
  stop
rescue SystemCallError => e
  logger.debug "write error #{e}"
  report_error "write error #{e}"
  stop
end

#send_packet(packet) ⇒ Object

Raises:



167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
# File 'lib/devp2p/peer.rb', line 167

def send_packet(packet)
  protocol = @protocols.values.find {|pro| pro.protocol_id == packet.protocol_id }
  raise PeerError, "no protocol found" unless protocol
  logger.debug "send packet", cmd: protocol.cmd_by_id[packet.cmd_id], protocol: protocol.name, peer: self

  # rewrite cmd_id (backwards compatibility)
  if @offset_based_dispatch
    @protocols.values.each_with_index do |proto, i|
      if packet.protocol_id > i
        packet.cmd_id += (protocol.max_cmd_id == 0 ? 0 : protocol.max_cmd_id + 1)
      end
      if packet.protocol_id == protocol.protocol_id
        protocol = proto
        break
      end
      packet.protocol_id = 0
    end
  end

  @mux.add_packet packet
end

#startObject



42
43
44
45
# File 'lib/devp2p/peer.rb', line 42

def start
  @stopped = false
  @run = Thread.new { run }
end

#stopObject



47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
# File 'lib/devp2p/peer.rb', line 47

def stop
  if !stopped?
    @stopped = true

    @protocols.each_value {|proto| proto.async.stop }
    @peermanager.async.delete self

    logger.info "peer stopped", peer: self
    @run.kill
    @run_decoded_packets.kill
    @run_egress_message.kill
  end
rescue
  puts $!
  puts $!.backtrace[0,10].join("\n")
end

#stopped?Boolean

Returns:

  • (Boolean)


64
65
66
# File 'lib/devp2p/peer.rb', line 64

def stopped?
  @stopped
end

#to_sObject Also known as: inspect



81
82
83
84
85
86
# File 'lib/devp2p/peer.rb', line 81

def to_s
  pn = "#@ip:#@port"
  cv = @remote_client_version.split('/')[0,2].join('/')
  pn = "#{pn} #{cv}" unless cv.empty?
  "<Peer #{pn}>"
end