Class: SPing::Session

Inherits:
Object
  • Object
show all
Defined in:
lib/session.rb

Overview

Models or represents a session with a peer.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(host, port, socket, madebyme) ⇒ Session

Creates a new session

Parameters:

  • host (#to_s)

    Host of the peer

  • port (#to_i)

    Port of the peer

  • socket (UDPSocket)

    UDP socket via which the pings and the UDP handshake are to be sent.

  • madebyme (TrueClass, FalseClass)

    Indicates whether we have initiated the session.



43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
# File 'lib/session.rb', line 43

def initialize(host, port, socket, madebyme)
  # Assign the passed parameters to instance variables.
  @host = host.to_s
  @port = port.to_i
  @socket = socket
  @madebyme = madebyme

  # Initialize assignment of other instance variables
  @created = Time.now
  @double_activated = nil

  @tcp_handshake_complete = false
  @udp_handshake_complete = false

  @last_acks = SPing::LastAcks.new
end

Instance Attribute Details

#createdTime (readonly)

Time when the session was created. This is necessary so that the session GC knows when a non-initialized session can be deleted.

Returns:

  • (Time)


14
15
16
# File 'lib/session.rb', line 14

def created
  @created
end

#last_rxTime (readonly)

Time when the last ping packet was received by the peer. This is important so that the session GC knows when a session is considered inactive and can be deleted.

Returns:

  • (Time)


18
19
20
# File 'lib/session.rb', line 18

def last_rx
  @last_rx
end

#madebymeTrueClass, FalseClass (readonly)

Indicates whether the session has been initialized. true if we have initiated it. false if the peer has initiated it.

Returns:

  • (TrueClass, FalseClass)


22
23
24
# File 'lib/session.rb', line 22

def madebyme
  @madebyme
end

#session_idInteger (readonly)

The session ID in the range from 0 to 2**32 - 1.

Returns:

  • (Integer)


31
32
33
# File 'lib/session.rb', line 31

def session_id
  @session_id
end

#tcp_handshake_completeTrueClass, FalseClass (readonly)

Indicates whether a TCP handshake has been carried out (successfully).

Returns:

  • (TrueClass, FalseClass)


25
26
27
# File 'lib/session.rb', line 25

def tcp_handshake_complete
  @tcp_handshake_complete
end

#udp_handshake_completeTrueClass, FalseClass (readonly)

Indicates whether a UDP handshake has been carried out (successfully).

Returns:

  • (TrueClass, FalseClass)


28
29
30
# File 'lib/session.rb', line 28

def udp_handshake_complete
  @udp_handshake_complete
end

Instance Method Details

#do_tcp_handshake1(socket, session_id) ⇒ Object

Initiates a TCP handshake.

Parameters:

  • socket (TCPSocket)

    Socket via which the handshake is to be sent (and responses are to be expected).

  • session_id (Integer)

    Session ID which is to be sent to the peer and which is to be used for the current session.



65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
# File 'lib/session.rb', line 65

def do_tcp_handshake1(socket, session_id)
  # Send the banner
  socket.write "sping-0.3-https://codeberg.org/mark22k/sping\r\n"
  # and make sure that it is no longer in the send buffer.
  socket.flush

  # See if the peer invites us to create a session with them.
  invite = socket.readpartial 6
  unless invite.chomp == 'INVITE'
    socket.write "I_DONT_UNDERSTAND\r\n"
    raise TCPHandshakeError, 'Peer didn\'t invite us.'
  end

  # Send the session ID
  @session_id = session_id
  socket.write "#{session_id}\r\n"
  # and make sure that it is no longer in the send buffer.
  socket.flush

  # This means that the TCP handshake is successful. The socket can be closed
  # and the corresponding instance variable can be set.
  socket.close

  @tcp_handshake_complete = true
end

#do_tcp_handshake2Object

Receives a TCP handshake from a peer. The peer specified when the session is created is consulted for this purpose.



93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
# File 'lib/session.rb', line 93

def do_tcp_handshake2
  # Establish a connection and receive the banner.
  socket = TCPSocket.new @host, @port

  # If the banner is too large, close the socket and throw an error.
  # The handshake was not successful.
  banner = socket.readpartial 9001
  if banner.length > 9000
    socket.close
    raise TCPHandshakeError, 'Host banner too big'
  end

  # If the banner does not match the SPing service, close the socket and
  # throw an error. The handshake was not successful.
  unless banner.start_with? 'sping-0.3-'
    socket.close
    raise TCPHandshakeError, 'Host banner not sping or unsupported version of sping.'
  end

  @remote_version = banner.chomp
  $logger.info "Peer uses the following program version: #{@remote_version.dump}"

  # Invite the peer to start a session with us.
  socket.write "INVITE\r\n"
  socket.flush

  # If the session ID is too long or none was received, close the socket and throw an error.
  invite_buf = socket.readpartial 32
  if invite_buf.length > 31 || invite_buf.empty?
    socket.close
    raise TCPHandshakeError, 'Invite banner wrong size'
  end

  @session_id = invite_buf.chomp.to_i

  socket.close

  @tcp_handshake_complete = true
end

#handle_ping(packet, rxtime, peeraddr) ⇒ Object

Handler that receives and processes a receiving ping packet or time message. The current statistics are output.

Parameters:

  • packet (Hash)
  • rxtime (Time)
  • peeraddr (#to_s)

Raises:



234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
# File 'lib/session.rb', line 234

def handle_ping(packet, rxtime, peeraddr)
  if !(packet.keys - %w[M Y E I T A S]).empty? ||
     # M, Y are already checked in handle_packet from session manager
     !packet['E'].is_a?(Integer) ||
     !packet['I'].is_a?(Integer) ||
     !packet['T'].is_a?(Time) ||
     !packet['A'].is_a?(Array)
    raise InvalidPacketError, 'The peer has sent an invalid message. The ping packet is incorrectly coded.'
  end

  raise SignalizedError, "Package displays an error message: #{packet['E']} Processing aborted." if packet['E'] != 0

  id = packet['I']
  txtime = packet['T']
  remote_last_acks = packet['A']

  if remote_last_acks.length != 32
    raise InvalidMessageError, 'The peer has sent an invalid message. It does not contain any 32-Acks.'
  end

  remote_last_acks.each do |block_ack|
    if !(block_ack.keys - %w[R U X]).empty? ||
       !block_ack['R'].is_a?(Integer) ||
       !block_ack['U'].is_a?(Time) ||
       !block_ack['X'].is_a?(Time)
      raise InvalidMessageError, 'The peer has sent an invalid message. An Ack is formatted invalid.'
    end
  end

  ack = {
    'R' => id,
    'U' => txtime,
    'X' => rxtime
  }

  @last_acks.add_ack(ack)
  @last_rx = rxtime

  # This is the A
  # The peer is B
  # TX => Time from A to B
  # RX => Time from B to A

  # Remove zero acks
  remote_last_acks.reject! { |block_ack| block_ack['R'].zero? }

  # Sort by TX time
  remote_last_acks.sort_by! { |block_ack| block_ack['U'] }

  newest_remote_ack = remote_last_acks.last.to_h

  # Calculate loss
  tx_loss = 0
  rx_loss = 0
  exchanges = 0

  last_local_acks = @last_acks.acks

  # A calculation can only take place when enough data is available, i.e. when:
  # - We have sent 32 time messages (this is the case after 32 seconds)
  # - The peer has sent 32 time messages (this is the case after 32 seconds)
  # - If there is a time message (this is the case when we are in this function)
  if @double_activated && (Time.now - @double_activated).to_i > 32
    # We have enough data
    exchanges = 32
    remote_acks = remote_last_acks.map { |ack| ack['R'] }
    local_acks = last_local_acks.map { |ack| ack['R'] }

    tip_id = (Time.now.to_i % 255) + 1
    starting = tip_id - 32

    last_ids = if tip_id > 32
                 (starting..tip_id).to_a
               else
                 ((255 + starting)..255).to_a + (1..tip_id).to_a
               end

    last_ids[0...-1].each do |id|
      tx_loss += 1 unless remote_acks.include? id
    end

    last_ids[1..].each do |id|
      rx_loss += 1 unless local_acks.include? id
    end
  end

  tx_latency = ((newest_remote_ack['X'].to_f - newest_remote_ack['U'].to_f) * 1000.0).round(6)
  rx_latency = ((rxtime - txtime) * 1000.0).round(6)
  puts "[#{peeraddr}] RX: #{rx_latency}ms TX: #{tx_latency}ms [Loss RX: #{rx_loss}/#{exchanges} | Loss TX: #{tx_loss}/#{exchanges}]"
end

#pingObject

Sends a ping message to the peer.



208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
# File 'lib/session.rb', line 208

def ping
  current_id = (Time.now.to_i % 255) + 1
  data = {
    'Y' => 't'.ord,
    'M' => 11_181,
    'S' => @session_id,
    'I' => current_id,
    'T' => Time.now,
    'E' => 0,
    'A' => @last_acks.acks
  }.to_msgpack

  $logger.debug "Send ping to #{@host} port #{@port}."
  @socket.send data, 0, @host, @port
end

#send_udp_handshakeObject

Send a single UDP handshake



160
161
162
163
164
165
166
167
168
169
170
# File 'lib/session.rb', line 160

def send_udp_handshake
  packet = {
    'Y' => 'h'.ord,
    'M' => 11_181,
    'V' => 3,
    'S' => @session_id
  }.to_msgpack

  $logger.debug "Send UDP handshake to #{@host} port #{@port}."
  @socket.send packet, 0, @host, @port
end

#set_endpoint(host, port) ⇒ Object

Sets the endpoint consisting of the host and port of the remote end. This is done each time a new packet is received and ensures that the current endpoint is always available.

Parameters:

  • host (#to_s)
  • port (#to_i)


137
138
139
140
# File 'lib/session.rb', line 137

def set_endpoint(host, port)
  @host = host.to_s
  @port = port.to_i
end

#start_pingerObject

Starts a thread which sends ping or time messages to the peer at one-second intervals.



188
189
190
191
192
193
194
195
196
197
# File 'lib/session.rb', line 188

def start_pinger
  raise 'Pinger is already running.' if @pinger

  @pinger = Thread.new do
    loop do
      ping
      sleep 1
    end
  end
end

#start_udp_handshake_sender(send_interval = 1) ⇒ Object

Starts a thread which sends the UDP handshake at regular intervals.

Parameters:

  • send_interval (#to_i) (defaults to: 1)

    The interval at which the UDP handshakes are to be sent.



144
145
146
147
148
149
150
151
152
153
154
155
156
157
# File 'lib/session.rb', line 144

def start_udp_handshake_sender(send_interval = 1)
  raise 'UDP handshake sender is already running.' if @udp_handshake_sender

  @udp_handshake_sender = Thread.new(send_interval.to_i) do |th_send_interval|
    loop do
      begin
        send_udp_handshake
        sleep th_send_interval
      rescue SocketError => e
        $logger.warn "Failed to send ping: #{e.message}"
      end
    end
  end
end

#stopObject

Stops all threads associated with the session. This means that the session to the peer is as good as dead.



225
226
227
228
# File 'lib/session.rb', line 225

def stop
  @pinger&.kill
  @udp_handshake_sender&.kill
end

#stop_pingerObject

Stops the thread, which sends ping or time messages to the peer at regular intervals.



200
201
202
203
204
205
# File 'lib/session.rb', line 200

def stop_pinger
  raise 'Pinger sender is not running and therefore cannot be terminated.' unless @pinger

  @pinger.kill
  @pinger = nil
end

#stop_udp_handshake_senderObject

Stop the UDP handshake sender. UDP handshakes are then no longer sent at any interval.



173
174
175
176
177
178
# File 'lib/session.rb', line 173

def stop_udp_handshake_sender
  raise 'UDP handshake sender is not running and therefore cannot be terminated.' unless @udp_handshake_sender

  @udp_handshake_sender.kill
  @udp_handshake_sender = nil
end

#udp_handshake_recivedObject

Informs the session that a UDP handshake has been received.



181
182
183
184
185
# File 'lib/session.rb', line 181

def udp_handshake_recived
  stop_udp_handshake_sender if @madebyme
  @udp_handshake_complete = true
  @double_activated = Time.now
end