Module: PryRemoteEm::Broker
- Includes:
- Proto
- Defined in:
- lib/pry-remote-em/broker.rb
Constant Summary
Constants included
from Proto
Proto::PREAMBLE, Proto::PREAMBLE_LEN, Proto::SEPERATOR, Proto::SEPERATOR_LEN
Class Attribute Summary collapse
Class Method Summary
collapse
Instance Method Summary
collapse
Methods included from Proto
#receive_auth, #receive_banner, #receive_completion, #receive_data, #receive_json, #receive_msg, #receive_msg_bcast, #receive_prompt, #receive_raw, #receive_shell_cmd, #receive_shell_data, #receive_shell_result, #receive_shell_sig, #receive_start_tls, #receive_unknown, #send_auth, #send_banner, #send_completion, #send_data, #send_heatbeat, #send_json, #send_msg, #send_msg_bcast, #send_prompt, #send_proxy_connection, #send_raw, #send_register_server, #send_server_list, #send_shell_cmd, #send_shell_result, #send_start_tls, #send_unregister_server
Class Attribute Details
Returns the value of attribute host.
9
10
11
|
# File 'lib/pry-remote-em/broker.rb', line 9
def host
@host
end
|
.listening ⇒ Object
Also known as:
listening?
Returns the value of attribute listening.
9
10
11
|
# File 'lib/pry-remote-em/broker.rb', line 9
def listening
@listening
end
|
Returns the value of attribute port.
9
10
11
|
# File 'lib/pry-remote-em/broker.rb', line 9
def port
@port
end
|
Class Method Details
.connected? ⇒ Boolean
100
101
102
|
# File 'lib/pry-remote-em/broker.rb', line 100
def connected?
@connected
end
|
.expand_url(url) ⇒ Object
75
76
77
78
79
|
# File 'lib/pry-remote-em/broker.rb', line 75
def expand_url(url)
return Array(url) if (u = URI.parse(url)).host != '0.0.0.0'
Socket.ip_address_list.select { |a| a.ipv4? }
.map(&:ip_address).map{|i| u.clone.tap{|mu| mu.host = i } }
end
|
96
97
98
|
# File 'lib/pry-remote-em/broker.rb', line 96
def hbeats
@hbeats ||= {}
end
|
54
55
56
57
|
# File 'lib/pry-remote-em/broker.rb', line 54
def log
return opts[:logger] if opts[:logger]
@log ||= Logger.new(STDERR)
end
|
50
51
52
|
# File 'lib/pry-remote-em/broker.rb', line 50
def opts
@opts ||= {}
end
|
.register(url, name = 'unknown') ⇒ Object
63
64
65
66
67
|
# File 'lib/pry-remote-em/broker.rb', line 63
def register(url, name = 'unknown')
expand_url(url).each do |u|
client { |c| c.send_register_server(u, name) }
end
end
|
38
39
40
41
42
43
44
45
46
47
48
|
# File 'lib/pry-remote-em/broker.rb', line 38
def restart
log.info("[pry-remote-em broker] restarting on pryem://#{host}:#{port}")
@waiting = nil
@client = nil
run(@host, @port, @opts) do
PryRemoteEm.servers.each do |url, (sig, name)|
next unless EM.get_sockname(sig)
register(url, name)
end
end
end
|
.run(host = ENV['PRYEMBROKER'] || DEF_BROKERHOST, port = ENV['PRYEMBROKERPORT'] || DEF_BROKERPORT, opts = {:tls => false}) ⇒ Object
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
|
# File 'lib/pry-remote-em/broker.rb', line 12
def run(host = ENV['PRYEMBROKER'] || DEF_BROKERHOST, port = ENV['PRYEMBROKERPORT'] || DEF_BROKERPORT, opts = {:tls => false})
raise "root permission required for port below 1024 (#{port})" if port < 1024 && Process.euid != 0
@host = host
@port = port
opts = opts.dup
opts[:tls] = false
@opts = opts
begin
EM.start_server(host, port, PryRemoteEm::Broker, opts) do |broker|
end
log.info("[pry-remote-em broker] listening on #{opts[:tls] ? 'pryems' : 'pryem'}://#{host}:#{port}")
@listening = true
rescue => e
if (e.message.include?('port is in use') || e.message.include?('no acceptor'))
else
raise e
end
end
client { |c| yield self } if block_given?
end
|
59
60
61
|
# File 'lib/pry-remote-em/broker.rb', line 59
def servers
@servers ||= {}
end
|
92
93
94
|
# File 'lib/pry-remote-em/broker.rb', line 92
def timers
@timers ||= {}
end
|
.unregister(url) ⇒ Object
69
70
71
72
73
|
# File 'lib/pry-remote-em/broker.rb', line 69
def unregister(url)
expand_url(url).each do |u|
client { |c| c.send_unregister_server(u) }
end
end
|
.watch_heartbeats(url) ⇒ Object
81
82
83
84
85
86
87
88
89
90
|
# File 'lib/pry-remote-em/broker.rb', line 81
def watch_heartbeats(url)
return if timers[url]
timers[url] = EM::PeriodicTimer.new(20) do
if !hbeats[url] || (Time.new - hbeats[url]) > 20
servers.delete(url)
timers[url].cancel
timers.delete(url)
end
end
end
|
Instance Method Details
#initialize(opts = {:tls => false}, &blk) ⇒ Object
159
160
161
|
# File 'lib/pry-remote-em/broker.rb', line 159
def initialize(opts = {:tls => false}, &blk)
@opts = opts
end
|
163
164
165
|
# File 'lib/pry-remote-em/broker.rb', line 163
def log
Broker.log
end
|
180
181
182
183
184
185
|
# File 'lib/pry-remote-em/broker.rb', line 180
def peer_ip
return @peer_ip if @peer_ip
return "" if get_peername.nil?
@peer_port, @peer_ip = Socket.unpack_sockaddr_in(get_peername)
@peer_ip
end
|
#peer_port ⇒ Object
187
188
189
190
191
192
|
# File 'lib/pry-remote-em/broker.rb', line 187
def peer_port
return @peer_port if @peer_port
return "" if get_peername.nil?
@peer_port, @peer_ip = Socket.unpack_sockaddr_in(get_peername)
@peer_port
end
|
#post_init ⇒ Object
167
168
169
170
171
172
|
# File 'lib/pry-remote-em/broker.rb', line 167
def post_init
port, ip = Socket.unpack_sockaddr_in(get_peername)
log.info("[pry-remote-em broker] received client connection from #{ip}:#{port}")
send_banner("PryRemoteEm #{VERSION} #{@opts[:tls] ? 'pryems' : 'pryem'}")
@opts[:tls] ? start_tls : send_server_list(Broker.servers)
end
|
#receive_proxy_connection(url) ⇒ Object
153
154
155
156
157
|
# File 'lib/pry-remote-em/broker.rb', line 153
def receive_proxy_connection(url)
log.info("[pry-remote-em broker] proxying to #{url}")
url = URI.parse(url)
EM.connect(url.host, url.port, Client::Proxy, self)
end
|
#receive_register_server(url, name) ⇒ Object
136
137
138
139
140
141
142
143
144
|
# File 'lib/pry-remote-em/broker.rb', line 136
def receive_register_server(url, name)
url = URI.parse(url)
url.host = peer_ip if ['0.0.0.0', 'localhost', '127.0.0.1'].include?(url.host)
log.info("[pry-remote-em broker] registered #{url} - #{name.inspect}") unless Broker.servers[url] == name
Broker.servers[url] = name
Broker.hbeats[url] = Time.new
Broker.watch_heartbeats(url)
name
end
|
#receive_server_list ⇒ Object
132
133
134
|
# File 'lib/pry-remote-em/broker.rb', line 132
def receive_server_list
send_server_list(Broker.servers)
end
|
#receive_unregister_server(url) ⇒ Object
146
147
148
149
150
151
|
# File 'lib/pry-remote-em/broker.rb', line 146
def receive_unregister_server(url)
url = URI.parse(url)
url.host = peer_ip if ['0.0.0.0', 'localhost', '127.0.0.1'].include?(url.host)
log.warn("[pry-remote-em broker] unregister #{url}")
Broker.servers.delete(url)
end
|
#ssl_handshake_completed ⇒ Object
194
195
196
197
|
# File 'lib/pry-remote-em/broker.rb', line 194
def ssl_handshake_completed
log.info("[pry-remote-em broker] TLS connection established (#{peer_ip}:#{peer_port})")
send_server_list(Broker.servers)
end
|
#start_tls ⇒ Object
174
175
176
177
178
|
# File 'lib/pry-remote-em/broker.rb', line 174
def start_tls
log.debug("[pry-remote-em broker] starting TLS (#{peer_ip}:#{peer_port})")
send_start_tls
super(@opts[:tls].is_a?(Hash) ? @opts[:tls] : {})
end
|