Class: Evinrude

Inherits:
Object
  • Object
show all
Includes:
LoggingHelpers
Defined in:
lib/evinrude.rb,
lib/evinrude/log.rb,
lib/evinrude/peer.rb,
lib/evinrude/backoff.rb,
lib/evinrude/message.rb,
lib/evinrude/metrics.rb,
lib/evinrude/network.rb,
lib/evinrude/resolver.rb,
lib/evinrude/snapshot.rb,
lib/evinrude/log_entry.rb,
lib/evinrude/node_info.rb,
lib/evinrude/state_machine.rb,
lib/evinrude/log_entry/null.rb,
lib/evinrude/logging_helpers.rb,
lib/evinrude/network/protocol.rb,
lib/evinrude/message/join_reply.rb,
lib/evinrude/message/read_reply.rb,
lib/evinrude/message/vote_reply.rb,
lib/evinrude/network/connection.rb,
lib/evinrude/message/join_request.rb,
lib/evinrude/message/read_request.rb,
lib/evinrude/message/vote_request.rb,
lib/evinrude/cluster_configuration.rb,
lib/evinrude/message/command_reply.rb,
lib/evinrude/state_machine/register.rb,
lib/evinrude/message/command_request.rb,
lib/evinrude/config_change_queue_entry.rb,
lib/evinrude/message/node_removal_reply.rb,
lib/evinrude/message/append_entries_reply.rb,
lib/evinrude/message/node_removal_request.rb,
lib/evinrude/message/append_entries_request.rb,
lib/evinrude/message/install_snapshot_reply.rb,
lib/evinrude/log_entry/cluster_configuration.rb,
lib/evinrude/log_entry/state_machine_command.rb,
lib/evinrude/message/install_snapshot_request.rb,
lib/evinrude/config_change_queue_entry/add_node.rb,
lib/evinrude/config_change_queue_entry/remove_node.rb

Defined Under Namespace

Modules: LoggingHelpers Classes: Backoff, ClusterConfiguration, ConfigChangeQueueEntry, Error, Log, LogEntry, Message, Metrics, Network, NoLeaderError, NodeExpiredError, NodeInfo, Peer, Resolver, Snapshot, StateMachine

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(join_hints: [], shared_keys:, state_machine: Evinrude::StateMachine::Register, logger: Logger.new("/dev/null"), node_name: nil, storage_dir: nil, heartbeat_interval: 0.25, heartbeat_timeout: 1..2, listen: {}, advertise: {}, metrics_registry: Prometheus::Client::Registry.new) ⇒ Evinrude

Returns a new instance of Evinrude.



22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
# File 'lib/evinrude.rb', line 22

def initialize(join_hints: [], shared_keys:, state_machine: Evinrude::StateMachine::Register,
               logger: Logger.new("/dev/null"), node_name: nil, storage_dir: nil,
               heartbeat_interval: 0.25, heartbeat_timeout: 1..2,
               listen: {}, advertise: {}, metrics_registry: Prometheus::Client::Registry.new)
	@join_hints, @keys, @logger, @heartbeat_interval, @heartbeat_timeout = join_hints, shared_keys, logger, heartbeat_interval, heartbeat_timeout

	@metrics = Evinrude::Metrics.new(metrics_registry)

	@listen, @advertise = listen, advertise
	@listen[:address] ||= "::"
	@listen[:port]    ||= 0

	if storage_dir
		@storage_dir = Pathname.new(storage_dir)
	end

	snapshot = if @storage_dir
		if !@storage_dir.exist?
			@storage_dir.mkdir
		end

		if !@storage_dir.directory?
			raise ArgumentError, "Storage directory #{@storage_dir} isn't *actually* a directory"
		end

		snapshot_file = @storage_dir.join("snapshot.yaml")

		if snapshot_file.exist?
			@metrics.snapshot_file_size.set(snapshot_file.stat.size)
			YAML.load_file(snapshot_file)
		end
	end

	@state_machine_class = state_machine

	if snapshot
		@node_name        = snapshot.node_name
		@state_machine    = @state_machine_class.new(snapshot: snapshot.state)
		@last_command_ids = snapshot.last_command_ids
	else
		@node_name        = node_name || SecureRandom.uuid
		@state_machine    = @state_machine_class.new
		@last_command_ids = {}
	end

	@sm_mutex      = Mutex.new

	if snapshot
		@config        = snapshot.cluster_config
		@config_index  = snapshot.cluster_config_index
		@config.metrics = @metrics
		@config.logger  = logger
	else
		@config       = Evinrude::ClusterConfiguration.new(logger: logger, metrics: @metrics)
		@config_index = 0
	end

	@last_append  = Time.at(0)
	@current_term = 0
	@voted_for    = nil
	@mode         = :init

	@metrics.term.set(0)

	if snapshot
		logger.debug(logloc) { "Configuring log from snapshot; snapshot_last_term=#{snapshot.last_term} snapshot_last_index=#{snapshot.last_index}" }
		@log = Evinrude::Log.new(snapshot_last_term: snapshot.last_term, snapshot_last_index: snapshot.last_index, logger: logger)
	else
		@log = Evinrude::Log.new(logger: logger)
	end

	if snapshot
		logger.debug(logloc) { "Setting commit_index to #{snapshot.last_index} from snapshot" }
		@commit_index = snapshot.last_index
	else
		@commit_index = 0
	end

	@metrics.commit_index.set(@commit_index)

	@peers = Hash.new do |h, k|
		backoff = Evinrude::Backoff.new

		peer_conn = @network.connect(address: k.address, port: k.port)

		h[k] = Peer.new(metrics: @metrics, conn: peer_conn, node_info: k, next_index: @log.last_index + 1)
	end

	@config_change_queue = []
	@config_change_request_in_progress = nil
	@cc_sem = Async::Semaphore.new
end

Instance Attribute Details

#node_nameObject (readonly)

Returns the value of attribute node_name.



20
21
22
# File 'lib/evinrude.rb', line 20

def node_name
  @node_name
end

Instance Method Details

#addressObject



208
209
210
# File 'lib/evinrude.rb', line 208

def address
	@network&.advertised_address
end

#candidate?Boolean

Returns:

  • (Boolean)


228
229
230
# File 'lib/evinrude.rb', line 228

def candidate?
	@mode == :candidate
end

#command(s) ⇒ Object



115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
# File 'lib/evinrude.rb', line 115

def command(s)
	@metrics.command_execution.measure do
		Async(logger: logger) do |task|
			command_id = SecureRandom.uuid

			loop do
				reply = rpc_to_leader(Message::CommandRequest.new(command: s, id: command_id, node_name: @node_name), task)

				if reply.success
					break true
				end
			end
		end.result
	end
end

#expired?Boolean

Returns:

  • (Boolean)


236
237
238
# File 'lib/evinrude.rb', line 236

def expired?
	!!(!leader? && @heartbeat_timeout_time && @heartbeat_timeout_time < Time.now)
end

#follower?Boolean

Returns:

  • (Boolean)


224
225
226
# File 'lib/evinrude.rb', line 224

def follower?
	@mode == :follower
end

#init?Boolean

Returns:

  • (Boolean)


232
233
234
# File 'lib/evinrude.rb', line 232

def init?
	@mode == :init
end

#leader?Boolean

Returns:

  • (Boolean)


220
221
222
# File 'lib/evinrude.rb', line 220

def leader?
	@mode == :leader
end

#node_infoObject



240
241
242
243
244
245
246
# File 'lib/evinrude.rb', line 240

def node_info
	if @network.nil?
		raise RuntimeError, "Cannot determine node info until the network is up"
	end

	@node_info ||= Evinrude::NodeInfo.new(address: address, port: port, name: @node_name)
end

#nodesObject



216
217
218
# File 'lib/evinrude.rb', line 216

def nodes
	@config.nodes
end

#portObject



212
213
214
# File 'lib/evinrude.rb', line 212

def port
	@network&.advertised_port
end

#remove_node(node_info, unsafe: false) ⇒ Object



186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
# File 'lib/evinrude.rb', line 186

def remove_node(node_info, unsafe: false)
	if unsafe
		logger.warn(logloc) { "Unsafely removing node #{node_info.inspect} from the local configuration" }

		@config.remove_node(node_info, force: true)
	else
		@metrics.remove_node.measure do
			Async(logger: logger) do |task|
				loop do
					logger.debug(logloc) { "(in #{@node_name}) Requesting removal of #{node_info.inspect}" }

					reply = rpc_to_leader(Evinrude::Message::NodeRemovalRequest.new(node_info: node_info, unsafe: unsafe), task)

					if reply.success
						break true
					end
				end
			end.result
		end
	end
end

#runObject



157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
# File 'lib/evinrude.rb', line 157

def run
	logger.info(logloc) { "Evinrude node #{@node_name} starting up" }

	@metrics.start_time.set(Time.now.to_f)

	if @storage_dir
		@metrics.log_loaded_from_disk.set(1)
		load_log_from_disk
	else
		@metrics.log_loaded_from_disk.set(0)
	end

	Async do |task|  #(logger: logger) do |task|
		@async_task = task
		@network = Network.new(keys: @keys, logger: logger, metrics: @metrics, listen: @listen, advertise: @advertise).start

		logger.info(logloc) { "Node #{@node_name} listening on #{address}:#{port}" }

		@metrics.info.set(1, labels: { node_name: @node_name, listen_address: @network.listen_address, listen_port: @network.listen_port, advertise_address: address, advertise_port: port })

		task.async { process_rpc_requests }

		join_or_create_cluster
	end.return
rescue => ex
	log_exception(ex) { "Fatal error" }
	raise
end

#stateObject



131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
# File 'lib/evinrude.rb', line 131

def state
	@metrics.read_state.measure do
		Async(logger: logger) do |task|
			loop do
				state_object = nil
				commit_index = nil

				@sm_mutex.synchronize do
					# Disturbingly, this appears to be one of the best available ways
					# to make a guaranteed deep copy of an arbitrary object
					state_object = YAML.load(@state_machine.current_state.to_yaml)
					commit_index = @commit_index
				end

				logger.debug(logloc) { "(in #{@node_name}) Checking if #{state_object.inspect} at commit_index=#{commit_index} is the most up-to-date state" }

				reply = rpc_to_leader(Evinrude::Message::ReadRequest.new(commit_index: commit_index), task)

				if reply.success
					break state_object
				end
			end
		end.result
	end
end