Class: Embulk::InputPresto

Inherits:
InputPlugin
  • Object
show all
Defined in:
lib/embulk/input/presto.rb

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.resume(task, columns, count, &control) ⇒ Object



25
26
27
28
29
30
# File 'lib/embulk/input/presto.rb', line 25

def self.resume(task, columns, count, &control)
  task_reports = yield(task, columns, count)

  next_config_diff = {}
  return next_config_diff
end

.transaction(config, &control) ⇒ Object



7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# File 'lib/embulk/input/presto.rb', line 7

def self.transaction(config, &control)
  task = {
    "host" => config.param("host", :string, default: "localhost"),
    "port" => config.param("port", :integer, default: 8080),
    "schema" => config.param("schema", :string, default: "default"),
    "catalog" => config.param("catalog", :string, default: "native"),
    "query" => config.param("query", :string),
    "user" => config.param("user", :string, default: "embulk"),
    "columns" => config.param("columns", :array, default: [])
  }

  columns = task['columns'].each_with_index.map do |c, i|
    Column.new(i, c["name"], c["type"].to_sym)
  end

  resume(task, columns, 1, &control)
end

Instance Method Details

#initObject



32
33
34
35
36
37
38
39
40
# File 'lib/embulk/input/presto.rb', line 32

def init
  @client = Presto::Client.new(
    server: "#{task['host']}:#{task['port']}",
    catalog: task['catalog'],
    user: task['user'],
    schema: task['schema']
  )
  @query = task["query"]
end

#runObject



42
43
44
45
46
47
48
49
50
51
52
53
# File 'lib/embulk/input/presto.rb', line 42

def run
  @client.query(@query) do |q|
    q.each_row {|row|
      page_builder.add(row)
    }
  end

  page_builder.finish

  task_report = {}
  return task_report
end