Class: Evinrude::Network::Connection

Inherits:
Object
  • Object
show all
Includes:
LoggingHelpers, Protocol
Defined in:
lib/evinrude/network/connection.rb

Defined Under Namespace

Classes: ConnectionError

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(socket:, logger:, metrics:, keys:) ⇒ Connection

Returns a new instance of Connection.



31
32
33
34
35
36
37
38
39
40
# File 'lib/evinrude/network/connection.rb', line 31

def initialize(socket:, logger:, metrics:, keys:)
	@socket, @logger, @metrics = socket, logger, metrics

	@keys = keys.map { |k| Digest::SHA256.digest(k) }

	@sem = Async::Semaphore.new

	@peer_address = @socket.remote_address.ip_address
	@peer_port    = @socket.remote_address.ip_port
end

Instance Attribute Details

#peer_addressObject (readonly)

Returns the value of attribute peer_address.



15
16
17
# File 'lib/evinrude/network/connection.rb', line 15

def peer_address
  @peer_address
end

#peer_portObject (readonly)

Returns the value of attribute peer_port.



15
16
17
# File 'lib/evinrude/network/connection.rb', line 15

def peer_port
  @peer_port
end

Class Method Details

.connect(address:, port:, keys:, logger:, metrics:) ⇒ Object



17
18
19
20
21
22
23
24
25
26
27
28
29
# File 'lib/evinrude/network/connection.rb', line 17

def self.connect(address:, port:, keys:, logger:, metrics:)
	backoff = Evinrude::Backoff.new

	begin
		sock = Async::IO::Endpoint.tcp(address, port).connect
	rescue Errno::ECONNRESET, Errno::ECONNREFUSED, Errno::ETIMEDOUT => ex
		logger.info("Evinrude::Network::Connection.connect") { "Could not connect to #{address}:#{port}: #{ex.class}" }
		raise ConnectionError,
		      "#{ex.class}"
	end

	new(socket: sock, logger: logger, metrics: metrics, keys: keys)
end

Instance Method Details

#closeObject



99
100
101
# File 'lib/evinrude/network/connection.rb', line 99

def close
	@socket.close
end

#each_messageObject



77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
# File 'lib/evinrude/network/connection.rb', line 77

def each_message
	begin
		loop do
			@sem.acquire do
				yield read_message
			end
		end
	rescue Async::Wrapper::Cancelled
		# This is fine
		nil
	rescue Evinrude::Error, SystemCallError, IOError => ex
		# This is... not so fine, but there's not much we can do about it
		log_exception(ex) { "Reading message" }
		@socket.close
		nil
	end
end

#inspectObject



103
104
105
106
107
108
109
# File 'lib/evinrude/network/connection.rb', line 103

def inspect
	"#<#{self.class}:0x#{object_id.to_s(16)} " +
		instance_variables.map do |iv|
			next nil if %i{@logger @metrics @socket}.include?(iv)
			"#{iv}=#{instance_variable_get(iv).inspect}"
		end.compact.join(" ")
end

#peer_infoObject



42
43
44
# File 'lib/evinrude/network/connection.rb', line 42

def peer_info
	"#{peer_address}:#{peer_port}"
end

#rpc(msg) ⇒ Object



46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
# File 'lib/evinrude/network/connection.rb', line 46

def rpc(msg)
	@metrics.rpc.measure(target: peer_info) do |labels|
		begin
			@sem.acquire do
				logger.debug(logloc) { "Sending RPC request #{msg.inspect} to #{peer_info}" }
				begin
					@socket.write(frame(box(msg.to_yaml)))
				rescue Errno::EPIPE, IOError, Errno::ECONNRESET => ex
					logger.debug(logloc) { "Failed to send RPC request to #{peer_info}: #{ex.message} (#{ex.class})" }
					labels[:result] = ex.class.to_s
					return nil
				end

				logger.debug(logloc) { "Request sent; now we wait" }

				begin
					read_message
				rescue Protocol::VersionError, Errno::ECONNRESET, Errno::EPIPE, IOError => ex
					logger.debug(logloc) { "I/O exception #{ex.class} while reading RPC reply" }
					labels[:result] = ex.class.to_s
					@socket.close
					nil
				end
			end.tap { labels[:result] = "success" }
		rescue Async::Wrapper::Cancelled
			labels[:result] = "cancelled"
			nil
		end
	end
end

#send_reply(msg) ⇒ Object



95
96
97
# File 'lib/evinrude/network/connection.rb', line 95

def send_reply(msg)
	@socket.write(frame(box(msg.to_yaml)))
end