Class: RethinkDB::Connection
Constant Summary collapse
- @@last =
nil
- @@magic_number =
VersionDummy::Version::V1_0
- @@protocol_version =
0
- @@wire_protocol =
VersionDummy::Protocol::JSON
- @@fast_auth_func =
lambda {|*args| digest = OpenSSL::Digest::SHA256.new OpenSSL::PKCS5.pbkdf2_hmac(*args, digest.digest_length, digest) }
- @@slow_auth_func =
lambda {|password, salt, iter| mac = OpenSSL::HMAC.new(password, OpenSSL::Digest::SHA256.new) acc = t = mac.update(salt + "\0\0\0\1").digest mac.reset (iter-1).times{acc = xor(acc, t = mac.update(t).digest); mac.reset} res = acc }
- @@auth_func =
@@fast_auth_func
- @@auth_func_mutex =
Mutex.new
- @@auth_cache =
{}
- @@auth_cache_mutex =
Mutex.new
Instance Attribute Summary collapse
-
#conn_id ⇒ Object
readonly
Returns the value of attribute conn_id.
-
#default_db ⇒ Object
readonly
Returns the value of attribute default_db.
-
#host ⇒ Object
readonly
Returns the value of attribute host.
-
#port ⇒ Object
readonly
Returns the value of attribute port.
Class Method Summary collapse
Instance Method Summary collapse
- #auto_reconnect(x = true) ⇒ Object
- #base_socket ⇒ Object
- #check_nonce(server_nonce, nonce) ⇒ Object
- #check_server_signature_claim(server_signature_claim, server_signature) ⇒ Object
- #check_version(server_info) ⇒ Object
- #client_address ⇒ Object
- #client_port ⇒ Object
- #close(opts = {}) ⇒ Object
- #closed? ⇒ Boolean
- #connect ⇒ Object
- #const_eq(a, b) ⇒ Object
- #create_context(options) ⇒ Object
- #debug_socket ⇒ Object
- #dispatch(msg, token) ⇒ Object
- #do_handshake ⇒ Object
- #hmac(*args) ⇒ Object
- #init_socket ⇒ Object
-
#initialize(opts = {}) ⇒ Connection
constructor
A new instance of Connection.
- #inspect ⇒ Object
- #is_open ⇒ Object
- #new_token ⇒ Object
- #noreply_wait ⇒ Object
-
#note_data(token, data) ⇒ Object
Synchronize around this!.
-
#note_error(token, e) ⇒ Object
Synchronize around this!.
- #pbkdf2_hmac_sha256(*args) ⇒ Object
- #rcv_json ⇒ Object
-
#reconnect(opts = {}) ⇒ Object
Reconnect to the server.
- #register_query(token, opts, callback = nil) ⇒ Object
- #remove_em_waiters ⇒ Object
- #repl ⇒ Object
- #run(msg, opts, b) ⇒ Object
- #run_internal(q, opts, token) ⇒ Object
- #send(packet) ⇒ Object
- #send_json(x) ⇒ Object
- #server ⇒ Object
- #sha256(str) ⇒ Object
- #start_listener ⇒ Object
- #stop(token) ⇒ Object
-
#use(new_default_db) ⇒ Object
Change the default database of a connection.
- #verify_cert!(socket, context) ⇒ Object
- #wait(token, timeout) ⇒ Object
Constructor Details
#initialize(opts = {}) ⇒ Connection
Returns a new instance of Connection.
468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 |
# File 'lib/net.rb', line 468 def initialize(opts={}) begin @abort_module = ::IRB rescue NameError => e @abort_module = Faux_Abort end opts = Hash[opts.map{|(k,v)| [k.to_sym,v]}] if opts.is_a?(Hash) opts = {:host => opts} if opts.is_a?(String) @host = opts[:host] || "localhost" @port = (opts[:port] || 28015).to_i @default_db = opts[:db] @user = opts[:user] || "admin" @user = @user.gsub("=", "=3D").gsub(",","=2C") if opts[:password] && opts[:auth_key] raise ReqlDriverError, "Cannot specify both a password and an auth key." end @password = opts[:password] || opts[:auth_key] || "" @nonce = SecureRandom.base64(18) @timeout = opts[:timeout].to_i @timeout = 20 if @timeout <= 0 @ssl_opts = opts[:ssl] || {} @@last = self @default_opts = @default_db ? {:db => RQL.new.db(@default_db)} : {} @conn_id = 0 @token_cnt = 0 @token_cnt_mutex = Mutex.new connect() end |
Instance Attribute Details
#conn_id ⇒ Object (readonly)
Returns the value of attribute conn_id.
500 501 502 |
# File 'lib/net.rb', line 500 def conn_id @conn_id end |
#default_db ⇒ Object (readonly)
Returns the value of attribute default_db.
500 501 502 |
# File 'lib/net.rb', line 500 def default_db @default_db end |
#host ⇒ Object (readonly)
Returns the value of attribute host.
500 501 502 |
# File 'lib/net.rb', line 500 def host @host end |
#port ⇒ Object (readonly)
Returns the value of attribute port.
500 501 502 |
# File 'lib/net.rb', line 500 def port @port end |
Class Method Details
.last ⇒ Object
794 795 796 797 |
# File 'lib/net.rb', line 794 def self.last return @@last if @@last raise ReqlRuntimeError, "No last connection. Use RethinkDB::Connection.new." end |
.xor(str1, str2) ⇒ Object
935 936 937 938 939 940 941 |
# File 'lib/net.rb', line 935 def self.xor(str1, str2) out = str1.dup out.bytesize.times {|i| out.setbyte(i, out.getbyte(i) ^ str2.getbyte(i)) } return out end |
Instance Method Details
#auto_reconnect(x = true) ⇒ Object
462 463 464 465 |
# File 'lib/net.rb', line 462 def auto_reconnect(x=true) @auto_reconnect = x self end |
#base_socket ⇒ Object
705 706 707 708 709 710 |
# File 'lib/net.rb', line 705 def base_socket socket = TCPSocket.open(@host, @port) socket.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1) socket.setsockopt(Socket::SOL_SOCKET, Socket::SO_KEEPALIVE, 1) socket end |
#check_nonce(server_nonce, nonce) ⇒ Object
875 876 877 878 879 |
# File 'lib/net.rb', line 875 def check_nonce(server_nonce, nonce) if !server_nonce.start_with?(nonce) raise ReqlDriverError, "Invalid nonce #{server_nonce} received from server." end end |
#check_server_signature_claim(server_signature_claim, server_signature) ⇒ Object
952 953 954 955 956 957 958 959 |
# File 'lib/net.rb', line 952 def check_server_signature_claim(server_signature_claim, server_signature) if !const_eq(server_signature_claim, server_signature) sig1 = Base64.strict_encode64(server_signature_claim) sig2 = Base64.strict_encode64(server_signature) raise ReqlAuthError, "Server signature #{sig1} does "+ "not match expected signature #{sig2}." end end |
#check_version(server_info) ⇒ Object
866 867 868 869 870 871 872 873 |
# File 'lib/net.rb', line 866 def check_version(server_info) if server_info['min_protocol_version'] > @@protocol_version || server_info['max_protocol_version'] < @@protocol_version raise ReqlDriverError, "Version mismatch: Driver uses #{@@protocol_version} "+ "but server accepts "+ "[#{server_info['min_version']}, #{server_info['max_version']}]." end end |
#client_address ⇒ Object
505 506 507 |
# File 'lib/net.rb', line 505 def client_address is_open() ? @socket.addr[3] : nil end |
#client_port ⇒ Object
502 503 504 |
# File 'lib/net.rb', line 502 def client_port is_open() ? @socket.addr[1] : nil end |
#close(opts = {}) ⇒ Object
739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 |
# File 'lib/net.rb', line 739 def close(opts={}) EM_Guard.unregister(self) raise ArgumentError, "Argument to close must be a hash." if opts.class != Hash if !(opts.keys - [:noreply_wait]).empty? raise ArgumentError, "close does not understand these options: " + (opts.keys - [:noreply_wait]).to_s end opts[:noreply_wait] = true if !opts.keys.include?(:noreply_wait) @mon.synchronize { @opts.clear @data.clear @waiters.each {|k,v| case v when QueryHandle v.handle_force_close when MonitorMixin::ConditionVariable @waiters[k] = nil v.signal end } @waiters.clear } noreply_wait() if opts[:noreply_wait] && is_open() if @listener @listener.terminate @listener.join end @socket.close if @socket @listener = nil @socket = nil self end |
#closed? ⇒ Boolean
735 736 737 |
# File 'lib/net.rb', line 735 def closed? !is_open end |
#connect ⇒ Object
680 681 682 683 684 685 686 687 688 689 690 |
# File 'lib/net.rb', line 680 def connect() raise RuntimeError, "Connection must be closed before calling connect." if @socket init_socket @mon = Monitor.new @waiters = {} @opts = {} @data = {} @conn_id += 1 start_listener self end |
#const_eq(a, b) ⇒ Object
943 944 945 946 947 948 949 950 |
# File 'lib/net.rb', line 943 def const_eq(a, b) return false if a.size != b.size residue = 0 a.unpack('C*').zip(b.unpack('C*')).each{|x,y| residue |= (x ^ y) } return residue == 0 end |
#create_context(options) ⇒ Object
712 713 714 715 716 717 718 719 720 721 722 |
# File 'lib/net.rb', line 712 def create_context() context = OpenSSL::SSL::SSLContext.new context.ssl_version = :TLSv1_2 if [:ca_certs] context.ca_file = [:ca_certs] context.verify_mode = OpenSSL::SSL::VERIFY_PEER else raise 'ssl options provided but missing required "ca_certs" option' end context end |
#debug_socket ⇒ Object
669 |
# File 'lib/net.rb', line 669 def debug_socket; @socket; end |
#dispatch(msg, token) ⇒ Object
605 606 607 608 609 610 |
# File 'lib/net.rb', line 605 def dispatch(msg, token) payload = Shim.dump_json(msg).force_encoding('BINARY') prefix = [token, payload.bytesize].pack('Q<L<') send(prefix + payload) return token end |
#do_handshake ⇒ Object
961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 |
# File 'lib/net.rb', line 961 def do_handshake begin send([@@magic_number].pack('L<')) = "n=#{@user},r=#{@nonce}" send_json({ protocol_version: @@protocol_version, authentication_method: "SCRAM-SHA-256", authentication: "n,,#{}" }) server_version_msg = rcv_json check_version(server_version_msg) auth_resp = rcv_json = auth_resp['authentication'] fields = Hash[.split(',').map{|x| x.split('=', 2)}] server_nonce = fields['r'] = "c=biws,r=#{server_nonce}" = "#{},#{},"+ check_nonce(server_nonce, @nonce) salt = Base64.decode64(fields['s']) iter = fields['i'].to_i salted_password = pbkdf2_hmac_sha256(@password, salt, iter) client_key = hmac(salted_password, "Client Key") stored_key = sha256(client_key) client_signature = hmac(stored_key, ) client_proof = Connection::xor(client_key, client_signature) cproof_64 = Base64.strict_encode64(client_proof) msg = "#{},p=#{cproof_64}" send_json({authentication: msg}) sig_resp = rcv_json fields = Hash[sig_resp['authentication'].split(',').map{|x| x.split('=', 2)}] server_signature_claim = Base64::decode64(fields['v']) server_key = hmac(salted_password, "Server Key") server_signature = hmac(server_key, ) check_server_signature_claim(server_signature_claim, server_signature) rescue ReqlError => e raise e rescue Exception => e raise ReqlDriverError, "Error during handshake: #{e.inspect} #{e.backtrace}" end end |
#hmac(*args) ⇒ Object
927 928 929 |
# File 'lib/net.rb', line 927 def hmac(*args) OpenSSL::HMAC.digest(OpenSSL::Digest::SHA256.new, *args) end |
#init_socket ⇒ Object
692 693 694 695 696 697 698 699 700 701 702 703 |
# File 'lib/net.rb', line 692 def init_socket unless @ssl_opts.empty? @tcp_socket = base_socket context = create_context(@ssl_opts) @socket = OpenSSL::SSL::SSLSocket.new(@tcp_socket, context) @socket.sync_close = true @socket.connect verify_cert!(@socket, context) else @socket = base_socket end end |
#inspect ⇒ Object
657 658 659 660 661 662 |
# File 'lib/net.rb', line 657 def inspect db = @default_opts[:db] || RQL.new.db('test') properties = "(#{@host}:#{@port}) (Default DB: #{db.inspect})" state = is_open() ? "(open)" : "(closed)" "#<RethinkDB::Connection:#{object_id} #{properties} #{state}>" end |
#is_open ⇒ Object
732 733 734 |
# File 'lib/net.rb', line 732 def is_open @socket && @listener end |
#new_token ⇒ Object
509 510 511 |
# File 'lib/net.rb', line 509 def new_token @token_cnt_mutex.synchronize{@token_cnt += 1} end |
#noreply_wait ⇒ Object
774 775 776 777 778 779 780 781 782 |
# File 'lib/net.rb', line 774 def noreply_wait raise ReqlRuntimeError, "Connection is closed." if !is_open() q = [Query::QueryType::NOREPLY_WAIT] res = run_internal(q, {noreply: false}, new_token) if res['t'] != Response::ResponseType::WAIT_COMPLETE raise ReqlRuntimeError, "Unexpected response to noreply_wait: " + PP.pp(res, "") end nil end |
#note_data(token, data) ⇒ Object
Synchronize around this!
810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 |
# File 'lib/net.rb', line 810 def note_data(token, data) # Synchronize around this! @opts.delete(token) w = @waiters.delete(token) case w when MonitorMixin::ConditionVariable @data[token] = data w.signal when QueryHandle w.callback(data) when nil # nothing else raise ReqlDriverError, "Unrecognized value #{w.inspect} in `@waiters`." end end |
#note_error(token, e) ⇒ Object
Synchronize around this!
826 827 828 829 830 831 832 833 |
# File 'lib/net.rb', line 826 def note_error(token, e) # Synchronize around this! data = { 't' => Response::ResponseType::CLIENT_ERROR, 'r' => [e.], 'b' => [] } note_data(token, data) end |
#pbkdf2_hmac_sha256(*args) ⇒ Object
898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 |
# File 'lib/net.rb', line 898 def pbkdf2_hmac_sha256(*args) auth_func = res = nil @@auth_cache_mutex.synchronize { res = @@auth_cache[args] return res if res } @@auth_func_mutex.synchronize { auth_func = @@auth_func } begin res = auth_func.call(*args) rescue NotImplementedError => e if auth_func != @@slow_auth_func @@auth_func_mutex.synchronize { auth_func = @@auth_func = @@slow_auth_func } res = auth_func.call(*args) else raise e end end @@auth_cache_mutex.synchronize { @@auth_cache[args] = res } return res end |
#rcv_json ⇒ Object
835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 |
# File 'lib/net.rb', line 835 def rcv_json response = "" while response[-1..-1] != "\0" response += @socket.read_exn(1, @timeout) end begin res = JSON.parse(response[0...-1]) if !res['success'] msg = "Handshake error (#{res})." ecode = res['error_code'] if ecode && ecode >= 10 && ecode <= 20 msg = res['error'] if res['error'].to_s != "" raise ReqlAuthError, msg else raise ReqlDriverError, msg end end rescue Exception => e if !e.class.ancestors.include?(ReqlDriverError) raise ReqlDriverError, "Connection closed by server (#{e})." else raise e end end return res end |
#reconnect(opts = {}) ⇒ Object
Reconnect to the server. This will interrupt all queries on the server (if :noreply_wait => false) and invalidate all outstanding enumerables on the client.
674 675 676 677 678 |
# File 'lib/net.rb', line 674 def reconnect(opts={}) raise ArgumentError, "Argument to reconnect must be a hash." if opts.class != Hash close(opts) connect() end |
#register_query(token, opts, callback = nil) ⇒ Object
513 514 515 516 517 518 519 520 521 522 523 |
# File 'lib/net.rb', line 513 def register_query(token, opts, callback=nil) if !opts[:noreply] @mon.synchronize { if @waiters.has_key?(token) raise ReqlDriverError, "Internal driver error, token already in use." end @waiters[token] = callback ? callback : @mon.new_cond @opts[token] = opts } end end |
#remove_em_waiters ⇒ Object
799 800 801 802 803 804 805 806 807 808 |
# File 'lib/net.rb', line 799 def remove_em_waiters @mon.synchronize { @waiters.each {|k,v| if v.is_a? QueryHandle v.handle_force_close @waiters.delete(k) end } } end |
#repl ⇒ Object
466 |
# File 'lib/net.rb', line 466 def repl; RQL.set_default_conn self; end |
#run(msg, opts, b) ⇒ Object
535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 |
# File 'lib/net.rb', line 535 def run(msg, opts, b) reconnect(:noreply_wait => false) if @auto_reconnect && !is_open() raise ReqlRuntimeError, "Connection is closed." if !is_open() global_optargs = {} all_opts = @default_opts.merge(opts) if all_opts.keys.include?(:noreply) all_opts[:noreply] = !!all_opts[:noreply] end token = new_token q = [Query::QueryType::START, msg, Hash[all_opts.map {|k,v| [k.to_s, (v.is_a?(RQL) ? v.to_pb : RQL.new.expr(v).to_pb)] }]] if b.is_a? Handler callback = QueryHandle.new(b, msg, all_opts, token, self) register_query(token, all_opts, callback) dispatch(q, token) return callback else res = run_internal(q, all_opts, token) return res if !res if res['t'] == Response::ResponseType::SUCCESS_PARTIAL value = Cursor.new(Shim.response_to_native(res, msg, opts), msg, self, opts, token, true) elsif res['t'] == Response::ResponseType::SUCCESS_SEQUENCE value = Cursor.new(Shim.response_to_native(res, msg, opts), msg, self, opts, token, false) else value = Shim.response_to_native(res, msg, opts) end if res['p'] real_val = { "profile" => res['p'], "value" => value } else real_val = value end if b begin b.call(real_val) ensure value.close if value.is_a?(Cursor) end else real_val end end end |
#run_internal(q, opts, token) ⇒ Object
524 525 526 527 528 |
# File 'lib/net.rb', line 524 def run_internal(q, opts, token) register_query(token, opts) dispatch(q, token) opts[:noreply] ? nil : wait(token, nil) end |
#send(packet) ⇒ Object
591 592 593 594 595 596 597 598 599 600 601 602 603 |
# File 'lib/net.rb', line 591 def send packet @mon.synchronize { written = 0 while written < packet.length # Supposedly slice will not copy the array if it goes all # the way to the end We use IO::syswrite here rather than # IO::write because of incompatibilities in JRuby regarding # filling up the TCP send buffer. Reference: # https://github.com/rethinkdb/rethinkdb/issues/3795 written += @socket.syswrite(packet.slice(written, packet.length)) end } end |
#send_json(x) ⇒ Object
862 863 864 |
# File 'lib/net.rb', line 862 def send_json(x) send(x.to_json + "\0") end |
#server ⇒ Object
784 785 786 787 788 789 790 791 792 |
# File 'lib/net.rb', line 784 def server raise ReqlRuntimeError, "Connection is closed." if !is_open() q = [Query::QueryType::SERVER_INFO] res = run_internal(q, {noreply: false}, new_token) if res['t'] != Response::ResponseType::SERVER_INFO raise ReqlRuntimeError, "Unexpected response to server_info: " + PP.pp(res, "") end res['r'][0] end |
#sha256(str) ⇒ Object
931 932 933 |
# File 'lib/net.rb', line 931 def sha256(str) OpenSSL::Digest.digest("SHA256", str) end |
#start_listener ⇒ Object
1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 |
# File 'lib/net.rb', line 1009 def start_listener class << @socket def maybe_timeout(sec=nil, &b) sec ? Timeout::timeout(sec, &b) : b.call end def read_exn(len, timeout_sec=nil) maybe_timeout(timeout_sec) { buf = read len if !buf || buf.length != len raise ReqlRuntimeError, "Connection closed by server." end return buf } end end do_handshake @listener = Thread.new { while true begin token = nil token = @socket.read_exn(8).unpack('q<')[0] response_length = @socket.read_exn(4).unpack('L<')[0] response = @socket.read_exn(response_length) begin data = Shim.load_json(response, @opts[token]) rescue Exception => e raise ReqlRuntimeError, "Bad response, server is buggy.\n" + "#{e.inspect}\n" + response end @mon.synchronize{note_data(token, data)} rescue Exception => e @mon.synchronize { @waiters.keys.each{ |k| note_error(k, e) } @listener = nil Thread.current.terminate abort("unreachable") } end end } end |
#stop(token) ⇒ Object
529 530 531 532 533 534 |
# File 'lib/net.rb', line 529 def stop(token) dispatch([Query::QueryType::STOP], token) @mon.synchronize { !!@waiters.delete(token) } end |
#use(new_default_db) ⇒ Object
Change the default database of a connection.
652 653 654 655 |
# File 'lib/net.rb', line 652 def use(new_default_db) @default_db = new_default_db @default_opts[:db] = RQL.new.db(new_default_db) end |
#verify_cert!(socket, context) ⇒ Object
724 725 726 727 728 729 730 |
# File 'lib/net.rb', line 724 def verify_cert!(socket, context) if context.verify_mode == OpenSSL::SSL::VERIFY_PEER unless OpenSSL::SSL.verify_certificate_identity(socket.peer_cert, host) raise 'SSL handshake failed due to a hostname mismatch.' end end end |
#wait(token, timeout) ⇒ Object
612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 |
# File 'lib/net.rb', line 612 def wait(token, timeout) begin @mon.synchronize { end_time = timeout ? Time.now.to_f + timeout : nil loop { res = @data.delete(token) return res if res # Theoretically we only need to check the second property, # but this is safer in case someone makes changes to # `close` in the future. if !is_open() || !@waiters.has_key?(token) raise ReqlRuntimeError, "Connection is closed." end if end_time cur_time = Time.now.to_f if cur_time >= end_time raise Timeout::Error, "Timed out waiting for cursor response." else # We can't use `wait_while` because it doesn't take a # timeout, and we can't use an external `timeout { # ... }` block because in Ruby 1.9.1 it seems to confuse # the synchronization in `@mon` to be timed out while # waiting in a synchronize block. @waiters[token].wait(end_time - cur_time) end else @waiters[token].wait end } } rescue @abort_module::Abort => e print "\nAborting query and reconnecting...\n" reconnect(:noreply_wait => false) raise e end end |