Module: RSMP::Supervisor::Modules::Connection

Included in:
RSMP::Supervisor
Defined in:
lib/rsmp/node/supervisor/modules/connection.rb

Overview

Handles incoming connections from sites

Instance Method Summary collapse

Instance Method Details

#accept?(_socket, _info) ⇒ Boolean

Returns:

  • (Boolean)


27
28
29
# File 'lib/rsmp/node/supervisor/modules/connection.rb', line 27

def accept?(_socket, _info)
  true
end

#accept_connection(socket, info) ⇒ Object



105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
# File 'lib/rsmp/node/supervisor/modules/connection.rb', line 105

def accept_connection(socket, info)
  log "Site connected from #{format_ip_and_port(info)}",
      ip: info[:ip],
      port: info[:port],
      level: :info,
      timestamp: Clock.now

  authorize_ip info[:ip]

  settings = build_proxy_settings(socket, info)
  id = retrieve_site_id(settings[:protocol])
  proxy = setup_proxy(find_site(id), settings, id)

  validate_and_start_proxy(proxy, settings[:protocol])
ensure
  site_ids_changed
  stop if @supervisor_settings['one_shot']
end

#authorize_ip(ip) ⇒ Object

Raises:



39
40
41
42
43
44
# File 'lib/rsmp/node/supervisor/modules/connection.rb', line 39

def authorize_ip(ip)
  return if @supervisor_settings['ips'] == 'all'
  return if @supervisor_settings['ips'].include? ip

  raise ConnectionError, 'guest ip not allowed'
end

#build_proxy_settings(socket, info) ⇒ Object



60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
# File 'lib/rsmp/node/supervisor/modules/connection.rb', line 60

def build_proxy_settings(socket, info)
  stream = IO::Stream::Buffered.new(socket)
  protocol = RSMP::Protocol.new stream

  {
    supervisor: self,
    ip: info[:ip],
    port: info[:port],
    task: @task,
    collect: @collect,
    socket: socket,
    stream: stream,
    protocol: protocol,
    info: info,
    logger: @logger,
    archive: @archive
  }
end

#check_max_sitesObject

Raises:



46
47
48
49
50
51
52
# File 'lib/rsmp/node/supervisor/modules/connection.rb', line 46

def check_max_sites
  max = @supervisor_settings['max_sites']
  return unless max
  return unless @proxies.size >= max

  raise ConnectionError, "maximum of #{max} sites already connected"
end

#close(socket, info) ⇒ Object



128
129
130
131
132
133
134
135
136
# File 'lib/rsmp/node/supervisor/modules/connection.rb', line 128

def close(socket, info)
  if info
    log "Connection to #{format_ip_and_port(info)} closed", ip: info[:ip], level: :info, timestamp: Clock.now
  else
    log 'Connection closed', level: :info, timestamp: Clock.now
  end

  socket.close
end

#format_ip_and_port(info) ⇒ Object



31
32
33
34
35
36
37
# File 'lib/rsmp/node/supervisor/modules/connection.rb', line 31

def format_ip_and_port(info)
  if @logger.settings['hide_ip_and_port']
    '********'
  else
    "#{info[:ip]}:#{info[:port]}"
  end
end

#handle_connection(socket) ⇒ Object



6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# File 'lib/rsmp/node/supervisor/modules/connection.rb', line 6

def handle_connection(socket)
  remote_port = socket.remote_address.ip_port
  remote_hostname = socket.remote_address.ip_address
  remote_ip = socket.remote_address.ip_address

  info = { ip: remote_ip, port: remote_port, hostname: remote_hostname, now: Clock.now }
  if accept? socket, info
    accept_connection socket, info
  else
    reject_connection socket, info
  end
rescue ConnectionError, HandshakeError => e
  log "Rejected connection from #{remote_ip}:#{remote_port}, #{e}", level: :warning
  distribute_error e
rescue StandardError => e
  log "Connection: #{e}", exception: e, level: :error
  distribute_error e, level: :internal
ensure
  close socket, info
end

#peek_version_message(protocol) ⇒ Object



54
55
56
57
58
# File 'lib/rsmp/node/supervisor/modules/connection.rb', line 54

def peek_version_message(protocol)
  json = protocol.peek_line
  attributes = Message.parse_attributes json
  Message.build attributes, json
end

#reject_connection(_socket, info) ⇒ Object



124
125
126
# File 'lib/rsmp/node/supervisor/modules/connection.rb', line 124

def reject_connection(_socket, info)
  log 'Site rejected', ip: info[:ip], level: :info
end

#retrieve_site_id(protocol) ⇒ Object



79
80
81
82
# File 'lib/rsmp/node/supervisor/modules/connection.rb', line 79

def retrieve_site_id(protocol)
  version_message = peek_version_message protocol
  version_message.attribute('siteId').first['sId']
end

#setup_proxy(proxy, settings, id) ⇒ Object



84
85
86
87
88
89
90
91
92
93
94
95
# File 'lib/rsmp/node/supervisor/modules/connection.rb', line 84

def setup_proxy(proxy, settings, id)
  if proxy
    raise ConnectionError, "Site #{id} alredy connected from port #{proxy.port}" if proxy.connected?

    proxy.revive settings
  else
    check_max_sites
    proxy = build_proxy settings.merge(site_id: id)
    @proxies.push proxy
  end
  proxy
end

#validate_and_start_proxy(proxy, protocol) ⇒ Object



97
98
99
100
101
102
103
# File 'lib/rsmp/node/supervisor/modules/connection.rb', line 97

def validate_and_start_proxy(proxy, protocol)
  proxy.setup_site_settings
  proxy.check_core_version peek_version_message(protocol)
  log "Validating using core version #{proxy.core_version}", level: :debug
  proxy.start
  proxy.wait
end