Class: RBHive::TCLIConnection

Inherits:
Object
  • Object
show all
Defined in:
lib/rbhive/t_c_l_i_connection.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(server, port = 10_000, options = {}, logger = StdOutLogger.new) ⇒ TCLIConnection

Returns a new instance of TCLIConnection.



84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
# File 'lib/rbhive/t_c_l_i_connection.rb', line 84

def initialize(server, port = 10_000, options = {}, logger = StdOutLogger.new)
  options ||= {} # backwards compatibility
  raise "'options' parameter must be a hash" unless options.is_a?(Hash)
  
  if options[:transport] == :sasl and options[:sasl_params].nil?
    raise ":transport is set to :sasl, but no :sasl_params option was supplied"
  end
  
  # Defaults to buffered transport, Hive 0.10, 1800 second timeout
  options[:transport]     ||= :buffered
  options[:hive_version]  ||= 10
  options[:timeout]       ||= 1800
  @options = options
  
  # Look up the appropriate Thrift protocol version for the supplied Hive version
  @thrift_protocol_version = thrift_hive_protocol(options[:hive_version])
  
  @logger = logger
  @transport = thrift_transport(server, port)
  @protocol = Thrift::BinaryProtocol.new(@transport)
  @client = Hive2::Thrift::TCLIService::Client.new(@protocol)
  @session = nil
  @logger.info("Connecting to HiveServer2 #{server} on port #{port}")
end

Dynamic Method Handling

This class handles dynamic methods through the method_missing method

#method_missing(meth, *args) ⇒ Object



371
372
373
# File 'lib/rbhive/t_c_l_i_connection.rb', line 371

def method_missing(meth, *args)
  client.send(meth, *args)
end

Instance Attribute Details

#clientObject (readonly)

Returns the value of attribute client.



82
83
84
# File 'lib/rbhive/t_c_l_i_connection.rb', line 82

def client
  @client
end

Instance Method Details

#add_columns(schema) ⇒ Object



367
368
369
# File 'lib/rbhive/t_c_l_i_connection.rb', line 367

def add_columns(schema)
  execute(schema.add_columns_statement)
end

#async_cancel(handles) ⇒ Object



232
233
234
# File 'lib/rbhive/t_c_l_i_connection.rb', line 232

def async_cancel(handles)
  @client.CancelOperation(prepare_cancel_request(handles))
end

#async_close_session(handles) ⇒ Object



294
295
296
297
# File 'lib/rbhive/t_c_l_i_connection.rb', line 294

def async_close_session(handles)
  validate_handles!(handles)
  @client.CloseSession(Hive2::Thrift::TCloseSessionReq.new( sessionHandle: handles[:session] ))
end

#async_execute(query) ⇒ Object

Async execute



193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
# File 'lib/rbhive/t_c_l_i_connection.rb', line 193

def async_execute(query)
  @logger.info("Executing query asynchronously: #{query}")
  exec_result = @client.ExecuteStatement(
    Hive2::Thrift::TExecuteStatementReq.new(
      sessionHandle: @session.sessionHandle,
      statement: query,
      runAsync: true
    )
  )
  raise_error_if_failed!(exec_result)
  op_handle = exec_result.operationHandle

  # Return handles to get hold of this query / session again
  {
    session: @session.sessionHandle, 
    guid: op_handle.operationId.guid, 
    secret: op_handle.operationId.secret
  }
end

#async_fetch(handles, max_rows = 100) ⇒ Object

Async fetch results from an async execute



267
268
269
270
271
272
273
274
275
# File 'lib/rbhive/t_c_l_i_connection.rb', line 267

def async_fetch(handles, max_rows = 100)
  # Can't get data from an unfinished query
  unless async_is_complete?(handles)
    raise "Can't perform fetch on a query in state: #{async_state(handles)}"
  end
  
  # Fetch and
  fetch_rows(prepare_operation_handle(handles), :first, max_rows)
end

#async_fetch_in_batch(handles, batch_size = 1000, &block) ⇒ Object

Performs a query on the server, fetches the results in batches of batch_size rows and yields the result batches to a given block as arrays of rows.



279
280
281
282
283
284
285
286
287
288
289
290
291
292
# File 'lib/rbhive/t_c_l_i_connection.rb', line 279

def async_fetch_in_batch(handles, batch_size = 1000, &block)
  raise "No block given for the batch fetch request!" unless block_given?
  # Can't get data from an unfinished query
  unless async_is_complete?(handles)
    raise "Can't perform fetch on a query in state: #{async_state(handles)}"
  end

  # Now let's iterate over the results
  loop do
    rows = fetch_rows(prepare_operation_handle(handles), :next, batch_size)
    break if rows.empty?
    yield rows
  end
end

#async_is_cancelled?(handles) ⇒ Boolean

Returns:

  • (Boolean)


228
229
230
# File 'lib/rbhive/t_c_l_i_connection.rb', line 228

def async_is_cancelled?(handles)
  async_state(handles) == :cancelled
end

#async_is_complete?(handles) ⇒ Boolean

Is the query complete?

Returns:

  • (Boolean)


214
215
216
# File 'lib/rbhive/t_c_l_i_connection.rb', line 214

def async_is_complete?(handles)
  async_state(handles) == :finished
end

#async_is_failed?(handles) ⇒ Boolean

Has the query failed?

Returns:

  • (Boolean)


224
225
226
# File 'lib/rbhive/t_c_l_i_connection.rb', line 224

def async_is_failed?(handles)
  async_state(handles) == :error
end

#async_is_running?(handles) ⇒ Boolean

Is the query actually running?

Returns:

  • (Boolean)


219
220
221
# File 'lib/rbhive/t_c_l_i_connection.rb', line 219

def async_is_running?(handles)
  async_state(handles) == :running
end

#async_state(handles) ⇒ Object

Map states to symbols



237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
# File 'lib/rbhive/t_c_l_i_connection.rb', line 237

def async_state(handles)
  response = @client.GetOperationStatus(
    Hive2::Thrift::TGetOperationStatusReq.new(operationHandle: prepare_operation_handle(handles))
  )

  case response.operationState
  when Hive2::Thrift::TOperationState::FINISHED_STATE
    return :finished
  when Hive2::Thrift::TOperationState::INITIALIZED_STATE
    return :initialized
  when Hive2::Thrift::TOperationState::RUNNING_STATE
    return :running
  when Hive2::Thrift::TOperationState::CANCELED_STATE
    return :cancelled
  when Hive2::Thrift::TOperationState::CLOSED_STATE
    return :closed
  when Hive2::Thrift::TOperationState::ERROR_STATE
    return :error
  when Hive2::Thrift::TOperationState::UKNOWN_STATE
    return :unknown
  when Hive2::Thrift::TOperationState::PENDING_STATE
    return :pending
  when nil
    raise "No operation state found for handles - has the session been closed?"
  else
    return :state_not_in_protocol
  end
end

#closeObject



150
151
152
# File 'lib/rbhive/t_c_l_i_connection.rb', line 150

def close
  @transport.close
end

#close_sessionObject



158
159
160
161
# File 'lib/rbhive/t_c_l_i_connection.rb', line 158

def close_session
  @client.CloseSession prepare_close_session
  @session = nil
end

#create_table(schema) ⇒ Object



354
355
356
# File 'lib/rbhive/t_c_l_i_connection.rb', line 354

def create_table(schema)
  execute(schema.create_table_statement)
end

#drop_table(name) ⇒ Object



358
359
360
361
# File 'lib/rbhive/t_c_l_i_connection.rb', line 358

def drop_table(name)
  name = name.name if name.is_a?(TableSchema)
  execute("DROP TABLE `#{name}`")
end

#execute(query) ⇒ Object



171
172
173
174
175
176
177
# File 'lib/rbhive/t_c_l_i_connection.rb', line 171

def execute(query)
  @logger.info("Executing Hive Query: #{query}")
  req = prepare_execute_statement(query)
  exec_result = client.ExecuteStatement(req)
  raise_error_if_failed!(exec_result)
  exec_result
end

#explain(query) ⇒ Object

Performs a explain on the supplied query on the server, returns it as a ExplainResult. (Only works on 0.12 if you have this patch - issues.apache.org/jira/browse/HIVE-5492)



310
311
312
313
314
315
316
# File 'lib/rbhive/t_c_l_i_connection.rb', line 310

def explain(query)
  rows = []
  fetch_in_batch("EXPLAIN " + query) do |batch|
    rows << batch.map { |b| b[:Explain] }
  end
  ExplainResult.new(rows.flatten)
end

#fetch(query, max_rows = 100) ⇒ Object

Performs a query on the server, fetches up to max_rows rows and returns them as an array.



319
320
321
322
323
324
325
326
327
328
329
# File 'lib/rbhive/t_c_l_i_connection.rb', line 319

def fetch(query, max_rows = 100)
  # Execute the query and check the result
  exec_result = execute(query)
  raise_error_if_failed!(exec_result)

  # Get search operation handle to fetch the results
  op_handle = exec_result.operationHandle
  
  # Fetch the rows
  fetch_rows(op_handle, :first, max_rows)
end

#fetch_in_batch(query, batch_size = 1000, &block) ⇒ Object

Performs a query on the server, fetches the results in batches of batch_size rows and yields the result batches to a given block as arrays of rows.



333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
# File 'lib/rbhive/t_c_l_i_connection.rb', line 333

def fetch_in_batch(query, batch_size = 1000, &block)
  raise "No block given for the batch fetch request!" unless block_given?
  
  # Execute the query and check the result
  exec_result = execute(query)
  raise_error_if_failed!(exec_result)

  # Get search operation handle to fetch the results
  op_handle = exec_result.operationHandle

  # Prepare fetch results request
  fetch_req = prepare_fetch_results(op_handle, :next, batch_size)

  # Now let's iterate over the results
  loop do
    rows = fetch_rows(op_handle, :next, batch_size)
    break if rows.empty?
    yield rows
  end
end

#fetch_rows(op_handle, orientation = :first, max_rows = 1000) ⇒ Object

Pull rows from the query result



300
301
302
303
304
305
306
# File 'lib/rbhive/t_c_l_i_connection.rb', line 300

def fetch_rows(op_handle, orientation = :first, max_rows = 1000)
  fetch_req = prepare_fetch_results(op_handle, orientation, max_rows)
  fetch_results = @client.FetchResults(fetch_req)
  raise_error_if_failed!(fetch_results)
  rows = fetch_results.results.rows
  TCLIResultSet.new(rows, TCLISchemaDefinition.new(get_schema_for(op_handle), rows.first))
end

#openObject



146
147
148
# File 'lib/rbhive/t_c_l_i_connection.rb', line 146

def open
  @transport.open
end

#open_sessionObject



154
155
156
# File 'lib/rbhive/t_c_l_i_connection.rb', line 154

def open_session
  @session = @client.OpenSession(prepare_open_session(@thrift_protocol_version))
end

#parse_sasl_params(sasl_params) ⇒ Object

Processes SASL connection params and returns a hash with symbol keys or a nil



135
136
137
138
139
140
141
142
143
144
# File 'lib/rbhive/t_c_l_i_connection.rb', line 135

def parse_sasl_params(sasl_params)
  # Symbilize keys in a hash
  if sasl_params.kind_of?(Hash)
    return sasl_params.inject({}) do |memo,(k,v)|
      memo[k.to_sym] = v;
      memo
    end
  end
  return nil
end

#priority=(priority) ⇒ Object



179
180
181
# File 'lib/rbhive/t_c_l_i_connection.rb', line 179

def priority=(priority)
  set("mapred.job.priority", priority)
end

#queue=(queue) ⇒ Object



183
184
185
# File 'lib/rbhive/t_c_l_i_connection.rb', line 183

def queue=(queue)
  set("mapred.job.queue.name", queue)
end

#replace_columns(schema) ⇒ Object



363
364
365
# File 'lib/rbhive/t_c_l_i_connection.rb', line 363

def replace_columns(schema)
  execute(schema.replace_columns_statement)
end

#sessionObject



163
164
165
# File 'lib/rbhive/t_c_l_i_connection.rb', line 163

def session
  @session && @session.sessionHandle
end

#set(name, value) ⇒ Object



187
188
189
190
# File 'lib/rbhive/t_c_l_i_connection.rb', line 187

def set(name,value)
  @logger.info("Setting #{name}=#{value}")
  self.execute("SET #{name}=#{value}")
end

#thrift_hive_protocol(version) ⇒ Object



109
110
111
# File 'lib/rbhive/t_c_l_i_connection.rb', line 109

def thrift_hive_protocol(version)
  HIVE_THRIFT_MAPPING[version] || raise("Invalid Hive version")
end

#thrift_socket(server, port, timeout) ⇒ Object



128
129
130
131
132
# File 'lib/rbhive/t_c_l_i_connection.rb', line 128

def thrift_socket(server, port, timeout)
  socket = Thrift::Socket.new(server, port)
  socket.timeout = timeout
  socket
end

#thrift_transport(server, port) ⇒ Object



113
114
115
116
117
118
119
120
121
122
123
124
125
126
# File 'lib/rbhive/t_c_l_i_connection.rb', line 113

def thrift_transport(server, port)
  @logger.info("Initializing transport #{@options[:transport]}")
  case @options[:transport]
  when :buffered
    return Thrift::BufferedTransport.new(thrift_socket(server, port, @options[:timeout]))
  when :sasl
    return Thrift::SaslClientTransport.new(thrift_socket(server, port, @options[:timeout]),
                                           parse_sasl_params(@options[:sasl_params]))
  when :http
    return Thrift::HTTPClientTransport.new("http://#{server}:#{port}/cliservice")
  else
    raise "Unrecognised transport type '#{transport}'"
  end
end