Class: Vertica::Connection

Inherits:
Object
  • Object
show all
Defined in:
lib/vertica/connection.rb

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options = {}) ⇒ Connection

Opens a connectio the a Vertica server

Parameters:

  • options (Hash) (defaults to: {})

    The connection options to use.



15
16
17
18
19
20
21
22
23
24
25
26
27
# File 'lib/vertica/connection.rb', line 15

def initialize(options = {})
  reset_values

  @options = {}

  options.each { |key, value| @options[key.to_s.to_sym] = value if value}
  
  @options[:port] ||= 5433
  @options[:read_timeout] ||= 30

  @row_style = @options[:row_style] ? @options[:row_style] : :hash
  boot_connection unless options[:skip_startup]
end

Instance Attribute Details

#backend_keyObject (readonly)

Returns the value of attribute backend_key.



5
6
7
# File 'lib/vertica/connection.rb', line 5

def backend_key
  @backend_key
end

#backend_pidObject (readonly)

Returns the value of attribute backend_pid.



5
6
7
# File 'lib/vertica/connection.rb', line 5

def backend_pid
  @backend_pid
end

#debugObject

Returns the value of attribute debug.



7
8
9
# File 'lib/vertica/connection.rb', line 7

def debug
  @debug
end

#notice_handlerObject (readonly)

Returns the value of attribute notice_handler.



5
6
7
# File 'lib/vertica/connection.rb', line 5

def notice_handler
  @notice_handler
end

#noticesObject (readonly)

Returns the value of attribute notices.



5
6
7
# File 'lib/vertica/connection.rb', line 5

def notices
  @notices
end

#optionsObject

Returns the value of attribute options.



7
8
9
# File 'lib/vertica/connection.rb', line 7

def options
  @options
end

#parametersObject (readonly)

Returns the value of attribute parameters.



5
6
7
# File 'lib/vertica/connection.rb', line 5

def parameters
  @parameters
end

#row_styleObject

Returns the value of attribute row_style.



7
8
9
# File 'lib/vertica/connection.rb', line 7

def row_style
  @row_style
end

#session_idObject (readonly)

Returns the value of attribute session_id.



5
6
7
# File 'lib/vertica/connection.rb', line 5

def session_id
  @session_id
end

#transaction_statusObject (readonly)

Returns the value of attribute transaction_status.



5
6
7
# File 'lib/vertica/connection.rb', line 5

def transaction_status
  @transaction_status
end

Class Method Details

.cancel(existing_conn) ⇒ Object



9
10
11
# File 'lib/vertica/connection.rb', line 9

def self.cancel(existing_conn)
  existing_conn.cancel
end

Instance Method Details

#boot_connectionObject



100
101
102
103
# File 'lib/vertica/connection.rb', line 100

def boot_connection
  startup_connection
  initialize_connection
end

#busy?Boolean

Returns:

  • (Boolean)


64
65
66
# File 'lib/vertica/connection.rb', line 64

def busy?
  !ready_for_query?
end

#cancelObject



105
106
107
108
109
110
# File 'lib/vertica/connection.rb', line 105

def cancel
  conn = self.class.new(options.merge(:skip_startup => true))
  conn.write Vertica::Messages::CancelRequest.new(backend_pid, backend_key)
  conn.write Vertica::Messages::Flush.new
  conn.socket.close
end

#closeObject



81
82
83
84
85
# File 'lib/vertica/connection.rb', line 81

def close
  write Vertica::Messages::Terminate.new
ensure
  close_socket
end

#close_socketObject



87
88
89
90
91
92
93
# File 'lib/vertica/connection.rb', line 87

def close_socket
  socket.close
  @socket = nil
rescue SystemCallError
ensure
  reset_values
end

#closed?Boolean

Returns:

  • (Boolean)


60
61
62
# File 'lib/vertica/connection.rb', line 60

def closed?
  !opened?
end

#copy(sql, source = nil, &block) ⇒ Object



167
168
169
170
171
172
173
174
175
176
177
# File 'lib/vertica/connection.rb', line 167

def copy(sql, source = nil, &block)
  job = Vertica::Query.new(self, sql, :row_style => @row_style)
  if block_given?
    job.copy_handler = block
  elsif source && File.exists?(source.to_s)
    job.copy_handler = lambda { |data| file_copy_handler(source, data) }
  elsif source.respond_to?(:read) && source.respond_to?(:eof?)
    job.copy_handler = lambda { |data| io_copy_handler(source, data) }
  end
  run_with_job_lock(job)
end

#inspectObject



179
180
181
182
# File 'lib/vertica/connection.rb', line 179

def inspect
  safe_options = @options.reject{ |name, _| name == :password }
  "#<Vertica::Connection:#{object_id} @parameters=#{@parameters.inspect} @backend_pid=#{@backend_pid}, @backend_key=#{@backend_key}, @transaction_status=#{@transaction_status}, @socket=#{@socket}, @options=#{safe_options.inspect}, @row_style=#{@row_style}>"
end

#interruptObject



112
113
114
115
116
117
118
# File 'lib/vertica/connection.rb', line 112

def interrupt
  raise Vertica::Error::InterruptImpossible, "Session cannopt be interrupted because the session ID is not known!" if session_id.nil?
  conn = self.class.new(options.merge(:interruptable => false, :role => nil, :search_path => nil))
  response = conn.query("SELECT CLOSE_SESSION(#{Vertica.quote(session_id)})").the_value
  conn.close
  return response
end

#interruptable?Boolean

Returns:

  • (Boolean)


120
121
122
# File 'lib/vertica/connection.rb', line 120

def interruptable?
  !session_id.nil?
end

#on_notice(&block) ⇒ Object



29
30
31
# File 'lib/vertica/connection.rb', line 29

def on_notice(&block)
  @notice_handler = block
end

#opened?Boolean

Returns:

  • (Boolean)


56
57
58
# File 'lib/vertica/connection.rb', line 56

def opened?
  @socket && @backend_pid && @transaction_status
end

#process_message(message) ⇒ Object



142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
# File 'lib/vertica/connection.rb', line 142

def process_message(message)
  case message
  when Vertica::Messages::ErrorResponse
    raise Vertica::Error::ConnectionError.new(message.error_message)
  when Vertica::Messages::NoticeResponse
    @notice_handler.call(message) if @notice_handler
  when Vertica::Messages::BackendKeyData
    @backend_pid = message.pid
    @backend_key = message.key
  when Vertica::Messages::ParameterStatus
    @parameters[message.name] = message.value
  when Vertica::Messages::ReadyForQuery
    @transaction_status = message.transaction_status
    @current_job = nil
  else
    raise Vertica::Error::MessageError, "Unhandled message: #{message.inspect}"
  end
end

#query(sql, options = {}, &block) ⇒ Object



161
162
163
164
165
# File 'lib/vertica/connection.rb', line 161

def query(sql, options = {}, &block)
  job = Vertica::Query.new(self, sql, { :row_style => @row_style }.merge(options))
  job.row_handler = block if block_given?
  run_with_job_lock(job)
end

#read_messageObject



124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
# File 'lib/vertica/connection.rb', line 124

def read_message
  ready = IO.select([socket], nil, nil, @options[:read_timeout])
  if ready
    type = read_bytes(1)
    size = read_bytes(4).unpack('N').first
    raise Vertica::Error::MessageError.new("Bad message size: #{size}.") unless size >= 4
    message = Vertica::Messages::BackendMessage.factory type, read_bytes(size - 4)
    puts "<= #{message.inspect}" if @debug
    return message
  else
    close
    raise Vertica::Error::TimedOutError.new("Connection timed out.")
  end
rescue SystemCallError => e
  close_socket
  raise Vertica::Error::ConnectionError.new(e.message)
end

#ready_for_query?Boolean

Returns:

  • (Boolean)


68
69
70
# File 'lib/vertica/connection.rb', line 68

def ready_for_query?
  @current_job.nil?
end

#reset_connectionObject



95
96
97
98
# File 'lib/vertica/connection.rb', line 95

def reset_connection
  close
  boot_connection
end

#socketObject



33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
# File 'lib/vertica/connection.rb', line 33

def socket
  @socket ||= begin
    raw_socket = TCPSocket.new(@options[:host], @options[:port].to_i)
    if @options[:ssl]
      require 'openssl'
      raw_socket.write Vertica::Messages::SslRequest.new.to_bytes
      if raw_socket.read(1) == 'S'
        raw_socket = OpenSSL::SSL::SSLSocket.new(raw_socket, OpenSSL::SSL::SSLContext.new)
        raw_socket.sync = true
        raw_socket.connect
      else
        raise Vertica::Error::SSLNotSupported.new("SSL requested but server doesn't support it.")
      end
    end
    
    raw_socket
  end
end

#ssl?Boolean

Returns:

  • (Boolean)


52
53
54
# File 'lib/vertica/connection.rb', line 52

def ssl?
  Object.const_defined?('OpenSSL') && socket.kind_of?(OpenSSL::SSL::SSLSocket)
end

#write(message) ⇒ Object



72
73
74
75
76
77
78
79
# File 'lib/vertica/connection.rb', line 72

def write(message)
  raise ArgumentError, "invalid message: (#{message.inspect})" unless message.respond_to?(:to_bytes)
  puts "=> #{message.inspect}" if @debug
  socket.write message.to_bytes
rescue SystemCallError => e
  close_socket
  raise Vertica::Error::ConnectionError.new(e.message)
end