Class: NchanTools::Subscriber::WebSocketClient
- Includes:
- Celluloid::IO
- Defined in:
- lib/nchan_tools/pubsub.rb
Defined Under Namespace
Modules: WebSocketDriverExtensions Classes: WebSocketBundle
Instance Attribute Summary collapse
-
#etag ⇒ Object
Returns the value of attribute etag.
-
#last_modified ⇒ Object
Returns the value of attribute last_modified.
-
#timeout ⇒ Object
Returns the value of attribute timeout.
-
#ws ⇒ Object
Returns the value of attribute ws.
Attributes inherited from Client
Class Method Summary collapse
Instance Method Summary collapse
- #close(bundle) ⇒ Object
-
#initialize(subscr, opt = {}) ⇒ WebSocketClient
constructor
A new instance of WebSocketClient.
- #listen(bundle) ⇒ Object
- #provides_msgid? ⇒ Boolean
- #run(was_success = nil) ⇒ Object
- #send_binary(data) ⇒ Object
- #send_close(reason = nil, code = 1000) ⇒ Object
- #send_data(data) ⇒ Object
- #send_ping(data = nil) ⇒ Object
- #stop(msg = "Stopped", src_bundle = nil) ⇒ Object
Methods inherited from Client
#error, #handle_bundle_error, inherited, lookup, #poke, unique_aliases
Constructor Details
#initialize(subscr, opt = {}) ⇒ WebSocketClient
Returns a new instance of WebSocketClient.
514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 |
# File 'lib/nchan_tools/pubsub.rb', line 514 def initialize(subscr, opt={}) super @last_modified, @etag, @timeout = opt[:last_modified], opt[:etag], opt[:timeout].to_i || 10 @connect_timeout = opt[:connect_timeout] @subscriber=subscr @subprotocol = opt[:subprotocol] @url=subscr.url @url = @url.gsub(/^h(ttp|2)(s)?:/, "ws\\2:") if opt[:permessage_deflate] = true end = opt[:permessage_deflate_max_window_bits] = opt[:permessage_deflate_server_max_window_bits] @concurrency=(opt[:concurrency] || opt[:clients] || 1).to_i @retry_delay=opt[:retry_delay] @ws = {} @connected=0 @nomsg = opt[:nomsg] @http2 = opt[:http2] @extra_headers = opt[:extra_headers] end |
Instance Attribute Details
#etag ⇒ Object
Returns the value of attribute etag.
513 514 515 |
# File 'lib/nchan_tools/pubsub.rb', line 513 def etag @etag end |
#last_modified ⇒ Object
Returns the value of attribute last_modified.
513 514 515 |
# File 'lib/nchan_tools/pubsub.rb', line 513 def last_modified @last_modified end |
#timeout ⇒ Object
Returns the value of attribute timeout.
513 514 515 |
# File 'lib/nchan_tools/pubsub.rb', line 513 def timeout @timeout end |
#ws ⇒ Object
Returns the value of attribute ws.
513 514 515 |
# File 'lib/nchan_tools/pubsub.rb', line 513 def ws @ws end |
Class Method Details
.aliases ⇒ Object
412 413 414 |
# File 'lib/nchan_tools/pubsub.rb', line 412 def self.aliases [:websocket, :ws] end |
Instance Method Details
#close(bundle) ⇒ Object
697 698 699 700 701 702 703 704 705 706 707 708 709 |
# File 'lib/nchan_tools/pubsub.rb', line 697 def close(bundle) if bundle then @ws.delete bundle bundle.sock.close unless bundle.sock.closed? end @connected -= 1 if @connected <= 0 then until @ws.count == 0 do sleep 0.1 end @cooked.signal true end end |
#listen(bundle) ⇒ Object
656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 |
# File 'lib/nchan_tools/pubsub.rb', line 656 def listen(bundle) while @ws[bundle] begin bundle.read rescue IOError => e @subscriber.on_failure error(0, "Connection closed: #{e}"), bundle close bundle return false rescue EOFError bundle.sock.close close bundle return rescue Errno::ECONNRESET close bundle return end end end |
#provides_msgid? ⇒ Boolean
509 510 511 |
# File 'lib/nchan_tools/pubsub.rb', line 509 def provides_msgid? @subprotocol == "ws+meta.nchan" end |
#run(was_success = nil) ⇒ Object
546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 |
# File 'lib/nchan_tools/pubsub.rb', line 546 def run(was_success = nil) uri = URI.parse_possibly_unix_socket(@url) uri.port ||= (uri.scheme == "ws" || uri.scheme == "unix" ? 80 : 443) @cooked=Celluloid::Condition.new @connected = @concurrency if @http2 @subscriber.on_failure error(0, "Refusing to try websocket over HTTP/2") @connected = 0 @notready = 0 @cooked_ready.signal false @cooked.signal true return end raise ArgumentError, "invalid websocket scheme #{uri.scheme} in #{@url}" unless uri.scheme == "unix" || uri.scheme.match(/^wss?$/) @notready=@concurrency if @timeout @timer = after(@timeout) do stop "Timeout" end end @concurrency.times do |i| begin sock = ParserBundle.new(uri).open_socket.sock rescue SystemCallError => e @subscriber.on_failure error(0, e.to_s) close nil return end if uri.scheme == "unix" hs_url="http://#{uri.host.match "[^/]+$"}#{uri.path}#{uri.query && "?#{uri.query}"}" else hs_url=@url end bundle = WebSocketBundle.new hs_url, sock, id: i, permessage_deflate: , subprotocol: @subprotocol, logger: @logger, permessage_deflate_max_window_bits: , permessage_deflate_server_max_window_bits: , extra_headers: @extra_headers bundle.ws.on :open do |ev| bundle.connected = true @notready-=1 @cooked_ready.signal true if @notready == 0 end bundle.ws.on :ping do |ev| @subscriber.on(:ping).call ev, bundle end bundle.ws.on :pong do |ev| @subscriber.on(:pong).call ev, bundle end bundle.ws.on :error do |ev| http_error_match = ev..match(/Unexpected response code: (\d+)/) @subscriber.on_failure error(http_error_match ? http_error_match[1] : 0, ev., bundle) close bundle end bundle.ws.on :close do |ev| @subscriber.on_failure error(ev.code, ev.reason, bundle) bundle.connected = false close bundle end bundle.ws.on :message do |ev| @timer.reset if @timer data = ev.data if Array === data #binary String data = data.map(&:chr).join data.force_encoding "ASCII-8BIT" bundle.=:binary else bundle.=:text end if bundle.ws.protocol == "ws+meta.nchan" ||= /^id: (?<id>\d+:[^n]+)\n(content-type: (?<content_type>[^\n]+)\n)?\n(?<data>.*)/m match = .match data if not match @subscriber.on_failure error(0, "Invalid ws+meta.nchan message received") close bundle else if @nomsg msg = match[:data] else msg= Message.new match[:data] msg.content_type = match[:content_type] msg.id = match[:id] end end else msg= @nomsg ? data : Message.new(data) end bundle.=Time.now.to_f if @subscriber.(msg, bundle) == false close bundle end end @ws[bundle]=true #handhsake bundle.send_handshake async.listen bundle end end |
#send_binary(data) ⇒ Object
693 694 695 |
# File 'lib/nchan_tools/pubsub.rb', line 693 def send_binary(data) ws_client.send_binary data end |
#send_close(reason = nil, code = 1000) ⇒ Object
687 688 689 |
# File 'lib/nchan_tools/pubsub.rb', line 687 def send_close(reason=nil, code=1000) ws_client.send_close reason, code end |
#send_data(data) ⇒ Object
690 691 692 |
# File 'lib/nchan_tools/pubsub.rb', line 690 def send_data(data) ws_client.send_data data end |
#send_ping(data = nil) ⇒ Object
684 685 686 |
# File 'lib/nchan_tools/pubsub.rb', line 684 def send_ping(data=nil) ws_client.send_ping data end |
#stop(msg = "Stopped", src_bundle = nil) ⇒ Object
538 539 540 541 542 543 544 |
# File 'lib/nchan_tools/pubsub.rb', line 538 def stop(msg = "Stopped", src_bundle = nil) super msg, (@ws.first && @ws.first.first) @ws.each do |b, v| close b end @timer.cancel if @timer end |