Class: Webtube

Inherits:
Object
  • Object
show all
Defined in:
lib/webtube.rb,
lib/webtube/vital-statistics.rb

Defined Under Namespace

Classes: AbortReceiveLoop, BadlyEncodedText, BrokenFrame, ConnectionNotAlive, FragmentedControlFrame, Frame, Location, MaskedFrameToClient, MissingContinuationFrame, ProtocolError, Request, UnexpectedContinuationFrame, UnknownOpcode, UnknownReservedBit, UnmaskedFrameToServer, Vital_Statistics, WebSocketDeclined, WebSocketUpgradeFailed, WebSocketVersionMismatch

Constant Summary collapse

OPCODE_CONTINUATION =

Not all the possible 16 values are defined by the standard.

0x0
OPCODE_TEXT =
0x1
OPCODE_BINARY =
0x2
OPCODE_CLOSE =
0x8
OPCODE_PING =
0x9
OPCODE_PONG =
0xA
VERSION =
'1.1.0'

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(socket, serverp, allow_rsv_bits: 0, allow_opcodes: [Webtube::OPCODE_TEXT], close_socket: true) ⇒ Webtube



37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
# File 'lib/webtube.rb', line 37

def initialize socket,
    serverp,
        # If true, we will expect incoming data masked and
        # will not mask outgoing data.  If false, we will
        # expect incoming data unmasked and will mask outgoing
        # data.
    allow_rsv_bits: 0,
    allow_opcodes: [Webtube::OPCODE_TEXT],
    close_socket: true
  super()
  @socket = socket
  @serverp = serverp
  @allow_rsv_bits = allow_rsv_bits
  @allow_opcodes = allow_opcodes
  @close_socket = close_socket
  @defrag_buffer = []
  @alive = true
  @send_mutex = Mutex.new
      # Guards message sending, so that fragmented messages
      # won't get interleaved, and the [[@alive]] flag.
  @run_mutex = Mutex.new
      # Guards the main read loop.
  @receiving_frame = false
      # Are we currently receiving a frame for the
      # [[Webtube#run]] main loop?
  @reception_interrupt_mutex = Mutex.new
      # guards [[@receiving_frame]]
  return
end

Instance Attribute Details

#allow_opcodesObject

Returns the value of attribute allow_opcodes.



20
21
22
# File 'lib/webtube.rb', line 20

def allow_opcodes
  @allow_opcodes
end

#allow_rsv_bitsObject

Returns the value of attribute allow_rsv_bits.



19
20
21
# File 'lib/webtube.rb', line 19

def allow_rsv_bits
  @allow_rsv_bits
end

#contextObject

Returns the value of attribute context.



35
36
37
# File 'lib/webtube.rb', line 35

def context
  @context
end

#headerObject

The following three slots are not used by the [[Webtube]] infrastructrue. They have been defined purely so that application code could easily associate data it finds significant to [[Webtube]] instances.



32
33
34
# File 'lib/webtube.rb', line 32

def header
  @header
end

#sessionObject

[accept_webtube]

saves the request object here



34
35
36
# File 'lib/webtube.rb', line 34

def session
  @session
end

#urlObject (readonly)

Returns the value of attribute url.



22
23
24
# File 'lib/webtube.rb', line 22

def url
  @url
end

Class Method Details

.connect(url, allow_rsv_bits: 0, allow_opcodes: [Webtube::OPCODE_TEXT], http_header: {}, ssl_verify_mode: OpenSSL::SSL::VERIFY_PEER, ssl_cert_store: nil, tcp_connect_timeout: nil, tcp_nodelay: true, close_socket: true, on_http_request: nil, on_http_response: nil, on_ssl_handshake: nil, on_tcp_connect: nil) ⇒ Object

Attempts to set up a [[WebSocket]] connection to the given

[url]]. Returns the [[Webtube]

instance if successful or

raise an appropriate [[Webtube::WebSocketUpgradeFailed]].



358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
# File 'lib/webtube.rb', line 358

def self::connect url,
    allow_rsv_bits: 0,
    allow_opcodes: [Webtube::OPCODE_TEXT],
    http_header: {},
    ssl_verify_mode: OpenSSL::SSL::VERIFY_PEER,
    ssl_cert_store: nil,
        # or an [[OpenSSL::X509::Store]] instance
    tcp_connect_timeout: nil, # or number of seconds
    tcp_nodelay: true,
    close_socket: true,
    on_http_request: nil,
    on_http_response: nil,
    on_ssl_handshake: nil,
    on_tcp_connect: nil
  loc = Webtube::Location.new url

  socket = if tcp_connect_timeout.nil? then
    TCPSocket.new loc.host, loc.port
  else
    Timeout.timeout tcp_connect_timeout, Net::OpenTimeout do
      TCPSocket.new loc.host, loc.port
    end
  end
  if tcp_nodelay then
    socket.setsockopt Socket::IPPROTO_TCP,
        Socket::TCP_NODELAY, 1
  end
  on_tcp_connect.call socket if on_tcp_connect

  if loc.ssl? then
    # construct an SSL context
    if ssl_cert_store.nil? then
  ssl_cert_store = OpenSSL::X509::Store.new
  ssl_cert_store.set_default_paths
    end
    ssl_context = OpenSSL::SSL::SSLContext.new
    ssl_context.cert_store = ssl_cert_store
    ssl_context.verify_mode = ssl_verify_mode
    # wrap the socket
    socket = OpenSSL::SSL::SSLSocket.new socket, ssl_context
    socket.sync_close = true
    socket.hostname = loc.host # Server Name Indication
    socket.connect # perform SSL handshake
    socket.post_connection_check loc.host
    on_ssl_handshake.call socket if on_ssl_handshake
  end

  socket = Net::BufferedIO.new socket

  # transmit the request
  req = Webtube::Request.new loc, http_header
  composed_request = req.to_s
  socket.write composed_request
  on_http_request.call composed_request if on_http_request

  # wait for response
  response = Net::HTTPResponse.read_new socket

  if on_http_response then
    # reconstitute the response as a string
    #
    # (XXX: this loses some diagnostically useful bits, but
    # [[Net::HTTPResponse::read_new]] just doesn't preserve
    # the pristine original)
    s = "#{response.code} #{response.message}\r\n"
    response.each_header do |k, v|
      s << "#{k}: #{v}\r\n"
    end
    s << "\r\n"
    on_http_response.call s
  end

  # Check that the server is seeing us now
  # FIXME: ensure that the socket will be closed in case of
  # exception
  d = rejection response, req.expected_accept
  raise Webtube::WebSocketDeclined.new(d) \
      if d

  # Can the server speak our protocol version?
  unless (response['Sec-WebSocket-Version'] || '13').
      strip.split(/\s*,\s*/).include? '13' then
    raise Webtube::WebSocketVersionMismatch.new(
        "Sec-WebSocket-Version negotiation failed")
  end

  # The connection has been set up.  Now we can instantiate
  # [[Webtube]].
  wt = Webtube.new socket, false,
      allow_rsv_bits: allow_rsv_bits,
      allow_opcodes: allow_opcodes,
      close_socket: close_socket
  wt.instance_variable_set :@url, loc.to_s
  return wt
end

.rejection(response, expected_accept) ⇒ Object

Checks whether the given [[Net::HTTPResponse]] represents a valid WebSocket upgrade acceptance. Returns [[nil]] if so, or a human-readable string explaining the issue if not.

[expected_accept]

is the value [[Sec-WebSocket-Accept]] is

expected to hold, generated from the [[Sec-WebSocket-Key]] header field.



460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
# File 'lib/webtube.rb', line 460

def self::rejection response, expected_accept
  unless response.code == '101' then
    return "the HTTP response code was not 101"
  end
  unless (response['Connection'] || '').downcase ==
      'upgrade' then
    return "the HTTP response did not say " +
        "'Connection: upgrade'"
  end
  unless (response['Upgrade'] || '').downcase ==
      'websocket' then
    return "the HTTP response did not say " +
        "'Upgrade: websocket'"
  end
  unless (response['Sec-WebSocket-Accept'] || '') ==
      expected_accept then
    return "the HTTP response did not say " +
        "'Sec-WebSocket-Accept: #{expected_accept}'"
  end
  return nil
end

Instance Method Details

#close(status_code = 1000, explanation = "") ⇒ Object

Closes the connection, thus preventing further transmission.

If [[status_code]] is supplied, it will be passed to the other side in the [[OPCODE_CLOSE]] frame. The default is 1000 which indicates normal closure. Sending a status code can be explicitly suppressed by passing [[nil]] instead of an integer; then, an empty close frame will be sent. Due to the way a close frame’s payload is structured, this will also suppress delivery of [[close_explanation]], even if non-empty.

Note that RFC 6455 requires the explanation to be encoded in UTF-8. Accordingly, this method will re-encode it unless it is already in UTF-8.



312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
# File 'lib/webtube.rb', line 312

def close status_code = 1000, explanation = ""
  # prepare the payload for the close frame
  payload = ""
  if status_code then
    payload = [status_code].pack('n')
    if explanation then
      payload << explanation.encode(Encoding::UTF_8)
    end
  end
  # let the other side know we're closing
  send_message payload, OPCODE_CLOSE
  # break the main reception loop
  @send_mutex.synchronize do
    @alive = false
  end
  # if waiting for a frame (or parsing one), interrupt it
  @reception_interrupt_mutex.synchronize do
    @thread.raise AbortReceiveLoop.new if @receiving_frame
  end
  @socket.close if @close_socket
  return
end

#hashObject

The application may want to store many Webtube instances in a hash or a set. In order to facilitate this, we’ll need

[hash]

and [[eql?]]. The latter is already adequately –

comparing by identity – implemented by [[Object]]; in order to ensure the former hashes by identity, we’ll override it.



628
629
630
# File 'lib/webtube.rb', line 628

def hash
  return object_id
end

#inspectObject



335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
# File 'lib/webtube.rb', line 335

def inspect
  s = "#<Webtube@0x%0x" % object_id
  s << " " << (@server ? 'from' : 'to')
  unless @url.nil? then
    s << " " << @url
  else
    # [[@socket]] is a [[Net::BufferedIO]] instance, so
    # [[@socket.io]] is either a plain socket or an SSL
    # wrapper
    af, port, hostname = @socket.io.peeraddr
    s << " %s:%i" % [hostname, port]
  end
  s << " @allow_rsv_bits=%s" % @allow_rsv_bits.inspect \
      unless @allow_rsv_bits.nil?
  s << " @allow_opcodes=%s" % @allow_opcodes.inspect \
      unless @allow_opcodes.nil?
  s << ">"
  return s
end

#run(listener) ⇒ Object

Run a loop to read all the messages and control frames coming in via this WebSocket, and hand events to the given [[listener]]. The listener can implement the following methods:

  • onopen(webtube) will be called as soon as the channel is set up.

  • onmessage(webtube, message_body, opcode) will be called with each arriving data message once it has been defragmented. The data will be passed to it as a

    [String]], encoded in [[UTF-8]

    for [[OPCODE_TEXT]]

    messages and in [[ASCII-8BIT]] for all the other message opcodes.

  • oncontrolframe(webtube, frame) will be called upon receipt of a control frame whose opcode is listed in the

    [allow_opcodes]

    parameter of this [[Webtube]] instance.

    The frame is represented by an instance of

    [Webtube::Frame]]. Note that [[Webtube]

    handles

    connection closures ([[OPCODE_CLOSE]]) and ponging all the pings ([[OPCODE_PING]]) automatically.

  • onping(webtube, frame) will be called upon receipt of an

    [OPCODE_PING]

    frame. [[Webtube]] will take care of

    ponging all the pings, but the listener may want to process such an event for statistical information.

  • onpong(webtube, frame) will be called upon receipt of an

    [OPCODE_PONG]

    frame.

  • onclose(webtube) will be called upon closure of the connection, for any reason.

  • onannoyedclose(webtube, frame) will be called upon receipt of an [[OPCODE_CLOSE]] frame with an explicit status code other than 1000. This typically indicates that the other side is annoyed, so the listener may want to log the condition for debugging or further analysis. Normally, once the handler returns, [[Webtube]] will respond with a close frame of the same status code and close the connection, but the handler may call [[Webtube#close]] to request a closure with a different status code or without one.

  • onexception(webtube, exception) will be called if an unhandled exception is raised during the [[Webtube]]‘s lifecycle, including all of the listener event handlers. It may log the exception but should return normally so that the [[Webtube]] can issue a proper close frame for the other end and invoke the [[onclose]] handler, after which the exception will be raised again so the caller of

    [Webtube#run]

    will have a chance to handle it.

Before calling any of the handlers, [[respond_to?]] will be used to check implementedness.

If an exception occurs during processing, it (that is, the

[Exception]

instance) may implement a specific status code

to be passed to the other end via the [[OPCODE_CLOSE]] frame by implementing the [[websocket_close_status_code]] method returning the code as an integer. The default code, used if the exception does not specify one, is 1011 ‘unexpected condition’. An exception may explicitly suppress sending any code by having [[websocket_close_status_code]] return

[nil]

instead of an integer.



134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
# File 'lib/webtube.rb', line 134

def run listener
  @run_mutex.synchronize do
    @thread = Thread.current
    begin
      listener.onopen self if listener.respond_to? :onopen
      while @send_mutex.synchronize{@alive} do
        begin
          @reception_interrupt_mutex.synchronize do
            @receiving_frame = true
          end
          frame = Webtube::Frame.read_from_socket @socket
        ensure
          @reception_interrupt_mutex.synchronize do
            @receiving_frame = false
          end
        end
        unless (frame.rsv & ~@allow_rsv_bits) == 0 then
          raise Webtube::UnknownReservedBit.new(frame: frame)
        end
        if @serverp then
          unless frame.masked?
            raise Webtube::UnmaskedFrameToServer.new(
                frame: frame)
          end
        else
          unless !frame.masked? then
            raise Webtube::MaskedFrameToClient.new(
                frame: frame)
          end
        end
        if !frame.control_frame? then
          # data frame
          if frame.opcode != Webtube::OPCODE_CONTINUATION then
            # initial frame
            unless @allow_opcodes.include? frame.opcode then
              raise Webtube::UnknownOpcode.new(frame: frame)
            end
            unless @defrag_buffer.empty? then
              raise Webtube::MissingContinuationFrame.new
            end
          else
            # continuation frame
            if @defrag_buffer.empty? then
              raise Webtube::UnexpectedContinuationFrame.new(
                  frame: frame)
            end
          end
          @defrag_buffer.push frame
          if frame.fin? then
            opcode = @defrag_buffer.first.opcode
            data = @defrag_buffer.map(&:payload).join ''
            @defrag_buffer = []
            if opcode == Webtube::OPCODE_TEXT then
              # text messages must be encoded in UTF-8, per
              # RFC 6455
              data.force_encoding Encoding::UTF_8
              unless data.valid_encoding? then
                data.force_encoding Encoding::ASCII_8BIT
                raise Webtube::BadlyEncodedText.new(
                    data: data)
              end
            end
            listener.onmessage self, data, opcode \
                if listener.respond_to? :onmessage
          end
        elsif (0x08 .. 0x0F).include? frame.opcode then
          # control frame
          unless frame.fin? then
            raise Webtube::FragmentedControlFrame.new(
                frame: frame)
          end
          case frame.opcode
          when Webtube::OPCODE_CLOSE then
            message = frame.payload
            if message.length >= 2 then
              status_code, = message.unpack 'n'
              unless status_code == 1000 then
                listener.onannoyedclose self, frame \
                    if listener.respond_to? :onannoyedclose
              end
            else
              status_code = 1000
            end
            close status_code
          when Webtube::OPCODE_PING then
            listener.onping self, frame \
                if listener.respond_to? :onping
            send_message frame.payload, Webtube::OPCODE_PONG
          when Webtube::OPCODE_PONG then
            listener.onpong self, frame \
                if listener.respond_to? :onpong
          else
            unless @allow_opcodes.include? frame.opcode then
              raise Webtube::UnknownOpcode.new(frame: frame)
            end
          end
          listener.oncontrolframe self, frame \
              if @allow_opcodes.include?(frame.opcode) and
                  listener.respond_to?(:oncontrolframe)
        else
          raise 'assertion failed'
        end
      end
    rescue AbortReceiveLoop
      # we're out of the loop now, so nothing further to do
    rescue Exception => e
      status_code =
          if e.respond_to? :websocket_close_status_code then
            e.websocket_close_status_code
          else
            1011 # 'unexpected condition'
          end
      listener.onexception self, e \
          if listener.respond_to? :onexception
      begin
        close status_code
      rescue Errno::EPIPE, Errno::ECONNRESET, Errno::ENOTCONN
        # ignore, we have a bigger exception to handle
      end
      raise e
    ensure
      @thread = nil
      listener.onclose self \
          if listener.respond_to? :onclose
    end
  end
  return
end

#send_message(message, opcode = Webtube::OPCODE_TEXT) ⇒ Object

Send a given message payload to the other party, using the given opcode. By default, the [[opcode]] is [[Webtube::OPCODE_TEXT]]. Re-encodes the payload if given in a non-UTF-8 encoding and [[opcode == Webtube::OPCODE_TEXT]].



268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
# File 'lib/webtube.rb', line 268

def send_message message, opcode = Webtube::OPCODE_TEXT
  if opcode == Webtube::OPCODE_TEXT and
      message.encoding != Encoding::UTF_8 then
    message = message.encode Encoding::UTF_8
  end
  @send_mutex.synchronize do
    raise 'WebSocket connection no longer live' unless @alive
    # In order to ensure that the local kernel will treat our
    # (data) frames atomically during the [[write]] syscall,
    # we'll want to ensure that the frame size does not exceed
    # 512 bytes -- the minimum permitted size for
    # [[PIPE_BUF]].  At this frame size, the header size is up
    # to four bytes for unmasked or eight bytes for masked
    # frames.
    #
    # (FIXME: in retrospect, that seems like an unpractical
    # consideration.  We should probably use path MTU
    # instead.)
    Webtube::Frame.each_frame_for_message(
        message: message,
        opcode: opcode,
        masked: !@serverp,
        max_frame_body_size:
            512 - (!@serverp ? 8 : 4)) do |frame|
      @socket.write frame.header + frame.body
    end
  end
  return
end