Class: Vines::Stream

Inherits:
EventMachine::Connection
  • Object
show all
Includes:
Log
Defined in:
lib/vines/stream.rb,
lib/vines/stream/http.rb,
lib/vines/stream/sasl.rb,
lib/vines/stream/state.rb,
lib/vines/stream/client.rb,
lib/vines/stream/parser.rb,
lib/vines/stream/server.rb,
lib/vines/stream/component.rb,
lib/vines/stream/http/auth.rb,
lib/vines/stream/http/bind.rb,
lib/vines/stream/client/tls.rb,
lib/vines/stream/http/ready.rb,
lib/vines/stream/http/start.rb,
lib/vines/stream/server/tls.rb,
lib/vines/stream/client/auth.rb,
lib/vines/stream/client/bind.rb,
lib/vines/stream/server/auth.rb,
lib/vines/stream/client/ready.rb,
lib/vines/stream/client/start.rb,
lib/vines/stream/http/request.rb,
lib/vines/stream/http/session.rb,
lib/vines/stream/server/ready.rb,
lib/vines/stream/server/start.rb,
lib/vines/stream/client/closed.rb,
lib/vines/stream/http/sessions.rb,
lib/vines/stream/client/session.rb,
lib/vines/stream/component/ready.rb,
lib/vines/stream/component/start.rb,
lib/vines/stream/http/bind_restart.rb,
lib/vines/stream/client/auth_restart.rb,
lib/vines/stream/client/bind_restart.rb,
lib/vines/stream/component/handshake.rb,
lib/vines/stream/server/auth_restart.rb,
lib/vines/stream/server/outbound/tls.rb,
lib/vines/stream/server/final_restart.rb,
lib/vines/stream/server/outbound/auth.rb,
lib/vines/stream/server/outbound/start.rb,
lib/vines/stream/server/outbound/tls_result.rb,
lib/vines/stream/server/outbound/auth_result.rb,
lib/vines/stream/server/outbound/auth_restart.rb,
lib/vines/stream/server/outbound/final_restart.rb,
lib/vines/stream/server/outbound/final_features.rb

Overview

The base class for various XMPP streams (c2s, s2s, component, http), containing behavior common to all streams like rate limiting, stanza parsing, and stream error handling.

Direct Known Subclasses

Client, Component, Server

Defined Under Namespace

Classes: Client, Component, Http, Parser, SASL, Server, State

Constant Summary collapse

ERROR =
'error'.freeze
PAD =
20

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Log

#log

Constructor Details

#initialize(config) ⇒ Stream

Returns a new instance of Stream.



16
17
18
# File 'lib/vines/stream.rb', line 16

def initialize(config)
  @config = config
end

Instance Attribute Details

#configObject (readonly)

Returns the value of attribute config.



13
14
15
# File 'lib/vines/stream.rb', line 13

def config
  @config
end

#domainObject (readonly)

Returns the value of attribute domain.



13
14
15
# File 'lib/vines/stream.rb', line 13

def domain
  @domain
end

#userObject

Returns the value of attribute user.



14
15
16
# File 'lib/vines/stream.rb', line 14

def user
  @user
end

Instance Method Details

#advance(state) ⇒ Object

Advance the stream’s state machine to the new state. XML nodes received by the stream will be passed to this state’s node method.



143
144
145
# File 'lib/vines/stream.rb', line 143

def advance(state)
  @state = state
end

#available_resources(*jid) ⇒ Object



93
94
95
# File 'lib/vines/stream.rb', line 93

def available_resources(*jid)
  router.available_resources(*jid, user.jid)
end

#cert_domain_matches?(domain) ⇒ Boolean

Returns:

  • (Boolean)


110
111
112
# File 'lib/vines/stream.rb', line 110

def cert_domain_matches?(domain)
  @store.domain?(get_peer_cert, domain)
end

#close_connection(after_writing = false) ⇒ Object

Advance the state machine into the Closed state so any remaining queued nodes are not processed while we’re waiting for EM to actually close the connection.



47
48
49
50
51
# File 'lib/vines/stream.rb', line 47

def close_connection(after_writing=false)
  super
  @closed = true
  advance(Client::Closed.new(self))
end

#connected_resources(jid) ⇒ Object



89
90
91
# File 'lib/vines/stream.rb', line 89

def connected_resources(jid)
  router.connected_resources(jid, user.jid)
end

#create_parserObject

Initialize a new XML parser for this connection. This is called when the stream is first connected as well as for stream restarts during negotiation. Subclasses can override this method to provide a different type of parser (e.g. HTTP).



36
37
38
39
40
41
42
# File 'lib/vines/stream.rb', line 36

def create_parser
  @parser = Parser.new.tap do |p|
    p.stream_open {|node| @nodes.push(node) }
    p.stream_close { close_connection }
    p.stanza {|node| @nodes.push(node) }
  end
end

#encryptObject



123
124
125
126
# File 'lib/vines/stream.rb', line 123

def encrypt
  cert, key = @store.files_for_domain(domain)
  start_tls(cert_chain_file: cert, private_key_file: key, verify_peer: true)
end

#encrypt?Boolean

Returns true if the TLS certificate and private key files for this domain exist and can be used to encrypt this stream.

Returns:

  • (Boolean)


130
131
132
# File 'lib/vines/stream.rb', line 130

def encrypt?
  !@store.files_for_domain(domain).nil?
end

#error(e) ⇒ Object

Stream level errors close the stream while stanza and SASL errors are written to the client and leave the stream open. All exceptions should pass through this method for consistent handling.



150
151
152
153
154
155
156
157
158
159
160
161
162
# File 'lib/vines/stream.rb', line 150

def error(e)
  case e
  when SaslError, StanzaError
    write(e.to_xml)
  when StreamError
    send_stream_error(e)
    close_stream
  else
    log.error(e)
    send_stream_error(StreamErrors::InternalServerError.new)
    close_stream
  end
end

#interested_resources(*jid) ⇒ Object



97
98
99
# File 'lib/vines/stream.rb', line 97

def interested_resources(*jid)
  router.interested_resources(*jid, user.jid)
end

#post_initObject



20
21
22
23
24
25
26
27
28
29
30
# File 'lib/vines/stream.rb', line 20

def post_init
  @remote_addr, @local_addr = addresses
  @user, @closed, @stanza_size = nil, false, 0
  @bucket = TokenBucket.new(100, 10)
  @store = Store.new(@config.certs)
  @nodes = EM::Queue.new
  process_node_queue
  create_parser
  log.info { "%s %21s -> %s" %
    ['Stream connected:'.ljust(PAD), @remote_addr, @local_addr] }
end

#receive_data(data) ⇒ Object



53
54
55
56
57
58
59
60
61
# File 'lib/vines/stream.rb', line 53

def receive_data(data)
  return if @closed
  @stanza_size += data.bytesize
  if @stanza_size < max_stanza_size
    @parser << data rescue error(StreamErrors::NotWellFormed.new)
  else
    error(StreamErrors::PolicyViolation.new('max stanza size reached'))
  end
end

#resetObject

Reset the connection’s XML parser when a new <stream:stream> header is received.



65
66
67
# File 'lib/vines/stream.rb', line 65

def reset
  create_parser
end

#routerObject



164
165
166
# File 'lib/vines/stream.rb', line 164

def router
  @config.router
end

#ssl_verify_peer(pem) ⇒ Object



101
102
103
104
105
106
107
108
# File 'lib/vines/stream.rb', line 101

def ssl_verify_peer(pem)
  # EM is supposed to close the connection when this returns false,
  # but it only does that for inbound connections, not when we
  # make a connection to another server.
  @store.trusted?(pem).tap do |trusted|
    close_connection unless trusted
  end
end

#storage(domain = nil) ⇒ Object

Returns the storage system for the domain. If no domain is given, the stream’s storage mechanism is returned.



71
72
73
# File 'lib/vines/stream.rb', line 71

def storage(domain=nil)
  @config.storage(domain || self.domain)
end

#unbindObject



134
135
136
137
138
139
# File 'lib/vines/stream.rb', line 134

def unbind
  router.delete(self)
  log.info { "%s %21s -> %s" %
    ['Stream disconnected:'.ljust(PAD), @remote_addr, @local_addr] }
  log.info { "Streams connected: #{router.size}" }
end

#update_user_streams(user) ⇒ Object

Reload the user’s information into their active connections. Call this after storage.save_user() to sync the new user state with their other connections.



83
84
85
86
87
# File 'lib/vines/stream.rb', line 83

def update_user_streams(user)
  connected_resources(user.jid.bare).each do |stream|
    stream.user.update_from(user)
  end
end

#vhostObject

Returns the Vines::Config::Host virtual host for the stream’s domain.



76
77
78
# File 'lib/vines/stream.rb', line 76

def vhost
  @config.vhost(domain)
end

#write(data) ⇒ Object

Send the data over the wire to this client.



115
116
117
118
119
120
121
# File 'lib/vines/stream.rb', line 115

def write(data)
  log_node(data, :out)
  if data.respond_to?(:to_xml)
    data = data.to_xml(:indent => 0)
  end
  send_data(data)
end