Class: RBHive::TCLIConnection
- Inherits:
-
Object
- Object
- RBHive::TCLIConnection
show all
- Defined in:
- lib/rbhive/t_c_l_i_connection.rb
Instance Attribute Summary collapse
Instance Method Summary
collapse
-
#add_columns(schema) ⇒ Object
-
#close ⇒ Object
-
#close_session ⇒ Object
-
#create_table(schema) ⇒ Object
-
#drop_table(name) ⇒ Object
-
#execute(query) ⇒ Object
-
#explain(query) ⇒ Object
Performs a explain on the supplied query on the server, returns it as a ExplainResult.
-
#fetch(query, max_rows = 100) ⇒ Object
Performs a query on the server, fetches up to max_rows rows and returns them as an array.
-
#fetch_in_batch(query, batch_size = 1000, &block) ⇒ Object
Performs a query on the server, fetches the results in batches of batch_size rows and yields the result batches to a given block as arrays of rows.
-
#initialize(server, port = 10_000, options = {}, logger = StdOutLogger.new) ⇒ TCLIConnection
constructor
A new instance of TCLIConnection.
-
#method_missing(meth, *args) ⇒ Object
-
#open ⇒ Object
-
#open_session ⇒ Object
-
#parse_sasl_params(sasl_params) ⇒ Object
Processes SASL connection params and returns a hash with symbol keys or a nil.
-
#priority=(priority) ⇒ Object
-
#queue=(queue) ⇒ Object
-
#replace_columns(schema) ⇒ Object
-
#session ⇒ Object
-
#set(name, value) ⇒ Object
-
#thrift_hive_protocol(version) ⇒ Object
-
#thrift_socket(server, port, timeout) ⇒ Object
-
#thrift_transport(server, port) ⇒ Object
Constructor Details
#initialize(server, port = 10_000, options = {}, logger = StdOutLogger.new) ⇒ TCLIConnection
Returns a new instance of TCLIConnection.
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
|
# File 'lib/rbhive/t_c_l_i_connection.rb', line 83
def initialize(server, port=10_000, options={}, logger=StdOutLogger.new)
options ||= {} raise "'options' parameter must be a hash" unless options.is_a?(Hash)
if options[:transport] == :sasl and options[:sasl_params].nil?
raise ":transport is set to :sasl, but no :sasl_params option was supplied"
end
options[:transport] ||= :buffered
options[:hive_version] ||= 10
options[:timeout] ||= 1800
@options = options
@thrift_protocol_version = thrift_hive_protocol(options[:hive_version])
@logger = logger
@transport = thrift_transport(server, port)
@protocol = Thrift::BinaryProtocol.new(@transport)
@client = Hive2::Thrift::TCLIService::Client.new(@protocol)
@session = nil
@logger.info("Connecting to HiveServer2 #{server} on port #{port}")
@mutex = Mutex.new
end
|
Dynamic Method Handling
This class handles dynamic methods through the method_missing method
#method_missing(meth, *args) ⇒ Object
272
273
274
|
# File 'lib/rbhive/t_c_l_i_connection.rb', line 272
def method_missing(meth, *args)
client.send(meth, *args)
end
|
Instance Attribute Details
#client ⇒ Object
Returns the value of attribute client.
81
82
83
|
# File 'lib/rbhive/t_c_l_i_connection.rb', line 81
def client
@client
end
|
Instance Method Details
#add_columns(schema) ⇒ Object
268
269
270
|
# File 'lib/rbhive/t_c_l_i_connection.rb', line 268
def add_columns(schema)
execute(schema.add_columns_statement)
end
|
#close ⇒ Object
150
151
152
|
# File 'lib/rbhive/t_c_l_i_connection.rb', line 150
def close
@transport.close
end
|
#close_session ⇒ Object
158
159
160
161
|
# File 'lib/rbhive/t_c_l_i_connection.rb', line 158
def close_session
@client.CloseSession prepare_close_session
@session = nil
end
|
#create_table(schema) ⇒ Object
255
256
257
|
# File 'lib/rbhive/t_c_l_i_connection.rb', line 255
def create_table(schema)
execute(schema.create_table_statement)
end
|
#drop_table(name) ⇒ Object
259
260
261
262
|
# File 'lib/rbhive/t_c_l_i_connection.rb', line 259
def drop_table(name)
name = name.name if name.is_a?(TableSchema)
execute("DROP TABLE `#{name}`")
end
|
#execute(query) ⇒ Object
171
172
173
|
# File 'lib/rbhive/t_c_l_i_connection.rb', line 171
def execute(query)
execute_safe(query)
end
|
#explain(query) ⇒ Object
190
191
192
193
194
195
196
|
# File 'lib/rbhive/t_c_l_i_connection.rb', line 190
def explain(query)
rows = []
fetch_in_batch("EXPLAIN " + query) do |batch|
rows << batch.map { |b| b[:Explain] }
end
ExplainResult.new(rows.flatten)
end
|
#fetch(query, max_rows = 100) ⇒ Object
Performs a query on the server, fetches up to max_rows rows and returns them as an array.
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
|
# File 'lib/rbhive/t_c_l_i_connection.rb', line 199
def fetch(query, max_rows = 100)
safe do
exec_result = execute_unsafe(query)
raise_error_if_failed!(exec_result)
op_handle = exec_result.operationHandle
fetch_req = prepare_fetch_results(op_handle, :first, max_rows)
fetch_results = client.FetchResults(fetch_req)
raise_error_if_failed!(fetch_results)
rows = fetch_results.results.rows
the_schema = TCLISchemaDefinition.new(get_schema_for( op_handle ), rows.first)
TCLIResultSet.new(rows, the_schema)
end
end
|
#fetch_in_batch(query, batch_size = 1000, &block) ⇒ Object
Performs a query on the server, fetches the results in batches of batch_size rows and yields the result batches to a given block as arrays of rows.
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
|
# File 'lib/rbhive/t_c_l_i_connection.rb', line 222
def fetch_in_batch(query, batch_size = 1000, &block)
raise "No block given for the batch fetch request!" unless block_given?
safe do
exec_result = execute_unsafe(query)
raise_error_if_failed!(exec_result)
op_handle = exec_result.operationHandle
fetch_req = prepare_fetch_results(op_handle, :next, batch_size)
loop do
fetch_results = client.FetchResults(fetch_req)
raise_error_if_failed!(fetch_results)
rows = fetch_results.results.rows
break if rows.empty?
schema_for_req ||= get_schema_for(op_handle)
the_schema ||= TCLISchemaDefinition.new(schema_for_req, rows.first)
yield TCLIResultSet.new(rows, the_schema)
end
end
end
|
#open ⇒ Object
146
147
148
|
# File 'lib/rbhive/t_c_l_i_connection.rb', line 146
def open
@transport.open
end
|
#open_session ⇒ Object
154
155
156
|
# File 'lib/rbhive/t_c_l_i_connection.rb', line 154
def open_session
@session = @client.OpenSession(prepare_open_session(@thrift_protocol_version))
end
|
#parse_sasl_params(sasl_params) ⇒ Object
Processes SASL connection params and returns a hash with symbol keys or a nil
135
136
137
138
139
140
141
142
143
144
|
# File 'lib/rbhive/t_c_l_i_connection.rb', line 135
def parse_sasl_params(sasl_params)
if sasl_params.kind_of?(Hash)
return sasl_params.inject({}) do |memo,(k,v)|
memo[k.to_sym] = v;
memo
end
end
return nil
end
|
#priority=(priority) ⇒ Object
175
176
177
|
# File 'lib/rbhive/t_c_l_i_connection.rb', line 175
def priority=(priority)
set("mapred.job.priority", priority)
end
|
#queue=(queue) ⇒ Object
179
180
181
|
# File 'lib/rbhive/t_c_l_i_connection.rb', line 179
def queue=(queue)
set("mapred.job.queue.name", queue)
end
|
#replace_columns(schema) ⇒ Object
264
265
266
|
# File 'lib/rbhive/t_c_l_i_connection.rb', line 264
def replace_columns(schema)
execute(schema.replace_columns_statement)
end
|
#session ⇒ Object
163
164
165
|
# File 'lib/rbhive/t_c_l_i_connection.rb', line 163
def session
@session && @session.sessionHandle
end
|
#set(name, value) ⇒ Object
183
184
185
186
|
# File 'lib/rbhive/t_c_l_i_connection.rb', line 183
def set(name,value)
@logger.info("Setting #{name}=#{value}")
self.execute("SET #{name}=#{value}")
end
|
#thrift_hive_protocol(version) ⇒ Object
109
110
111
|
# File 'lib/rbhive/t_c_l_i_connection.rb', line 109
def thrift_hive_protocol(version)
HIVE_THRIFT_MAPPING[version] || raise("Invalid Hive version")
end
|
#thrift_socket(server, port, timeout) ⇒ Object
128
129
130
131
132
|
# File 'lib/rbhive/t_c_l_i_connection.rb', line 128
def thrift_socket(server, port, timeout)
socket = Thrift::Socket.new(server, port)
socket.timeout = timeout
socket
end
|
#thrift_transport(server, port) ⇒ Object
113
114
115
116
117
118
119
120
121
122
123
124
125
126
|
# File 'lib/rbhive/t_c_l_i_connection.rb', line 113
def thrift_transport(server, port)
@logger.info("Initializing transport #{@options[:transport]}")
case @options[:transport]
when :buffered
return Thrift::BufferedTransport.new(thrift_socket(server, port, @options[:timeout]))
when :sasl
return Thrift::SaslClientTransport.new(thrift_socket(server, port, @options[:timeout]),
parse_sasl_params(@options[:sasl_params]))
when :http
return Thrift::HTTPClientTransport.new("http://#{server}:#{port}/cliservice")
else
raise "Unrecognised transport type '#{transport}'"
end
end
|