Class: Embulk::Input::Presto
- Inherits:
-
InputPlugin
- Object
- InputPlugin
- Embulk::Input::Presto
- Defined in:
- lib/embulk/input/presto.rb,
lib/embulk/input/presto/connection.rb,
lib/embulk/input/presto/explain_parser.rb,
lib/embulk/input/presto/type_converter.rb
Defined Under Namespace
Classes: Connection, ExplainParser, TypeConverter
Class Method Summary collapse
- .build_output_columns(task) ⇒ Object
- .resume(task, columns, count, &control) ⇒ Object
- .transaction(config, &control) ⇒ Object
Instance Method Summary collapse
Class Method Details
.build_output_columns(task) ⇒ Object
41 42 43 44 45 46 47 48 49 50 51 |
# File 'lib/embulk/input/presto.rb', line 41 def self.build_output_columns(task) explain_query = "explain (FORMAT TEXT) " + task["query"] Embulk.logger.debug("SQL: #{explain_query}") explain_result = Connection.get_client(task).run("explain (FORMAT TEXT) " + task["query"]) columns = [] ExplainParser.parse(explain_result).each_with_index do |(name, type), i| columns << Column.new(i, name, TypeConverter.get_type(type)) end columns end |
.resume(task, columns, count, &control) ⇒ Object
34 35 36 37 38 39 |
# File 'lib/embulk/input/presto.rb', line 34 def self.resume(task, columns, count, &control) task_reports = yield(task, columns, count) next_config_diff = {} return next_config_diff end |
.transaction(config, &control) ⇒ Object
12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 |
# File 'lib/embulk/input/presto.rb', line 12 def self.transaction(config, &control) task = { "host" => config.param("host", :string, default: "localhost"), "port" => config.param("port", :integer, default: 8080), "schema" => config.param("schema", :string, default: "default"), "catalog" => config.param("catalog", :string, default: "native"), "query" => config.param("query", :string), "user" => config.param("user", :string, default: "embulk"), "columns" => config.param("columns", :array, default: nil) } columns = if task['columns'] task['columns'].each_with_index.map do |c, i| Column.new(i, c["name"], c["type"].to_sym) end else build_output_columns(task) end resume(task, columns, 1, &control) end |
Instance Method Details
#init ⇒ Object
53 54 55 56 57 58 59 |
# File 'lib/embulk/input/presto.rb', line 53 def init @client = Connection.get_client(task) @query = task["query"] @type_converter = TypeConverter.new Embulk.logger.info "SQL: #{@query}" end |
#run ⇒ Object
61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 |
# File 'lib/embulk/input/presto.rb', line 61 def run size = 0 @client.query(@query) do |q| q.each_row {|row| converted_values = row.map.with_index { |value,i| @type_converter.convert_value(value, schema[i]) } page_builder.add(converted_values) } size = q.rows.size end page_builder.finish task_report = { size: size } return task_report end |