Class: Embulk::Filter::Mysql

Inherits:
FilterPlugin
  • Object
show all
Defined in:
lib/embulk/filter/mysql.rb,
lib/embulk/filter/mysql/field.rb

Defined Under Namespace

Classes: Field

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.get_type(name, type) ⇒ Object



34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
# File 'lib/embulk/filter/mysql.rb', line 34

def self.get_type(name, type)
  case type
  when Mysql::Field::TYPE_TINY
    :boolean
  when Mysql::Field::TYPE_SHORT, Mysql::Field::TYPE_LONG, Mysql::Field::TYPE_LONGLONG
    :long
  when Mysql::Field::TYPE_DOUBLE, Mysql::Field::TYPE_FLOAT
    :double
  when Mysql::Field::TYPE_DATE, Mysql::Field::TYPE_DATETIME, Mysql::Field::TYPE_TIMESTAMP
    :timestamp
  when Mysql::Field::TYPE_BLOB, Mysql::Field::TYPE_STRING, Mysql::Field::TYPE_VAR_STRING, Mysql::Field::TYPE_VARCHAR
    :string
  when Mysql::Field::TYPE_JSON
    :json
  else
    raise ConfigError.new "Not support column [#{name}], type_no => [#{type}]"
  end
end

.transaction(config, in_schema) {|task, columns| ... } ⇒ Object

Yields:

  • (task, columns)


10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
# File 'lib/embulk/filter/mysql.rb', line 10

def self.transaction(config, in_schema, &control)
  task = {
    "host" => config.param("host", :string, default: 'localhost'),
    "user" => config.param("user", :string),
    "password" => config.param("password", :string),
    "database" => config.param("database", :string),
    "port" => config.param("port", :integer, default: 3306),
    "query" => config.param("query", :string),
    "params" => config.param("params", :array),
    "keep_input" => config.param("keep_input", :bool, default: false)
  }
  connection = ::Mysql.real_connect(task['host'], task['user'], task['password'], task['database'], task['port'])
  Embulk.logger.info "PREPARE SQL:\n#{task['query']}"
  statement = connection.prepare(task['query'])
  columns = []
  columns = columns + in_schema if task['keep_input']

  statement..fetch_fields.each do |field|
    columns << Column.new(nil, field.name, get_type(field.name, field.type))
  end

  yield(task, columns)
end

Instance Method Details

#add(page) ⇒ Object



65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
# File 'lib/embulk/filter/mysql.rb', line 65

def add(page)
  page.each do |record|
    hash = Hash[in_schema.names.zip(record)]
    prepare_params = @params ? @params.map{ |param| hash[param] } : []
    query_results = @statement.execute(*prepare_params)
    query_results.each do |values|
      converted = []
      converted = record + converted if @keep_input
      values.each do |value|
        converted << cast(value)
      end
      page_builder.add(converted)
    end
  end
end

#cast(value) ⇒ Object



85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
# File 'lib/embulk/filter/mysql.rb', line 85

def cast(value)
  if (value.class == String)
    value.force_encoding('UTF-8')
  elsif (value.class == ::Mysql::Time)
    Time.local(
      value.year,
      value.month,
      value.day,
      value.hour,
      value.minute,
      value.second
    )
  else
    value
  end
end

#closeObject



60
61
62
63
# File 'lib/embulk/filter/mysql.rb', line 60

def close
  Embulk.logger.info "connection closing..."
  @connection.close
end

#finishObject



81
82
83
# File 'lib/embulk/filter/mysql.rb', line 81

def finish
  page_builder.finish
end

#initObject



53
54
55
56
57
58
# File 'lib/embulk/filter/mysql.rb', line 53

def init
  @connection = ::Mysql.real_connect(task['host'], task['user'], task['password'], task['database'], task['port'])
  @statement = @connection.prepare(task['query'])
  @params = task['params']
  @keep_input = task['keep_input']
end