Class: Subscriber::WebSocketClient
- Includes:
- Celluloid::IO
- Defined in:
- lib/nchan_tools/pubsub.rb
Defined Under Namespace
Modules: WebSocketDriverExtensions Classes: WebSocketBundle
Instance Attribute Summary collapse
-
#etag ⇒ Object
Returns the value of attribute etag.
-
#last_modified ⇒ Object
Returns the value of attribute last_modified.
-
#timeout ⇒ Object
Returns the value of attribute timeout.
-
#ws ⇒ Object
Returns the value of attribute ws.
Attributes inherited from Client
Class Method Summary collapse
Instance Method Summary collapse
- #close(bundle) ⇒ Object
-
#initialize(subscr, opt = {}) ⇒ WebSocketClient
constructor
A new instance of WebSocketClient.
- #listen(bundle) ⇒ Object
- #on_ping ⇒ Object
- #on_pong ⇒ Object
- #provides_msgid? ⇒ Boolean
- #run(was_success = nil) ⇒ Object
- #send_binary(data) ⇒ Object
- #send_close(code = 1000, reason = nil) ⇒ Object
- #send_data(data) ⇒ Object
- #send_ping(data = nil) ⇒ Object
- #stop(msg = "Stopped", src_bundle = nil) ⇒ Object
Methods inherited from Client
#error, #handle_bundle_error, inherited, lookup, #poke, unique_aliases
Constructor Details
#initialize(subscr, opt = {}) ⇒ WebSocketClient
Returns a new instance of WebSocketClient.
509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 |
# File 'lib/nchan_tools/pubsub.rb', line 509 def initialize(subscr, opt={}) super @last_modified, @etag, @timeout = opt[:last_modified], opt[:etag], opt[:timeout].to_i || 10 @connect_timeout = opt[:connect_timeout] @subscriber=subscr @subprotocol = opt[:subprotocol] @url=subscr.url @url = @url.gsub(/^h(ttp|2)(s)?:/, "ws\\2:") if opt[:permessage_deflate] @permessage_deflate = true end @permessage_deflate_max_window_bits = opt[:permessage_deflate_max_window_bits] @permessage_deflate_server_max_window_bits = opt[:permessage_deflate_server_max_window_bits] @concurrency=(opt[:concurrency] || opt[:clients] || 1).to_i @retry_delay=opt[:retry_delay] @ws = {} @connected=0 @nomsg = opt[:nomsg] @http2 = opt[:http2] @extra_headers = opt[:extra_headers] end |
Instance Attribute Details
#etag ⇒ Object
Returns the value of attribute etag.
508 509 510 |
# File 'lib/nchan_tools/pubsub.rb', line 508 def etag @etag end |
#last_modified ⇒ Object
Returns the value of attribute last_modified.
508 509 510 |
# File 'lib/nchan_tools/pubsub.rb', line 508 def last_modified @last_modified end |
#timeout ⇒ Object
Returns the value of attribute timeout.
508 509 510 |
# File 'lib/nchan_tools/pubsub.rb', line 508 def timeout @timeout end |
#ws ⇒ Object
Returns the value of attribute ws.
508 509 510 |
# File 'lib/nchan_tools/pubsub.rb', line 508 def ws @ws end |
Class Method Details
.aliases ⇒ Object
415 416 417 |
# File 'lib/nchan_tools/pubsub.rb', line 415 def self.aliases [:websocket, :ws] end |
Instance Method Details
#close(bundle) ⇒ Object
699 700 701 702 703 704 705 706 707 708 709 |
# File 'lib/nchan_tools/pubsub.rb', line 699 def close(bundle) if bundle @ws.delete bundle bundle.sock.close unless bundle.sock.closed? end @connected -= 1 if @connected <= 0 binding.pry unless @ws.count == 0 @cooked.signal true end end |
#listen(bundle) ⇒ Object
658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 |
# File 'lib/nchan_tools/pubsub.rb', line 658 def listen(bundle) while @ws[bundle] begin bundle.read rescue IOError => e @subscriber.on_failure error(0, "Connection closed: #{e}"), bundle close bundle return false rescue EOFError bundle.sock.close close bundle return rescue Errno::ECONNRESET close bundle return end end end |
#on_ping ⇒ Object
651 652 653 |
# File 'lib/nchan_tools/pubsub.rb', line 651 def on_ping @on_ping = Proc.new if block_given? end |
#on_pong ⇒ Object
654 655 656 |
# File 'lib/nchan_tools/pubsub.rb', line 654 def on_pong @on_pong = Proc.new if block_given? end |
#provides_msgid? ⇒ Boolean
504 505 506 |
# File 'lib/nchan_tools/pubsub.rb', line 504 def provides_msgid? @subprotocol == "ws+meta.nchan" end |
#run(was_success = nil) ⇒ Object
541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 |
# File 'lib/nchan_tools/pubsub.rb', line 541 def run(was_success = nil) uri = URI.parse_possibly_unix_socket(@url) uri.port ||= (uri.scheme == "ws" || uri.scheme == "unix" ? 80 : 443) @cooked=Celluloid::Condition.new @connected = @concurrency if @http2 @subscriber.on_failure error(0, "Refusing to try websocket over HTTP/2") @connected = 0 @notready = 0 @cooked_ready.signal false @cooked.signal true return end raise ArgumentError, "invalid websocket scheme #{uri.scheme} in #{@url}" unless uri.scheme == "unix" || uri.scheme.match(/^wss?$/) @notready=@concurrency if @timeout @timer = after(@timeout) do stop "Timeout" end end @concurrency.times do |i| begin sock = ParserBundle.new(uri).open_socket.sock rescue SystemCallError => e @subscriber.on_failure error(0, e.to_s) close nil return end if uri.scheme == "unix" hs_url="http://#{uri.host.match "[^/]+$"}#{uri.path}#{uri.query && "?#{uri.query}"}" else hs_url=@url end bundle = WebSocketBundle.new hs_url, sock, id: i, permessage_deflate: @permessage_deflate, subprotocol: @subprotocol, logger: @logger, permessage_deflate_max_window_bits: @permessage_deflate_max_window_bits, permessage_deflate_server_max_window_bits: @permessage_deflate_server_max_window_bits, extra_headers: @extra_headers bundle.ws.on :open do |ev| bundle.connected = true @notready-=1 @cooked_ready.signal true if @notready == 0 end bundle.ws.on :ping do |ev| @on_ping.call if @on_ping end bundle.ws.on :pong do |ev| @on_pong.call if @on_pong end bundle.ws.on :error do |ev| http_error_match = ev..match(/Unexpected response code: (\d+)/) @subscriber.on_failure error(http_error_match ? http_error_match[1] : 0, ev., bundle) close bundle end bundle.ws.on :close do |ev| @subscriber.on_failure error(ev.code, ev.reason, bundle) bundle.connected = false close bundle end bundle.ws.on :message do |ev| @timer.reset if @timer data = ev.data if Array === data #binary String data = data.map(&:chr).join data.force_encoding "ASCII-8BIT" bundle.=:binary else bundle.=:text end if bundle.ws.protocol == "ws+meta.nchan" @meta_regex ||= /^id: (?<id>\d+:[^n]+)\n(content-type: (?<content_type>[^\n]+)\n)?\n(?<data>.*)/m match = @meta_regex.match data if not match @subscriber.on_failure error(0, "Invalid ws+meta.nchan message received") close bundle else if @nomsg msg = match[:data] else msg= Message.new match[:data] msg.content_type = match[:content_type] msg.id = match[:id] end end else msg= @nomsg ? data : Message.new(data) end bundle.=Time.now.to_f if @subscriber.(msg, bundle) == false close bundle end end @ws[bundle]=true #handhsake bundle.send_handshake async.listen bundle end end |
#send_binary(data) ⇒ Object
695 696 697 |
# File 'lib/nchan_tools/pubsub.rb', line 695 def send_binary(data) ws_client.send_binary data end |
#send_close(code = 1000, reason = nil) ⇒ Object
689 690 691 |
# File 'lib/nchan_tools/pubsub.rb', line 689 def send_close(code=1000, reason=nil) ws_client.send_close code, reason end |
#send_data(data) ⇒ Object
692 693 694 |
# File 'lib/nchan_tools/pubsub.rb', line 692 def send_data(data) ws_client.send_data data end |
#send_ping(data = nil) ⇒ Object
686 687 688 |
# File 'lib/nchan_tools/pubsub.rb', line 686 def send_ping(data=nil) ws_client.ping data end |
#stop(msg = "Stopped", src_bundle = nil) ⇒ Object
533 534 535 536 537 538 539 |
# File 'lib/nchan_tools/pubsub.rb', line 533 def stop(msg = "Stopped", src_bundle = nil) super msg, (@ws.first && @ws.first.first) @ws.each do |b, v| close b end @timer.cancel if @timer end |