Class: Kontena::Websocket::Client

Inherits:
Object
  • Object
show all
Includes:
Logging
Defined in:
lib/kontena/websocket/client.rb,
lib/kontena/websocket/client/version.rb

Overview

def websocket_connect

    Kontena::Websocket::Client.connect(url, ...) do |client|
      on_open(client)

      client.read do |message|
        on_message(message)
      end

      on_close(client.close_code, client.close_reason) # client closed connection
    end
  rescue Kontena::Websocket::CloseError => exc
    on_close(exc.code, exc.reason) # server closed connection
  rescue Kontena::Websocket::Error => exc
    on_error(exc)
  ensure
    # disconnected
  end
end

Defined Under Namespace

Classes: Connection

Constant Summary collapse

FRAME_SIZE =
4 * 1024
CONNECT_TIMEOUT =
60.0
OPEN_TIMEOUT =
60.0
PING_INTERVAL =
60.0
PING_TIMEOUT =
10.0
PING_STRFTIME =

high-percision RFC 3339

'%FT%T.%NZ'
CLOSE_TIMEOUT =
60.0
WRITE_TIMEOUT =
60.0
VERSION =
"0.1.1"

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Logging

#debug, #error, #info, initialize_logger, #logger, logger, logger=, #logging_prefix, #warn

Constructor Details

#initialize(url, headers: {}, ssl_params: {}, ssl_hostname: nil, connect_timeout: CONNECT_TIMEOUT, open_timeout: OPEN_TIMEOUT, ping_interval: PING_INTERVAL, ping_timeout: PING_TIMEOUT, close_timeout: CLOSE_TIMEOUT, write_timeout: WRITE_TIMEOUT) ⇒ Client

Returns a new instance of Client.

Parameters:

  • url (String)
  • headers (Hash{String => String}) (defaults to: {})
  • ssl_params (Hash) (defaults to: {})

    @see OpenSSL::SSL::SSLContext The DEFAULT_PARAMS includes verify_mode: OpenSSL::SSL::VERIFY_PEER. Use { verify_mode: OpenSSL::SSL::VERIFY_NONE } to disable Kontena::Websocket::SSLVerifyError on connect.

  • ssl_hostname (String) (defaults to: nil)

    override hostname for SSL SNI, certificate identity matching

  • connect_timeout (Float) (defaults to: CONNECT_TIMEOUT)

    timeout for TCP handshake; XXX: each phase of the SSL handshake

  • open_timeout (Float) (defaults to: OPEN_TIMEOUT)

    expect open frame after #start

  • ping_interval (Float) (defaults to: PING_INTERVAL)

    send pings every interval seconds after previous ping

  • ping_timeout (Float) (defaults to: PING_TIMEOUT)

    expect pong frame after #ping

  • close_timeout (Float) (defaults to: CLOSE_TIMEOUT)

    expect close frame after #close

  • write_timeout (Float) (defaults to: WRITE_TIMEOUT)

    block #send when sending faster than the server is able to receive, fail if no progress is made

Raises:

  • (ArgumentError)

    Invalid websocket URI



88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
# File 'lib/kontena/websocket/client.rb', line 88

def initialize(url,
    headers: {},
    ssl_params: {},
    ssl_hostname: nil,
    connect_timeout: CONNECT_TIMEOUT,
    open_timeout: OPEN_TIMEOUT,
    ping_interval: PING_INTERVAL,
    ping_timeout: PING_TIMEOUT,
    close_timeout: CLOSE_TIMEOUT,
    write_timeout: WRITE_TIMEOUT
)
  @uri = URI.parse(url)
  @headers = headers
  @ssl_params = ssl_params
  @ssl_hostname = ssl_hostname

  @connect_timeout = connect_timeout
  @open_timeout = open_timeout
  @ping_interval = ping_interval
  @ping_timeout = ping_timeout
  @close_timeout = close_timeout
  @write_timeout = write_timeout

  unless @uri.scheme == 'ws' || @uri.scheme == 'wss'
    raise ArgumentError, "Invalid websocket URL: #{@uri}"
  end

  @mutex = Mutex.new # for @driver

  # written by #enqueue from @driver callbacks with the @mutex held
  # drained by #read without the @mutex held
  @message_queue = []

  # sequential ping-pongs
  @ping_at = Time.now # fake for first ping_interval
end

Instance Attribute Details

#uriObject (readonly)

Returns the value of attribute uri.



31
32
33
# File 'lib/kontena/websocket/client.rb', line 31

def uri
  @uri
end

Class Method Details

.connect(url, **options) {|client| ... } ⇒ Object

Connect, send websocket handshake, and loop reading responses.

Passed block is called once websocket is open. Raises on errors. Returns once websocket is closed by server.

Intended to be called using a dedicated per-websocket thread. Other threads can then call the other threadsafe methods:

* send
* close

Yields:

  • (client)

    websocket open

Returns:

  • websocket closed by server, @see #close_code #close_reason

Raises:



58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
# File 'lib/kontena/websocket/client.rb', line 58

def self.connect(url, **options, &block)
  client = new(url, **options)

  client.connect

  begin
    yield client
  ensure
    # ensure socket is closed and client disconnected on any of:
    #   * connect error
    #   * open error
    #   * read error
    #   * close
    client.disconnect
  end
end

.ruby_version?(gte_version) ⇒ Boolean

Running ruby >= version?

Parameters:

  • gte_version (String)

Returns:

  • (Boolean)


7
8
9
10
11
12
# File 'lib/kontena/websocket/client/version.rb', line 7

def self.ruby_version?(gte_version)
  ruby_version = RUBY_VERSION.split('.').map{|x|x.to_i}
  gte_version = gte_version.split('.').map{|x|x.to_i}

  return (ruby_version <=> gte_version) >= 0
end

Instance Method Details

#close(code = 1000, reason = nil) ⇒ Object

Send close frame. Does not disconnect, but allows #read to return once server completes close handshake.

Imposes a close timeout when called from #run blocks (run/on_message/on_pong do …). If called from a different thread, then #run should eventually return, once either:

  • server sends close frame

  • server sends any other frame after the close timeout expires

  • the ping interval expires

XXX: prevent #send after close?

Parameters:

  • close (Integer)
  • reason (String) (defaults to: nil)


385
386
387
388
389
390
391
392
393
# File 'lib/kontena/websocket/client.rb', line 385

def close(code = 1000, reason = nil)
  debug "close code=#{code}: #{reason}"

  with_driver do |driver|
    fail unless driver.close(reason, code) # swapped argument order

    closing! code
  end
end

#close_codeInteger

Valid once #run returns, when closed? or closing?

Returns:

  • (Integer)


201
202
203
# File 'lib/kontena/websocket/client.rb', line 201

def close_code
  @closed_code || @closing_code
end

#close_reasonString

Valid once #run returns, when closed?

Returns:

  • (String)


208
209
210
# File 'lib/kontena/websocket/client.rb', line 208

def close_reason
  @closed_reason
end

#closed!(code, reason) ⇒ Object

Server closed, completing close handshake if closing.

Parameters:

  • code (Integer)
  • reason (String)


716
717
718
719
720
721
# File 'lib/kontena/websocket/client.rb', line 716

def closed!(code, reason)
  @open = false
  @closed = true
  @closed_code = code
  @closed_reason = reason
end

#closed?Boolean

Server has closed websocket connection.

Returns:

  • (Boolean)


194
195
196
# File 'lib/kontena/websocket/client.rb', line 194

def closed?
  !!@closed
end

#closing!(code) ⇒ Object

Client closing connection with code.

Start read deadline for @open_timeout

Parameters:

  • code (Integer)

    expecting close frame from server with matching code



705
706
707
708
709
710
# File 'lib/kontena/websocket/client.rb', line 705

def closing!(code)
  @open = false # fail any further sends after close
  @closing = true
  @closing_at = Time.now
  @closing_code = code # cheked by #read_closed
end

#closing?Boolean

Client has sent close frame, but socket is not yet closed.

Returns:

  • (Boolean)


187
188
189
# File 'lib/kontena/websocket/client.rb', line 187

def closing?
  !!@closing && !@closed
end

#connectObject

Create @socket, @connection and open @driver.

Blocks until open. Disconnects if fails.

Returns:

  • once websocket is open

Raises:



219
220
221
222
223
224
225
226
227
228
229
230
# File 'lib/kontena/websocket/client.rb', line 219

def connect
  @connection = self.socket_connect

  @driver = self.websocket_open(@connection)

  # blocks
  self.websocket_read until @opened

rescue
  disconnect
  raise
end

#connect_sslOpenSSL::SSL::SSLSocket

Connect to TCP server, perform SSL handshake, verify if required.

Returns:

Raises:



549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
# File 'lib/kontena/websocket/client.rb', line 549

def connect_ssl
  tcp_socket = self.connect_tcp
  ssl_context = self.ssl_context

  debug "connect_ssl: #{ssl_context.inspect} hostname=#{self.ssl_hostname}"

  ssl_socket = OpenSSL::SSL::SSLSocket.new(tcp_socket, ssl_context)
  ssl_socket.sync_close = true # close TCPSocket after SSL shutdown
  ssl_socket.hostname = self.ssl_hostname # SNI

  begin
    self.ssl_connect(ssl_socket)
  rescue OpenSSL::SSL::SSLError => exc
    # SSL_connect returned=1 errno=0 state=error: certificate verify failed
    if exc.message.end_with? 'certificate verify failed'
      # ssl_socket.peer_cert is not set on errors :(
      raise ssl_verify_error(ssl_socket.verify_result)
    else
      raise Kontena::Websocket::SSLConnectError, exc
    end
  end

  # raises Kontena::Websocket::SSLVerifyError
  self.ssl_verify_cert!(ssl_socket.peer_cert, ssl_socket.peer_cert_chain) if ssl_verify?

  ssl_socket
end

#connect_tcpTCPSocket

Connect to TCP server.

Returns:

  • (TCPSocket)

Raises:



430
431
432
433
434
435
436
437
438
439
440
# File 'lib/kontena/websocket/client.rb', line 430

def connect_tcp
  debug "connect_tcp: timeout=#{@connect_timeout}"

  Socket.tcp(self.host, self.port, connect_timeout: @connect_timeout)
rescue Errno::ETIMEDOUT => exc
  raise Kontena::Websocket::TimeoutError, "Connect timeout after #{@connect_timeout}s" # XXX: actual delay
rescue SocketError => exc
  raise Kontena::Websocket::ConnectError, exc
rescue SystemCallError => exc
  raise Kontena::Websocket::ConnectError, exc
end

#connected?Boolean

Connected to server. Not necessarily open yet.

Returns:

  • (Boolean)


165
166
167
# File 'lib/kontena/websocket/client.rb', line 165

def connected?
  !!@socket && !!@connection && !!@driver
end

#disconnectObject

Clear connection state, close socket. Does not send websocket close frame, or wait for server to close.



397
398
399
400
401
402
403
404
405
406
407
# File 'lib/kontena/websocket/client.rb', line 397

def disconnect
  debug "disconnect"

  @open = false
  @driver = nil
  @connection = nil

  # TODO: errors and timeout? SSLSocket.close in particular is bidirectional?
  @socket.close if @socket
  @socket = nil
end

#hostString

Returns:

  • (String)


153
154
155
# File 'lib/kontena/websocket/client.rb', line 153

def host
  @uri.host
end

#http_headersWebsocket::Driver::Headers

Valid once open

Returns:

  • (Websocket::Driver::Headers)

Raises:

  • (RuntimeError)

    not connected



272
273
274
275
276
# File 'lib/kontena/websocket/client.rb', line 272

def http_headers
  with_driver do |driver|
    driver.headers
  end
end

#http_statusInteger

Valid once open

Returns:

  • (Integer)

Raises:

  • (RuntimeError)

    not connected



262
263
264
265
266
# File 'lib/kontena/websocket/client.rb', line 262

def http_status
  with_driver do |driver|
    driver.status
  end
end

#on_driver_close(event) ⇒ Object

Mark client as closed, allowing #run to return (and disconnect from the server).

Parameters:

  • event (WebSocket::Driver::CloseEvent)

    code, reason



662
663
664
665
666
# File 'lib/kontena/websocket/client.rb', line 662

def on_driver_close(event)
  debug "closed code=#{event.code}: #{event.reason}"

  closed! event.code, event.reason
end

#on_driver_error(exc) ⇒ Object

Parameters:

  • exc (WebSocket::Driver::ProtocolError)


634
635
636
637
638
639
640
# File 'lib/kontena/websocket/client.rb', line 634

def on_driver_error(exc)
  # this will presumably propagate up out of #recv_loop, not this function
  raise exc

rescue WebSocket::Driver::ProtocolError => exc
  raise Kontena::Websocket::ProtocolError, exc
end

#on_driver_message(event) ⇒ Object

Queue up received messages Causes #read to return/yield once websocket_read returns with the driver mutex unlocked.

Parameters:

  • event (WebSocket::Driver::MessageEvent)

    data



655
656
657
# File 'lib/kontena/websocket/client.rb', line 655

def on_driver_message(event)
  @message_queue << event.data
end

#on_driver_open(event) ⇒ Object

Mark client as opened, calling the block passed to #run.

Parameters:

  • event (WebSocket::Driver::OpenEvent)

    no attrs



645
646
647
648
649
# File 'lib/kontena/websocket/client.rb', line 645

def on_driver_open(event)
  debug "opened"

  opened!
end

#on_pong {|delay| ... } ⇒ Object

Register pong handler. Called from the #read thread every ping_interval after received pong. XXX: called with driver @mutex locked

The ping interval should be longer than the ping timeout. If new pings are sent before old pings get any response, then the older pings do not yield on pong.

Yields:

  • (delay)

    received pong

Yield Parameters:

  • delay (Float)

    ping-pong delay in seconds



321
322
323
# File 'lib/kontena/websocket/client.rb', line 321

def on_pong(&block)
  @on_pong = block
end

#open?Boolean

Server has accepted websocket connection.

Returns:

  • (Boolean)


180
181
182
# File 'lib/kontena/websocket/client.rb', line 180

def open?
  !!@open
end

#opened!Object

Server completed open handshake.



674
675
676
677
# File 'lib/kontena/websocket/client.rb', line 674

def opened!
  @opened = true
  @open = true
end

#opening!Object

Start read deadline for @open_timeout



669
670
671
# File 'lib/kontena/websocket/client.rb', line 669

def opening!
  @opening_at = Time.now
end

#opening?Boolean

Client has started websocket handshake, but is not yet open.

Returns:

  • (Boolean)


172
173
174
175
# File 'lib/kontena/websocket/client.rb', line 172

def opening?
  # XXX: also true after disconnect
  !!@opening_at && !@opened
end

#pingObject

Send ping message.

This is intended to be automatically called from #run per @ping_interval. Calling it from the #run callbacks also works, but calling it from a different thread while #run is blocked on read() will ignore the @ping_timeout, unless the server happens to send something else.

Raises:

  • (RuntimeError)

    not connected



333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
# File 'lib/kontena/websocket/client.rb', line 333

def ping
  with_driver do |driver|
    # start ping timeout for next read
    ping_at = pinging!

    debug "pinging at #{ping_at}"

    fail unless driver.ping(ping_at.utc.strftime(PING_STRFTIME)) do
      # called from read -> @driver.parse with @mutex held!
      debug "pong for #{ping_at}"

      # resolve ping timeout, unless this pong is late and we already sent a new one
      if ping_at == @ping_at
        ping_delay = pinged!

        debug "ping-pong at #{ping_at} in #{ping_delay}s"

        # XXX: defer call without mutex?
        @on_pong.call(ping_delay) if @on_pong # TODO: also pass ping_at
      end
    end
  end
end

#ping_delayFloat?

Measured ping-pong delay from previous ping

nil if not pinged yet

Returns:

  • (Float, nil)


369
370
371
# File 'lib/kontena/websocket/client.rb', line 369

def ping_delay
  @ping_delay
end

#pinged!Float

Stop read deadline for @ping_timeout

Returns:

  • (Float)

    ping delay



692
693
694
695
696
697
698
# File 'lib/kontena/websocket/client.rb', line 692

def pinged!
  @pinging = false
  @pong_at = Time.now
  @ping_delay = @pong_at - @ping_at

  @ping_delay
end

#pinging!Time

Start read deadline for @ping_timeout

Returns:

  • (Time)

    ping at



682
683
684
685
686
687
688
# File 'lib/kontena/websocket/client.rb', line 682

def pinging!
  @pinging = true
  @ping_at = Time.now
  @pong_at = nil

  @ping_at
end

#pinging?Boolean

Waiting for pong from ping

Returns:

  • (Boolean)


360
361
362
# File 'lib/kontena/websocket/client.rb', line 360

def pinging?
  !!@pinging
end

#portInteger

Returns:

  • (Integer)


158
159
160
# File 'lib/kontena/websocket/client.rb', line 158

def port
  @uri.port || (@uri.scheme == "ws" ? 80 : 443)
end

#read(&block) ⇒ Object

Read messages from websocket.

If a block is given, then this loops and yields messages until closed. Otherwise, returns the next message, or nil if closed.



284
285
286
287
288
289
290
# File 'lib/kontena/websocket/client.rb', line 284

def read(&block)
  if block
    read_yield(&block)
  else
    read_return
  end
end

#read_closedBoolean

Returns connection was closed.

Returns:

  • (Boolean)

    connection was closed

Raises:



725
726
727
728
729
730
731
732
733
734
# File 'lib/kontena/websocket/client.rb', line 725

def read_closed
  if !@closed
    return false
  elsif @closing && @closing_code == @closed_code
    # client sent close, server responded with close
    return true
  else
    raise Kontena::Websocket::CloseError.new(@closed_code, @closed_reason)
  end
end

#read_returnString, Array<Integer>

Returns nil when websocket closed.

Returns:

  • (String, Array<Integer>)

    nil when websocket closed

Raises:



738
739
740
741
742
743
744
745
746
# File 'lib/kontena/websocket/client.rb', line 738

def read_return
  # important to first drain message queue before failing on read_closed!
  while @message_queue.empty? && !self.read_closed
    self.websocket_read
  end

  # returns nil if @closed, once the message queue is empty
  return @message_queue.shift
end

#read_state_timeoutSymbol, Float

Return read deadline for current read state

Returns:

  • (Symbol, Float, Float)

    state, at, timeout



768
769
770
771
772
773
774
775
776
777
778
779
780
781
# File 'lib/kontena/websocket/client.rb', line 768

def read_state_timeout
  case
  when opening? && @open_timeout
    [:open, @opening_at, @open_timeout]
  when pinging? && @ping_timeout
    [:pong, @ping_at, @ping_timeout]
  when closing? && @close_timeout
    [:close, @closing_at, @close_timeout]
  when @ping_interval
    [:ping, @ping_at, @ping_interval]
  else
    [nil, nil, nil]
  end
end

#read_yield {|message| ... } ⇒ Object

Returns websocket closed.

Yields:

  • (message)

    received websocket message payload

Yield Parameters:

  • message (String, Array<integer>)

    text or binary

Returns:

  • websocket closed

Raises:



752
753
754
755
756
757
758
759
760
761
762
763
# File 'lib/kontena/websocket/client.rb', line 752

def read_yield
  loop do
    while msg = @message_queue.shift
      yield msg
    end

    # drain queue before returning on close
    return if self.read_closed

    self.websocket_read
  end
end

#schemeString

Returns ws or wss.

Returns:

  • (String)

    ws or wss



131
132
133
# File 'lib/kontena/websocket/client.rb', line 131

def scheme
  @uri.scheme
end

#send(message) ⇒ Object

Send message frame, either text or binary.

Parameters:

  • message (String, Array<Integer>)

Raises:

  • (ArgumentError)

    invalid type

  • (RuntimeError)

    unable to send (socket closed?)



297
298
299
300
301
302
303
304
305
306
307
308
309
310
# File 'lib/kontena/websocket/client.rb', line 297

def send(message)
  case message
  when String
    with_driver do |driver|
      fail unless driver.text(message)
    end
  when Array
    with_driver do |driver|
      fail unless driver.binary(message)
    end
  else
    raise ArgumentError, "Invalid type: #{message.class}"
  end
end

#socket_connectConnection

Returns:



578
579
580
581
582
583
584
585
586
587
588
# File 'lib/kontena/websocket/client.rb', line 578

def socket_connect
  if ssl?
    @socket = self.connect_ssl
  else
    @socket = self.connect_tcp
  end

  return Connection.new(@uri, @socket,
    write_timeout: @write_timeout,
  )
end

#socket_read(timeout = nil) ⇒ Object

Read from socket with timeout, parse websocket frames. Invokes on_driver_*, which enqueues messages. The websocket must be connected.

Parameters:

  • timeout (Flaot) (defaults to: nil)

    seconds

Raises:



826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
# File 'lib/kontena/websocket/client.rb', line 826

def socket_read(timeout = nil)
  if timeout && timeout <= 0.0
    raise Kontena::Websocket::TimeoutError, "read deadline expired"
  end

  begin
    data = @connection.read(FRAME_SIZE, timeout: timeout)

  rescue EOFError => exc
    debug "read EOF"

    raise Kontena::Websocket::EOFError, 'Server closed connection without sending close frame'

  rescue IOError => exc
    # socket was closed => IOError: closed stream
    debug "read IOError: #{exc}"

    raise Kontena::Websocket::SocketError, exc

  # TODO: Errno::ECONNRESET etc
  end
end

#ssl?Boolean

Returns:

  • (Boolean)


136
137
138
# File 'lib/kontena/websocket/client.rb', line 136

def ssl?
  @uri.scheme == 'wss'
end

#ssl_certnil, OpenSSL::X509::Certificate

Returns:

  • (nil)

    not an ssl connection, or no peer cert

  • (OpenSSL::X509::Certificate)

Raises:

  • (RuntimeError)

    not connected



235
236
237
238
239
240
# File 'lib/kontena/websocket/client.rb', line 235

def ssl_cert
  fail "not connected" unless @socket
  return nil unless ssl?

  return @socket.peer_cert
end

#ssl_cert!nil, OpenSSL::X509::Certificate

Verify and return SSL cert. Validates even if not ssl_verify.

Returns:

  • (nil)

    not an ssl connection

  • (OpenSSL::X509::Certificate)

Raises:



248
249
250
251
252
253
254
255
256
# File 'lib/kontena/websocket/client.rb', line 248

def ssl_cert!
  fail "not connected" unless @socket
  return nil unless ssl?

  # raises Kontena::Websocket::SSLVerifyError
  self.ssl_verify_cert! @socket.peer_cert, @socket.peer_cert_chain

  return @socket.peer_cert
end

#ssl_cert_storeOpenSSL::X509::Store

Returns:

  • (OpenSSL::X509::Store)

Raises:

  • (ArgumentError)

    Failed adding cert store file/path: …



444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
# File 'lib/kontena/websocket/client.rb', line 444

def ssl_cert_store
  @ssl_cert_store ||= OpenSSL::X509::Store.new.tap do |ssl_cert_store|
    ca_file = @ssl_params[:ca_file] || ENV['SSL_CERT_FILE']
    ca_path = @ssl_params[:ca_path] || ENV['SSL_CERT_PATH']

    if ca_file || ca_path
      if ca_file
        debug "add cert store file: #{ca_file}"

        begin
          ssl_cert_store.add_file ca_file
        rescue OpenSSL::X509::StoreError
          raise ArgumentError, "Failed adding cert store file: #{ca_file}"
        end
      end

      if ca_path
        debug "add cert store path: #{ca_path}"

        begin
          # XXX: does not actually raise
          ssl_cert_store.add_path ca_path
        rescue OpenSSL::X509::StoreError
          raise ArgumentError, "Failed adding cert store path: #{ca_path}"
        end
      end
    else
      debug "use default cert store paths"

      ssl_cert_store.set_default_paths
    end
  end
end

#ssl_connect(ssl_socket) ⇒ Object

TODO: connect_deadline to impose a single deadline on the entire process XXX: specs

Parameters:

Raises:



528
529
530
531
532
533
534
535
536
537
538
539
540
541
# File 'lib/kontena/websocket/client.rb', line 528

def ssl_connect(ssl_socket)
  debug "ssl_connect..."
  ret = ssl_socket.connect_nonblock
rescue IO::WaitReadable
  debug "ssl_connect wait read: timeout=#{@connect_timeout}"
  ssl_socket.wait_readable(@connect_timeout) or raise Kontena::Websocket::TimeoutError, "SSL connect read timeout after #{@connect_timeout}s"
  retry
rescue IO::WaitWritable
  debug "ssl_connect wait write: timeout=#{@connect_timeout}"
  ssl_socket.wait_writable(@connect_timeout) or raise Kontena::Websocket::TimeoutError, "SSL connect write timeout after #{@connect_timeout}s"
  retry
else
  debug "ssl_connect: #{ret}"
end

#ssl_contextOpenSSL::SSL::SSLContext

Returns:

  • (OpenSSL::SSL::SSLContext)


506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
# File 'lib/kontena/websocket/client.rb', line 506

def ssl_context
  @ssl_context ||= OpenSSL::SSL::SSLContext.new().tap do |ssl_context|
    params = {
      cert_store: self.ssl_cert_store,
      **@ssl_params
    }

    if Kontena::Websocket::Client.ruby_version? '2.4'
      # prefer our own ssl_verify_cert! logic
      params[:verify_hostname] = false
    end

    ssl_context.set_params(params)
  end
end

#ssl_hostnameString

Returns:

  • (String)


148
149
150
# File 'lib/kontena/websocket/client.rb', line 148

def ssl_hostname
  @ssl_hostname || @uri.host
end

#ssl_verify?Boolean

Connecting with SSL cert/host verification?

Returns:

  • (Boolean)


143
144
145
# File 'lib/kontena/websocket/client.rb', line 143

def ssl_verify?
  ssl? && ssl_context.verify_mode != OpenSSL::SSL::VERIFY_NONE
end

#ssl_verify_cert!(ssl_cert, ssl_cert_chain) ⇒ Object

Parameters:

  • ssl_cert (OpenSSL::X509::Certificate)

Raises:



480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
# File 'lib/kontena/websocket/client.rb', line 480

def ssl_verify_cert!(ssl_cert, ssl_cert_chain)
  unless ssl_cert
    raise Kontena::Websocket::SSLVerifyError.new(OpenSSL::X509::V_OK, ssl_cert, ssl_cert_chain), "No certificate"
  end

  ssl_verify_context = OpenSSL::X509::StoreContext.new(ssl_cert_store, ssl_cert, ssl_cert_chain)

  unless ssl_verify_context.verify
    raise Kontena::Websocket::SSLVerifyError.new(ssl_verify_context.error, ssl_cert, ssl_cert_chain), ssl_verify_context.error_string
  end

  unless OpenSSL::SSL.verify_certificate_identity(ssl_cert, self.ssl_hostname)
    raise Kontena::Websocket::SSLVerifyError.new(OpenSSL::X509::V_OK, ssl_cert, ssl_cert_chain), "Subject does not match hostname #{self.ssl_hostname}: #{ssl_cert.subject}"
  end
end

#ssl_verify_error(verify_result, ssl_cert = nil, ssl_cert_chain = nil) ⇒ Kontena::Websocket::SSLVerifyError

Parameters:

  • verify_result (Integer)

    OpenSSL::SSL::SSLSocket#verify_result

Returns:



498
499
500
501
502
503
# File 'lib/kontena/websocket/client.rb', line 498

def ssl_verify_error(verify_result, ssl_cert = nil, ssl_cert_chain = nil)
  ssl_verify_context = OpenSSL::X509::StoreContext.new(ssl_cert_store)
  ssl_verify_context.error = verify_result

  Kontena::Websocket::SSLVerifyError.new(ssl_verify_context.error, ssl_cert, ssl_cert_chain, ssl_verify_context.error_string)
end

#urlString

Returns:

  • (String)


126
127
128
# File 'lib/kontena/websocket/client.rb', line 126

def url
  @uri.to_s
end

#websocket_open(connection) ⇒ WebSocket::Driver::Client

Create websocket driver using connection, and send websocket handshake. Registers driver handlers to set @open, @closed states, enqueue messages, or raise errors.

Parameters:

Returns:

  • (WebSocket::Driver::Client)

Raises:

  • (RuntimeError)

    already started?



596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
# File 'lib/kontena/websocket/client.rb', line 596

def websocket_open(connection)
  debug "websocket open..."

  driver = ::WebSocket::Driver.client(connection)

  @headers.each do |k, v|
    driver.set_header(k, v)
  end

  # these are called from read_loop -> with_driver { driver.parse } with the @mutex held
  # do not recurse back into with_driver!
  driver.on :error do |event|
    self.on_driver_error(event)
  end

  driver.on :open do |event|
    self.on_driver_open(event)
  end

  driver.on :message do |event|
    self.on_driver_message(event)
  end

  driver.on :close do |event|
    self.on_driver_close(event)
  end

  # not expected to emit anything, not even :error
  fail unless driver.start

  debug "opening"

  opening!

  return driver
end

#websocket_readObject

Read socket with timeout, parse websocket frames via @driver, emitting on-driver_* to enqueue messages. Sends ping on interval timeout.



785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
# File 'lib/kontena/websocket/client.rb', line 785

def websocket_read
  read_state, state_start, state_timeout = self.read_state_timeout

  if state_timeout
    read_deadline = state_start + state_timeout
    read_timeout = read_deadline - Time.now
  else
    read_timeout = nil
  end

  debug "read (#{read_state})"

  begin
    # read from socket
    data = self.socket_read(read_timeout)
  rescue Kontena::Websocket::TimeoutError => exc
    if read_state == :ping
      debug "ping on #{exc}"
      self.ping
    elsif read_state
      raise exc.class.new("#{exc} while waiting #{state_timeout}s for #{read_state}")
    else
      raise
    end
  else
    # parse data with @driver @mutex held
    with_driver do |driver|
      # call into the driver, causing it to emit the events registered in #start
      driver.parse(data)
    end
  end
end

#with_driver {|driver| ... } ⇒ Object

Call into driver with locked Mutex

Yields:

  • (driver)

Yield Parameters:

  • driver (Websocket::Driver)

Raises:

  • (RuntimeError)

    not connected



416
417
418
419
420
421
422
# File 'lib/kontena/websocket/client.rb', line 416

def with_driver
  @mutex.synchronize {
    fail "not connected" unless @driver

    yield @driver
  }
end