Class: NchanTools::Subscriber::WebSocketClient

Inherits:
Client
  • Object
show all
Includes:
Celluloid::IO
Defined in:
lib/nchan_tools/pubsub.rb

Defined Under Namespace

Modules: WebSocketDriverExtensions Classes: WebSocketBundle

Instance Attribute Summary collapse

Attributes inherited from Client

#concurrency

Class Method Summary collapse

Instance Method Summary collapse

Methods inherited from Client

#error, #handle_bundle_error, inherited, lookup, #poke, unique_aliases

Constructor Details

#initialize(subscr, opt = {}) ⇒ WebSocketClient

Returns a new instance of WebSocketClient.



514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
# File 'lib/nchan_tools/pubsub.rb', line 514

def initialize(subscr, opt={})
  super
  @last_modified, @etag, @timeout = opt[:last_modified], opt[:etag], opt[:timeout].to_i || 10
  @connect_timeout = opt[:connect_timeout]
  @subscriber=subscr
  @subprotocol = opt[:subprotocol]
  @url=subscr.url
  @url = @url.gsub(/^h(ttp|2)(s)?:/, "ws\\2:")
  
  if opt[:permessage_deflate]
    @permessage_deflate = true
  end
  @permessage_deflate_max_window_bits = opt[:permessage_deflate_max_window_bits]
  @permessage_deflate_server_max_window_bits = opt[:permessage_deflate_server_max_window_bits]
  
  @concurrency=(opt[:concurrency] || opt[:clients] || 1).to_i
  @retry_delay=opt[:retry_delay]
  @ws = {}
  @connected=0
  @nomsg = opt[:nomsg]
  @http2 = opt[:http2]
  @extra_headers = opt[:extra_headers]
end

Instance Attribute Details

#etagObject

Returns the value of attribute etag.



513
514
515
# File 'lib/nchan_tools/pubsub.rb', line 513

def etag
  @etag
end

#last_modifiedObject

Returns the value of attribute last_modified.



513
514
515
# File 'lib/nchan_tools/pubsub.rb', line 513

def last_modified
  @last_modified
end

#timeoutObject

Returns the value of attribute timeout.



513
514
515
# File 'lib/nchan_tools/pubsub.rb', line 513

def timeout
  @timeout
end

#wsObject

Returns the value of attribute ws.



513
514
515
# File 'lib/nchan_tools/pubsub.rb', line 513

def ws
  @ws
end

Class Method Details

.aliasesObject



412
413
414
# File 'lib/nchan_tools/pubsub.rb', line 412

def self.aliases
  [:websocket, :ws]
end

Instance Method Details

#close(bundle) ⇒ Object



697
698
699
700
701
702
703
704
705
706
707
708
709
# File 'lib/nchan_tools/pubsub.rb', line 697

def close(bundle)
  if bundle then
    @ws.delete bundle
    bundle.sock.close unless bundle.sock.closed?
  end
  @connected -= 1
  if @connected <= 0 then
    until @ws.count == 0 do
      sleep 0.1
    end
    @cooked.signal true
  end
end

#listen(bundle) ⇒ Object



656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
# File 'lib/nchan_tools/pubsub.rb', line 656

def listen(bundle)
  while @ws[bundle]
    begin
      bundle.read
    rescue IOError => e
      @subscriber.on_failure error(0, "Connection closed: #{e}"), bundle
      close bundle
      return false
    rescue EOFError
      bundle.sock.close
      close bundle
      return
    rescue Errno::ECONNRESET
      close bundle
      return
    end
  end
end

#provides_msgid?Boolean

Returns:

  • (Boolean)


509
510
511
# File 'lib/nchan_tools/pubsub.rb', line 509

def provides_msgid?
  @subprotocol == "ws+meta.nchan"
end

#run(was_success = nil) ⇒ Object

Raises:

  • (ArgumentError)


546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
# File 'lib/nchan_tools/pubsub.rb', line 546

def run(was_success = nil)
  uri = URI.parse_possibly_unix_socket(@url)
  uri.port ||= (uri.scheme == "ws" || uri.scheme == "unix" ? 80 : 443)
  @cooked=Celluloid::Condition.new
  @connected = @concurrency
  if @http2
    @subscriber.on_failure error(0, "Refusing to try websocket over HTTP/2")
    @connected = 0
    @notready = 0
    @cooked_ready.signal false
    @cooked.signal true
    return
  end
  raise ArgumentError, "invalid websocket scheme #{uri.scheme} in #{@url}" unless uri.scheme == "unix" || uri.scheme.match(/^wss?$/)
  @notready=@concurrency
  if @timeout
    @timer = after(@timeout) do
      stop "Timeout"
    end
  end
  @concurrency.times do |i|
    begin
      sock = ParserBundle.new(uri).open_socket.sock
    rescue SystemCallError => e
      @subscriber.on_failure error(0, e.to_s)
      close nil
      return
    end
    
    if uri.scheme == "unix"
      hs_url="http://#{uri.host.match "[^/]+$"}#{uri.path}#{uri.query && "?#{uri.query}"}"
    else
      hs_url=@url
    end        
    
    bundle = WebSocketBundle.new hs_url, sock, id: i, permessage_deflate: @permessage_deflate, subprotocol: @subprotocol, logger: @logger, permessage_deflate_max_window_bits: @permessage_deflate_max_window_bits, permessage_deflate_server_max_window_bits: @permessage_deflate_server_max_window_bits, extra_headers: @extra_headers
    
    bundle.ws.on :open do |ev|
      bundle.connected = true
      @notready-=1
      @cooked_ready.signal true if @notready == 0
    end
    
    bundle.ws.on :ping do |ev|
      @subscriber.on(:ping).call ev, bundle
    end
    
    bundle.ws.on :pong do |ev|
      @subscriber.on(:pong).call ev, bundle
    end
    
    bundle.ws.on :error do |ev|
      http_error_match = ev.message.match(/Unexpected response code: (\d+)/)
      @subscriber.on_failure error(http_error_match ? http_error_match[1] : 0, ev.message, bundle)
      close bundle
    end
    
    bundle.ws.on :close do |ev|
      @subscriber.on_failure error(ev.code, ev.reason, bundle)
      bundle.connected = false
      close bundle
    end
    
    bundle.ws.on :message do |ev|
      @timer.reset if @timer
      
      data = ev.data
      if Array === data #binary String
        data = data.map(&:chr).join
        data.force_encoding "ASCII-8BIT"
        bundle.last_message_frame_type=:binary
      else
        bundle.last_message_frame_type=:text
      end
      
      if bundle.ws.protocol == "ws+meta.nchan"
        @meta_regex ||= /^id: (?<id>\d+:[^n]+)\n(content-type: (?<content_type>[^\n]+)\n)?\n(?<data>.*)/m
        match = @meta_regex.match data
        if not match
          @subscriber.on_failure error(0, "Invalid ws+meta.nchan message received")
          close bundle
        else
          if @nomsg 
            msg = match[:data]
          else
            msg= Message.new match[:data]
            msg.content_type = match[:content_type]
            msg.id = match[:id]
          end
        end
      else
        msg= @nomsg ? data : Message.new(data)
      end
      
      bundle.last_message_time=Time.now.to_f
      if @subscriber.on_message(msg, bundle) == false
        close bundle
      end
      
    end
    
    @ws[bundle]=true
    
    #handhsake
    bundle.send_handshake
    
    async.listen bundle
  end
end

#send_binary(data) ⇒ Object



693
694
695
# File 'lib/nchan_tools/pubsub.rb', line 693

def send_binary(data)
  ws_client.send_binary data
end

#send_close(reason = nil, code = 1000) ⇒ Object



687
688
689
# File 'lib/nchan_tools/pubsub.rb', line 687

def send_close(reason=nil, code=1000)
  ws_client.send_close reason, code
end

#send_data(data) ⇒ Object



690
691
692
# File 'lib/nchan_tools/pubsub.rb', line 690

def send_data(data)
  ws_client.send_data data
end

#send_ping(data = nil) ⇒ Object



684
685
686
# File 'lib/nchan_tools/pubsub.rb', line 684

def send_ping(data=nil)
  ws_client.send_ping data
end

#stop(msg = "Stopped", src_bundle = nil) ⇒ Object



538
539
540
541
542
543
544
# File 'lib/nchan_tools/pubsub.rb', line 538

def stop(msg = "Stopped", src_bundle = nil)
  super msg, (@ws.first && @ws.first.first)
  @ws.each do |b, v|
    close b
  end
  @timer.cancel if @timer
end