Class: Volt::MessageBus::PeerToPeer

Inherits:
BaseMessageBus show all
Includes:
Eventable
Defined in:
lib/volt/server/message_bus/peer_to_peer.rb

Constant Summary collapse

DEAD_TIME =

How long without an update before we mark an instance as dead (in seconds)

20

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Eventable

#on, #remove_listener, #trigger!

Constructor Details

#initialize(volt_app) ⇒ PeerToPeer

Returns a new instance of PeerToPeer.



60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
# File 'lib/volt/server/message_bus/peer_to_peer.rb', line 60

def initialize(volt_app)
  @volt_app = volt_app

  if Volt::DataStore.fetch.connected?
    # Generate a guid
    @server_id = SecureRandom.uuid
    # The PeerConnection's to peers
    @peer_connections = {}
    # The server id's for each peer we're connected to
    @peer_server_ids = {}

    setup_peer_server
    start_tracker

    @peer_connection_threads = []

    @connect_thread = Thread.new do
      connect_to_peers
    end
  else
    Volt.logger.error('Unable to connect to the database.  Currently Volt requires running mongodb for a few things to work.  Volt will still run, but the message bus requires a database connection to setup connections between nodes, so the message bus has been disabled.  Also, the store collection can not be used without a database.  This means updates will not be propagated between instances (server, console, runners, etc...)')
  end
end

Instance Attribute Details

#server_idObject (readonly)

Returns the value of attribute server_id.



58
59
60
# File 'lib/volt/server/message_bus/peer_to_peer.rb', line 58

def server_id
  @server_id
end

Instance Method Details

#add_peer_connection(peer_connection) ⇒ Object



138
139
140
141
# File 'lib/volt/server/message_bus/peer_to_peer.rb', line 138

def add_peer_connection(peer_connection)
  @peer_connections[peer_connection] = true
  @peer_server_ids[peer_connection.peer_server_id] = true
end

#connect_to_peersObject



120
121
122
123
124
125
126
127
128
129
130
# File 'lib/volt/server/message_bus/peer_to_peer.rb', line 120

def connect_to_peers
  peers.each do |peer|
    # Start connecting to all at the same time.  Since most will connect or
    # timeout, this is the desired behaviour.
    # sometimes we get nil peers for some reason
    if peer
      peer_connection = PeerConnection.new(nil, peer._ips, peer._port, self, false, peer._server_id)
      add_peer_connection(peer_connection)
    end
  end
end

#disconnect!Object

Blocks until all peers have connected or timed out.



133
134
135
136
# File 'lib/volt/server/message_bus/peer_to_peer.rb', line 133

def disconnect!
  # Wait for disconnect on each
  @peer_connections.keys.each(&:disconnect!)
end

#handle_message(message) ⇒ Object

Called when a message comes in



149
150
151
152
# File 'lib/volt/server/message_bus/peer_to_peer.rb', line 149

def handle_message(message)
  channel_name, message = message.split('|', 2)
  trigger!(channel_name, message)
end

#peersObject

Return an array of peer records.



114
115
116
117
118
# File 'lib/volt/server/message_bus/peer_to_peer.rb', line 114

def peers
  instances = @volt_app.store._active_volt_instances

  instances.where(server_id: {'$ne' => @server_id}).all.sync
end

#publish(channel, message) ⇒ Object



100
101
102
103
104
105
106
107
108
109
110
111
# File 'lib/volt/server/message_bus/peer_to_peer.rb', line 100

def publish(channel, message)
  full_msg = "#{channel}|#{message}"
  @peer_connections.keys.each do |peer|
    begin
      # Queue message on each peer
      peer.publish(full_msg)
    rescue IOError => e
      # Connection to peer lost
      Volt.logger.warn("Message bus connection to peer lost: #{e}")
    end
  end
end

#remove_duplicate_connectionsObject

We only want one connection between two instances, this loops through each connection



156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
# File 'lib/volt/server/message_bus/peer_to_peer.rb', line 156

def remove_duplicate_connections
  peer_server_ids = {}

  # remove any we are connected to twice
  @peer_connections.keys.each do |peer|
    peer_id = peer.peer_server_id

    if peer_id
      # peer is connected

      if peer_server_ids[peer_id]
        # Peer is already connected somewhere else, remove connection
        peer.disconnect!

        # remove the connection
        @peer_connections.delete(peer)
      else
        # Mark that we are connected
        peer_server_ids[peer_id] = true
      end
    end
  end
end

#remove_peer_connection(peer_connection) ⇒ Object



143
144
145
146
# File 'lib/volt/server/message_bus/peer_to_peer.rb', line 143

def remove_peer_connection(peer_connection)
  @peer_connections.delete(peer_connection)
  @peer_server_ids.delete(peer_connection.peer_server_id)
end

#setup_peer_serverObject

The peer server maintains a socket other instances can connect to.



85
86
87
# File 'lib/volt/server/message_bus/peer_to_peer.rb', line 85

def setup_peer_server
  @peer_server = PeerServer.new(self)
end

#start_trackerObject

The tracker updates the socket ip’s and port and a timestamp into the database every minute. If the timestamp is more than 2 minutes old, an instance is marked as “dead” and removed.



92
93
94
95
96
97
98
# File 'lib/volt/server/message_bus/peer_to_peer.rb', line 92

def start_tracker
  @server_tracker = ServerTracker.new(@volt_app, @server_id, @peer_server.port)

  # Do the initial registration, and wait until its done before connecting
  # to peers.
  @server_tracker.register()
end

#still_alive?(peer_server_id) ⇒ Boolean

Returns true if the server is still reporting as alive.

Returns:



181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
# File 'lib/volt/server/message_bus/peer_to_peer.rb', line 181

def still_alive?(peer_server_id)
  # Unable to write to the socket, retry until the instance is no
  # longer marking its self as active in the database
  peer_table = @volt_app.store.active_volt_instances
  peer = peer_table.where(server_id: peer_server_id).first.sync
  if peer
    # Found the peer, retry if it has reported in in the last 2
    # minutes.
    if peer._time > (Time.now.to_i - DEAD_TIME)
      # Peer reported in less than 2 minutes ago
      return true
    else
      # Delete the entry
      peer.destroy
    end
  end

  false
end