Class: Subscriber::WebSocketClient

Inherits:
Client
  • Object
show all
Includes:
Celluloid::IO
Defined in:
lib/nchan_tools/pubsub.rb

Defined Under Namespace

Modules: WebSocketDriverExtensions Classes: WebSocketBundle

Instance Attribute Summary collapse

Attributes inherited from Client

#concurrency

Class Method Summary collapse

Instance Method Summary collapse

Methods inherited from Client

#error, #handle_bundle_error, inherited, lookup, #poke, unique_aliases

Constructor Details

#initialize(subscr, opt = {}) ⇒ WebSocketClient

Returns a new instance of WebSocketClient.



509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
# File 'lib/nchan_tools/pubsub.rb', line 509

def initialize(subscr, opt={})
  super
  @last_modified, @etag, @timeout = opt[:last_modified], opt[:etag], opt[:timeout].to_i || 10
  @connect_timeout = opt[:connect_timeout]
  @subscriber=subscr
  @subprotocol = opt[:subprotocol]
  @url=subscr.url
  @url = @url.gsub(/^h(ttp|2)(s)?:/, "ws\\2:")
  
  if opt[:permessage_deflate]
    @permessage_deflate = true
  end
  @permessage_deflate_max_window_bits = opt[:permessage_deflate_max_window_bits]
  @permessage_deflate_server_max_window_bits = opt[:permessage_deflate_server_max_window_bits]
  
  @concurrency=(opt[:concurrency] || opt[:clients] || 1).to_i
  @retry_delay=opt[:retry_delay]
  @ws = {}
  @connected=0
  @nomsg = opt[:nomsg]
  @http2 = opt[:http2]
  @extra_headers = opt[:extra_headers]
end

Instance Attribute Details

#etagObject

Returns the value of attribute etag.



508
509
510
# File 'lib/nchan_tools/pubsub.rb', line 508

def etag
  @etag
end

#last_modifiedObject

Returns the value of attribute last_modified.



508
509
510
# File 'lib/nchan_tools/pubsub.rb', line 508

def last_modified
  @last_modified
end

#timeoutObject

Returns the value of attribute timeout.



508
509
510
# File 'lib/nchan_tools/pubsub.rb', line 508

def timeout
  @timeout
end

#wsObject

Returns the value of attribute ws.



508
509
510
# File 'lib/nchan_tools/pubsub.rb', line 508

def ws
  @ws
end

Class Method Details

.aliasesObject



415
416
417
# File 'lib/nchan_tools/pubsub.rb', line 415

def self.aliases
  [:websocket, :ws]
end

Instance Method Details

#close(bundle) ⇒ Object



699
700
701
702
703
704
705
706
707
708
709
# File 'lib/nchan_tools/pubsub.rb', line 699

def close(bundle)
  if bundle
    @ws.delete bundle
    bundle.sock.close unless bundle.sock.closed?
  end
  @connected -= 1
  if @connected <= 0
    binding.pry unless @ws.count == 0
    @cooked.signal true
  end
end

#listen(bundle) ⇒ Object



658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
# File 'lib/nchan_tools/pubsub.rb', line 658

def listen(bundle)
  while @ws[bundle]
    begin
      bundle.read
    rescue IOError => e
      @subscriber.on_failure error(0, "Connection closed: #{e}"), bundle
      close bundle
      return false
    rescue EOFError
      bundle.sock.close
      close bundle
      return
    rescue Errno::ECONNRESET
      close bundle
      return
    end
  end
end

#on_pingObject



651
652
653
# File 'lib/nchan_tools/pubsub.rb', line 651

def on_ping
  @on_ping = Proc.new if block_given?
end

#on_pongObject



654
655
656
# File 'lib/nchan_tools/pubsub.rb', line 654

def on_pong
  @on_pong = Proc.new if block_given?
end

#provides_msgid?Boolean

Returns:

  • (Boolean)


504
505
506
# File 'lib/nchan_tools/pubsub.rb', line 504

def provides_msgid?
  @subprotocol == "ws+meta.nchan"
end

#run(was_success = nil) ⇒ Object

Raises:

  • (ArgumentError)


541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
# File 'lib/nchan_tools/pubsub.rb', line 541

def run(was_success = nil)
  uri = URI.parse_possibly_unix_socket(@url)
  uri.port ||= (uri.scheme == "ws" || uri.scheme == "unix" ? 80 : 443)
  @cooked=Celluloid::Condition.new
  @connected = @concurrency
  if @http2
    @subscriber.on_failure error(0, "Refusing to try websocket over HTTP/2")
    @connected = 0
    @notready = 0
    @cooked_ready.signal false
    @cooked.signal true
    return
  end
  raise ArgumentError, "invalid websocket scheme #{uri.scheme} in #{@url}" unless uri.scheme == "unix" || uri.scheme.match(/^wss?$/)
  @notready=@concurrency
  if @timeout
    @timer = after(@timeout) do
      stop "Timeout"
    end
  end
  @concurrency.times do |i|
    begin
      sock = ParserBundle.new(uri).open_socket.sock
    rescue SystemCallError => e
      @subscriber.on_failure error(0, e.to_s)
      close nil
      return
    end
    
    if uri.scheme == "unix"
      hs_url="http://#{uri.host.match "[^/]+$"}#{uri.path}#{uri.query && "?#{uri.query}"}"
    else
      hs_url=@url
    end        
    
    bundle = WebSocketBundle.new hs_url, sock, id: i, permessage_deflate: @permessage_deflate, subprotocol: @subprotocol, logger: @logger, permessage_deflate_max_window_bits: @permessage_deflate_max_window_bits, permessage_deflate_server_max_window_bits: @permessage_deflate_server_max_window_bits, extra_headers: @extra_headers
    
    bundle.ws.on :open do |ev|
      bundle.connected = true
      @notready-=1
      @cooked_ready.signal true if @notready == 0
    end
    
    bundle.ws.on :ping do |ev|
      @on_ping.call if @on_ping
    end
    
    bundle.ws.on :pong do |ev|
      @on_pong.call if @on_pong
    end
    
    bundle.ws.on :error do |ev|
      http_error_match = ev.message.match(/Unexpected response code: (\d+)/)
      @subscriber.on_failure error(http_error_match ? http_error_match[1] : 0, ev.message, bundle)
      close bundle
    end
    
    bundle.ws.on :close do |ev|
      @subscriber.on_failure error(ev.code, ev.reason, bundle)
      bundle.connected = false
      close bundle
    end
    
    bundle.ws.on :message do |ev|
      @timer.reset if @timer
      
      data = ev.data
      if Array === data #binary String
        data = data.map(&:chr).join
        data.force_encoding "ASCII-8BIT"
        bundle.last_message_frame_type=:binary
      else
        bundle.last_message_frame_type=:text
      end
      
      if bundle.ws.protocol == "ws+meta.nchan"
        @meta_regex ||= /^id: (?<id>\d+:[^n]+)\n(content-type: (?<content_type>[^\n]+)\n)?\n(?<data>.*)/m
        match = @meta_regex.match data
        if not match
          @subscriber.on_failure error(0, "Invalid ws+meta.nchan message received")
          close bundle
        else
          if @nomsg 
            msg = match[:data]
          else
            msg= Message.new match[:data]
            msg.content_type = match[:content_type]
            msg.id = match[:id]
          end
        end
      else
        msg= @nomsg ? data : Message.new(data)
      end
      
      bundle.last_message_time=Time.now.to_f
      if @subscriber.on_message(msg, bundle) == false
        close bundle
      end
      
    end
    
    @ws[bundle]=true
    
    #handhsake
    bundle.send_handshake
    
    async.listen bundle
  end
end

#send_binary(data) ⇒ Object



695
696
697
# File 'lib/nchan_tools/pubsub.rb', line 695

def send_binary(data)
  ws_client.send_binary data
end

#send_close(code = 1000, reason = nil) ⇒ Object



689
690
691
# File 'lib/nchan_tools/pubsub.rb', line 689

def send_close(code=1000, reason=nil)
  ws_client.send_close code, reason
end

#send_data(data) ⇒ Object



692
693
694
# File 'lib/nchan_tools/pubsub.rb', line 692

def send_data(data)
  ws_client.send_data data
end

#send_ping(data = nil) ⇒ Object



686
687
688
# File 'lib/nchan_tools/pubsub.rb', line 686

def send_ping(data=nil)
  ws_client.ping data
end

#stop(msg = "Stopped", src_bundle = nil) ⇒ Object



533
534
535
536
537
538
539
# File 'lib/nchan_tools/pubsub.rb', line 533

def stop(msg = "Stopped", src_bundle = nil)
  super msg, (@ws.first && @ws.first.first)
  @ws.each do |b, v|
    close b
  end
  @timer.cancel if @timer
end