Class: Kontena::Websocket::Client
- Inherits:
-
Object
- Object
- Kontena::Websocket::Client
- Includes:
- Logging
- Defined in:
- lib/kontena/websocket/client.rb,
lib/kontena/websocket/client/version.rb
Overview
def websocket_connect
Kontena::Websocket::Client.connect(url, ...) do |client|
on_open(client)
client.read do |message|
on_message(message)
end
on_close(client.close_code, client.close_reason) # client closed connection
end
rescue Kontena::Websocket::CloseError => exc
on_close(exc.code, exc.reason) # server closed connection
rescue Kontena::Websocket::Error => exc
on_error(exc)
ensure
# disconnected
end
end
Defined Under Namespace
Classes: Connection
Constant Summary collapse
- FRAME_SIZE =
4 * 1024
- CONNECT_TIMEOUT =
60.0
- OPEN_TIMEOUT =
60.0
- PING_INTERVAL =
60.0
- PING_TIMEOUT =
10.0
- PING_STRFTIME =
high-percision RFC 3339
'%FT%T.%NZ'
- CLOSE_TIMEOUT =
60.0
- WRITE_TIMEOUT =
60.0
- VERSION =
"0.1.1"
Instance Attribute Summary collapse
-
#uri ⇒ Object
readonly
Returns the value of attribute uri.
Class Method Summary collapse
-
.connect(url, **options) {|client| ... } ⇒ Object
Connect, send websocket handshake, and loop reading responses.
-
.ruby_version?(gte_version) ⇒ Boolean
Running ruby >= version?.
Instance Method Summary collapse
-
#close(code = 1000, reason = nil) ⇒ Object
Send close frame.
-
#close_code ⇒ Integer
Valid once #run returns, when closed? or closing?.
-
#close_reason ⇒ String
Valid once #run returns, when closed?.
-
#closed!(code, reason) ⇒ Object
Server closed, completing close handshake if closing.
-
#closed? ⇒ Boolean
Server has closed websocket connection.
-
#closing!(code) ⇒ Object
Client closing connection with code.
-
#closing? ⇒ Boolean
Client has sent close frame, but socket is not yet closed.
-
#connect ⇒ Object
Create @socket, @connection and open @driver.
-
#connect_ssl ⇒ OpenSSL::SSL::SSLSocket
Connect to TCP server, perform SSL handshake, verify if required.
-
#connect_tcp ⇒ TCPSocket
Connect to TCP server.
-
#connected? ⇒ Boolean
Connected to server.
-
#disconnect ⇒ Object
Clear connection state, close socket.
- #host ⇒ String
-
#http_headers ⇒ Websocket::Driver::Headers
Valid once open.
-
#http_status ⇒ Integer
Valid once open.
-
#initialize(url, headers: {}, ssl_params: {}, ssl_hostname: nil, connect_timeout: CONNECT_TIMEOUT, open_timeout: OPEN_TIMEOUT, ping_interval: PING_INTERVAL, ping_timeout: PING_TIMEOUT, close_timeout: CLOSE_TIMEOUT, write_timeout: WRITE_TIMEOUT) ⇒ Client
constructor
A new instance of Client.
-
#on_driver_close(event) ⇒ Object
Mark client as closed, allowing #run to return (and disconnect from the server).
- #on_driver_error(exc) ⇒ Object
-
#on_driver_message(event) ⇒ Object
Queue up received messages Causes #read to return/yield once websocket_read returns with the driver mutex unlocked.
-
#on_driver_open(event) ⇒ Object
Mark client as opened, calling the block passed to #run.
-
#on_pong {|delay| ... } ⇒ Object
Register pong handler.
-
#open? ⇒ Boolean
Server has accepted websocket connection.
-
#opened! ⇒ Object
Server completed open handshake.
-
#opening! ⇒ Object
Start read deadline for @open_timeout.
-
#opening? ⇒ Boolean
Client has started websocket handshake, but is not yet open.
-
#ping ⇒ Object
Send ping message.
-
#ping_delay ⇒ Float?
Measured ping-pong delay from previous ping.
-
#pinged! ⇒ Float
Stop read deadline for @ping_timeout.
-
#pinging! ⇒ Time
Start read deadline for @ping_timeout.
-
#pinging? ⇒ Boolean
Waiting for pong from ping.
- #port ⇒ Integer
-
#read(&block) ⇒ Object
Read messages from websocket.
-
#read_closed ⇒ Boolean
Connection was closed.
-
#read_return ⇒ String, Array<Integer>
Nil when websocket closed.
-
#read_state_timeout ⇒ Symbol, Float
Return read deadline for current read state.
-
#read_yield {|message| ... } ⇒ Object
Websocket closed.
-
#scheme ⇒ String
Ws or wss.
-
#send(message) ⇒ Object
Send message frame, either text or binary.
- #socket_connect ⇒ Connection
-
#socket_read(timeout = nil) ⇒ Object
Read from socket with timeout, parse websocket frames.
- #ssl? ⇒ Boolean
- #ssl_cert ⇒ nil, OpenSSL::X509::Certificate
-
#ssl_cert! ⇒ nil, OpenSSL::X509::Certificate
Verify and return SSL cert.
- #ssl_cert_store ⇒ OpenSSL::X509::Store
-
#ssl_connect(ssl_socket) ⇒ Object
TODO: connect_deadline to impose a single deadline on the entire process XXX: specs.
- #ssl_context ⇒ OpenSSL::SSL::SSLContext
- #ssl_hostname ⇒ String
-
#ssl_verify? ⇒ Boolean
Connecting with SSL cert/host verification?.
- #ssl_verify_cert!(ssl_cert, ssl_cert_chain) ⇒ Object
- #ssl_verify_error(verify_result, ssl_cert = nil, ssl_cert_chain = nil) ⇒ Kontena::Websocket::SSLVerifyError
- #url ⇒ String
-
#websocket_open(connection) ⇒ WebSocket::Driver::Client
Create websocket driver using connection, and send websocket handshake.
-
#websocket_read ⇒ Object
Read socket with timeout, parse websocket frames via @driver, emitting on-driver_* to enqueue messages.
-
#with_driver {|driver| ... } ⇒ Object
Call into driver with locked Mutex.
Methods included from Logging
#debug, #error, #info, initialize_logger, #logger, logger, logger=, #logging_prefix, #warn
Constructor Details
#initialize(url, headers: {}, ssl_params: {}, ssl_hostname: nil, connect_timeout: CONNECT_TIMEOUT, open_timeout: OPEN_TIMEOUT, ping_interval: PING_INTERVAL, ping_timeout: PING_TIMEOUT, close_timeout: CLOSE_TIMEOUT, write_timeout: WRITE_TIMEOUT) ⇒ Client
Returns a new instance of Client.
88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 |
# File 'lib/kontena/websocket/client.rb', line 88 def initialize(url, headers: {}, ssl_params: {}, ssl_hostname: nil, connect_timeout: CONNECT_TIMEOUT, open_timeout: OPEN_TIMEOUT, ping_interval: PING_INTERVAL, ping_timeout: PING_TIMEOUT, close_timeout: CLOSE_TIMEOUT, write_timeout: WRITE_TIMEOUT ) @uri = URI.parse(url) @headers = headers @ssl_params = ssl_params @ssl_hostname = ssl_hostname @connect_timeout = connect_timeout @open_timeout = open_timeout @ping_interval = ping_interval @ping_timeout = ping_timeout @close_timeout = close_timeout @write_timeout = write_timeout unless @uri.scheme == 'ws' || @uri.scheme == 'wss' raise ArgumentError, "Invalid websocket URL: #{@uri}" end @mutex = Mutex.new # for @driver # written by #enqueue from @driver callbacks with the @mutex held # drained by #read without the @mutex held @message_queue = [] # sequential ping-pongs @ping_at = Time.now # fake for first ping_interval end |
Instance Attribute Details
#uri ⇒ Object (readonly)
Returns the value of attribute uri.
31 32 33 |
# File 'lib/kontena/websocket/client.rb', line 31 def uri @uri end |
Class Method Details
.connect(url, **options) {|client| ... } ⇒ Object
Connect, send websocket handshake, and loop reading responses.
Passed block is called once websocket is open. Raises on errors. Returns once websocket is closed by server.
Intended to be called using a dedicated per-websocket thread. Other threads can then call the other threadsafe methods:
* send
* close
58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 |
# File 'lib/kontena/websocket/client.rb', line 58 def self.connect(url, **, &block) client = new(url, **) client.connect begin yield client ensure # ensure socket is closed and client disconnected on any of: # * connect error # * open error # * read error # * close client.disconnect end end |
.ruby_version?(gte_version) ⇒ Boolean
Running ruby >= version?
7 8 9 10 11 12 |
# File 'lib/kontena/websocket/client/version.rb', line 7 def self.ruby_version?(gte_version) ruby_version = RUBY_VERSION.split('.').map{|x|x.to_i} gte_version = gte_version.split('.').map{|x|x.to_i} return (ruby_version <=> gte_version) >= 0 end |
Instance Method Details
#close(code = 1000, reason = nil) ⇒ Object
Send close frame. Does not disconnect, but allows #read to return once server completes close handshake.
Imposes a close timeout when called from #run blocks (run/on_message/on_pong do …). If called from a different thread, then #run should eventually return, once either:
-
server sends close frame
-
server sends any other frame after the close timeout expires
-
the ping interval expires
XXX: prevent #send after close?
385 386 387 388 389 390 391 392 393 |
# File 'lib/kontena/websocket/client.rb', line 385 def close(code = 1000, reason = nil) debug "close code=#{code}: #{reason}" with_driver do |driver| fail unless driver.close(reason, code) # swapped argument order closing! code end end |
#close_code ⇒ Integer
Valid once #run returns, when closed? or closing?
201 202 203 |
# File 'lib/kontena/websocket/client.rb', line 201 def close_code @closed_code || @closing_code end |
#close_reason ⇒ String
Valid once #run returns, when closed?
208 209 210 |
# File 'lib/kontena/websocket/client.rb', line 208 def close_reason @closed_reason end |
#closed!(code, reason) ⇒ Object
Server closed, completing close handshake if closing.
716 717 718 719 720 721 |
# File 'lib/kontena/websocket/client.rb', line 716 def closed!(code, reason) @open = false @closed = true @closed_code = code @closed_reason = reason end |
#closed? ⇒ Boolean
Server has closed websocket connection.
194 195 196 |
# File 'lib/kontena/websocket/client.rb', line 194 def closed? !!@closed end |
#closing!(code) ⇒ Object
Client closing connection with code.
Start read deadline for @open_timeout
705 706 707 708 709 710 |
# File 'lib/kontena/websocket/client.rb', line 705 def closing!(code) @open = false # fail any further sends after close @closing = true @closing_at = Time.now @closing_code = code # cheked by #read_closed end |
#closing? ⇒ Boolean
Client has sent close frame, but socket is not yet closed.
187 188 189 |
# File 'lib/kontena/websocket/client.rb', line 187 def closing? !!@closing && !@closed end |
#connect ⇒ Object
Create @socket, @connection and open @driver.
Blocks until open. Disconnects if fails.
219 220 221 222 223 224 225 226 227 228 229 230 |
# File 'lib/kontena/websocket/client.rb', line 219 def connect @connection = self.socket_connect @driver = self.websocket_open(@connection) # blocks self.websocket_read until @opened rescue disconnect raise end |
#connect_ssl ⇒ OpenSSL::SSL::SSLSocket
Connect to TCP server, perform SSL handshake, verify if required.
549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 |
# File 'lib/kontena/websocket/client.rb', line 549 def connect_ssl tcp_socket = self.connect_tcp ssl_context = self.ssl_context debug "connect_ssl: #{ssl_context.inspect} hostname=#{self.ssl_hostname}" ssl_socket = OpenSSL::SSL::SSLSocket.new(tcp_socket, ssl_context) ssl_socket.sync_close = true # close TCPSocket after SSL shutdown ssl_socket.hostname = self.ssl_hostname # SNI begin self.ssl_connect(ssl_socket) rescue OpenSSL::SSL::SSLError => exc # SSL_connect returned=1 errno=0 state=error: certificate verify failed if exc..end_with? 'certificate verify failed' # ssl_socket.peer_cert is not set on errors :( raise ssl_verify_error(ssl_socket.verify_result) else raise Kontena::Websocket::SSLConnectError, exc end end # raises Kontena::Websocket::SSLVerifyError self.ssl_verify_cert!(ssl_socket.peer_cert, ssl_socket.peer_cert_chain) if ssl_verify? ssl_socket end |
#connect_tcp ⇒ TCPSocket
Connect to TCP server.
430 431 432 433 434 435 436 437 438 439 440 |
# File 'lib/kontena/websocket/client.rb', line 430 def connect_tcp debug "connect_tcp: timeout=#{@connect_timeout}" Socket.tcp(self.host, self.port, connect_timeout: @connect_timeout) rescue Errno::ETIMEDOUT => exc raise Kontena::Websocket::TimeoutError, "Connect timeout after #{@connect_timeout}s" # XXX: actual delay rescue SocketError => exc raise Kontena::Websocket::ConnectError, exc rescue SystemCallError => exc raise Kontena::Websocket::ConnectError, exc end |
#connected? ⇒ Boolean
Connected to server. Not necessarily open yet.
165 166 167 |
# File 'lib/kontena/websocket/client.rb', line 165 def connected? !!@socket && !!@connection && !!@driver end |
#disconnect ⇒ Object
Clear connection state, close socket. Does not send websocket close frame, or wait for server to close.
397 398 399 400 401 402 403 404 405 406 407 |
# File 'lib/kontena/websocket/client.rb', line 397 def disconnect debug "disconnect" @open = false @driver = nil @connection = nil # TODO: errors and timeout? SSLSocket.close in particular is bidirectional? @socket.close if @socket @socket = nil end |
#host ⇒ String
153 154 155 |
# File 'lib/kontena/websocket/client.rb', line 153 def host @uri.host end |
#http_headers ⇒ Websocket::Driver::Headers
Valid once open
272 273 274 275 276 |
# File 'lib/kontena/websocket/client.rb', line 272 def http_headers with_driver do |driver| driver.headers end end |
#http_status ⇒ Integer
Valid once open
262 263 264 265 266 |
# File 'lib/kontena/websocket/client.rb', line 262 def http_status with_driver do |driver| driver.status end end |
#on_driver_close(event) ⇒ Object
Mark client as closed, allowing #run to return (and disconnect from the server).
662 663 664 665 666 |
# File 'lib/kontena/websocket/client.rb', line 662 def on_driver_close(event) debug "closed code=#{event.code}: #{event.reason}" closed! event.code, event.reason end |
#on_driver_error(exc) ⇒ Object
634 635 636 637 638 639 640 |
# File 'lib/kontena/websocket/client.rb', line 634 def on_driver_error(exc) # this will presumably propagate up out of #recv_loop, not this function raise exc rescue WebSocket::Driver::ProtocolError => exc raise Kontena::Websocket::ProtocolError, exc end |
#on_driver_message(event) ⇒ Object
Queue up received messages Causes #read to return/yield once websocket_read returns with the driver mutex unlocked.
655 656 657 |
# File 'lib/kontena/websocket/client.rb', line 655 def (event) @message_queue << event.data end |
#on_driver_open(event) ⇒ Object
Mark client as opened, calling the block passed to #run.
645 646 647 648 649 |
# File 'lib/kontena/websocket/client.rb', line 645 def on_driver_open(event) debug "opened" opened! end |
#on_pong {|delay| ... } ⇒ Object
Register pong handler. Called from the #read thread every ping_interval after received pong. XXX: called with driver @mutex locked
The ping interval should be longer than the ping timeout. If new pings are sent before old pings get any response, then the older pings do not yield on pong.
321 322 323 |
# File 'lib/kontena/websocket/client.rb', line 321 def on_pong(&block) @on_pong = block end |
#open? ⇒ Boolean
Server has accepted websocket connection.
180 181 182 |
# File 'lib/kontena/websocket/client.rb', line 180 def open? !!@open end |
#opened! ⇒ Object
Server completed open handshake.
674 675 676 677 |
# File 'lib/kontena/websocket/client.rb', line 674 def opened! @opened = true @open = true end |
#opening! ⇒ Object
Start read deadline for @open_timeout
669 670 671 |
# File 'lib/kontena/websocket/client.rb', line 669 def opening! @opening_at = Time.now end |
#opening? ⇒ Boolean
Client has started websocket handshake, but is not yet open.
172 173 174 175 |
# File 'lib/kontena/websocket/client.rb', line 172 def opening? # XXX: also true after disconnect !!@opening_at && !@opened end |
#ping ⇒ Object
Send ping message.
This is intended to be automatically called from #run per @ping_interval. Calling it from the #run callbacks also works, but calling it from a different thread while #run is blocked on read() will ignore the @ping_timeout, unless the server happens to send something else.
333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 |
# File 'lib/kontena/websocket/client.rb', line 333 def ping with_driver do |driver| # start ping timeout for next read ping_at = pinging! debug "pinging at #{ping_at}" fail unless driver.ping(ping_at.utc.strftime(PING_STRFTIME)) do # called from read -> @driver.parse with @mutex held! debug "pong for #{ping_at}" # resolve ping timeout, unless this pong is late and we already sent a new one if ping_at == @ping_at ping_delay = pinged! debug "ping-pong at #{ping_at} in #{ping_delay}s" # XXX: defer call without mutex? @on_pong.call(ping_delay) if @on_pong # TODO: also pass ping_at end end end end |
#ping_delay ⇒ Float?
Measured ping-pong delay from previous ping
nil if not pinged yet
369 370 371 |
# File 'lib/kontena/websocket/client.rb', line 369 def ping_delay @ping_delay end |
#pinged! ⇒ Float
Stop read deadline for @ping_timeout
692 693 694 695 696 697 698 |
# File 'lib/kontena/websocket/client.rb', line 692 def pinged! @pinging = false @pong_at = Time.now @ping_delay = @pong_at - @ping_at @ping_delay end |
#pinging! ⇒ Time
Start read deadline for @ping_timeout
682 683 684 685 686 687 688 |
# File 'lib/kontena/websocket/client.rb', line 682 def pinging! @pinging = true @ping_at = Time.now @pong_at = nil @ping_at end |
#pinging? ⇒ Boolean
Waiting for pong from ping
360 361 362 |
# File 'lib/kontena/websocket/client.rb', line 360 def pinging? !!@pinging end |
#port ⇒ Integer
158 159 160 |
# File 'lib/kontena/websocket/client.rb', line 158 def port @uri.port || (@uri.scheme == "ws" ? 80 : 443) end |
#read(&block) ⇒ Object
Read messages from websocket.
If a block is given, then this loops and yields messages until closed. Otherwise, returns the next message, or nil if closed.
284 285 286 287 288 289 290 |
# File 'lib/kontena/websocket/client.rb', line 284 def read(&block) if block read_yield(&block) else read_return end end |
#read_closed ⇒ Boolean
Returns connection was closed.
725 726 727 728 729 730 731 732 733 734 |
# File 'lib/kontena/websocket/client.rb', line 725 def read_closed if !@closed return false elsif @closing && @closing_code == @closed_code # client sent close, server responded with close return true else raise Kontena::Websocket::CloseError.new(@closed_code, @closed_reason) end end |
#read_return ⇒ String, Array<Integer>
Returns nil when websocket closed.
738 739 740 741 742 743 744 745 746 |
# File 'lib/kontena/websocket/client.rb', line 738 def read_return # important to first drain message queue before failing on read_closed! while @message_queue.empty? && !self.read_closed self.websocket_read end # returns nil if @closed, once the message queue is empty return @message_queue.shift end |
#read_state_timeout ⇒ Symbol, Float
Return read deadline for current read state
768 769 770 771 772 773 774 775 776 777 778 779 780 781 |
# File 'lib/kontena/websocket/client.rb', line 768 def read_state_timeout case when opening? && @open_timeout [:open, @opening_at, @open_timeout] when pinging? && @ping_timeout [:pong, @ping_at, @ping_timeout] when closing? && @close_timeout [:close, @closing_at, @close_timeout] when @ping_interval [:ping, @ping_at, @ping_interval] else [nil, nil, nil] end end |
#read_yield {|message| ... } ⇒ Object
Returns websocket closed.
752 753 754 755 756 757 758 759 760 761 762 763 |
# File 'lib/kontena/websocket/client.rb', line 752 def read_yield loop do while msg = @message_queue.shift yield msg end # drain queue before returning on close return if self.read_closed self.websocket_read end end |
#scheme ⇒ String
Returns ws or wss.
131 132 133 |
# File 'lib/kontena/websocket/client.rb', line 131 def scheme @uri.scheme end |
#send(message) ⇒ Object
Send message frame, either text or binary.
297 298 299 300 301 302 303 304 305 306 307 308 309 310 |
# File 'lib/kontena/websocket/client.rb', line 297 def send() case when String with_driver do |driver| fail unless driver.text() end when Array with_driver do |driver| fail unless driver.binary() end else raise ArgumentError, "Invalid type: #{.class}" end end |
#socket_connect ⇒ Connection
578 579 580 581 582 583 584 585 586 587 588 |
# File 'lib/kontena/websocket/client.rb', line 578 def socket_connect if ssl? @socket = self.connect_ssl else @socket = self.connect_tcp end return Connection.new(@uri, @socket, write_timeout: @write_timeout, ) end |
#socket_read(timeout = nil) ⇒ Object
Read from socket with timeout, parse websocket frames. Invokes on_driver_*, which enqueues messages. The websocket must be connected.
826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 |
# File 'lib/kontena/websocket/client.rb', line 826 def socket_read(timeout = nil) if timeout && timeout <= 0.0 raise Kontena::Websocket::TimeoutError, "read deadline expired" end begin data = @connection.read(FRAME_SIZE, timeout: timeout) rescue EOFError => exc debug "read EOF" raise Kontena::Websocket::EOFError, 'Server closed connection without sending close frame' rescue IOError => exc # socket was closed => IOError: closed stream debug "read IOError: #{exc}" raise Kontena::Websocket::SocketError, exc # TODO: Errno::ECONNRESET etc end end |
#ssl? ⇒ Boolean
136 137 138 |
# File 'lib/kontena/websocket/client.rb', line 136 def ssl? @uri.scheme == 'wss' end |
#ssl_cert ⇒ nil, OpenSSL::X509::Certificate
235 236 237 238 239 240 |
# File 'lib/kontena/websocket/client.rb', line 235 def ssl_cert fail "not connected" unless @socket return nil unless ssl? return @socket.peer_cert end |
#ssl_cert! ⇒ nil, OpenSSL::X509::Certificate
Verify and return SSL cert. Validates even if not ssl_verify.
248 249 250 251 252 253 254 255 256 |
# File 'lib/kontena/websocket/client.rb', line 248 def ssl_cert! fail "not connected" unless @socket return nil unless ssl? # raises Kontena::Websocket::SSLVerifyError self.ssl_verify_cert! @socket.peer_cert, @socket.peer_cert_chain return @socket.peer_cert end |
#ssl_cert_store ⇒ OpenSSL::X509::Store
444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 |
# File 'lib/kontena/websocket/client.rb', line 444 def ssl_cert_store @ssl_cert_store ||= OpenSSL::X509::Store.new.tap do |ssl_cert_store| ca_file = @ssl_params[:ca_file] || ENV['SSL_CERT_FILE'] ca_path = @ssl_params[:ca_path] || ENV['SSL_CERT_PATH'] if ca_file || ca_path if ca_file debug "add cert store file: #{ca_file}" begin ssl_cert_store.add_file ca_file rescue OpenSSL::X509::StoreError raise ArgumentError, "Failed adding cert store file: #{ca_file}" end end if ca_path debug "add cert store path: #{ca_path}" begin # XXX: does not actually raise ssl_cert_store.add_path ca_path rescue OpenSSL::X509::StoreError raise ArgumentError, "Failed adding cert store path: #{ca_path}" end end else debug "use default cert store paths" ssl_cert_store.set_default_paths end end end |
#ssl_connect(ssl_socket) ⇒ Object
TODO: connect_deadline to impose a single deadline on the entire process XXX: specs
528 529 530 531 532 533 534 535 536 537 538 539 540 541 |
# File 'lib/kontena/websocket/client.rb', line 528 def ssl_connect(ssl_socket) debug "ssl_connect..." ret = ssl_socket.connect_nonblock rescue IO::WaitReadable debug "ssl_connect wait read: timeout=#{@connect_timeout}" ssl_socket.wait_readable(@connect_timeout) or raise Kontena::Websocket::TimeoutError, "SSL connect read timeout after #{@connect_timeout}s" retry rescue IO::WaitWritable debug "ssl_connect wait write: timeout=#{@connect_timeout}" ssl_socket.wait_writable(@connect_timeout) or raise Kontena::Websocket::TimeoutError, "SSL connect write timeout after #{@connect_timeout}s" retry else debug "ssl_connect: #{ret}" end |
#ssl_context ⇒ OpenSSL::SSL::SSLContext
506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 |
# File 'lib/kontena/websocket/client.rb', line 506 def ssl_context @ssl_context ||= OpenSSL::SSL::SSLContext.new().tap do |ssl_context| params = { cert_store: self.ssl_cert_store, **@ssl_params } if Kontena::Websocket::Client.ruby_version? '2.4' # prefer our own ssl_verify_cert! logic params[:verify_hostname] = false end ssl_context.set_params(params) end end |
#ssl_hostname ⇒ String
148 149 150 |
# File 'lib/kontena/websocket/client.rb', line 148 def ssl_hostname @ssl_hostname || @uri.host end |
#ssl_verify? ⇒ Boolean
Connecting with SSL cert/host verification?
143 144 145 |
# File 'lib/kontena/websocket/client.rb', line 143 def ssl_verify? ssl? && ssl_context.verify_mode != OpenSSL::SSL::VERIFY_NONE end |
#ssl_verify_cert!(ssl_cert, ssl_cert_chain) ⇒ Object
480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 |
# File 'lib/kontena/websocket/client.rb', line 480 def ssl_verify_cert!(ssl_cert, ssl_cert_chain) unless ssl_cert raise Kontena::Websocket::SSLVerifyError.new(OpenSSL::X509::V_OK, ssl_cert, ssl_cert_chain), "No certificate" end ssl_verify_context = OpenSSL::X509::StoreContext.new(ssl_cert_store, ssl_cert, ssl_cert_chain) unless ssl_verify_context.verify raise Kontena::Websocket::SSLVerifyError.new(ssl_verify_context.error, ssl_cert, ssl_cert_chain), ssl_verify_context.error_string end unless OpenSSL::SSL.verify_certificate_identity(ssl_cert, self.ssl_hostname) raise Kontena::Websocket::SSLVerifyError.new(OpenSSL::X509::V_OK, ssl_cert, ssl_cert_chain), "Subject does not match hostname #{self.ssl_hostname}: #{ssl_cert.subject}" end end |
#ssl_verify_error(verify_result, ssl_cert = nil, ssl_cert_chain = nil) ⇒ Kontena::Websocket::SSLVerifyError
498 499 500 501 502 503 |
# File 'lib/kontena/websocket/client.rb', line 498 def ssl_verify_error(verify_result, ssl_cert = nil, ssl_cert_chain = nil) ssl_verify_context = OpenSSL::X509::StoreContext.new(ssl_cert_store) ssl_verify_context.error = verify_result Kontena::Websocket::SSLVerifyError.new(ssl_verify_context.error, ssl_cert, ssl_cert_chain, ssl_verify_context.error_string) end |
#url ⇒ String
126 127 128 |
# File 'lib/kontena/websocket/client.rb', line 126 def url @uri.to_s end |
#websocket_open(connection) ⇒ WebSocket::Driver::Client
Create websocket driver using connection, and send websocket handshake. Registers driver handlers to set @open, @closed states, enqueue messages, or raise errors.
596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 |
# File 'lib/kontena/websocket/client.rb', line 596 def websocket_open(connection) debug "websocket open..." driver = ::WebSocket::Driver.client(connection) @headers.each do |k, v| driver.set_header(k, v) end # these are called from read_loop -> with_driver { driver.parse } with the @mutex held # do not recurse back into with_driver! driver.on :error do |event| self.on_driver_error(event) end driver.on :open do |event| self.on_driver_open(event) end driver.on :message do |event| self.(event) end driver.on :close do |event| self.on_driver_close(event) end # not expected to emit anything, not even :error fail unless driver.start debug "opening" opening! return driver end |
#websocket_read ⇒ Object
Read socket with timeout, parse websocket frames via @driver, emitting on-driver_* to enqueue messages. Sends ping on interval timeout.
785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 |
# File 'lib/kontena/websocket/client.rb', line 785 def websocket_read read_state, state_start, state_timeout = self.read_state_timeout if state_timeout read_deadline = state_start + state_timeout read_timeout = read_deadline - Time.now else read_timeout = nil end debug "read (#{read_state})" begin # read from socket data = self.socket_read(read_timeout) rescue Kontena::Websocket::TimeoutError => exc if read_state == :ping debug "ping on #{exc}" self.ping elsif read_state raise exc.class.new("#{exc} while waiting #{state_timeout}s for #{read_state}") else raise end else # parse data with @driver @mutex held with_driver do |driver| # call into the driver, causing it to emit the events registered in #start driver.parse(data) end end end |
#with_driver {|driver| ... } ⇒ Object
Call into driver with locked Mutex
416 417 418 419 420 421 422 |
# File 'lib/kontena/websocket/client.rb', line 416 def with_driver @mutex.synchronize { fail "not connected" unless @driver yield @driver } end |