Class: Embulk::InputPresto
- Inherits:
-
InputPlugin
- Object
- InputPlugin
- Embulk::InputPresto
- Defined in:
- lib/embulk/input/presto.rb
Class Method Summary collapse
Instance Method Summary collapse
Class Method Details
.resume(task, columns, count, &control) ⇒ Object
25 26 27 28 29 30 |
# File 'lib/embulk/input/presto.rb', line 25 def self.resume(task, columns, count, &control) task_reports = yield(task, columns, count) next_config_diff = {} return next_config_diff end |
.transaction(config, &control) ⇒ Object
7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
# File 'lib/embulk/input/presto.rb', line 7 def self.transaction(config, &control) task = { "host" => config.param("host", :string, default: "localhost"), "port" => config.param("port", :integer, default: 8080), "schema" => config.param("schema", :string, default: "default"), "catalog" => config.param("catalog", :string, default: "native"), "query" => config.param("query", :string), "user" => config.param("user", :string, default: "embulk"), "columns" => config.param("columns", :array, default: []) } columns = task['columns'].each_with_index.map do |c, i| Column.new(i, c["name"], c["type"].to_sym) end resume(task, columns, 1, &control) end |
Instance Method Details
#init ⇒ Object
32 33 34 35 36 37 38 39 40 |
# File 'lib/embulk/input/presto.rb', line 32 def init @client = Presto::Client.new( server: "#{task['host']}:#{task['port']}", catalog: task['catalog'], user: task['user'], schema: task['schema'] ) @query = task["query"] end |
#run ⇒ Object
42 43 44 45 46 47 48 49 50 51 52 53 |
# File 'lib/embulk/input/presto.rb', line 42 def run @client.query(@query) do |q| q.each_row {|row| page_builder.add(row) } end page_builder.finish task_report = {} return task_report end |