Class: Vines::Stream

Inherits:
EventMachine::Connection
  • Object
show all
Includes:
Log
Defined in:
lib/vines/stream.rb,
lib/vines/stream/http.rb,
lib/vines/stream/sasl.rb,
lib/vines/stream/state.rb,
lib/vines/stream/client.rb,
lib/vines/stream/parser.rb,
lib/vines/stream/server.rb,
lib/vines/stream/component.rb,
lib/vines/stream/http/auth.rb,
lib/vines/stream/http/bind.rb,
lib/vines/stream/client/tls.rb,
lib/vines/stream/http/ready.rb,
lib/vines/stream/http/start.rb,
lib/vines/stream/client/auth.rb,
lib/vines/stream/client/bind.rb,
lib/vines/stream/server/auth.rb,
lib/vines/stream/client/ready.rb,
lib/vines/stream/client/start.rb,
lib/vines/stream/http/request.rb,
lib/vines/stream/http/session.rb,
lib/vines/stream/server/ready.rb,
lib/vines/stream/server/start.rb,
lib/vines/stream/client/closed.rb,
lib/vines/stream/http/sessions.rb,
lib/vines/stream/client/session.rb,
lib/vines/stream/component/ready.rb,
lib/vines/stream/component/start.rb,
lib/vines/stream/http/bind_restart.rb,
lib/vines/stream/server/auth_method.rb,
lib/vines/stream/client/auth_restart.rb,
lib/vines/stream/client/bind_restart.rb,
lib/vines/stream/component/handshake.rb,
lib/vines/stream/server/auth_restart.rb,
lib/vines/stream/server/final_restart.rb,
lib/vines/stream/server/outbound/auth.rb,
lib/vines/stream/server/outbound/start.rb,
lib/vines/stream/server/outbound/tls_result.rb,
lib/vines/stream/server/outbound/auth_restart.rb,
lib/vines/stream/server/outbound/auth_external.rb,
lib/vines/stream/server/outbound/authoritative.rb,
lib/vines/stream/server/outbound/final_restart.rb,
lib/vines/stream/server/outbound/final_features.rb,
lib/vines/stream/server/outbound/auth_dialback_result.rb,
lib/vines/stream/server/outbound/auth_external_result.rb

Overview

The base class for various XMPP streams (c2s, s2s, component, http), containing behavior common to all streams like rate limiting, stanza parsing, and stream error handling.

Direct Known Subclasses

Client, Component, Server

Defined Under Namespace

Classes: Client, Component, Http, Parser, SASL, Server, State

Constant Summary collapse

ERROR =
'error'.freeze
STREAM =
'stream'.freeze
PAD =
20

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Log

#log, set_log_file

Constructor Details

#initialize(config) ⇒ Stream

Returns a new instance of Stream.



17
18
19
# File 'lib/vines/stream.rb', line 17

def initialize(config)
  @config = config
end

Instance Attribute Details

#configObject (readonly)

Returns the value of attribute config.



14
15
16
# File 'lib/vines/stream.rb', line 14

def config
  @config
end

#domainObject (readonly)

Returns the value of attribute domain.



14
15
16
# File 'lib/vines/stream.rb', line 14

def domain
  @domain
end

#idObject (readonly)

Returns the value of attribute id.



14
15
16
# File 'lib/vines/stream.rb', line 14

def id
  @id
end

#stateObject (readonly)

Returns the value of attribute state.



14
15
16
# File 'lib/vines/stream.rb', line 14

def state
  @state
end

#userObject

Returns the value of attribute user.



15
16
17
# File 'lib/vines/stream.rb', line 15

def user
  @user
end

Instance Method Details

#advance(state) ⇒ Object

Advance the stream’s state machine to the new state. XML nodes received by the stream will be passed to this state’s ‘node` method.

state - The Stream::State to process the stanzas next.

Returns the new Stream::State.



176
177
178
# File 'lib/vines/stream.rb', line 176

def advance(state)
  @state = state
end

#available_resources(*jid) ⇒ Object



116
117
118
# File 'lib/vines/stream.rb', line 116

def available_resources(*jid)
  router.available_resources(*jid, user.jid)
end

#cert_domain_matches?(domain) ⇒ Boolean

Returns:

  • (Boolean)


135
136
137
# File 'lib/vines/stream.rb', line 135

def cert_domain_matches?(domain)
  @store.domain?(get_peer_cert, domain)
end

#close_connection(after_writing = false) ⇒ Object

Advance the state machine into the ‘Closed` state so any remaining queued nodes are not processed while we’re waiting for EM to actually close the connection.

Returns nothing.



57
58
59
60
61
# File 'lib/vines/stream.rb', line 57

def close_connection(after_writing=false)
  super
  @closed = true
  advance(Client::Closed.new(self))
end

#connected_resources(jid) ⇒ Object



112
113
114
# File 'lib/vines/stream.rb', line 112

def connected_resources(jid)
  router.connected_resources(jid, user.jid)
end

#create_parserObject

Initialize a new XML parser for this connection. This is called when the stream is first connected as well as for stream restarts during negotiation. Subclasses can override this method to provide a different type of parser (e.g. HTTP).

Returns nothing.



44
45
46
47
48
49
50
# File 'lib/vines/stream.rb', line 44

def create_parser
  @parser = Parser.new.tap do |parser|
    parser.stream_open {|node| @nodes.push(node) }
    parser.stream_close { close_connection }
    parser.stanza {|node| @nodes.push(node) }
  end
end

#encryptObject



152
153
154
155
# File 'lib/vines/stream.rb', line 152

def encrypt
  cert, key = @store.files_for_domain(domain)
  start_tls(cert_chain_file: cert, private_key_file: key, verify_peer: true)
end

#encrypt?Boolean

Returns true if the TLS certificate and private key files for this domain exist and can be used to encrypt this stream.

Returns:

  • (Boolean)


159
160
161
# File 'lib/vines/stream.rb', line 159

def encrypt?
  !@store.files_for_domain(domain).nil?
end

#error(e) ⇒ Object

Stream level errors close the stream while stanza and SASL errors are written to the client and leave the stream open. All exceptions should pass through this method for consistent handling.

e - The StandardError, usually XmppError, that occurred.

Returns nothing.



187
188
189
190
191
192
193
194
195
196
197
198
199
# File 'lib/vines/stream.rb', line 187

def error(e)
  case e
  when SaslError, StanzaError
    write(e.to_xml)
  when StreamError
    send_stream_error(e)
    close_stream
  else
    log.error(e)
    send_stream_error(StreamErrors::InternalServerError.new)
    close_stream
  end
end

#interested_resources(*jid) ⇒ Object



120
121
122
# File 'lib/vines/stream.rb', line 120

def interested_resources(*jid)
  router.interested_resources(*jid, user.jid)
end

#post_initObject

Initialize the stream after its connection to the server has completed. EventMachine calls this method when an incoming connection is accepted into the event loop.

Returns nothing.



26
27
28
29
30
31
32
33
34
35
36
# File 'lib/vines/stream.rb', line 26

def post_init
  @remote_addr, @local_addr = addresses
  @user, @closed, @stanza_size = nil, false, 0
  @bucket = TokenBucket.new(100, 10)
  @store = Store.new(@config.certs)
  @nodes = EM::Queue.new
  process_node_queue
  create_parser
  log.info { "%s %21s -> %s" %
    ['Stream connected:'.ljust(PAD), @remote_addr, @local_addr] }
end

#receive_data(data) ⇒ Object

Read bytes off the stream and feed them into the XML parser. EventMachine is responsible for calling this method on its event loop as connections become readable.

data - The byte String sent to the server from the client, hopefully XML.

Returns nothing.



70
71
72
73
74
75
76
77
78
# File 'lib/vines/stream.rb', line 70

def receive_data(data)
  return if @closed
  @stanza_size += data.bytesize
  if @stanza_size < max_stanza_size
    @parser << data rescue error(StreamErrors::NotWellFormed.new)
  else
    error(StreamErrors::PolicyViolation.new('max stanza size reached'))
  end
end

#resetObject

Reset the connection’s XML parser when a new <stream:stream> header is received.

Returns nothing.



84
85
86
# File 'lib/vines/stream.rb', line 84

def reset
  create_parser
end

#routerObject



201
202
203
# File 'lib/vines/stream.rb', line 201

def router
  @config.router
end

#ssl_verify_peer(pem) ⇒ Object



124
125
126
127
128
129
130
131
132
133
# File 'lib/vines/stream.rb', line 124

def ssl_verify_peer(pem)
  # Skip verifying if user accept self-signed certificates
  return true if self.vhost.accept_self_signed?
  # EM is supposed to close the connection when this returns false,
  # but it only does that for inbound connections, not when we
  # make a connection to another server.
  @store.trusted?(pem).tap do |trusted|
    close_connection unless trusted
  end
end

#storage(domain = nil) ⇒ Object

Returns the storage system for the domain. If no domain is given, the stream’s storage mechanism is returned.



90
91
92
# File 'lib/vines/stream.rb', line 90

def storage(domain=nil)
  @config.storage(domain || self.domain)
end

#unbindObject



163
164
165
166
167
168
# File 'lib/vines/stream.rb', line 163

def unbind
  router.delete(self)
  log.info { "%s %21s -> %s" %
    ['Stream disconnected:'.ljust(PAD), @remote_addr, @local_addr] }
  log.info { "Streams connected: #{router.size}" }
end

#update_user_streams(user) ⇒ Object

Reload the user’s information into their active connections. Call this after storage.save_user() to sync the new user state with their other connections.

user - The User whose connection info needs refreshing.

Returns nothing.



106
107
108
109
110
# File 'lib/vines/stream.rb', line 106

def update_user_streams(user)
  connected_resources(user.jid.bare).each do |stream|
    stream.user.update_from(user)
  end
end

#vhostObject

Returns the Config::Host virtual host for the stream’s domain.



95
96
97
# File 'lib/vines/stream.rb', line 95

def vhost
  @config.vhost(domain)
end

#write(data) ⇒ Object

Send the data over the wire to this client.

data - The XML String or XML::Node to write to the socket.

Returns nothing.



144
145
146
147
148
149
150
# File 'lib/vines/stream.rb', line 144

def write(data)
  log_node(data, :out)
  if data.respond_to?(:to_xml)
    data = data.to_xml(:indent => 0)
  end
  send_data(data)
end