Class: Lanet::Mesh
- Inherits:
-
Object
- Object
- Lanet::Mesh
- Defined in:
- lib/lanet/mesh.rb
Defined Under Namespace
Classes: Error
Constant Summary collapse
- DEFAULT_TTL =
10- DEFAULT_MESH_PORT =
5050- DEFAULT_DISCOVERY_INTERVAL =
seconds
60- DEFAULT_MESSAGE_EXPIRY =
10 minutes
600- DEFAULT_CONNECTION_TIMEOUT =
3x discovery interval
180- MESSAGE_TYPES =
{ discovery: "DISCOVERY", discovery_response: "DISCOVERY_RESPONSE", message: "MESSAGE", route: "ROUTE_INFO" }.freeze
Instance Attribute Summary collapse
-
#connections ⇒ Object
readonly
Returns the value of attribute connections.
-
#logger ⇒ Object
readonly
Returns the value of attribute logger.
-
#message_cache ⇒ Object
readonly
Returns the value of attribute message_cache.
-
#node_id ⇒ Object
readonly
Returns the value of attribute node_id.
Instance Method Summary collapse
- #broadcast_mesh_message(mesh_message) ⇒ Object
- #healthy? ⇒ Boolean
-
#initialize(port = DEFAULT_MESH_PORT, max_hops = DEFAULT_TTL, discovery_interval = DEFAULT_DISCOVERY_INTERVAL, message_expiry = DEFAULT_MESSAGE_EXPIRY, logger = nil) ⇒ Mesh
constructor
A new instance of Mesh.
- #send_message(target_id, message, encryption_key = nil, private_key = nil) ⇒ Object
- #start ⇒ Object
- #stats ⇒ Object
- #stop ⇒ Object
Constructor Details
#initialize(port = DEFAULT_MESH_PORT, max_hops = DEFAULT_TTL, discovery_interval = DEFAULT_DISCOVERY_INTERVAL, message_expiry = DEFAULT_MESSAGE_EXPIRY, logger = nil) ⇒ Mesh
Returns a new instance of Mesh.
28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 |
# File 'lib/lanet/mesh.rb', line 28 def initialize(port = DEFAULT_MESH_PORT, max_hops = DEFAULT_TTL, discovery_interval = DEFAULT_DISCOVERY_INTERVAL, = DEFAULT_MESSAGE_EXPIRY, logger = nil) @port = port @max_hops = max_hops @discovery_interval = discovery_interval @message_expiry = @connection_timeout = @discovery_interval * 3 @node_id = SecureRandom.uuid @connections = {} @routes = {} @message_cache = Set.new @message_timestamps = {} @processed_message_count = 0 @mutex = Mutex.new @logger = logger || Logger.new($stdout) @logger.level = Logger::INFO # Setup communication channels @sender = Lanet::Sender.new(port) @receiver = nil # For handling data storage @storage_path = File.join(Dir.home, ".lanet", "mesh") FileUtils.mkdir_p(@storage_path) unless Dir.exist?(@storage_path) end |
Instance Attribute Details
#connections ⇒ Object (readonly)
Returns the value of attribute connections.
26 27 28 |
# File 'lib/lanet/mesh.rb', line 26 def connections @connections end |
#logger ⇒ Object (readonly)
Returns the value of attribute logger.
26 27 28 |
# File 'lib/lanet/mesh.rb', line 26 def logger @logger end |
#message_cache ⇒ Object (readonly)
Returns the value of attribute message_cache.
26 27 28 |
# File 'lib/lanet/mesh.rb', line 26 def @message_cache end |
#node_id ⇒ Object (readonly)
Returns the value of attribute node_id.
26 27 28 |
# File 'lib/lanet/mesh.rb', line 26 def node_id @node_id end |
Instance Method Details
#broadcast_mesh_message(mesh_message) ⇒ Object
141 142 143 144 145 146 147 |
# File 'lib/lanet/mesh.rb', line 141 def () @connections.each do |id, info| next if id == [:origin] @sender.send_to(info[:ip], .to_json) end end |
#healthy? ⇒ Boolean
149 150 151 152 153 154 155 |
# File 'lib/lanet/mesh.rb', line 149 def healthy? @running && @receiver_thread&.alive? && @discovery_thread&.alive? && @monitor_thread&.alive? && @cache_pruning_thread&.alive? end |
#send_message(target_id, message, encryption_key = nil, private_key = nil) ⇒ Object
83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 |
# File 'lib/lanet/mesh.rb', line 83 def (target_id, , encryption_key = nil, private_key = nil) unless @connections.key?(target_id) || @routes.key?(target_id) @logger.debug("No known route to #{target_id}, performing discovery") perform_discovery # Replace sleep with timeout-based approach discovery_timeout = Time.now.to_i + 2 until @connections.key?(target_id) || @routes.key?(target_id) || Time.now.to_i > discovery_timeout sleep 0.1 # Short sleep to avoid CPU spinning end unless @connections.key?(target_id) || @routes.key?(target_id) @logger.error("No route to node #{target_id}") raise Error, "No route to node #{target_id}" end end = SecureRandom.uuid # Prevent message loops by adding to cache @mutex.synchronize do @message_cache.add() @message_timestamps[] = Time.now.to_i end # Prepare the mesh message container encrypted_content = encryption_key ? Encryptor.(, encryption_key, private_key) : = ( MESSAGE_TYPES[:message], id: , target: target_id, content: encrypted_content, hops: 0 ) # Direct connection if @connections.key?(target_id) @logger.debug("Sending direct message to #{target_id}") @sender.send_to(@connections[target_id][:ip], .to_json) return end # Route through intermediate node if @routes.key?(target_id) next_hop = @routes[target_id][:next_hop] if @connections.key?(next_hop) @logger.debug("Sending message to #{target_id} via #{next_hop}") @sender.send_to(@connections[next_hop][:ip], .to_json) return end end # Broadcast as last resort @logger.debug("Broadcasting message to find route to #{target_id}") () end |
#start ⇒ Object
56 57 58 59 60 61 62 63 64 65 66 67 |
# File 'lib/lanet/mesh.rb', line 56 def start return if @running @running = true start_receiver start_discovery_service start_monitoring start_cache_pruning @logger.info("Mesh node #{@node_id} started on port #{@port}") load_state end |
#stats ⇒ Object
157 158 159 160 161 162 163 164 165 |
# File 'lib/lanet/mesh.rb', line 157 def stats { node_id: @node_id, connections: @connections.size, routes: @routes.size, message_cache_size: @message_cache.size, processed_messages: @processed_message_count } end |
#stop ⇒ Object
69 70 71 72 73 74 75 76 77 78 79 80 81 |
# File 'lib/lanet/mesh.rb', line 69 def stop return unless @running @logger.info("Stopping mesh node #{@node_id}") @running = false [@discovery_thread, @receiver_thread, @monitor_thread, @cache_pruning_thread].each do |thread| thread&.exit end save_state @logger.info("Mesh node stopped") end |