Class: CarbonMU::EdgeRouter

Inherits:
Object
  • Object
show all
Includes:
Celluloid::IO, Celluloid::Logger, Celluloid::ZMQ
Defined in:
lib/carbonmu/edge_router/edge_router.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(host, port) ⇒ EdgeRouter

Returns a new instance of EdgeRouter.



18
19
20
21
22
23
24
25
26
27
28
# File 'lib/carbonmu/edge_router/edge_router.rb', line 18

def initialize(host, port)
  info "*** Starting CarbonMU edge router."
  @receptors = TelnetReceptor.new(host,port)
  @connections = []

  @ipc_reader = ReadSocket.new
  info "*** Edge router waiting for IPC on port #{@ipc_reader.port_number}"
  async.run

  start_server
end

Instance Attribute Details

#connectionsObject (readonly)

Returns the value of attribute connections.



16
17
18
# File 'lib/carbonmu/edge_router/edge_router.rb', line 16

def connections
  @connections
end

#receptorsObject (readonly)

Returns the value of attribute receptors.



16
17
18
# File 'lib/carbonmu/edge_router/edge_router.rb', line 16

def receptors
  @receptors
end

Instance Method Details

#add_connection(connection) ⇒ Object



41
42
43
44
# File 'lib/carbonmu/edge_router/edge_router.rb', line 41

def add_connection(connection)
  @connections << connection
  send_connect_to_server(connection)
end

#handle_server_message(input) ⇒ Object



86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
# File 'lib/carbonmu/edge_router/edge_router.rb', line 86

def handle_server_message(input)
  message = IPCMessage.unserialize(input)
  debug "EDGE ROUTER RECEIVE: #{message}" if CarbonMU.configuration.log_ipc_traffic
  case message.op
  when :started
    handle_server_started(message.pid, message.port)
  when :write
    conn = @connections.select {|x| x.id == message.connection_id}.first # TODO look for efficiency here
    conn.write(message.output)
  when :reboot
    reboot_server
  when :retrieve_existing_connections
    info 'Sending connections to server...'
    @connections.each do |conn|
      send_connect_to_server(conn)
    end
  else
    raise ArgumentError, "Unsupported operation '#{message.op}' received from Server."
  end
end

#handle_server_started(pid, port) ⇒ Object



34
35
36
37
38
39
# File 'lib/carbonmu/edge_router/edge_router.rb', line 34

def handle_server_started(pid, port)
  debug "*** Edge router received server IPC start. Pid #{pid}, port #{port}." if CarbonMU.configuration.log_ipc_traffic
  @current_server_pid = pid
  Process.detach(pid)
  @ipc_writer = WriteSocket.new(port)
end

#reboot_serverObject



80
81
82
83
84
# File 'lib/carbonmu/edge_router/edge_router.rb', line 80

def reboot_server
  warn "Reboot triggered!"
  Process.kill("TERM", @current_server_pid)
  start_server
end

#remove_connection(connection) ⇒ Object



46
47
48
49
# File 'lib/carbonmu/edge_router/edge_router.rb', line 46

def remove_connection(connection)
  @connections.delete(connection)
  send_disconnect_to_server(connection)
end

#runObject



56
57
58
59
60
# File 'lib/carbonmu/edge_router/edge_router.rb', line 56

def run
  loop do
    async.handle_server_message(@ipc_reader.read)
  end
end

#send_command_to_server(input, connection_id) ⇒ Object



70
71
72
# File 'lib/carbonmu/edge_router/edge_router.rb', line 70

def send_command_to_server(input, connection_id)
  send_message_to_server(:command, command: input, connection_id: connection_id)
end

#send_connect_to_server(connection) ⇒ Object



62
63
64
# File 'lib/carbonmu/edge_router/edge_router.rb', line 62

def send_connect_to_server(connection)
  send_message_to_server(:connect, connection_id: connection.id)
end

#send_disconnect_to_server(connection) ⇒ Object



66
67
68
# File 'lib/carbonmu/edge_router/edge_router.rb', line 66

def send_disconnect_to_server(connection)
  send_message_to_server(:disconnect, connection_id: connection.id)
end

#send_message_to_server(op, params = {}) ⇒ Object



74
75
76
77
78
# File 'lib/carbonmu/edge_router/edge_router.rb', line 74

def send_message_to_server(op, params={})
  message = IPCMessage.new(op, params)
  debug "EDGE ROUTER SEND: #{message}" if CarbonMU.configuration.log_ipc_traffic
  @ipc_writer.send message.serialize
end

#shutdownObject



51
52
53
54
# File 'lib/carbonmu/edge_router/edge_router.rb', line 51

def shutdown
  # TODO Tell all receptors and connections to quit.
  Process.kill("TERM", @current_server_pid)
end

#start_serverObject



30
31
32
# File 'lib/carbonmu/edge_router/edge_router.rb', line 30

def start_server
  spawn("bundle exec carbonmu start_server_only #{@ipc_reader.port_number}")
end