Class: Evinrude

Inherits:
Object
  • Object
show all
Includes:
LoggingHelpers
Defined in:
lib/evinrude.rb,
lib/evinrude/log.rb,
lib/evinrude/peer.rb,
lib/evinrude/backoff.rb,
lib/evinrude/message.rb,
lib/evinrude/metrics.rb,
lib/evinrude/network.rb,
lib/evinrude/resolver.rb,
lib/evinrude/snapshot.rb,
lib/evinrude/log_entry.rb,
lib/evinrude/node_info.rb,
lib/evinrude/state_machine.rb,
lib/evinrude/log_entry/null.rb,
lib/evinrude/logging_helpers.rb,
lib/evinrude/network/protocol.rb,
lib/evinrude/message/join_reply.rb,
lib/evinrude/message/read_reply.rb,
lib/evinrude/message/vote_reply.rb,
lib/evinrude/network/connection.rb,
lib/evinrude/message/join_request.rb,
lib/evinrude/message/read_request.rb,
lib/evinrude/message/vote_request.rb,
lib/evinrude/cluster_configuration.rb,
lib/evinrude/message/command_reply.rb,
lib/evinrude/state_machine/register.rb,
lib/evinrude/message/command_request.rb,
lib/evinrude/config_change_queue_entry.rb,
lib/evinrude/message/node_removal_reply.rb,
lib/evinrude/message/append_entries_reply.rb,
lib/evinrude/message/node_removal_request.rb,
lib/evinrude/message/append_entries_request.rb,
lib/evinrude/message/install_snapshot_reply.rb,
lib/evinrude/log_entry/cluster_configuration.rb,
lib/evinrude/log_entry/state_machine_command.rb,
lib/evinrude/message/install_snapshot_request.rb,
lib/evinrude/config_change_queue_entry/add_node.rb,
lib/evinrude/config_change_queue_entry/remove_node.rb

Defined Under Namespace

Modules: LoggingHelpers Classes: Backoff, ClusterConfiguration, ConfigChangeQueueEntry, Error, Log, LogEntry, Message, Metrics, Network, NoLeaderError, NodeExpiredError, NodeInfo, Peer, Resolver, Snapshot, StateMachine

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(join_hints: [], shared_keys:, state_machine: Evinrude::StateMachine::Register, logger: Logger.new("/dev/null"), node_name: nil, storage_dir: nil, heartbeat_interval: 0.25, heartbeat_timeout: 1..2, listen: {}, advertise: {}, metrics_registry: Prometheus::Client::Registry.new) ⇒ Evinrude

Returns a new instance of Evinrude.



22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
# File 'lib/evinrude.rb', line 22

def initialize(join_hints: [], shared_keys:, state_machine: Evinrude::StateMachine::Register,
               logger: Logger.new("/dev/null"), node_name: nil, storage_dir: nil,
               heartbeat_interval: 0.25, heartbeat_timeout: 1..2,
               listen: {}, advertise: {}, metrics_registry: Prometheus::Client::Registry.new)
  @join_hints, @keys, @logger, @heartbeat_interval, @heartbeat_timeout = join_hints, shared_keys, logger, heartbeat_interval, heartbeat_timeout

  @metrics = Evinrude::Metrics.new(metrics_registry)

  @listen, @advertise = listen, advertise
  @listen[:address] ||= "::"
  @listen[:port]    ||= 0

  if storage_dir
    @storage_dir = Pathname.new(storage_dir)
  end

  snapshot = if @storage_dir
    if !@storage_dir.exist?
      @storage_dir.mkdir
    end

    if !@storage_dir.directory?
      raise ArgumentError, "Storage directory #{@storage_dir} isn't *actually* a directory"
    end

    snapshot_file = @storage_dir.join("snapshot.yaml")

    if snapshot_file.exist?
      @metrics.snapshot_file_size.set(snapshot_file.stat.size)
      YAML.load_file(snapshot_file)
    end
  end

  @state_machine_class = state_machine

  if snapshot
    @node_name        = snapshot.node_name
    @state_machine    = @state_machine_class.new(snapshot: snapshot.state)
    @last_command_ids = snapshot.last_command_ids
  else
    @node_name        = node_name || SecureRandom.uuid
    @state_machine    = @state_machine_class.new
    @last_command_ids = {}
  end

  @sm_mutex      = Mutex.new

  if snapshot
    @config        = snapshot.cluster_config
    @config_index  = snapshot.cluster_config_index
    @config.metrics = @metrics
    @config.logger  = logger
  else
    @config       = Evinrude::ClusterConfiguration.new(logger: logger, metrics: @metrics)
    @config_index = 0
  end

  @last_append  = Time.at(0)
  @current_term = 0
  @voted_for    = nil
  @mode         = :init

  @metrics.term.set(0)

  if snapshot
    logger.debug(logloc) { "Configuring log from snapshot; snapshot_last_term=#{snapshot.last_term} snapshot_last_index=#{snapshot.last_index}" }
    @log = Evinrude::Log.new(snapshot_last_term: snapshot.last_term, snapshot_last_index: snapshot.last_index, logger: logger)
  else
    @log = Evinrude::Log.new(logger: logger)
  end

  if snapshot
    logger.debug(logloc) { "Setting commit_index to #{snapshot.last_index} from snapshot" }
    @commit_index = snapshot.last_index
  else
    @commit_index = 0
  end

  @metrics.commit_index.set(@commit_index)

  @peers = Hash.new do |h, k|
    backoff = Evinrude::Backoff.new

    peer_conn = @network.connect(address: k.address, port: k.port)

    h[k] = Peer.new(metrics: @metrics, conn: peer_conn, node_info: k, next_index: @log.last_index + 1)
  end

  @config_change_queue = []
  @config_change_request_in_progress = nil
  @cc_sem = Async::Semaphore.new
end

Instance Attribute Details

#node_nameObject (readonly)

Returns the value of attribute node_name.



20
21
22
# File 'lib/evinrude.rb', line 20

def node_name
  @node_name
end

Instance Method Details

#addressObject



208
209
210
# File 'lib/evinrude.rb', line 208

def address
  @network&.advertised_address
end

#candidate?Boolean

Returns:

  • (Boolean)


228
229
230
# File 'lib/evinrude.rb', line 228

def candidate?
  @mode == :candidate
end

#command(s) ⇒ Object



115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
# File 'lib/evinrude.rb', line 115

def command(s)
  @metrics.command_execution.measure do
    Async(logger: logger) do |task|
      command_id = SecureRandom.uuid

      loop do
        reply = rpc_to_leader(Message::CommandRequest.new(command: s, id: command_id, node_name: @node_name), task)

        if reply.success
          break true
        end
      end
    end.result
  end
end

#expired?Boolean

Returns:

  • (Boolean)


236
237
238
# File 'lib/evinrude.rb', line 236

def expired?
  !!(!leader? && @heartbeat_timeout_time && @heartbeat_timeout_time < Time.now)
end

#follower?Boolean

Returns:

  • (Boolean)


224
225
226
# File 'lib/evinrude.rb', line 224

def follower?
  @mode == :follower
end

#init?Boolean

Returns:

  • (Boolean)


232
233
234
# File 'lib/evinrude.rb', line 232

def init?
  @mode == :init
end

#leader?Boolean

Returns:

  • (Boolean)


220
221
222
# File 'lib/evinrude.rb', line 220

def leader?
  @mode == :leader
end

#node_infoObject



240
241
242
243
244
245
246
# File 'lib/evinrude.rb', line 240

def node_info
  if @network.nil?
    raise RuntimeError, "Cannot determine node info until the network is up"
  end

  @node_info ||= Evinrude::NodeInfo.new(address: address, port: port, name: @node_name)
end

#nodesObject



216
217
218
# File 'lib/evinrude.rb', line 216

def nodes
  @config.nodes
end

#portObject



212
213
214
# File 'lib/evinrude.rb', line 212

def port
  @network&.advertised_port
end

#remove_node(node_info, unsafe: false) ⇒ Object



186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
# File 'lib/evinrude.rb', line 186

def remove_node(node_info, unsafe: false)
  if unsafe
    logger.warn(logloc) { "Unsafely removing node #{node_info.inspect} from the local configuration" }

    @config.remove_node(node_info, force: true)
  else
    @metrics.remove_node.measure do
      Async(logger: logger) do |task|
        loop do
          logger.debug(logloc) { "(in #{@node_name}) Requesting removal of #{node_info.inspect}" }

          reply = rpc_to_leader(Evinrude::Message::NodeRemovalRequest.new(node_info: node_info, unsafe: unsafe), task)

          if reply.success
            break true
          end
        end
      end.result
    end
  end
end

#runObject



157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
# File 'lib/evinrude.rb', line 157

def run
  logger.info(logloc) { "Evinrude node #{@node_name} starting up" }

  @metrics.start_time.set(Time.now.to_f)

  if @storage_dir
    @metrics.log_loaded_from_disk.set(1)
    load_log_from_disk
  else
    @metrics.log_loaded_from_disk.set(0)
  end

  Async do |task|  #(logger: logger) do |task|
    @async_task = task
    @network = Network.new(keys: @keys, logger: logger, metrics: @metrics, listen: @listen, advertise: @advertise).start

    logger.info(logloc) { "Node #{@node_name} listening on #{address}:#{port}" }

    @metrics.info.set(1, labels: { node_name: @node_name, listen_address: @network.listen_address, listen_port: @network.listen_port, advertise_address: address, advertise_port: port })

    task.async { process_rpc_requests }

    join_or_create_cluster
  end.return
rescue => ex
  log_exception(ex) { "Fatal error" }
  raise
end

#stateObject



131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
# File 'lib/evinrude.rb', line 131

def state
  @metrics.read_state.measure do
    Async(logger: logger) do |task|
      loop do
        state_object = nil
        commit_index = nil

        @sm_mutex.synchronize do
          # Disturbingly, this appears to be one of the best available ways
          # to make a guaranteed deep copy of an arbitrary object
          state_object = YAML.load(@state_machine.current_state.to_yaml)
          commit_index = @commit_index
        end

        logger.debug(logloc) { "(in #{@node_name}) Checking if #{state_object.inspect} at commit_index=#{commit_index} is the most up-to-date state" }

        reply = rpc_to_leader(Evinrude::Message::ReadRequest.new(commit_index: commit_index), task)

        if reply.success
          break state_object
        end
      end
    end.result
  end
end