Class: Volt::MessageBus::PeerToPeer
- Inherits:
-
BaseMessageBus
- Object
- BaseMessageBus
- Volt::MessageBus::PeerToPeer
- Includes:
- Eventable
- Defined in:
- lib/volt/server/message_bus/peer_to_peer.rb
Constant Summary collapse
- DEAD_TIME =
How long without an update before we mark an instance as dead (in seconds)
20
Instance Attribute Summary collapse
-
#page ⇒ Object
readonly
Returns the value of attribute page.
-
#server_id ⇒ Object
readonly
Returns the value of attribute server_id.
Instance Method Summary collapse
- #add_peer_connection(peer_connection) ⇒ Object
- #connect_to_peers ⇒ Object
-
#handle_message(message) ⇒ Object
Called when a message comes in.
-
#initialize(volt_app) ⇒ PeerToPeer
constructor
A new instance of PeerToPeer.
-
#peers ⇒ Object
Return an array of peer records.
- #publish(channel, message) ⇒ Object
-
#remove_duplicate_connections ⇒ Object
We only want one connection between two instances, this loops through each connection.
- #remove_peer_connection(peer_connection) ⇒ Object
-
#setup_peer_server ⇒ Object
The peer server maintains a socket other instances can connect to.
-
#start_tracker ⇒ Object
The tracker updates the socket ip’s and port and a timestamp into the database every minute.
-
#still_alive?(peer_server_id) ⇒ Boolean
Returns true if the server is still reporting as alive.
Methods included from Eventable
#on, #remove_listener, #trigger!
Constructor Details
#initialize(volt_app) ⇒ PeerToPeer
Returns a new instance of PeerToPeer.
60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 |
# File 'lib/volt/server/message_bus/peer_to_peer.rb', line 60 def initialize(volt_app) if Volt::DataStore.fetch.connected? # Generate a guid @server_id = SecureRandom.uuid # The PeerConnection's to peers @peer_connections = {} # The server id's for each peer we're connected to @peer_server_ids = {} @page = volt_app.page setup_peer_server start_tracker Thread.new do sleep 1 connect_to_peers end else Volt.logger.error('Unable to connect to the database. Volt will still run, but the message bus requires a database connection to setup connections between nodes, so the message bus has been disabled. This means updates will not be propigated between instances (server, console, runners, etc...)') end end |
Instance Attribute Details
#page ⇒ Object (readonly)
Returns the value of attribute page.
58 59 60 |
# File 'lib/volt/server/message_bus/peer_to_peer.rb', line 58 def page @page end |
#server_id ⇒ Object (readonly)
Returns the value of attribute server_id.
58 59 60 |
# File 'lib/volt/server/message_bus/peer_to_peer.rb', line 58 def server_id @server_id end |
Instance Method Details
#add_peer_connection(peer_connection) ⇒ Object
140 141 142 143 |
# File 'lib/volt/server/message_bus/peer_to_peer.rb', line 140 def add_peer_connection(peer_connection) @peer_connections[peer_connection] = true @peer_server_ids[peer_connection.peer_server_id] = true end |
#connect_to_peers ⇒ Object
120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 |
# File 'lib/volt/server/message_bus/peer_to_peer.rb', line 120 def connect_to_peers peers.each do |peer| # Start connecting to all at the same time. Since most will connect or # timeout, this is the desired behaviour. Thread.new do # sometimes we get nil peers for some reason if peer peer_connection = PeerConnection.connect_to(self, peer._ips, peer._port) if peer_connection add_peer_connection(peer_connection) else # remove if not alive anymore. still_alive?(peer._server_id) end end end end end |
#handle_message(message) ⇒ Object
Called when a message comes in
151 152 153 154 |
# File 'lib/volt/server/message_bus/peer_to_peer.rb', line 151 def () channel_name, = .split('|', 2) trigger!(channel_name, ) end |
#peers ⇒ Object
Return an array of peer records.
114 115 116 117 118 |
# File 'lib/volt/server/message_bus/peer_to_peer.rb', line 114 def peers instances = @page.store._active_volt_instances instances.where(server_id: {'$ne' => @server_id}).all.sync end |
#publish(channel, message) ⇒ Object
100 101 102 103 104 105 106 107 108 109 110 111 |
# File 'lib/volt/server/message_bus/peer_to_peer.rb', line 100 def publish(channel, ) full_msg = "#{channel}|#{}" @peer_connections.keys.each do |peer| begin # Queue message on each peer peer.publish(full_msg) rescue IOError => e # Connection to peer lost Volt.logger.warn("Message bus connection to peer lost: #{e}") end end end |
#remove_duplicate_connections ⇒ Object
We only want one connection between two instances, this loops through each connection
158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 |
# File 'lib/volt/server/message_bus/peer_to_peer.rb', line 158 def remove_duplicate_connections peer_server_ids = {} # remove any we are connected to twice @peer_connections.keys.each do |peer| peer_id = peer.peer_server_id if peer_id # peer is connected if peer_server_ids[peer_id] # Peer is already connected somewhere else, remove connection peer.disconnect # remove the connection @peer_connections.delete(peer) else # Mark that we are connected peer_server_ids[peer_id] = true end end end end |
#remove_peer_connection(peer_connection) ⇒ Object
145 146 147 148 |
# File 'lib/volt/server/message_bus/peer_to_peer.rb', line 145 def remove_peer_connection(peer_connection) @peer_connections.delete(peer_connection) @peer_server_ids.delete(peer_connection.peer_server_id) end |
#setup_peer_server ⇒ Object
The peer server maintains a socket other instances can connect to.
85 86 87 |
# File 'lib/volt/server/message_bus/peer_to_peer.rb', line 85 def setup_peer_server @peer_server = PeerServer.new(self) end |
#start_tracker ⇒ Object
The tracker updates the socket ip’s and port and a timestamp into the database every minute. If the timestamp is more than 2 minutes old, an instance is marked as “dead” and removed.
92 93 94 95 96 97 98 |
# File 'lib/volt/server/message_bus/peer_to_peer.rb', line 92 def start_tracker @server_tracker = ServerTracker.new(page, @server_id, @peer_server.port) # Do the initial registration, and wait until its done before connecting # to peers. @server_tracker.register() end |
#still_alive?(peer_server_id) ⇒ Boolean
Returns true if the server is still reporting as alive.
183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 |
# File 'lib/volt/server/message_bus/peer_to_peer.rb', line 183 def still_alive?(peer_server_id) # Unable to write to the socket, retry until the instance is no # longer marking its self as active in the database peer_table = @page.store._active_volt_instances peer = peer_table.where(server_id: peer_server_id).first.sync if peer # Found the peer, retry if it has reported in in the last 2 # minutes. if peer._time > (Time.now.to_i - DEAD_TIME) # Peer reported in less than 2 minutes ago return true else # Delete the entry peer.destroy end end false end |