Module: PryRemoteEm::Broker

Includes:
Proto
Defined in:
lib/pry-remote-em/broker.rb

Constant Summary

Constants included from Proto

Proto::PREAMBLE, Proto::PREAMBLE_LEN, Proto::SEPERATOR, Proto::SEPERATOR_LEN

Class Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Proto

#receive_auth, #receive_banner, #receive_completion, #receive_data, #receive_json, #receive_msg, #receive_msg_bcast, #receive_prompt, #receive_raw, #receive_shell_cmd, #receive_shell_data, #receive_shell_result, #receive_shell_sig, #receive_start_tls, #receive_unknown, #send_auth, #send_banner, #send_completion, #send_data, #send_heatbeat, #send_json, #send_msg, #send_msg_bcast, #send_prompt, #send_proxy_connection, #send_raw, #send_register_server, #send_server_list, #send_shell_cmd, #send_shell_result, #send_start_tls, #send_unregister_server

Class Attribute Details

.hostObject (readonly)

Returns the value of attribute host.



9
10
11
# File 'lib/pry-remote-em/broker.rb', line 9

def host
  @host
end

.listeningObject (readonly) Also known as: listening?

Returns the value of attribute listening.



9
10
11
# File 'lib/pry-remote-em/broker.rb', line 9

def listening
  @listening
end

.portObject (readonly)

Returns the value of attribute port.



9
10
11
# File 'lib/pry-remote-em/broker.rb', line 9

def port
  @port
end

Class Method Details

.connected?Boolean

Returns:

  • (Boolean)


100
101
102
# File 'lib/pry-remote-em/broker.rb', line 100

def connected?
  @connected
end

.expand_url(url) ⇒ Object



75
76
77
78
79
# File 'lib/pry-remote-em/broker.rb', line 75

def expand_url(url)
  return Array(url) if (u = URI.parse(url)).host != '0.0.0.0'
  Socket.ip_address_list.select { |a| a.ipv4? }
   .map(&:ip_address).map{|i| u.clone.tap{|mu| mu.host = i } }
end

.hbeatsObject



96
97
98
# File 'lib/pry-remote-em/broker.rb', line 96

def hbeats
  @hbeats ||= {}
end

.logObject



54
55
56
57
# File 'lib/pry-remote-em/broker.rb', line 54

def log
  return opts[:logger] if opts[:logger]
  @log ||= Logger.new(STDERR)
end

.optsObject



50
51
52
# File 'lib/pry-remote-em/broker.rb', line 50

def opts
  @opts ||= {}
end

.register(url, name = 'unknown') ⇒ Object



63
64
65
66
67
# File 'lib/pry-remote-em/broker.rb', line 63

def register(url, name = 'unknown')
  expand_url(url).each do |u|
    client { |c| c.send_register_server(u, name) }
  end
end

.restartObject



38
39
40
41
42
43
44
45
46
47
48
# File 'lib/pry-remote-em/broker.rb', line 38

def restart
  log.info("[pry-remote-em broker] restarting on pryem://#{host}:#{port}")
  @waiting   = nil
  @client    = nil
  run(@host, @port, @opts) do
    PryRemoteEm.servers.each do |url, (sig, name)|
      next unless EM.get_sockname(sig)
      register(url, name)
    end
  end
end

.run(host = ENV['PRYEMBROKER'] || DEF_BROKERHOST, port = ENV['PRYEMBROKERPORT'] || DEF_BROKERPORT, opts = {:tls => false}) ⇒ Object



12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# File 'lib/pry-remote-em/broker.rb', line 12

def run(host = ENV['PRYEMBROKER'] || DEF_BROKERHOST, port = ENV['PRYEMBROKERPORT'] || DEF_BROKERPORT, opts = {:tls => false})
  raise "root permission required for port below 1024 (#{port})" if port < 1024 && Process.euid != 0
  @host      = host
  @port      = port
  # Brokers cannot use SSL directly. If they do then when a proxy request to an SSL server is received
  # the client and server will not be able to negotiate a SSL session. The proxied traffic can be SSL
  # encrypted, but the SSL session will be between the client and the server.
  opts       = opts.dup
  opts[:tls] = false
  @opts      = opts
  begin
    EM.start_server(host, port, PryRemoteEm::Broker, opts) do |broker|
    end
    log.info("[pry-remote-em broker] listening on #{opts[:tls] ? 'pryems' : 'pryem'}://#{host}:#{port}")
    @listening = true
  rescue => e
    # EM 1.0.0.beta4's message tells us the port is in use; 0.12.10 just says, 'no acceptor'
    if (e.message.include?('port is in use') || e.message.include?('no acceptor'))
      # [pry-remote-em broker] a broker is already listening on #{host}:#{port}
    else
      raise e
    end
  end
  client { |c| yield self } if block_given?
end

.serversObject



59
60
61
# File 'lib/pry-remote-em/broker.rb', line 59

def servers
  @servers ||= {}
end

.timersObject



92
93
94
# File 'lib/pry-remote-em/broker.rb', line 92

def timers
  @timers ||= {}
end

.unregister(url) ⇒ Object



69
70
71
72
73
# File 'lib/pry-remote-em/broker.rb', line 69

def unregister(url)
  expand_url(url).each do |u|
    client { |c| c.send_unregister_server(u) }
  end
end

.watch_heartbeats(url) ⇒ Object



81
82
83
84
85
86
87
88
89
90
# File 'lib/pry-remote-em/broker.rb', line 81

def watch_heartbeats(url)
  return if timers[url]
  timers[url] = EM::PeriodicTimer.new(20) do
    if !hbeats[url] || (Time.new - hbeats[url]) > 20
      servers.delete(url)
      timers[url].cancel
      timers.delete(url)
    end
  end
end

Instance Method Details

#initialize(opts = {:tls => false}, &blk) ⇒ Object



159
160
161
# File 'lib/pry-remote-em/broker.rb', line 159

def initialize(opts = {:tls => false}, &blk)
  @opts   = opts
end

#logObject



163
164
165
# File 'lib/pry-remote-em/broker.rb', line 163

def log
  Broker.log
end

#peer_ipObject



180
181
182
183
184
185
# File 'lib/pry-remote-em/broker.rb', line 180

def peer_ip
  return @peer_ip if @peer_ip
  return "" if get_peername.nil?
  @peer_port, @peer_ip = Socket.unpack_sockaddr_in(get_peername)
  @peer_ip
end

#peer_portObject



187
188
189
190
191
192
# File 'lib/pry-remote-em/broker.rb', line 187

def peer_port
  return @peer_port if @peer_port
  return "" if get_peername.nil?
  @peer_port, @peer_ip = Socket.unpack_sockaddr_in(get_peername)
  @peer_port
end

#post_initObject



167
168
169
170
171
172
# File 'lib/pry-remote-em/broker.rb', line 167

def post_init
  port, ip = Socket.unpack_sockaddr_in(get_peername)
  log.info("[pry-remote-em broker] received client connection from #{ip}:#{port}")
  send_banner("PryRemoteEm #{VERSION} #{@opts[:tls] ? 'pryems' : 'pryem'}")
  @opts[:tls] ? start_tls : send_server_list(Broker.servers)
end

#receive_proxy_connection(url) ⇒ Object



153
154
155
156
157
# File 'lib/pry-remote-em/broker.rb', line 153

def receive_proxy_connection(url)
  log.info("[pry-remote-em broker] proxying to #{url}")
  url = URI.parse(url)
  EM.connect(url.host, url.port, Client::Proxy, self)
end

#receive_register_server(url, name) ⇒ Object



136
137
138
139
140
141
142
143
144
# File 'lib/pry-remote-em/broker.rb', line 136

def receive_register_server(url, name)
  url      = URI.parse(url)
  url.host = peer_ip if ['0.0.0.0', 'localhost', '127.0.0.1'].include?(url.host)
  log.info("[pry-remote-em broker] registered #{url} - #{name.inspect}") unless Broker.servers[url] == name
  Broker.servers[url] = name
  Broker.hbeats[url]  = Time.new
  Broker.watch_heartbeats(url)
  name
end

#receive_server_listObject



132
133
134
# File 'lib/pry-remote-em/broker.rb', line 132

def receive_server_list
  send_server_list(Broker.servers)
end

#receive_unregister_server(url) ⇒ Object



146
147
148
149
150
151
# File 'lib/pry-remote-em/broker.rb', line 146

def receive_unregister_server(url)
  url      = URI.parse(url)
  url.host = peer_ip if ['0.0.0.0', 'localhost', '127.0.0.1'].include?(url.host)
  log.warn("[pry-remote-em broker] unregister #{url}")
  Broker.servers.delete(url)
end

#ssl_handshake_completedObject



194
195
196
197
# File 'lib/pry-remote-em/broker.rb', line 194

def ssl_handshake_completed
  log.info("[pry-remote-em broker] TLS connection established (#{peer_ip}:#{peer_port})")
  send_server_list(Broker.servers)
end

#start_tlsObject



174
175
176
177
178
# File 'lib/pry-remote-em/broker.rb', line 174

def start_tls
  log.debug("[pry-remote-em broker] starting TLS (#{peer_ip}:#{peer_port})")
  send_start_tls
  super(@opts[:tls].is_a?(Hash) ? @opts[:tls] : {})
end