Class: Vines::Stream
- Inherits:
-
EventMachine::Connection
- Object
- EventMachine::Connection
- Vines::Stream
- Includes:
- Log
- Defined in:
- lib/vines/stream.rb,
lib/vines/stream/http.rb,
lib/vines/stream/sasl.rb,
lib/vines/stream/state.rb,
lib/vines/stream/client.rb,
lib/vines/stream/parser.rb,
lib/vines/stream/server.rb,
lib/vines/stream/component.rb,
lib/vines/stream/http/auth.rb,
lib/vines/stream/http/bind.rb,
lib/vines/stream/client/tls.rb,
lib/vines/stream/http/ready.rb,
lib/vines/stream/http/start.rb,
lib/vines/stream/client/auth.rb,
lib/vines/stream/client/bind.rb,
lib/vines/stream/server/auth.rb,
lib/vines/stream/client/ready.rb,
lib/vines/stream/client/start.rb,
lib/vines/stream/http/request.rb,
lib/vines/stream/http/session.rb,
lib/vines/stream/server/ready.rb,
lib/vines/stream/server/start.rb,
lib/vines/stream/client/closed.rb,
lib/vines/stream/http/sessions.rb,
lib/vines/stream/client/session.rb,
lib/vines/stream/component/ready.rb,
lib/vines/stream/component/start.rb,
lib/vines/stream/http/bind_restart.rb,
lib/vines/stream/server/auth_method.rb,
lib/vines/stream/client/auth_restart.rb,
lib/vines/stream/client/bind_restart.rb,
lib/vines/stream/component/handshake.rb,
lib/vines/stream/server/auth_restart.rb,
lib/vines/stream/server/final_restart.rb,
lib/vines/stream/server/outbound/auth.rb,
lib/vines/stream/server/outbound/start.rb,
lib/vines/stream/server/outbound/tls_result.rb,
lib/vines/stream/server/outbound/auth_restart.rb,
lib/vines/stream/server/outbound/auth_external.rb,
lib/vines/stream/server/outbound/authoritative.rb,
lib/vines/stream/server/outbound/final_restart.rb,
lib/vines/stream/server/outbound/final_features.rb,
lib/vines/stream/server/outbound/auth_dialback_result.rb,
lib/vines/stream/server/outbound/auth_external_result.rb
Overview
The base class for various XMPP streams (c2s, s2s, component, http), containing behavior common to all streams like rate limiting, stanza parsing, and stream error handling.
Defined Under Namespace
Classes: Client, Component, Http, Parser, SASL, Server, State
Constant Summary collapse
- ERROR =
'error'.freeze
- STREAM =
'stream'.freeze
- PAD =
20
Instance Attribute Summary collapse
-
#config ⇒ Object
readonly
Returns the value of attribute config.
-
#domain ⇒ Object
readonly
Returns the value of attribute domain.
-
#id ⇒ Object
readonly
Returns the value of attribute id.
-
#state ⇒ Object
readonly
Returns the value of attribute state.
-
#user ⇒ Object
Returns the value of attribute user.
Instance Method Summary collapse
-
#advance(state) ⇒ Object
Advance the stream’s state machine to the new state.
- #available_resources(*jid) ⇒ Object
- #cert_domain_matches?(domain) ⇒ Boolean
-
#close_connection(after_writing = false) ⇒ Object
Advance the state machine into the ‘Closed` state so any remaining queued nodes are not processed while we’re waiting for EM to actually close the connection.
- #connected_resources(jid) ⇒ Object
-
#create_parser ⇒ Object
Initialize a new XML parser for this connection.
- #encrypt ⇒ Object
-
#encrypt? ⇒ Boolean
Returns true if the TLS certificate and private key files for this domain exist and can be used to encrypt this stream.
-
#error(e) ⇒ Object
Stream level errors close the stream while stanza and SASL errors are written to the client and leave the stream open.
-
#initialize(config) ⇒ Stream
constructor
A new instance of Stream.
- #interested_resources(*jid) ⇒ Object
-
#post_init ⇒ Object
Initialize the stream after its connection to the server has completed.
-
#receive_data(data) ⇒ Object
Read bytes off the stream and feed them into the XML parser.
-
#reset ⇒ Object
Reset the connection’s XML parser when a new <stream:stream> header is received.
- #router ⇒ Object
- #ssl_verify_peer(pem) ⇒ Object
-
#storage(domain = nil) ⇒ Object
Returns the storage system for the domain.
- #unbind ⇒ Object
-
#update_user_streams(user) ⇒ Object
Reload the user’s information into their active connections.
-
#vhost ⇒ Object
Returns the Config::Host virtual host for the stream’s domain.
-
#write(data) ⇒ Object
Send the data over the wire to this client.
Methods included from Log
Constructor Details
#initialize(config) ⇒ Stream
Returns a new instance of Stream.
17 18 19 |
# File 'lib/vines/stream.rb', line 17 def initialize(config) @config = config end |
Instance Attribute Details
#config ⇒ Object (readonly)
Returns the value of attribute config.
14 15 16 |
# File 'lib/vines/stream.rb', line 14 def config @config end |
#domain ⇒ Object (readonly)
Returns the value of attribute domain.
14 15 16 |
# File 'lib/vines/stream.rb', line 14 def domain @domain end |
#id ⇒ Object (readonly)
Returns the value of attribute id.
14 15 16 |
# File 'lib/vines/stream.rb', line 14 def id @id end |
#state ⇒ Object (readonly)
Returns the value of attribute state.
14 15 16 |
# File 'lib/vines/stream.rb', line 14 def state @state end |
#user ⇒ Object
Returns the value of attribute user.
15 16 17 |
# File 'lib/vines/stream.rb', line 15 def user @user end |
Instance Method Details
#advance(state) ⇒ Object
Advance the stream’s state machine to the new state. XML nodes received by the stream will be passed to this state’s ‘node` method.
state - The Stream::State to process the stanzas next.
Returns the new Stream::State.
176 177 178 |
# File 'lib/vines/stream.rb', line 176 def advance(state) @state = state end |
#available_resources(*jid) ⇒ Object
116 117 118 |
# File 'lib/vines/stream.rb', line 116 def available_resources(*jid) router.available_resources(*jid, user.jid) end |
#cert_domain_matches?(domain) ⇒ Boolean
135 136 137 |
# File 'lib/vines/stream.rb', line 135 def cert_domain_matches?(domain) @store.domain?(get_peer_cert, domain) end |
#close_connection(after_writing = false) ⇒ Object
Advance the state machine into the ‘Closed` state so any remaining queued nodes are not processed while we’re waiting for EM to actually close the connection.
Returns nothing.
57 58 59 60 61 |
# File 'lib/vines/stream.rb', line 57 def close_connection(after_writing=false) super @closed = true advance(Client::Closed.new(self)) end |
#connected_resources(jid) ⇒ Object
112 113 114 |
# File 'lib/vines/stream.rb', line 112 def connected_resources(jid) router.connected_resources(jid, user.jid) end |
#create_parser ⇒ Object
Initialize a new XML parser for this connection. This is called when the stream is first connected as well as for stream restarts during negotiation. Subclasses can override this method to provide a different type of parser (e.g. HTTP).
Returns nothing.
44 45 46 47 48 49 50 |
# File 'lib/vines/stream.rb', line 44 def create_parser @parser = Parser.new.tap do |parser| parser.stream_open {|node| @nodes.push(node) } parser.stream_close { close_connection } parser.stanza {|node| @nodes.push(node) } end end |
#encrypt ⇒ Object
152 153 154 155 |
# File 'lib/vines/stream.rb', line 152 def encrypt cert, key = @store.files_for_domain(domain) start_tls(cert_chain_file: cert, private_key_file: key, verify_peer: true) end |
#encrypt? ⇒ Boolean
Returns true if the TLS certificate and private key files for this domain exist and can be used to encrypt this stream.
159 160 161 |
# File 'lib/vines/stream.rb', line 159 def encrypt? !@store.files_for_domain(domain).nil? end |
#error(e) ⇒ Object
Stream level errors close the stream while stanza and SASL errors are written to the client and leave the stream open. All exceptions should pass through this method for consistent handling.
e - The StandardError, usually XmppError, that occurred.
Returns nothing.
187 188 189 190 191 192 193 194 195 196 197 198 199 |
# File 'lib/vines/stream.rb', line 187 def error(e) case e when SaslError, StanzaError write(e.to_xml) when StreamError send_stream_error(e) close_stream else log.error(e) send_stream_error(StreamErrors::InternalServerError.new) close_stream end end |
#interested_resources(*jid) ⇒ Object
120 121 122 |
# File 'lib/vines/stream.rb', line 120 def interested_resources(*jid) router.interested_resources(*jid, user.jid) end |
#post_init ⇒ Object
Initialize the stream after its connection to the server has completed. EventMachine calls this method when an incoming connection is accepted into the event loop.
Returns nothing.
26 27 28 29 30 31 32 33 34 35 36 |
# File 'lib/vines/stream.rb', line 26 def post_init @remote_addr, @local_addr = addresses @user, @closed, @stanza_size = nil, false, 0 @bucket = TokenBucket.new(100, 10) @store = Store.new(@config.certs) @nodes = EM::Queue.new process_node_queue create_parser log.info { "%s %21s -> %s" % ['Stream connected:'.ljust(PAD), @remote_addr, @local_addr] } end |
#receive_data(data) ⇒ Object
Read bytes off the stream and feed them into the XML parser. EventMachine is responsible for calling this method on its event loop as connections become readable.
data - The byte String sent to the server from the client, hopefully XML.
Returns nothing.
70 71 72 73 74 75 76 77 78 |
# File 'lib/vines/stream.rb', line 70 def receive_data(data) return if @closed @stanza_size += data.bytesize if @stanza_size < max_stanza_size @parser << data rescue error(StreamErrors::NotWellFormed.new) else error(StreamErrors::PolicyViolation.new('max stanza size reached')) end end |
#reset ⇒ Object
Reset the connection’s XML parser when a new <stream:stream> header is received.
Returns nothing.
84 85 86 |
# File 'lib/vines/stream.rb', line 84 def reset create_parser end |
#router ⇒ Object
201 202 203 |
# File 'lib/vines/stream.rb', line 201 def router @config.router end |
#ssl_verify_peer(pem) ⇒ Object
124 125 126 127 128 129 130 131 132 133 |
# File 'lib/vines/stream.rb', line 124 def ssl_verify_peer(pem) # Skip verifying if user accept self-signed certificates return true if self.vhost.accept_self_signed? # EM is supposed to close the connection when this returns false, # but it only does that for inbound connections, not when we # make a connection to another server. @store.trusted?(pem).tap do |trusted| close_connection unless trusted end end |
#storage(domain = nil) ⇒ Object
Returns the storage system for the domain. If no domain is given, the stream’s storage mechanism is returned.
90 91 92 |
# File 'lib/vines/stream.rb', line 90 def storage(domain=nil) @config.storage(domain || self.domain) end |
#unbind ⇒ Object
163 164 165 166 167 168 |
# File 'lib/vines/stream.rb', line 163 def unbind router.delete(self) log.info { "%s %21s -> %s" % ['Stream disconnected:'.ljust(PAD), @remote_addr, @local_addr] } log.info { "Streams connected: #{router.size}" } end |
#update_user_streams(user) ⇒ Object
Reload the user’s information into their active connections. Call this after storage.save_user() to sync the new user state with their other connections.
user - The User whose connection info needs refreshing.
Returns nothing.
106 107 108 109 110 |
# File 'lib/vines/stream.rb', line 106 def update_user_streams(user) connected_resources(user.jid.).each do |stream| stream.user.update_from(user) end end |
#vhost ⇒ Object
Returns the Config::Host virtual host for the stream’s domain.
95 96 97 |
# File 'lib/vines/stream.rb', line 95 def vhost @config.vhost(domain) end |
#write(data) ⇒ Object
Send the data over the wire to this client.
data - The XML String or XML::Node to write to the socket.
Returns nothing.
144 145 146 147 148 149 150 |
# File 'lib/vines/stream.rb', line 144 def write(data) log_node(data, :out) if data.respond_to?(:to_xml) data = data.to_xml(:indent => 0) end send_data(data) end |