Module: RSMP::Supervisor::Modules::Connection
- Included in:
- RSMP::Supervisor
- Defined in:
- lib/rsmp/node/supervisor/modules/connection.rb
Overview
Handles incoming connections from sites
Instance Method Summary collapse
- #accept?(_socket, _info) ⇒ Boolean
- #accept_connection(socket, info) ⇒ Object
- #authorize_ip(ip) ⇒ Object
- #build_proxy_settings(socket, info) ⇒ Object
- #check_max_sites ⇒ Object
- #close(socket, info) ⇒ Object
- #format_ip_and_port(info) ⇒ Object
- #handle_connection(socket) ⇒ Object
- #peek_version_message(protocol) ⇒ Object
- #reject_connection(_socket, info) ⇒ Object
- #retrieve_site_id(protocol) ⇒ Object
- #setup_proxy(proxy, settings, id) ⇒ Object
- #validate_and_start_proxy(proxy, protocol) ⇒ Object
Instance Method Details
#accept?(_socket, _info) ⇒ Boolean
27 28 29 |
# File 'lib/rsmp/node/supervisor/modules/connection.rb', line 27 def accept?(_socket, _info) true end |
#accept_connection(socket, info) ⇒ Object
105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 |
# File 'lib/rsmp/node/supervisor/modules/connection.rb', line 105 def accept_connection(socket, info) log "Site connected from #{format_ip_and_port(info)}", ip: info[:ip], port: info[:port], level: :info, timestamp: Clock.now info[:ip] settings = build_proxy_settings(socket, info) id = retrieve_site_id(settings[:protocol]) proxy = setup_proxy(find_site(id), settings, id) validate_and_start_proxy(proxy, settings[:protocol]) ensure site_ids_changed stop if @supervisor_settings['one_shot'] end |
#authorize_ip(ip) ⇒ Object
39 40 41 42 43 44 |
# File 'lib/rsmp/node/supervisor/modules/connection.rb', line 39 def (ip) return if @supervisor_settings['ips'] == 'all' return if @supervisor_settings['ips'].include? ip raise ConnectionError, 'guest ip not allowed' end |
#build_proxy_settings(socket, info) ⇒ Object
60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 |
# File 'lib/rsmp/node/supervisor/modules/connection.rb', line 60 def build_proxy_settings(socket, info) stream = IO::Stream::Buffered.new(socket) protocol = RSMP::Protocol.new stream { supervisor: self, ip: info[:ip], port: info[:port], task: @task, collect: @collect, socket: socket, stream: stream, protocol: protocol, info: info, logger: @logger, archive: @archive } end |
#check_max_sites ⇒ Object
46 47 48 49 50 51 52 |
# File 'lib/rsmp/node/supervisor/modules/connection.rb', line 46 def check_max_sites max = @supervisor_settings['max_sites'] return unless max return unless @proxies.size >= max raise ConnectionError, "maximum of #{max} sites already connected" end |
#close(socket, info) ⇒ Object
128 129 130 131 132 133 134 135 136 |
# File 'lib/rsmp/node/supervisor/modules/connection.rb', line 128 def close(socket, info) if info log "Connection to #{format_ip_and_port(info)} closed", ip: info[:ip], level: :info, timestamp: Clock.now else log 'Connection closed', level: :info, timestamp: Clock.now end socket.close end |
#format_ip_and_port(info) ⇒ Object
31 32 33 34 35 36 37 |
# File 'lib/rsmp/node/supervisor/modules/connection.rb', line 31 def format_ip_and_port(info) if @logger.settings['hide_ip_and_port'] '********' else "#{info[:ip]}:#{info[:port]}" end end |
#handle_connection(socket) ⇒ Object
6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
# File 'lib/rsmp/node/supervisor/modules/connection.rb', line 6 def handle_connection(socket) remote_port = socket.remote_address.ip_port remote_hostname = socket.remote_address.ip_address remote_ip = socket.remote_address.ip_address info = { ip: remote_ip, port: remote_port, hostname: remote_hostname, now: Clock.now } if accept? socket, info accept_connection socket, info else reject_connection socket, info end rescue ConnectionError, HandshakeError => e log "Rejected connection from #{remote_ip}:#{remote_port}, #{e}", level: :warning distribute_error e rescue StandardError => e log "Connection: #{e}", exception: e, level: :error distribute_error e, level: :internal ensure close socket, info end |
#peek_version_message(protocol) ⇒ Object
54 55 56 57 58 |
# File 'lib/rsmp/node/supervisor/modules/connection.rb', line 54 def (protocol) json = protocol.peek_line attributes = Message.parse_attributes json Message.build attributes, json end |
#reject_connection(_socket, info) ⇒ Object
124 125 126 |
# File 'lib/rsmp/node/supervisor/modules/connection.rb', line 124 def reject_connection(_socket, info) log 'Site rejected', ip: info[:ip], level: :info end |
#retrieve_site_id(protocol) ⇒ Object
79 80 81 82 |
# File 'lib/rsmp/node/supervisor/modules/connection.rb', line 79 def retrieve_site_id(protocol) = protocol .attribute('siteId').first['sId'] end |
#setup_proxy(proxy, settings, id) ⇒ Object
84 85 86 87 88 89 90 91 92 93 94 95 |
# File 'lib/rsmp/node/supervisor/modules/connection.rb', line 84 def setup_proxy(proxy, settings, id) if proxy raise ConnectionError, "Site #{id} alredy connected from port #{proxy.port}" if proxy.connected? proxy.revive settings else check_max_sites proxy = build_proxy settings.merge(site_id: id) @proxies.push proxy end proxy end |
#validate_and_start_proxy(proxy, protocol) ⇒ Object
97 98 99 100 101 102 103 |
# File 'lib/rsmp/node/supervisor/modules/connection.rb', line 97 def validate_and_start_proxy(proxy, protocol) proxy.setup_site_settings proxy.check_core_version (protocol) log "Validating using core version #{proxy.core_version}", level: :debug proxy.start proxy.wait end |