Class: RBHive::TCLIConnection

Inherits:
Object
  • Object
show all
Defined in:
lib/rbhive/t_c_l_i_connection.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(server, port = 10_000, options = {}, logger = StdOutLogger.new) ⇒ TCLIConnection

Returns a new instance of TCLIConnection.



83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
# File 'lib/rbhive/t_c_l_i_connection.rb', line 83

def initialize(server, port=10_000, options={}, logger=StdOutLogger.new)
  options ||= {} # backwards compatibility
  raise "'options' parameter must be a hash" unless options.is_a?(Hash)
  
  if options[:transport] == :sasl and options[:sasl_params].nil?
    raise ":transport is set to :sasl, but no :sasl_params option was supplied"
  end
  
  # Defaults to buffered transport, Hive 0.10, 1800 second timeout
  options[:transport]     ||= :buffered
  options[:hive_version]  ||= 10
  options[:timeout]       ||= 1800
  @options = options
  
  # Look up the appropriate Thrift protocol version for the supplied Hive version
  @thrift_protocol_version = thrift_hive_protocol(options[:hive_version])
  
  @logger = logger
  @transport = thrift_transport(server, port)
  @protocol = Thrift::BinaryProtocol.new(@transport)
  @client = Hive2::Thrift::TCLIService::Client.new(@protocol)
  @session = nil
  @logger.info("Connecting to HiveServer2 #{server} on port #{port}")
  @mutex = Mutex.new
end

Dynamic Method Handling

This class handles dynamic methods through the method_missing method

#method_missing(meth, *args) ⇒ Object



272
273
274
# File 'lib/rbhive/t_c_l_i_connection.rb', line 272

def method_missing(meth, *args)
  client.send(meth, *args)
end

Instance Attribute Details

#clientObject (readonly)

Returns the value of attribute client.



81
82
83
# File 'lib/rbhive/t_c_l_i_connection.rb', line 81

def client
  @client
end

Instance Method Details

#add_columns(schema) ⇒ Object



268
269
270
# File 'lib/rbhive/t_c_l_i_connection.rb', line 268

def add_columns(schema)
  execute(schema.add_columns_statement)
end

#closeObject



150
151
152
# File 'lib/rbhive/t_c_l_i_connection.rb', line 150

def close
  @transport.close
end

#close_sessionObject



158
159
160
161
# File 'lib/rbhive/t_c_l_i_connection.rb', line 158

def close_session
  @client.CloseSession prepare_close_session
  @session = nil
end

#create_table(schema) ⇒ Object



255
256
257
# File 'lib/rbhive/t_c_l_i_connection.rb', line 255

def create_table(schema)
  execute(schema.create_table_statement)
end

#drop_table(name) ⇒ Object



259
260
261
262
# File 'lib/rbhive/t_c_l_i_connection.rb', line 259

def drop_table(name)
  name = name.name if name.is_a?(TableSchema)
  execute("DROP TABLE `#{name}`")
end

#execute(query) ⇒ Object



171
172
173
# File 'lib/rbhive/t_c_l_i_connection.rb', line 171

def execute(query)
  execute_safe(query)
end

#explain(query) ⇒ Object

Performs a explain on the supplied query on the server, returns it as a ExplainResult. (Only works on 0.12 if you have this patch - issues.apache.org/jira/browse/HIVE-5492)



190
191
192
193
194
195
196
# File 'lib/rbhive/t_c_l_i_connection.rb', line 190

def explain(query)
  rows = []
  fetch_in_batch("EXPLAIN " + query) do |batch|
    rows << batch.map { |b| b[:Explain] }
  end
  ExplainResult.new(rows.flatten)
end

#fetch(query, max_rows = 100) ⇒ Object

Performs a query on the server, fetches up to max_rows rows and returns them as an array.



199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
# File 'lib/rbhive/t_c_l_i_connection.rb', line 199

def fetch(query, max_rows = 100)
  safe do
    # Execute the query and check the result
    exec_result = execute_unsafe(query)
    raise_error_if_failed!(exec_result)

    # Get search operation handle to fetch the results
    op_handle = exec_result.operationHandle

    # Prepare and execute fetch results request
    fetch_req = prepare_fetch_results(op_handle, :first, max_rows)
    fetch_results = client.FetchResults(fetch_req)
    raise_error_if_failed!(fetch_results)

    # Get data rows and format the result
    rows = fetch_results.results.rows
    the_schema = TCLISchemaDefinition.new(get_schema_for( op_handle ), rows.first)
    TCLIResultSet.new(rows, the_schema)
  end
end

#fetch_in_batch(query, batch_size = 1000, &block) ⇒ Object

Performs a query on the server, fetches the results in batches of batch_size rows and yields the result batches to a given block as arrays of rows.



222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
# File 'lib/rbhive/t_c_l_i_connection.rb', line 222

def fetch_in_batch(query, batch_size = 1000, &block)
  raise "No block given for the batch fetch request!" unless block_given?
  safe do
    # Execute the query and check the result
    exec_result = execute_unsafe(query)
    raise_error_if_failed!(exec_result)

    # Get search operation handle to fetch the results
    op_handle = exec_result.operationHandle

    # Prepare fetch results request
    fetch_req = prepare_fetch_results(op_handle, :next, batch_size)

    # Now let's iterate over the results
    loop do
      # Fetch next batch and raise an exception if it failed
      fetch_results = client.FetchResults(fetch_req)
      raise_error_if_failed!(fetch_results)

      # Get data rows from the result
      rows = fetch_results.results.rows
      break if rows.empty?

      # Prepare schema definition for the row
      schema_for_req ||= get_schema_for(op_handle)
      the_schema ||= TCLISchemaDefinition.new(schema_for_req, rows.first)

      # Format the results and yield them to the given block
      yield TCLIResultSet.new(rows, the_schema)
    end
  end
end

#openObject



146
147
148
# File 'lib/rbhive/t_c_l_i_connection.rb', line 146

def open
  @transport.open
end

#open_sessionObject



154
155
156
# File 'lib/rbhive/t_c_l_i_connection.rb', line 154

def open_session
  @session = @client.OpenSession(prepare_open_session(@thrift_protocol_version))
end

#parse_sasl_params(sasl_params) ⇒ Object

Processes SASL connection params and returns a hash with symbol keys or a nil



135
136
137
138
139
140
141
142
143
144
# File 'lib/rbhive/t_c_l_i_connection.rb', line 135

def parse_sasl_params(sasl_params)
  # Symbilize keys in a hash
  if sasl_params.kind_of?(Hash)
    return sasl_params.inject({}) do |memo,(k,v)|
      memo[k.to_sym] = v;
      memo
    end
  end
  return nil
end

#priority=(priority) ⇒ Object



175
176
177
# File 'lib/rbhive/t_c_l_i_connection.rb', line 175

def priority=(priority)
  set("mapred.job.priority", priority)
end

#queue=(queue) ⇒ Object



179
180
181
# File 'lib/rbhive/t_c_l_i_connection.rb', line 179

def queue=(queue)
  set("mapred.job.queue.name", queue)
end

#replace_columns(schema) ⇒ Object



264
265
266
# File 'lib/rbhive/t_c_l_i_connection.rb', line 264

def replace_columns(schema)
  execute(schema.replace_columns_statement)
end

#sessionObject



163
164
165
# File 'lib/rbhive/t_c_l_i_connection.rb', line 163

def session
  @session && @session.sessionHandle
end

#set(name, value) ⇒ Object



183
184
185
186
# File 'lib/rbhive/t_c_l_i_connection.rb', line 183

def set(name,value)
  @logger.info("Setting #{name}=#{value}")
  self.execute("SET #{name}=#{value}")
end

#thrift_hive_protocol(version) ⇒ Object



109
110
111
# File 'lib/rbhive/t_c_l_i_connection.rb', line 109

def thrift_hive_protocol(version)
  HIVE_THRIFT_MAPPING[version] || raise("Invalid Hive version")
end

#thrift_socket(server, port, timeout) ⇒ Object



128
129
130
131
132
# File 'lib/rbhive/t_c_l_i_connection.rb', line 128

def thrift_socket(server, port, timeout)
  socket = Thrift::Socket.new(server, port)
  socket.timeout = timeout
  socket
end

#thrift_transport(server, port) ⇒ Object



113
114
115
116
117
118
119
120
121
122
123
124
125
126
# File 'lib/rbhive/t_c_l_i_connection.rb', line 113

def thrift_transport(server, port)
  @logger.info("Initializing transport #{@options[:transport]}")
  case @options[:transport]
  when :buffered
    return Thrift::BufferedTransport.new(thrift_socket(server, port, @options[:timeout]))
  when :sasl
    return Thrift::SaslClientTransport.new(thrift_socket(server, port, @options[:timeout]),
                                           parse_sasl_params(@options[:sasl_params]))
  when :http
    return Thrift::HTTPClientTransport.new("http://#{server}:#{port}/cliservice")
  else
    raise "Unrecognised transport type '#{transport}'"
  end
end