Class: Embulk::Output::Sqlite3OutputPlugin

Inherits:
OutputPlugin
  • Object
show all
Defined in:
lib/embulk/output/sqlite3.rb

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.connect(task) ⇒ Object



48
49
50
51
52
53
54
55
56
57
58
59
# File 'lib/embulk/output/sqlite3.rb', line 48

def self.connect(task)
  url = "jdbc:sqlite:#{task['database']}"
  sqlite = org.sqlite.JDBC.new.connect(url, java.util.Properties.new)
  if block_given?
    begin
      yield sqlite
    ensure
      sqlite.close
    end
  end
  sqlite
end

.execute_sql(sqlite, sql, *args) ⇒ Object



61
62
63
64
65
66
67
68
# File 'lib/embulk/output/sqlite3.rb', line 61

def self.execute_sql(sqlite, sql, *args)
  stmt = sqlite.createStatement
  begin
    stmt.execute(sql)
  ensure
    stmt.close
  end
end

.to_sqlite_column_type(type) ⇒ Object



33
34
35
36
37
38
39
40
41
42
43
44
45
46
# File 'lib/embulk/output/sqlite3.rb', line 33

def self.to_sqlite_column_type(type)
  case type
  when 'long' then
    'integer'
  when 'string' then
    'text'
  when 'timestamp' then
    'text'
  when 'double' then
    'real'
  else
    type
  end
end

.to_sqlite_schema(schema) ⇒ Object



29
30
31
# File 'lib/embulk/output/sqlite3.rb', line 29

def self.to_sqlite_schema(schema)
  schema.map {|column| "`#{column.name}` #{to_sqlite_column_type(column.type.to_s)}" }.join(',')
end

.transaction(config, schema, count, &control) ⇒ Object



9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# File 'lib/embulk/output/sqlite3.rb', line 9

def self.transaction(config, schema, count, &control)
  columns = schema.map {|c| "`#{c.name}`" }
  column_types = schema.map {|c| "#{to_sqlite_column_type(c.type.to_s)}" }

  task = {
    'database' => config.param('database', :string),
    'table' => config.param('table', :string),
    'columns' => columns,
    'column_types' => column_types,
  }

  connect(task) do |sqlite|
    execute_sql(sqlite, %[create table if not exists #{task['table']}(#{to_sqlite_schema(schema)})])
  end

  commit_reports = yield(task)
  next_config_diff = {}
  return next_config_diff
end

Instance Method Details

#abortObject



110
111
# File 'lib/embulk/output/sqlite3.rb', line 110

def abort
end

#add(page) ⇒ Object



79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
# File 'lib/embulk/output/sqlite3.rb', line 79

def add(page)
  prep = @sqlite.prepareStatement(%[insert into #{@task['table']}(#{@task['columns'].join(',')}) values (#{@task['columns'].map{|c| '?' }.join(',')})])
  begin
    page.each do |record|

      @task['column_types'].each_with_index do |type, index|
        case type
        when 'integer' then
          prep.setInt(index+1, record[index])
        when 'string' then
          prep.setString(index+1, record[index])
        when 'timestamp' then
          prep.setString(index+1, record[index].to_s)
        when 'double' then
          prep.setString(index+1, record[index].to_f)
        else
          prep.setString(index+1, record[index].to_s)
        end
      end

      prep.execute
      @records += 1
    end
  ensure
    prep.close
  end
end

#closeObject



75
76
77
# File 'lib/embulk/output/sqlite3.rb', line 75

def close
  @sqlite.close
end

#commitObject



113
114
115
116
117
118
# File 'lib/embulk/output/sqlite3.rb', line 113

def commit
  commit_report = {
    "records" => @records
  }
  return commit_report
end

#finishObject



107
108
# File 'lib/embulk/output/sqlite3.rb', line 107

def finish
end

#initObject



70
71
72
73
# File 'lib/embulk/output/sqlite3.rb', line 70

def init
  @sqlite = self.class.connect(task)
  @records = 0
end