Class: Impala::Connection

Inherits:
Object
  • Object
show all
Defined in:
lib/impala/connection.rb

Overview

This object represents a connection to an Impala server. It can be used to perform queries on the database.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(host, port, options = {}) ⇒ Connection

Don’t instantiate Connections directly; instead, use Impala.connect.



8
9
10
11
12
13
14
15
16
# File 'lib/impala/connection.rb', line 8

def initialize(host, port, options={})
  @host = host
  @port = port
  @connected = false
  @options = options.dup
  @options[:transport] ||= :buffered
  @loggers = @options.fetch(:loggers, [])
  open
end

Instance Attribute Details

#hostObject

Returns the value of attribute host.



5
6
7
# File 'lib/impala/connection.rb', line 5

def host
  @host
end

#portObject

Returns the value of attribute port.



5
6
7
# File 'lib/impala/connection.rb', line 5

def port
  @port
end

Instance Method Details

#closeObject

Close this connection. It can still be reopened with #open.



70
71
72
73
74
75
# File 'lib/impala/connection.rb', line 70

def close
  return unless @connected

  @transport.close
  @connected = false
end

#close_handle(handle) ⇒ Object



117
118
119
# File 'lib/impala/connection.rb', line 117

def close_handle(handle)
  @service.close(handle)
end

#execute(raw_query, query_options = {}) ⇒ Cursor

Perform a query and return a cursor for iterating over the results.

Parameters:

  • query (String)

    the query you want to run

  • query_options (Hash) (defaults to: {})

    the options to set user and configuration except for :user, see TImpalaQueryOptions in ImpalaService.thrift

Options Hash (query_options):

  • :user (String)

    the user runs the query

Returns:

  • (Cursor)

    a cursor for the result rows

Raises:



106
107
108
109
110
111
112
113
114
115
# File 'lib/impala/connection.rb', line 106

def execute(raw_query, query_options = {})
  raise ConnectionError.new("Connection closed") unless open?

  query = sanitize_query(raw_query)
  handle = send_query(query, query_options)

  cursor = Cursor.new(handle, @service, @options)
  cursor.wait!
  cursor
end

#inspectObject



18
19
20
# File 'lib/impala/connection.rb', line 18

def inspect
  "#<#{self.class} #{@host}:#{@port}#{open? ? '' : ' (DISCONNECTED)'}>"
end

#openObject

Open the connection if it’s currently closed.



23
24
25
26
27
28
29
30
31
32
33
34
# File 'lib/impala/connection.rb', line 23

def open
  return if @connected

  @transport = thrift_transport(host, port)
  @transport.open do |transport|
    enable_keepalive(transport)
  end

  proto = Thrift::BinaryProtocol.new(@transport)
  @service = Protocol::ImpalaService::Client.new(proto)
  @connected = true
end

#open?Boolean

Returns true if the connection is currently open.

Returns:

  • (Boolean)


78
79
80
# File 'lib/impala/connection.rb', line 78

def open?
  @connected
end

#parse_sasl_params(sasl_params) ⇒ Object

Processes SASL connection params and returns a hash with symbol keys or a nil



58
59
60
61
62
63
64
65
66
67
# File 'lib/impala/connection.rb', line 58

def parse_sasl_params(sasl_params)
  # Symbilize keys in a hash
  if sasl_params.kind_of?(Hash)
    return sasl_params.inject({}) do |memo,(k,v)|
      memo[k.to_sym] = v;
      memo
    end
  end
  return nil
end

#query(raw_query, query_options = {}) ⇒ Array<Hash>

Perform a query and return all the results. This will load the entire result set into memory, so if you’re dealing with lots of rows, #execute may work better.

Parameters:

  • query (String)

    the query you want to run

  • query_options (Hash) (defaults to: {})

    the options to set user and configuration except for :user, see TImpalaQueryOptions in ImpalaService.thrift

Options Hash (query_options):

  • :user (String)

    the user runs the query

Returns:

  • (Array<Hash>)

    an array of hashes, one for each row.



96
97
98
# File 'lib/impala/connection.rb', line 96

def query(raw_query, query_options = {})
  execute(raw_query, query_options).fetch_all
end

#refreshObject

Refresh the metadata store.

Raises:



83
84
85
86
# File 'lib/impala/connection.rb', line 83

def refresh
  raise ConnectionError.new("Connection closed") unless open?
  @service.ResetCatalog
end

#thrift_socket(server, port, timeout) ⇒ Object



51
52
53
54
55
# File 'lib/impala/connection.rb', line 51

def thrift_socket(server, port, timeout)
  socket = Thrift::Socket.new(server, port)
  socket.timeout = timeout
  socket
end

#thrift_transport(server, port) ⇒ Object



36
37
38
39
40
41
42
43
44
45
46
47
48
49
# File 'lib/impala/connection.rb', line 36

def thrift_transport(server, port)
  socket = thrift_socket(server, port, @options[:timeout])

  case @options[:transport]
  when :buffered
    return Thrift::BufferedTransport.new(socket)
  when :sasl
    opts = parse_sasl_params(@options[:sasl_params])
    mechanism = opts.delete(:mechanism)
    return SASLTransport.new(socket, mechanism, opts)
  else
    raise "Unrecognised transport type '#{@options[:transport]}'"
  end
end