Class: Volt::MessageBus::PeerConnection

Inherits:
Object
  • Object
show all
Defined in:
lib/volt/server/message_bus/peer_to_peer/peer_connection.rb

Constant Summary collapse

CONNECT_TIMEOUT =
2

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(socket, ips, port, message_bus, server = false, peer_server_id = nil) ⇒ PeerConnection

Returns a new instance of PeerConnection.



17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
# File 'lib/volt/server/message_bus/peer_to_peer/peer_connection.rb', line 17

def initialize(socket, ips, port, message_bus, server=false, peer_server_id=nil)
  @message_bus = message_bus
  @ips = ips
  @port = port
  @server = server
  @socket = socket
  @server_id = message_bus.server_id
  @peer_server_id = peer_server_id
  @message_queue = SizedQueue.new(500)
  @reconnect_mutex = Mutex.new

  # The encoder handles things like formatting and encryption
  @message_encoder = MessageEncoder.new

  @worker_thread = Thread.new do
    # Connect to the remote if this PeerConnection was created from the
    # active_volt_instances collection.
    #
    # reconnect! will setup the @socket
    if @socket || reconnect!
      # Announce checks to make sure we didn't connect to ourselves
      if announce
        # Setp the listen thread.
        @listen_thread = Thread.new do
          # Listen for messages in a new thread
          listen
        end

        run_worker
      end

    end
  end
end

Instance Attribute Details

#peer_server_idObject (readonly)

The server id for the connected server



15
16
17
# File 'lib/volt/server/message_bus/peer_to_peer/peer_connection.rb', line 15

def peer_server_id
  @peer_server_id
end

#socketObject (readonly)

The server id for the connected server



15
16
17
# File 'lib/volt/server/message_bus/peer_to_peer/peer_connection.rb', line 15

def socket
  @socket
end

Instance Method Details

#announceObject

Tells the other connect its server_id. In the event we connected to ourself, close.



54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
# File 'lib/volt/server/message_bus/peer_to_peer/peer_connection.rb', line 54

def announce
  failed = false
  begin
    if @server
      # Wait for announcement
      @peer_server_id = @message_encoder.receive_message(@socket)
      @message_encoder.send_message(@socket, @server_id)
    else
      # Announce
      @message_encoder.send_message(@socket, @server_id)
      @peer_server_id = @message_encoder.receive_message(@socket)
    end
  rescue IOError => e
    failed = true
  end

  # Make sure we aren't already connected
  @message_bus.remove_duplicate_connections

  # Don't connect to self
  if failed || @peer_server_id == @server_id
    # Close the connection
    disconnect!
    return false
  end

  # Success
  return true
end

#disconnect!Object

Close the socket, kill listener thread, wait for worker thread to send all messages, and remove from message_bus’s peer_connections.



86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
# File 'lib/volt/server/message_bus/peer_to_peer/peer_connection.rb', line 86

def disconnect!
  @disconnected = true
  @message_queue.push(:QUIT)
  begin
    @socket.close
  rescue => e
    # Ignore close error, since we may not be connected
  end

  @listen_thread.kill if @listen_thread
  # @worker_thread.kill

  # Wait for the worker to publish all messages
  @worker_thread.join if Thread.current != @worker_thread && @worker_thread

  @message_bus.remove_peer_connection(self)
end

#listenObject



126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
# File 'lib/volt/server/message_bus/peer_to_peer/peer_connection.rb', line 126

def listen
  loop do
    begin
      while (message = @message_encoder.receive_message(@socket))
        break if @disconnected
        @message_bus.handle_message(message)
      end

      # Got nil from socket
    rescue Errno::ECONNRESET, Errno::ENETUNREACH, Errno::EPIPE, IOError => e
      # handle below
    end

    if !@disconnected && !@server
      # Connection was dropped, try to reconnect
      connected = reconnect!

      # Couldn't reconnect, die
      break unless connected
    else
      break
    end
  end
end

#publish(message) ⇒ Object



104
105
106
# File 'lib/volt/server/message_bus/peer_to_peer/peer_connection.rb', line 104

def publish(message)
  @message_queue.push(message)
end

#run_workerObject



108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
# File 'lib/volt/server/message_bus/peer_to_peer/peer_connection.rb', line 108

def run_worker
  while (message = @message_queue.pop)
    break if message == :QUIT

    begin
      @message_encoder.send_message(@socket, message)
      # 'Error: closed stream' comes in sometimes
    rescue Errno::ECONNREFUSED, Errno::ENETUNREACH, Errno::EPIPE, IOError => e # was also rescuing Error
      if reconnect!
        retry
      else
        # Unable to reconnect, die
        break
      end
    end
  end
end