Class: RethinkDB::Connection

Inherits:
Object
  • Object
show all
Includes:
OpenSSL
Defined in:
lib/net.rb

Constant Summary collapse

@@last =
nil
@@magic_number =
VersionDummy::Version::V1_0
@@protocol_version =
0
@@wire_protocol =
VersionDummy::Protocol::JSON
@@fast_auth_func =
lambda {|*args|
  digest = OpenSSL::Digest::SHA256.new
  OpenSSL::PKCS5.pbkdf2_hmac(*args, digest.digest_length, digest)
}
@@slow_auth_func =
lambda {|password, salt, iter|
  mac = OpenSSL::HMAC.new(password, OpenSSL::Digest::SHA256.new)
  acc = t = mac.update(salt + "\0\0\0\1").digest
  mac.reset
  (iter-1).times{acc = xor(acc, t = mac.update(t).digest); mac.reset}
  res = acc
}
@@auth_func =
@@fast_auth_func
@@auth_func_mutex =
Mutex.new
@@auth_cache =
{}
@@auth_cache_mutex =
Mutex.new

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(opts = {}) ⇒ Connection

Returns a new instance of Connection.



468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
# File 'lib/net.rb', line 468

def initialize(opts={})
  begin
    @abort_module = ::IRB
  rescue NameError => e
    @abort_module = Faux_Abort
  end

  opts = Hash[opts.map{|(k,v)| [k.to_sym,v]}] if opts.is_a?(Hash)
  opts = {:host => opts} if opts.is_a?(String)
  @host = opts[:host] || "localhost"
  @port = (opts[:port] || 28015).to_i
  @default_db = opts[:db]
  @user = opts[:user] || "admin"
  @user = @user.gsub("=", "=3D").gsub(",","=2C")
  if opts[:password] && opts[:auth_key]
    raise ReqlDriverError, "Cannot specify both a password and an auth key."
  end
  @password = opts[:password] || opts[:auth_key] || ""
  @nonce = SecureRandom.base64(18)
  @timeout = opts[:timeout].to_i
  @timeout = 20 if @timeout <= 0
  @ssl_opts = opts[:ssl] || {}

  @@last = self
  @default_opts = @default_db ? {:db => RQL.new.db(@default_db)} : {}
  @conn_id = 0

  @token_cnt = 0
  @token_cnt_mutex = Mutex.new

  connect()
end

Instance Attribute Details

#conn_idObject (readonly)

Returns the value of attribute conn_id.



500
501
502
# File 'lib/net.rb', line 500

def conn_id
  @conn_id
end

#default_dbObject (readonly)

Returns the value of attribute default_db.



500
501
502
# File 'lib/net.rb', line 500

def default_db
  @default_db
end

#hostObject (readonly)

Returns the value of attribute host.



500
501
502
# File 'lib/net.rb', line 500

def host
  @host
end

#portObject (readonly)

Returns the value of attribute port.



500
501
502
# File 'lib/net.rb', line 500

def port
  @port
end

Class Method Details

.lastObject

Raises:

  • (ReqlRuntimeError)


794
795
796
797
# File 'lib/net.rb', line 794

def self.last
  return @@last if @@last
  raise ReqlRuntimeError, "No last connection.  Use RethinkDB::Connection.new."
end

.xor(str1, str2) ⇒ Object



935
936
937
938
939
940
941
# File 'lib/net.rb', line 935

def self.xor(str1, str2)
  out = str1.dup
  out.bytesize.times {|i|
    out.setbyte(i, out.getbyte(i) ^ str2.getbyte(i))
  }
  return out
end

Instance Method Details

#auto_reconnect(x = true) ⇒ Object



462
463
464
465
# File 'lib/net.rb', line 462

def auto_reconnect(x=true)
  @auto_reconnect = x
  self
end

#base_socketObject



705
706
707
708
709
710
# File 'lib/net.rb', line 705

def base_socket
  socket = TCPSocket.open(@host, @port)
  socket.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1)
  socket.setsockopt(Socket::SOL_SOCKET, Socket::SO_KEEPALIVE, 1)
  socket
end

#check_nonce(server_nonce, nonce) ⇒ Object



875
876
877
878
879
# File 'lib/net.rb', line 875

def check_nonce(server_nonce, nonce)
  if !server_nonce.start_with?(nonce)
    raise ReqlDriverError, "Invalid nonce #{server_nonce} received from server."
  end
end

#check_server_signature_claim(server_signature_claim, server_signature) ⇒ Object



952
953
954
955
956
957
958
959
# File 'lib/net.rb', line 952

def check_server_signature_claim(server_signature_claim, server_signature)
  if !const_eq(server_signature_claim, server_signature)
    sig1 = Base64.strict_encode64(server_signature_claim)
    sig2 = Base64.strict_encode64(server_signature)
    raise ReqlAuthError, "Server signature #{sig1} does "+
      "not match expected signature #{sig2}."
  end
end

#check_version(server_info) ⇒ Object



866
867
868
869
870
871
872
873
# File 'lib/net.rb', line 866

def check_version(server_info)
  if server_info['min_protocol_version'] > @@protocol_version ||
      server_info['max_protocol_version'] < @@protocol_version
    raise ReqlDriverError, "Version mismatch: Driver uses #{@@protocol_version} "+
      "but server accepts "+
      "[#{server_info['min_version']}, #{server_info['max_version']}]."
  end
end

#client_addressObject



505
506
507
# File 'lib/net.rb', line 505

def client_address
  is_open() ? @socket.addr[3] : nil
end

#client_portObject



502
503
504
# File 'lib/net.rb', line 502

def client_port
  is_open() ? @socket.addr[1] : nil
end

#close(opts = {}) ⇒ Object

Raises:

  • (ArgumentError)


739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
# File 'lib/net.rb', line 739

def close(opts={})
  EM_Guard.unregister(self)
  raise ArgumentError, "Argument to close must be a hash." if opts.class != Hash
  if !(opts.keys - [:noreply_wait]).empty?
    raise ArgumentError, "close does not understand these options: " +
      (opts.keys - [:noreply_wait]).to_s
  end
  opts[:noreply_wait] = true if !opts.keys.include?(:noreply_wait)

  @mon.synchronize {
    @opts.clear
    @data.clear
    @waiters.each {|k,v|
      case v
      when QueryHandle
        v.handle_force_close
      when MonitorMixin::ConditionVariable
        @waiters[k] = nil
        v.signal
      end
    }
    @waiters.clear
  }

  noreply_wait() if opts[:noreply_wait] && is_open()
  if @listener
    @listener.terminate
    @listener.join
  end
  @socket.close if @socket
  @listener = nil
  @socket = nil
  self
end

#closed?Boolean

Returns:

  • (Boolean)


735
736
737
# File 'lib/net.rb', line 735

def closed?
  !is_open
end

#connectObject

Raises:

  • (RuntimeError)


680
681
682
683
684
685
686
687
688
689
690
# File 'lib/net.rb', line 680

def connect()
  raise RuntimeError, "Connection must be closed before calling connect." if @socket
  init_socket
  @mon = Monitor.new
  @waiters = {}
  @opts = {}
  @data = {}
  @conn_id += 1
  start_listener
  self
end

#const_eq(a, b) ⇒ Object



943
944
945
946
947
948
949
950
# File 'lib/net.rb', line 943

def const_eq(a, b)
  return false if a.size != b.size
  residue = 0
  a.unpack('C*').zip(b.unpack('C*')).each{|x,y|
    residue |= (x ^ y)
  }
  return residue == 0
end

#create_context(options) ⇒ Object



712
713
714
715
716
717
718
719
720
721
722
# File 'lib/net.rb', line 712

def create_context(options)
  context = OpenSSL::SSL::SSLContext.new
  context.ssl_version = :TLSv1_2
  if options[:ca_certs]
    context.ca_file = options[:ca_certs]
    context.verify_mode = OpenSSL::SSL::VERIFY_PEER
  else
    raise 'ssl options provided but missing required "ca_certs" option'
  end
  context
end

#debug_socketObject



669
# File 'lib/net.rb', line 669

def debug_socket; @socket; end

#dispatch(msg, token) ⇒ Object



605
606
607
608
609
610
# File 'lib/net.rb', line 605

def dispatch(msg, token)
  payload = Shim.dump_json(msg).force_encoding('BINARY')
  prefix = [token, payload.bytesize].pack('Q<L<')
  send(prefix + payload)
  return token
end

#do_handshakeObject



961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
# File 'lib/net.rb', line 961

def do_handshake
  begin
    send([@@magic_number].pack('L<'))
    client_first_message_bare = "n=#{@user},r=#{@nonce}"
    send_json({ protocol_version: @@protocol_version,
                authentication_method: "SCRAM-SHA-256",
                authentication: "n,,#{client_first_message_bare}" })

    server_version_msg = rcv_json
    check_version(server_version_msg)

    auth_resp = rcv_json
    server_first_message = auth_resp['authentication']

    fields = Hash[server_first_message.split(',').map{|x| x.split('=', 2)}]
    server_nonce = fields['r']

    client_final_message_without_proof = "c=biws,r=#{server_nonce}"
    auth_message = "#{client_first_message_bare},#{server_first_message},"+
      client_final_message_without_proof
    check_nonce(server_nonce, @nonce)
    salt = Base64.decode64(fields['s'])
    iter = fields['i'].to_i

    salted_password = pbkdf2_hmac_sha256(@password, salt, iter)

    client_key = hmac(salted_password, "Client Key")
    stored_key = sha256(client_key)
    client_signature = hmac(stored_key, auth_message)
    client_proof = Connection::xor(client_key, client_signature)
    cproof_64 = Base64.strict_encode64(client_proof)

    msg = "#{client_final_message_without_proof},p=#{cproof_64}"
    send_json({authentication: msg})

    sig_resp = rcv_json
    fields = Hash[sig_resp['authentication'].split(',').map{|x| x.split('=', 2)}]
    server_signature_claim = Base64::decode64(fields['v'])
    server_key = hmac(salted_password, "Server Key")
    server_signature = hmac(server_key, auth_message)
    check_server_signature_claim(server_signature_claim, server_signature)
  rescue ReqlError => e
    raise e
  rescue Exception => e
    raise ReqlDriverError, "Error during handshake: #{e.inspect} #{e.backtrace}"
  end
end

#hmac(*args) ⇒ Object



927
928
929
# File 'lib/net.rb', line 927

def hmac(*args)
  OpenSSL::HMAC.digest(OpenSSL::Digest::SHA256.new, *args)
end

#init_socketObject



692
693
694
695
696
697
698
699
700
701
702
703
# File 'lib/net.rb', line 692

def init_socket
  unless @ssl_opts.empty?
    @tcp_socket = base_socket
    context = create_context(@ssl_opts)
    @socket = OpenSSL::SSL::SSLSocket.new(@tcp_socket, context)
    @socket.sync_close = true
    @socket.connect
    verify_cert!(@socket, context)
  else
    @socket = base_socket
  end
end

#inspectObject



657
658
659
660
661
662
# File 'lib/net.rb', line 657

def inspect
  db = @default_opts[:db] || RQL.new.db('test')
  properties = "(#{@host}:#{@port}) (Default DB: #{db.inspect})"
  state = is_open() ? "(open)" : "(closed)"
  "#<RethinkDB::Connection:#{object_id} #{properties} #{state}>"
end

#is_openObject



732
733
734
# File 'lib/net.rb', line 732

def is_open
  @socket && @listener
end

#new_tokenObject



509
510
511
# File 'lib/net.rb', line 509

def new_token
  @token_cnt_mutex.synchronize{@token_cnt += 1}
end

#noreply_waitObject

Raises:

  • (ReqlRuntimeError)


774
775
776
777
778
779
780
781
782
# File 'lib/net.rb', line 774

def noreply_wait
  raise ReqlRuntimeError, "Connection is closed." if !is_open()
  q = [Query::QueryType::NOREPLY_WAIT]
  res = run_internal(q, {noreply: false}, new_token)
  if res['t'] != Response::ResponseType::WAIT_COMPLETE
    raise ReqlRuntimeError, "Unexpected response to noreply_wait: " + PP.pp(res, "")
  end
  nil
end

#note_data(token, data) ⇒ Object

Synchronize around this!



810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
# File 'lib/net.rb', line 810

def note_data(token, data) # Synchronize around this!
  @opts.delete(token)
  w = @waiters.delete(token)
  case w
  when MonitorMixin::ConditionVariable
    @data[token] = data
    w.signal
  when QueryHandle
    w.callback(data)
  when nil
    # nothing
  else
    raise ReqlDriverError, "Unrecognized value #{w.inspect} in `@waiters`."
  end
end

#note_error(token, e) ⇒ Object

Synchronize around this!



826
827
828
829
830
831
832
833
# File 'lib/net.rb', line 826

def note_error(token, e) # Synchronize around this!
  data = {
    't' => Response::ResponseType::CLIENT_ERROR,
    'r' => [e.message],
    'b' => []
  }
  note_data(token, data)
end

#pbkdf2_hmac_sha256(*args) ⇒ Object



898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
# File 'lib/net.rb', line 898

def pbkdf2_hmac_sha256(*args)
  auth_func = res = nil
  @@auth_cache_mutex.synchronize {
    res = @@auth_cache[args]
    return res if res
  }

  @@auth_func_mutex.synchronize {
    auth_func = @@auth_func
  }
  begin
    res = auth_func.call(*args)
  rescue NotImplementedError => e
    if auth_func != @@slow_auth_func
      @@auth_func_mutex.synchronize {
        auth_func = @@auth_func = @@slow_auth_func
      }
      res = auth_func.call(*args)
    else
      raise e
    end
  end

  @@auth_cache_mutex.synchronize {
    @@auth_cache[args] = res
  }
  return res
end

#rcv_jsonObject



835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
# File 'lib/net.rb', line 835

def rcv_json
  response = ""
  while response[-1..-1] != "\0"
    response += @socket.read_exn(1, @timeout)
  end
  begin
    res = JSON.parse(response[0...-1])
    if !res['success']
      msg = "Handshake error (#{res})."
      ecode = res['error_code']
      if ecode && ecode >= 10 && ecode <= 20
        msg = res['error'] if res['error'].to_s != ""
        raise ReqlAuthError, msg
      else
        raise ReqlDriverError, msg
      end
    end
  rescue Exception => e
    if !e.class.ancestors.include?(ReqlDriverError)
      raise ReqlDriverError, "Connection closed by server (#{e})."
    else
      raise e
    end
  end
  return res
end

#reconnect(opts = {}) ⇒ Object

Reconnect to the server. This will interrupt all queries on the server (if :noreply_wait => false) and invalidate all outstanding enumerables on the client.

Raises:

  • (ArgumentError)


674
675
676
677
678
# File 'lib/net.rb', line 674

def reconnect(opts={})
  raise ArgumentError, "Argument to reconnect must be a hash." if opts.class != Hash
  close(opts)
  connect()
end

#register_query(token, opts, callback = nil) ⇒ Object



513
514
515
516
517
518
519
520
521
522
523
# File 'lib/net.rb', line 513

def register_query(token, opts, callback=nil)
  if !opts[:noreply]
    @mon.synchronize {
      if @waiters.has_key?(token)
        raise ReqlDriverError, "Internal driver error, token already in use."
      end
      @waiters[token] = callback ? callback : @mon.new_cond
      @opts[token] = opts
    }
  end
end

#remove_em_waitersObject



799
800
801
802
803
804
805
806
807
808
# File 'lib/net.rb', line 799

def remove_em_waiters
  @mon.synchronize {
    @waiters.each {|k,v|
      if v.is_a? QueryHandle
        v.handle_force_close
        @waiters.delete(k)
      end
    }
  }
end

#replObject



466
# File 'lib/net.rb', line 466

def repl; RQL.set_default_conn self; end

#run(msg, opts, b) ⇒ Object

Raises:

  • (ReqlRuntimeError)


535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
# File 'lib/net.rb', line 535

def run(msg, opts, b)
  reconnect(:noreply_wait => false) if @auto_reconnect && !is_open()
  raise ReqlRuntimeError, "Connection is closed." if !is_open()

  global_optargs = {}
  all_opts = @default_opts.merge(opts)
  if all_opts.keys.include?(:noreply)
    all_opts[:noreply] = !!all_opts[:noreply]
  end

  token = new_token
  q = [Query::QueryType::START,
       msg,
       Hash[all_opts.map {|k,v|
              [k.to_s, (v.is_a?(RQL) ? v.to_pb : RQL.new.expr(v).to_pb)]
            }]]

  if b.is_a? Handler
    callback = QueryHandle.new(b, msg, all_opts, token, self)
    register_query(token, all_opts, callback)
    dispatch(q, token)
    return callback
  else
    res = run_internal(q, all_opts, token)
    return res if !res
    if res['t'] == Response::ResponseType::SUCCESS_PARTIAL
      value = Cursor.new(Shim.response_to_native(res, msg, opts),
                         msg, self, opts, token, true)
    elsif res['t'] == Response::ResponseType::SUCCESS_SEQUENCE
      value = Cursor.new(Shim.response_to_native(res, msg, opts),
                         msg, self, opts, token, false)
    else
      value = Shim.response_to_native(res, msg, opts)
    end

    if res['p']
      real_val = {
        "profile" => res['p'],
        "value" => value
      }
    else
      real_val = value
    end

    if b
      begin
        b.call(real_val)
      ensure
        value.close if value.is_a?(Cursor)
      end
    else
      real_val
    end
  end
end

#run_internal(q, opts, token) ⇒ Object



524
525
526
527
528
# File 'lib/net.rb', line 524

def run_internal(q, opts, token)
  register_query(token, opts)
  dispatch(q, token)
  opts[:noreply] ? nil : wait(token, nil)
end

#send(packet) ⇒ Object



591
592
593
594
595
596
597
598
599
600
601
602
603
# File 'lib/net.rb', line 591

def send packet
  @mon.synchronize {
    written = 0
    while written < packet.length
      # Supposedly slice will not copy the array if it goes all
      # the way to the end We use IO::syswrite here rather than
      # IO::write because of incompatibilities in JRuby regarding
      # filling up the TCP send buffer.  Reference:
      # https://github.com/rethinkdb/rethinkdb/issues/3795
      written += @socket.syswrite(packet.slice(written, packet.length))
    end
  }
end

#send_json(x) ⇒ Object



862
863
864
# File 'lib/net.rb', line 862

def send_json(x)
  send(x.to_json + "\0")
end

#serverObject

Raises:

  • (ReqlRuntimeError)


784
785
786
787
788
789
790
791
792
# File 'lib/net.rb', line 784

def server
  raise ReqlRuntimeError, "Connection is closed." if !is_open()
  q = [Query::QueryType::SERVER_INFO]
  res = run_internal(q, {noreply: false}, new_token)
  if res['t'] != Response::ResponseType::SERVER_INFO
    raise ReqlRuntimeError, "Unexpected response to server_info: " + PP.pp(res, "")
  end
  res['r'][0]
end

#sha256(str) ⇒ Object



931
932
933
# File 'lib/net.rb', line 931

def sha256(str)
  OpenSSL::Digest.digest("SHA256", str)
end

#start_listenerObject



1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
# File 'lib/net.rb', line 1009

def start_listener
  class << @socket
    def maybe_timeout(sec=nil, &b)
      sec ? Timeout::timeout(sec, &b) : b.call
    end
    def read_exn(len, timeout_sec=nil)
      maybe_timeout(timeout_sec) {
        buf = read len
        if !buf || buf.length != len
          raise ReqlRuntimeError, "Connection closed by server."
        end
        return buf
      }
    end
  end
  do_handshake
  @listener = Thread.new {
    while true
      begin
        token = nil
        token = @socket.read_exn(8).unpack('q<')[0]
        response_length = @socket.read_exn(4).unpack('L<')[0]
        response = @socket.read_exn(response_length)
        begin
          data = Shim.load_json(response, @opts[token])
        rescue Exception => e
          raise ReqlRuntimeError, "Bad response, server is buggy.\n" +
            "#{e.inspect}\n" + response
        end
        @mon.synchronize{note_data(token, data)}
      rescue Exception => e
        @mon.synchronize {
          @waiters.keys.each{ |k| note_error(k, e) }
          @listener = nil
          Thread.current.terminate
          abort("unreachable")
        }
      end
    end
  }
end

#stop(token) ⇒ Object



529
530
531
532
533
534
# File 'lib/net.rb', line 529

def stop(token)
  dispatch([Query::QueryType::STOP], token)
  @mon.synchronize {
    !!@waiters.delete(token)
  }
end

#use(new_default_db) ⇒ Object

Change the default database of a connection.



652
653
654
655
# File 'lib/net.rb', line 652

def use(new_default_db)
  @default_db = new_default_db
  @default_opts[:db] = RQL.new.db(new_default_db)
end

#verify_cert!(socket, context) ⇒ Object



724
725
726
727
728
729
730
# File 'lib/net.rb', line 724

def verify_cert!(socket, context)
  if context.verify_mode == OpenSSL::SSL::VERIFY_PEER
    unless OpenSSL::SSL.verify_certificate_identity(socket.peer_cert, host)
      raise 'SSL handshake failed due to a hostname mismatch.'
    end
  end
end

#wait(token, timeout) ⇒ Object



612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
# File 'lib/net.rb', line 612

def wait(token, timeout)
  begin
    @mon.synchronize {
      end_time = timeout ? Time.now.to_f + timeout : nil
      loop {
        res = @data.delete(token)
        return res if res

        # Theoretically we only need to check the second property,
        # but this is safer in case someone makes changes to
        # `close` in the future.
        if !is_open() || !@waiters.has_key?(token)
          raise ReqlRuntimeError, "Connection is closed."
        end

        if end_time
          cur_time = Time.now.to_f
          if cur_time >= end_time
            raise Timeout::Error, "Timed out waiting for cursor response."
          else
            # We can't use `wait_while` because it doesn't take a
            # timeout, and we can't use an external `timeout {
            # ... }` block because in Ruby 1.9.1 it seems to confuse
            # the synchronization in `@mon` to be timed out while
            # waiting in a synchronize block.
            @waiters[token].wait(end_time - cur_time)
          end
        else
          @waiters[token].wait
        end
      }
    }
  rescue @abort_module::Abort => e
    print "\nAborting query and reconnecting...\n"
    reconnect(:noreply_wait => false)
    raise e
  end
end