Class: Volt::MessageBus::PeerToPeer

Inherits:
BaseMessageBus show all
Includes:
Eventable
Defined in:
lib/volt/server/message_bus/peer_to_peer.rb

Constant Summary collapse

DEAD_TIME =

How long without an update before we mark an instance as dead (in seconds)

20

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Eventable

#on, #remove_listener, #trigger!

Constructor Details

#initialize(volt_app) ⇒ PeerToPeer

Returns a new instance of PeerToPeer.



60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
# File 'lib/volt/server/message_bus/peer_to_peer.rb', line 60

def initialize(volt_app)
  if Volt::DataStore.fetch.connected?
    # Generate a guid
    @server_id = SecureRandom.uuid
    # The PeerConnection's to peers
    @peer_connections = {}
    # The server id's for each peer we're connected to
    @peer_server_ids = {}

    @page = volt_app.page

    setup_peer_server
    start_tracker

    Thread.new do
      sleep 1

      connect_to_peers
    end
  else
    Volt.logger.error('Unable to connect to the database.  Volt will still run, but the message bus requires a database connection to setup connections between nodes, so the message bus has been disabled.  This means updates will not be propigated between instances (server, console, runners, etc...)')
  end
end

Instance Attribute Details

#pageObject (readonly)

Returns the value of attribute page.



58
59
60
# File 'lib/volt/server/message_bus/peer_to_peer.rb', line 58

def page
  @page
end

#server_idObject (readonly)

Returns the value of attribute server_id.



58
59
60
# File 'lib/volt/server/message_bus/peer_to_peer.rb', line 58

def server_id
  @server_id
end

Instance Method Details

#add_peer_connection(peer_connection) ⇒ Object



140
141
142
143
# File 'lib/volt/server/message_bus/peer_to_peer.rb', line 140

def add_peer_connection(peer_connection)
  @peer_connections[peer_connection] = true
  @peer_server_ids[peer_connection.peer_server_id] = true
end

#connect_to_peersObject



120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
# File 'lib/volt/server/message_bus/peer_to_peer.rb', line 120

def connect_to_peers
  peers.each do |peer|
    # Start connecting to all at the same time.  Since most will connect or
    # timeout, this is the desired behaviour.
    Thread.new do
      # sometimes we get nil peers for some reason
      if peer
        peer_connection = PeerConnection.connect_to(self, peer._ips, peer._port)

        if peer_connection
          add_peer_connection(peer_connection)
        else
          # remove if not alive anymore.
          still_alive?(peer._server_id)
        end
      end
    end
  end
end

#handle_message(message) ⇒ Object

Called when a message comes in



151
152
153
154
# File 'lib/volt/server/message_bus/peer_to_peer.rb', line 151

def handle_message(message)
  channel_name, message = message.split('|', 2)
  trigger!(channel_name, message)
end

#peersObject

Return an array of peer records.



114
115
116
117
118
# File 'lib/volt/server/message_bus/peer_to_peer.rb', line 114

def peers
  instances = @page.store._active_volt_instances

  instances.where(server_id: {'$ne' => @server_id}).all.sync
end

#publish(channel, message) ⇒ Object



100
101
102
103
104
105
106
107
108
109
110
111
# File 'lib/volt/server/message_bus/peer_to_peer.rb', line 100

def publish(channel, message)
  full_msg = "#{channel}|#{message}"
  @peer_connections.keys.each do |peer|
    begin
      # Queue message on each peer
      peer.publish(full_msg)
    rescue IOError => e
      # Connection to peer lost
      Volt.logger.warn("Message bus connection to peer lost: #{e}")
    end
  end
end

#remove_duplicate_connectionsObject

We only want one connection between two instances, this loops through each connection



158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
# File 'lib/volt/server/message_bus/peer_to_peer.rb', line 158

def remove_duplicate_connections
  peer_server_ids = {}

  # remove any we are connected to twice
  @peer_connections.keys.each do |peer|
    peer_id = peer.peer_server_id

    if peer_id
      # peer is connected

      if peer_server_ids[peer_id]
        # Peer is already connected somewhere else, remove connection
        peer.disconnect

        # remove the connection
        @peer_connections.delete(peer)
      else
        # Mark that we are connected
        peer_server_ids[peer_id] = true
      end
    end
  end
end

#remove_peer_connection(peer_connection) ⇒ Object



145
146
147
148
# File 'lib/volt/server/message_bus/peer_to_peer.rb', line 145

def remove_peer_connection(peer_connection)
  @peer_connections.delete(peer_connection)
  @peer_server_ids.delete(peer_connection.peer_server_id)
end

#setup_peer_serverObject

The peer server maintains a socket other instances can connect to.



85
86
87
# File 'lib/volt/server/message_bus/peer_to_peer.rb', line 85

def setup_peer_server
  @peer_server = PeerServer.new(self)
end

#start_trackerObject

The tracker updates the socket ip’s and port and a timestamp into the database every minute. If the timestamp is more than 2 minutes old, an instance is marked as “dead” and removed.



92
93
94
95
96
97
98
# File 'lib/volt/server/message_bus/peer_to_peer.rb', line 92

def start_tracker
  @server_tracker = ServerTracker.new(page, @server_id, @peer_server.port)

  # Do the initial registration, and wait until its done before connecting
  # to peers.
  @server_tracker.register()
end

#still_alive?(peer_server_id) ⇒ Boolean

Returns true if the server is still reporting as alive.

Returns:



183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
# File 'lib/volt/server/message_bus/peer_to_peer.rb', line 183

def still_alive?(peer_server_id)
  # Unable to write to the socket, retry until the instance is no
  # longer marking its self as active in the database
  peer_table = @page.store._active_volt_instances
  peer = peer_table.where(server_id: peer_server_id).first.sync
  if peer
    # Found the peer, retry if it has reported in in the last 2
    # minutes.
    if peer._time > (Time.now.to_i - DEAD_TIME)
      # Peer reported in less than 2 minutes ago
      return true
    else
      # Delete the entry
      peer.destroy
    end
  end

  false
end