Class: Volt::MessageBus::PeerToPeer
- Inherits:
-
BaseMessageBus
- Object
- BaseMessageBus
- Volt::MessageBus::PeerToPeer
- Includes:
- Eventable
- Defined in:
- lib/volt/server/message_bus/peer_to_peer.rb
Constant Summary collapse
- DEAD_TIME =
How long without an update before we mark an instance as dead (in seconds)
20
Instance Attribute Summary collapse
-
#server_id ⇒ Object
readonly
Returns the value of attribute server_id.
Instance Method Summary collapse
- #add_peer_connection(peer_connection) ⇒ Object
- #connect_to_peers ⇒ Object
-
#disconnect! ⇒ Object
Blocks until all peers have connected or timed out.
-
#handle_message(message) ⇒ Object
Called when a message comes in.
-
#initialize(volt_app) ⇒ PeerToPeer
constructor
A new instance of PeerToPeer.
-
#peers ⇒ Object
Return an array of peer records.
- #publish(channel, message) ⇒ Object
-
#remove_duplicate_connections ⇒ Object
We only want one connection between two instances, this loops through each connection.
- #remove_peer_connection(peer_connection) ⇒ Object
-
#setup_peer_server ⇒ Object
The peer server maintains a socket other instances can connect to.
-
#start_tracker ⇒ Object
The tracker updates the socket ip’s and port and a timestamp into the database every minute.
-
#still_alive?(peer_server_id) ⇒ Boolean
Returns true if the server is still reporting as alive.
Methods included from Eventable
#on, #remove_listener, #trigger!
Constructor Details
#initialize(volt_app) ⇒ PeerToPeer
Returns a new instance of PeerToPeer.
60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 |
# File 'lib/volt/server/message_bus/peer_to_peer.rb', line 60 def initialize(volt_app) @volt_app = volt_app if Volt::DataStore.fetch.connected? # Generate a guid @server_id = SecureRandom.uuid # The PeerConnection's to peers @peer_connections = {} # The server id's for each peer we're connected to @peer_server_ids = {} setup_peer_server start_tracker @peer_connection_threads = [] @connect_thread = Thread.new do connect_to_peers end else Volt.logger.error('Unable to connect to the database. Currently Volt requires running mongodb for a few things to work. Volt will still run, but the message bus requires a database connection to setup connections between nodes, so the message bus has been disabled. Also, the store collection can not be used without a database. This means updates will not be propagated between instances (server, console, runners, etc...)') end end |
Instance Attribute Details
#server_id ⇒ Object (readonly)
Returns the value of attribute server_id.
58 59 60 |
# File 'lib/volt/server/message_bus/peer_to_peer.rb', line 58 def server_id @server_id end |
Instance Method Details
#add_peer_connection(peer_connection) ⇒ Object
138 139 140 141 |
# File 'lib/volt/server/message_bus/peer_to_peer.rb', line 138 def add_peer_connection(peer_connection) @peer_connections[peer_connection] = true @peer_server_ids[peer_connection.peer_server_id] = true end |
#connect_to_peers ⇒ Object
120 121 122 123 124 125 126 127 128 129 130 |
# File 'lib/volt/server/message_bus/peer_to_peer.rb', line 120 def connect_to_peers peers.each do |peer| # Start connecting to all at the same time. Since most will connect or # timeout, this is the desired behaviour. # sometimes we get nil peers for some reason if peer peer_connection = PeerConnection.new(nil, peer._ips, peer._port, self, false, peer._server_id) add_peer_connection(peer_connection) end end end |
#disconnect! ⇒ Object
Blocks until all peers have connected or timed out.
133 134 135 136 |
# File 'lib/volt/server/message_bus/peer_to_peer.rb', line 133 def disconnect! # Wait for disconnect on each @peer_connections.keys.each(&:disconnect!) end |
#handle_message(message) ⇒ Object
Called when a message comes in
149 150 151 152 |
# File 'lib/volt/server/message_bus/peer_to_peer.rb', line 149 def () channel_name, = .split('|', 2) trigger!(channel_name, ) end |
#peers ⇒ Object
Return an array of peer records.
114 115 116 117 118 |
# File 'lib/volt/server/message_bus/peer_to_peer.rb', line 114 def peers instances = @volt_app.store._active_volt_instances instances.where(server_id: {'$ne' => @server_id}).all.sync end |
#publish(channel, message) ⇒ Object
100 101 102 103 104 105 106 107 108 109 110 111 |
# File 'lib/volt/server/message_bus/peer_to_peer.rb', line 100 def publish(channel, ) full_msg = "#{channel}|#{}" @peer_connections.keys.each do |peer| begin # Queue message on each peer peer.publish(full_msg) rescue IOError => e # Connection to peer lost Volt.logger.warn("Message bus connection to peer lost: #{e}") end end end |
#remove_duplicate_connections ⇒ Object
We only want one connection between two instances, this loops through each connection
156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 |
# File 'lib/volt/server/message_bus/peer_to_peer.rb', line 156 def remove_duplicate_connections peer_server_ids = {} # remove any we are connected to twice @peer_connections.keys.each do |peer| peer_id = peer.peer_server_id if peer_id # peer is connected if peer_server_ids[peer_id] # Peer is already connected somewhere else, remove connection peer.disconnect! # remove the connection @peer_connections.delete(peer) else # Mark that we are connected peer_server_ids[peer_id] = true end end end end |
#remove_peer_connection(peer_connection) ⇒ Object
143 144 145 146 |
# File 'lib/volt/server/message_bus/peer_to_peer.rb', line 143 def remove_peer_connection(peer_connection) @peer_connections.delete(peer_connection) @peer_server_ids.delete(peer_connection.peer_server_id) end |
#setup_peer_server ⇒ Object
The peer server maintains a socket other instances can connect to.
85 86 87 |
# File 'lib/volt/server/message_bus/peer_to_peer.rb', line 85 def setup_peer_server @peer_server = PeerServer.new(self) end |
#start_tracker ⇒ Object
The tracker updates the socket ip’s and port and a timestamp into the database every minute. If the timestamp is more than 2 minutes old, an instance is marked as “dead” and removed.
92 93 94 95 96 97 98 |
# File 'lib/volt/server/message_bus/peer_to_peer.rb', line 92 def start_tracker @server_tracker = ServerTracker.new(@volt_app, @server_id, @peer_server.port) # Do the initial registration, and wait until its done before connecting # to peers. @server_tracker.register() end |
#still_alive?(peer_server_id) ⇒ Boolean
Returns true if the server is still reporting as alive.
181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 |
# File 'lib/volt/server/message_bus/peer_to_peer.rb', line 181 def still_alive?(peer_server_id) # Unable to write to the socket, retry until the instance is no # longer marking its self as active in the database peer_table = @volt_app.store.active_volt_instances peer = peer_table.where(server_id: peer_server_id).first.sync if peer # Found the peer, retry if it has reported in in the last 2 # minutes. if peer._time > (Time.now.to_i - DEAD_TIME) # Peer reported in less than 2 minutes ago return true else # Delete the entry peer.destroy end end false end |