Class: Embulk::EmbulkRunner

Inherits:
Object
  • Object
show all
Defined in:
lib/embulk/runner.rb

Overview

Embulk.setup initializes: Runner = EmbulkRunner.new

Instance Method Summary collapse

Constructor Details

#initialize(embed) ⇒ EmbulkRunner



7
8
9
# File 'lib/embulk/runner.rb', line 7

def initialize(embed)
  @embed = embed  # org.embulk.EmbulkEmbed
end

Instance Method Details

#guess(config, options = {}) ⇒ Object



11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# File 'lib/embulk/runner.rb', line 11

def guess(config, options={})
  configSource = read_config(config, options)
  output_path = options[:next_config_output_path]

  check_file_writable(output_path)

  configDiff = @embed.guess(configSource)

  guessedConfigSource = configSource.merge(configDiff)
  yaml = write_config(output_path, guessedConfigSource)
  STDERR.puts yaml
  if output_path
    puts "Created '#{output_path}' file."
  else
    puts "Use -o PATH option to write the guessed config file to a file."
  end

  nil
end

#preview(config, options = {}) ⇒ Object



31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
# File 'lib/embulk/runner.rb', line 31

def preview(config, options={})
  configSource = read_config(config, options)
  format = options[:format]

  previewResult = @embed.preview(configSource)

  modelManager = @embed.getModelManager
  printer =
    case format || "table"
    when "table"
      org.embulk.command.TablePreviewPrinter.new(java.lang.System.out, modelManager, previewResult.getSchema)
    when "vertical"
      org.embulk.command.VerticalPreviewPrinter.new(java.lang.System.out, modelManager, previewResult.getSchema)
    else
      raise ArgumentError, "Unknown preview output format '#{format}'. Supported formats: table, vertical"
    end

  printer.printAllPages(previewResult.getPages)
  printer.finish

  nil
end

#run(config, options = {}) ⇒ Object



54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
# File 'lib/embulk/runner.rb', line 54

def run(config, options={})
  configSource = read_config(config, options)
  output_path = options[:next_config_output_path]
  resume_state_path = options[:resume_state_path]

  check_file_writable(output_path)
  check_file_writable(resume_state_path)

  if resume_state_path
    begin
      resumeConfig = read_yaml_config_file(resume_state_path)
      resumeConfig = nil if resumeConfig.isEmpty
    rescue
      # TODO log?
      resumeConfig = nil
    end
  end

  if resumeConfig
    resumableResult = @embed.resumeState(configSource, resumeConfig).resume
  elsif resume_state_path
    resumableResult = @embed.runResumable(configSource)
  else
    executionResult = @embed.run(configSource)
  end

  unless executionResult
    unless resumableResult.isSuccessful
      Embulk.logger.info "Writing resume state to '#{resume_state_path}'"
      write_config(resume_state_path, resumableResult.getResumeState)
      Embulk.logger.info "Resume state is written. Run the transaction again with -r option to resume or use \"cleanup\" subcommand to delete intermediate data."
      raise resumableResult.getCause
    end
    executionResult = resumableResult.getSuccessfulResult
  end

  # delete resume file
  if resume_state_path
    File.delete(resume_state_path) rescue nil
  end

  configDiff = executionResult.getConfigDiff
  Embulk.logger.info("Committed.")
  Embulk.logger.info("Next config diff: #{configDiff.toString}")

  write_config(output_path, configSource.merge(configDiff))
end