Class: Webtube
- Inherits:
-
Object
- Object
- Webtube
- Defined in:
- lib/webtube.rb,
lib/webtube/vital-statistics.rb
Defined Under Namespace
Classes: AbortReceiveLoop, BadlyEncodedText, BrokenFrame, ConnectionNotAlive, FragmentedControlFrame, Frame, Location, MaskedFrameToClient, MissingContinuationFrame, ProtocolError, Request, UnexpectedContinuationFrame, UnknownOpcode, UnknownReservedBit, UnmaskedFrameToServer, Vital_Statistics, WebSocketDeclined, WebSocketUpgradeFailed, WebSocketVersionMismatch
Constant Summary collapse
- OPCODE_CONTINUATION =
Not all the possible 16 values are defined by the standard.
0x0- OPCODE_TEXT =
0x1- OPCODE_BINARY =
0x2- OPCODE_CLOSE =
0x8- OPCODE_PING =
0x9- OPCODE_PONG =
0xA
- VERSION =
'1.1.0'
Instance Attribute Summary collapse
-
#allow_opcodes ⇒ Object
Returns the value of attribute allow_opcodes.
-
#allow_rsv_bits ⇒ Object
Returns the value of attribute allow_rsv_bits.
-
#context ⇒ Object
Returns the value of attribute context.
-
#header ⇒ Object
The following three slots are not used by the [[Webtube]] infrastructrue.
-
#session ⇒ Object
- [accept_webtube]
-
saves the request object here.
-
#url ⇒ Object
readonly
Returns the value of attribute url.
Class Method Summary collapse
-
.connect(url, allow_rsv_bits: 0, allow_opcodes: [Webtube::OPCODE_TEXT], http_header: {}, ssl_verify_mode: OpenSSL::SSL::VERIFY_PEER, ssl_cert_store: nil, tcp_connect_timeout: nil, tcp_nodelay: true, close_socket: true, on_http_request: nil, on_http_response: nil, on_ssl_handshake: nil, on_tcp_connect: nil) ⇒ Object
Attempts to set up a [[WebSocket]] connection to the given [[url]].
-
.rejection(response, expected_accept) ⇒ Object
Checks whether the given [[Net::HTTPResponse]] represents a valid WebSocket upgrade acceptance.
Instance Method Summary collapse
-
#close(status_code = 1000, explanation = "") ⇒ Object
Closes the connection, thus preventing further transmission.
-
#hash ⇒ Object
The application may want to store many Webtube instances in a hash or a set.
-
#initialize(socket, serverp, allow_rsv_bits: 0, allow_opcodes: [Webtube::OPCODE_TEXT], close_socket: true) ⇒ Webtube
constructor
A new instance of Webtube.
- #inspect ⇒ Object
-
#run(listener) ⇒ Object
Run a loop to read all the messages and control frames coming in via this WebSocket, and hand events to the given [[listener]].
-
#send_message(message, opcode = Webtube::OPCODE_TEXT) ⇒ Object
Send a given message payload to the other party, using the given opcode.
Constructor Details
#initialize(socket, serverp, allow_rsv_bits: 0, allow_opcodes: [Webtube::OPCODE_TEXT], close_socket: true) ⇒ Webtube
37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 |
# File 'lib/webtube.rb', line 37 def initialize socket, serverp, # If true, we will expect incoming data masked and # will not mask outgoing data. If false, we will # expect incoming data unmasked and will mask outgoing # data. allow_rsv_bits: 0, allow_opcodes: [Webtube::OPCODE_TEXT], close_socket: true super() @socket = socket @serverp = serverp @allow_rsv_bits = allow_rsv_bits @allow_opcodes = allow_opcodes @close_socket = close_socket @defrag_buffer = [] @alive = true @send_mutex = Mutex.new # Guards message sending, so that fragmented messages # won't get interleaved, and the [[@alive]] flag. @run_mutex = Mutex.new # Guards the main read loop. @receiving_frame = false # Are we currently receiving a frame for the # [[Webtube#run]] main loop? @reception_interrupt_mutex = Mutex.new # guards [[@receiving_frame]] return end |
Instance Attribute Details
#allow_opcodes ⇒ Object
Returns the value of attribute allow_opcodes.
20 21 22 |
# File 'lib/webtube.rb', line 20 def allow_opcodes @allow_opcodes end |
#allow_rsv_bits ⇒ Object
Returns the value of attribute allow_rsv_bits.
19 20 21 |
# File 'lib/webtube.rb', line 19 def allow_rsv_bits @allow_rsv_bits end |
#context ⇒ Object
Returns the value of attribute context.
35 36 37 |
# File 'lib/webtube.rb', line 35 def context @context end |
#header ⇒ Object
The following three slots are not used by the [[Webtube]] infrastructrue. They have been defined purely so that application code could easily associate data it finds significant to [[Webtube]] instances.
32 33 34 |
# File 'lib/webtube.rb', line 32 def header @header end |
#session ⇒ Object
- [accept_webtube]
-
saves the request object here
34 35 36 |
# File 'lib/webtube.rb', line 34 def session @session end |
#url ⇒ Object (readonly)
Returns the value of attribute url.
22 23 24 |
# File 'lib/webtube.rb', line 22 def url @url end |
Class Method Details
.connect(url, allow_rsv_bits: 0, allow_opcodes: [Webtube::OPCODE_TEXT], http_header: {}, ssl_verify_mode: OpenSSL::SSL::VERIFY_PEER, ssl_cert_store: nil, tcp_connect_timeout: nil, tcp_nodelay: true, close_socket: true, on_http_request: nil, on_http_response: nil, on_ssl_handshake: nil, on_tcp_connect: nil) ⇒ Object
Attempts to set up a [[WebSocket]] connection to the given
- [url]]. Returns the [[Webtube]
-
instance if successful or
raise an appropriate [[Webtube::WebSocketUpgradeFailed]].
358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 |
# File 'lib/webtube.rb', line 358 def self::connect url, allow_rsv_bits: 0, allow_opcodes: [Webtube::OPCODE_TEXT], http_header: {}, ssl_verify_mode: OpenSSL::SSL::VERIFY_PEER, ssl_cert_store: nil, # or an [[OpenSSL::X509::Store]] instance tcp_connect_timeout: nil, # or number of seconds tcp_nodelay: true, close_socket: true, on_http_request: nil, on_http_response: nil, on_ssl_handshake: nil, on_tcp_connect: nil loc = Webtube::Location.new url socket = if tcp_connect_timeout.nil? then TCPSocket.new loc.host, loc.port else Timeout.timeout tcp_connect_timeout, Net::OpenTimeout do TCPSocket.new loc.host, loc.port end end if tcp_nodelay then socket.setsockopt Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1 end on_tcp_connect.call socket if on_tcp_connect if loc.ssl? then # construct an SSL context if ssl_cert_store.nil? then ssl_cert_store = OpenSSL::X509::Store.new ssl_cert_store.set_default_paths end ssl_context = OpenSSL::SSL::SSLContext.new ssl_context.cert_store = ssl_cert_store ssl_context.verify_mode = ssl_verify_mode # wrap the socket socket = OpenSSL::SSL::SSLSocket.new socket, ssl_context socket.sync_close = true socket.hostname = loc.host # Server Name Indication socket.connect # perform SSL handshake socket.post_connection_check loc.host on_ssl_handshake.call socket if on_ssl_handshake end socket = Net::BufferedIO.new socket # transmit the request req = Webtube::Request.new loc, http_header composed_request = req.to_s socket.write composed_request on_http_request.call composed_request if on_http_request # wait for response response = Net::HTTPResponse.read_new socket if on_http_response then # reconstitute the response as a string # # (XXX: this loses some diagnostically useful bits, but # [[Net::HTTPResponse::read_new]] just doesn't preserve # the pristine original) s = "#{response.code} #{response.message}\r\n" response.each_header do |k, v| s << "#{k}: #{v}\r\n" end s << "\r\n" on_http_response.call s end # Check that the server is seeing us now # FIXME: ensure that the socket will be closed in case of # exception d = rejection response, req.expected_accept raise Webtube::WebSocketDeclined.new(d) \ if d # Can the server speak our protocol version? unless (response['Sec-WebSocket-Version'] || '13'). strip.split(/\s*,\s*/).include? '13' then raise Webtube::WebSocketVersionMismatch.new( "Sec-WebSocket-Version negotiation failed") end # The connection has been set up. Now we can instantiate # [[Webtube]]. wt = Webtube.new socket, false, allow_rsv_bits: allow_rsv_bits, allow_opcodes: allow_opcodes, close_socket: close_socket wt.instance_variable_set :@url, loc.to_s return wt end |
.rejection(response, expected_accept) ⇒ Object
Checks whether the given [[Net::HTTPResponse]] represents a valid WebSocket upgrade acceptance. Returns [[nil]] if so, or a human-readable string explaining the issue if not.
- [expected_accept]
-
is the value [[Sec-WebSocket-Accept]] is
expected to hold, generated from the [[Sec-WebSocket-Key]] header field.
460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 |
# File 'lib/webtube.rb', line 460 def self::rejection response, expected_accept unless response.code == '101' then return "the HTTP response code was not 101" end unless (response['Connection'] || '').downcase == 'upgrade' then return "the HTTP response did not say " + "'Connection: upgrade'" end unless (response['Upgrade'] || '').downcase == 'websocket' then return "the HTTP response did not say " + "'Upgrade: websocket'" end unless (response['Sec-WebSocket-Accept'] || '') == expected_accept then return "the HTTP response did not say " + "'Sec-WebSocket-Accept: #{expected_accept}'" end return nil end |
Instance Method Details
#close(status_code = 1000, explanation = "") ⇒ Object
Closes the connection, thus preventing further transmission.
If [[status_code]] is supplied, it will be passed to the other side in the [[OPCODE_CLOSE]] frame. The default is 1000 which indicates normal closure. Sending a status code can be explicitly suppressed by passing [[nil]] instead of an integer; then, an empty close frame will be sent. Due to the way a close frame’s payload is structured, this will also suppress delivery of [[close_explanation]], even if non-empty.
Note that RFC 6455 requires the explanation to be encoded in UTF-8. Accordingly, this method will re-encode it unless it is already in UTF-8.
312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 |
# File 'lib/webtube.rb', line 312 def close status_code = 1000, explanation = "" # prepare the payload for the close frame payload = "" if status_code then payload = [status_code].pack('n') if explanation then payload << explanation.encode(Encoding::UTF_8) end end # let the other side know we're closing payload, OPCODE_CLOSE # break the main reception loop @send_mutex.synchronize do @alive = false end # if waiting for a frame (or parsing one), interrupt it @reception_interrupt_mutex.synchronize do @thread.raise AbortReceiveLoop.new if @receiving_frame end @socket.close if @close_socket return end |
#hash ⇒ Object
The application may want to store many Webtube instances in a hash or a set. In order to facilitate this, we’ll need
- [hash]
-
and [[eql?]]. The latter is already adequately –
comparing by identity – implemented by [[Object]]; in order to ensure the former hashes by identity, we’ll override it.
628 629 630 |
# File 'lib/webtube.rb', line 628 def hash return object_id end |
#inspect ⇒ Object
335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 |
# File 'lib/webtube.rb', line 335 def inspect s = "#<Webtube@0x%0x" % object_id s << " " << (@server ? 'from' : 'to') unless @url.nil? then s << " " << @url else # [[@socket]] is a [[Net::BufferedIO]] instance, so # [[@socket.io]] is either a plain socket or an SSL # wrapper af, port, hostname = @socket.io.peeraddr s << " %s:%i" % [hostname, port] end s << " @allow_rsv_bits=%s" % @allow_rsv_bits.inspect \ unless @allow_rsv_bits.nil? s << " @allow_opcodes=%s" % @allow_opcodes.inspect \ unless @allow_opcodes.nil? s << ">" return s end |
#run(listener) ⇒ Object
Run a loop to read all the messages and control frames coming in via this WebSocket, and hand events to the given [[listener]]. The listener can implement the following methods:
-
onopen(webtube) will be called as soon as the channel is set up.
-
onmessage(webtube, message_body, opcode) will be called with each arriving data message once it has been defragmented. The data will be passed to it as a
- [String]], encoded in [[UTF-8]
-
for [[OPCODE_TEXT]]
messages and in [[ASCII-8BIT]] for all the other message opcodes.
-
oncontrolframe(webtube, frame) will be called upon receipt of a control frame whose opcode is listed in the
- [allow_opcodes]
-
parameter of this [[Webtube]] instance.
The frame is represented by an instance of
- [Webtube::Frame]]. Note that [[Webtube]
-
handles
connection closures ([[OPCODE_CLOSE]]) and ponging all the pings ([[OPCODE_PING]]) automatically.
-
onping(webtube, frame) will be called upon receipt of an
- [OPCODE_PING]
-
frame. [[Webtube]] will take care of
ponging all the pings, but the listener may want to process such an event for statistical information.
-
onpong(webtube, frame) will be called upon receipt of an
- [OPCODE_PONG]
-
frame.
-
onclose(webtube) will be called upon closure of the connection, for any reason.
-
onannoyedclose(webtube, frame) will be called upon receipt of an [[OPCODE_CLOSE]] frame with an explicit status code other than 1000. This typically indicates that the other side is annoyed, so the listener may want to log the condition for debugging or further analysis. Normally, once the handler returns, [[Webtube]] will respond with a close frame of the same status code and close the connection, but the handler may call [[Webtube#close]] to request a closure with a different status code or without one.
-
onexception(webtube, exception) will be called if an unhandled exception is raised during the [[Webtube]]‘s lifecycle, including all of the listener event handlers. It may log the exception but should return normally so that the [[Webtube]] can issue a proper close frame for the other end and invoke the [[onclose]] handler, after which the exception will be raised again so the caller of
- [Webtube#run]
-
will have a chance to handle it.
Before calling any of the handlers, [[respond_to?]] will be used to check implementedness.
If an exception occurs during processing, it (that is, the
- [Exception]
-
instance) may implement a specific status code
to be passed to the other end via the [[OPCODE_CLOSE]] frame by implementing the [[websocket_close_status_code]] method returning the code as an integer. The default code, used if the exception does not specify one, is 1011 ‘unexpected condition’. An exception may explicitly suppress sending any code by having [[websocket_close_status_code]] return
- [nil]
-
instead of an integer.
134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 |
# File 'lib/webtube.rb', line 134 def run listener @run_mutex.synchronize do @thread = Thread.current begin listener.onopen self if listener.respond_to? :onopen while @send_mutex.synchronize{@alive} do begin @reception_interrupt_mutex.synchronize do @receiving_frame = true end frame = Webtube::Frame.read_from_socket @socket ensure @reception_interrupt_mutex.synchronize do @receiving_frame = false end end unless (frame.rsv & ~@allow_rsv_bits) == 0 then raise Webtube::UnknownReservedBit.new(frame: frame) end if @serverp then unless frame.masked? raise Webtube::UnmaskedFrameToServer.new( frame: frame) end else unless !frame.masked? then raise Webtube::MaskedFrameToClient.new( frame: frame) end end if !frame.control_frame? then # data frame if frame.opcode != Webtube::OPCODE_CONTINUATION then # initial frame unless @allow_opcodes.include? frame.opcode then raise Webtube::UnknownOpcode.new(frame: frame) end unless @defrag_buffer.empty? then raise Webtube::MissingContinuationFrame.new end else # continuation frame if @defrag_buffer.empty? then raise Webtube::UnexpectedContinuationFrame.new( frame: frame) end end @defrag_buffer.push frame if frame.fin? then opcode = @defrag_buffer.first.opcode data = @defrag_buffer.map(&:payload).join '' @defrag_buffer = [] if opcode == Webtube::OPCODE_TEXT then # text messages must be encoded in UTF-8, per # RFC 6455 data.force_encoding Encoding::UTF_8 unless data.valid_encoding? then data.force_encoding Encoding::ASCII_8BIT raise Webtube::BadlyEncodedText.new( data: data) end end listener. self, data, opcode \ if listener.respond_to? :onmessage end elsif (0x08 .. 0x0F).include? frame.opcode then # control frame unless frame.fin? then raise Webtube::FragmentedControlFrame.new( frame: frame) end case frame.opcode when Webtube::OPCODE_CLOSE then = frame.payload if .length >= 2 then status_code, = .unpack 'n' unless status_code == 1000 then listener.onannoyedclose self, frame \ if listener.respond_to? :onannoyedclose end else status_code = 1000 end close status_code when Webtube::OPCODE_PING then listener.onping self, frame \ if listener.respond_to? :onping frame.payload, Webtube::OPCODE_PONG when Webtube::OPCODE_PONG then listener.onpong self, frame \ if listener.respond_to? :onpong else unless @allow_opcodes.include? frame.opcode then raise Webtube::UnknownOpcode.new(frame: frame) end end listener.oncontrolframe self, frame \ if @allow_opcodes.include?(frame.opcode) and listener.respond_to?(:oncontrolframe) else raise 'assertion failed' end end rescue AbortReceiveLoop # we're out of the loop now, so nothing further to do rescue Exception => e status_code = if e.respond_to? :websocket_close_status_code then e.websocket_close_status_code else 1011 # 'unexpected condition' end listener.onexception self, e \ if listener.respond_to? :onexception begin close status_code rescue Errno::EPIPE, Errno::ECONNRESET, Errno::ENOTCONN # ignore, we have a bigger exception to handle end raise e ensure @thread = nil listener.onclose self \ if listener.respond_to? :onclose end end return end |
#send_message(message, opcode = Webtube::OPCODE_TEXT) ⇒ Object
Send a given message payload to the other party, using the given opcode. By default, the [[opcode]] is [[Webtube::OPCODE_TEXT]]. Re-encodes the payload if given in a non-UTF-8 encoding and [[opcode == Webtube::OPCODE_TEXT]].
268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 |
# File 'lib/webtube.rb', line 268 def , opcode = Webtube::OPCODE_TEXT if opcode == Webtube::OPCODE_TEXT and .encoding != Encoding::UTF_8 then = .encode Encoding::UTF_8 end @send_mutex.synchronize do raise 'WebSocket connection no longer live' unless @alive # In order to ensure that the local kernel will treat our # (data) frames atomically during the [[write]] syscall, # we'll want to ensure that the frame size does not exceed # 512 bytes -- the minimum permitted size for # [[PIPE_BUF]]. At this frame size, the header size is up # to four bytes for unmasked or eight bytes for masked # frames. # # (FIXME: in retrospect, that seems like an unpractical # consideration. We should probably use path MTU # instead.) Webtube::Frame.( message: , opcode: opcode, masked: !@serverp, max_frame_body_size: 512 - (!@serverp ? 8 : 4)) do |frame| @socket.write frame.header + frame.body end end return end |