Class: Embulk::Input::Presto

Inherits:
InputPlugin
  • Object
show all
Defined in:
lib/embulk/input/presto.rb,
lib/embulk/input/presto/connection.rb,
lib/embulk/input/presto/explain_parser.rb,
lib/embulk/input/presto/type_converter.rb

Defined Under Namespace

Classes: Connection, ExplainParser, TypeConverter

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.build_output_columns(task) ⇒ Object



41
42
43
44
45
46
47
48
49
50
51
# File 'lib/embulk/input/presto.rb', line 41

def self.build_output_columns(task)
  explain_query = "explain (FORMAT TEXT) " + task["query"]
  Embulk.logger.debug("SQL: #{explain_query}")
  explain_result = Connection.get_client(task).run("explain (FORMAT TEXT) " + task["query"])

  columns = []
  ExplainParser.parse(explain_result).each_with_index do |(name, type), i|
    columns << Column.new(i, name, TypeConverter.get_type(type))
  end
  columns
end

.resume(task, columns, count, &control) ⇒ Object



34
35
36
37
38
39
# File 'lib/embulk/input/presto.rb', line 34

def self.resume(task, columns, count, &control)
  task_reports = yield(task, columns, count)

  next_config_diff = {}
  return next_config_diff
end

.transaction(config, &control) ⇒ Object



12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
# File 'lib/embulk/input/presto.rb', line 12

def self.transaction(config, &control)
  task = {
    "host" => config.param("host", :string, default: "localhost"),
    "port" => config.param("port", :integer, default: 8080),
    "schema" => config.param("schema", :string, default: "default"),
    "catalog" => config.param("catalog", :string, default: "native"),
    "query" => config.param("query", :string),
    "user" => config.param("user", :string, default: "embulk"),
    "columns" => config.param("columns", :array, default: nil)
  }

  columns = if task['columns']
    task['columns'].each_with_index.map do |c, i|
      Column.new(i, c["name"], c["type"].to_sym)
    end
  else
    build_output_columns(task)
  end

  resume(task, columns, 1, &control)
end

Instance Method Details

#initObject



53
54
55
56
57
58
59
# File 'lib/embulk/input/presto.rb', line 53

def init
  @client = Connection.get_client(task)
  @query = task["query"]
  @type_converter = TypeConverter.new

  Embulk.logger.info "SQL: #{@query}"
end

#runObject



61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
# File 'lib/embulk/input/presto.rb', line 61

def run
  size = 0
  @client.query(@query) do |q|
    q.each_row {|row|
      converted_values = row.map.with_index { |value,i| @type_converter.convert_value(value, schema[i]) }
      page_builder.add(converted_values)
    }
    size = q.rows.size
  end

  page_builder.finish

  task_report = { size: size }
  return task_report
end