Class: Embulk::Filter::Mysql
- Inherits:
-
FilterPlugin
- Object
- FilterPlugin
- Embulk::Filter::Mysql
- Defined in:
- lib/embulk/filter/mysql.rb,
lib/embulk/filter/mysql/field.rb
Defined Under Namespace
Classes: Field
Class Method Summary collapse
Instance Method Summary collapse
Class Method Details
.get_type(name, type) ⇒ Object
34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 |
# File 'lib/embulk/filter/mysql.rb', line 34 def self.get_type(name, type) case type when Mysql::Field::TYPE_TINY :boolean when Mysql::Field::TYPE_SHORT, Mysql::Field::TYPE_LONG, Mysql::Field::TYPE_LONGLONG :long when Mysql::Field::TYPE_DOUBLE, Mysql::Field::TYPE_FLOAT :double when Mysql::Field::TYPE_DATE, Mysql::Field::TYPE_DATETIME, Mysql::Field::TYPE_TIMESTAMP :timestamp when Mysql::Field::TYPE_BLOB, Mysql::Field::TYPE_STRING, Mysql::Field::TYPE_VAR_STRING, Mysql::Field::TYPE_VARCHAR :string when Mysql::Field::TYPE_JSON :json else raise ConfigError.new "Not support column [#{name}], type_no => [#{type}]" end end |
.transaction(config, in_schema) {|task, columns| ... } ⇒ Object
10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 |
# File 'lib/embulk/filter/mysql.rb', line 10 def self.transaction(config, in_schema, &control) task = { "host" => config.param("host", :string, default: 'localhost'), "user" => config.param("user", :string), "password" => config.param("password", :string), "database" => config.param("database", :string), "port" => config.param("port", :integer, default: 3306), "query" => config.param("query", :string), "params" => config.param("params", :array), "keep_input" => config.param("keep_input", :bool, default: false) } connection = ::Mysql.real_connect(task['host'], task['user'], task['password'], task['database'], task['port']) Embulk.logger.info "PREPARE SQL:\n#{task['query']}" statement = connection.prepare(task['query']) columns = [] columns = columns + in_schema if task['keep_input'] statement..fetch_fields.each do |field| columns << Column.new(nil, field.name, get_type(field.name, field.type)) end yield(task, columns) end |
Instance Method Details
#add(page) ⇒ Object
65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 |
# File 'lib/embulk/filter/mysql.rb', line 65 def add(page) page.each do |record| hash = Hash[in_schema.names.zip(record)] prepare_params = @params ? @params.map{ |param| hash[param] } : [] query_results = @statement.execute(*prepare_params) query_results.each do |values| converted = [] converted = record + converted if @keep_input values.each do |value| converted << cast(value) end page_builder.add(converted) end end end |
#cast(value) ⇒ Object
85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 |
# File 'lib/embulk/filter/mysql.rb', line 85 def cast(value) if (value.class == String) value.force_encoding('UTF-8') elsif (value.class == ::Mysql::Time) Time.local( value.year, value.month, value.day, value.hour, value.minute, value.second ) else value end end |
#close ⇒ Object
60 61 62 63 |
# File 'lib/embulk/filter/mysql.rb', line 60 def close Embulk.logger.info "connection closing..." @connection.close end |
#finish ⇒ Object
81 82 83 |
# File 'lib/embulk/filter/mysql.rb', line 81 def finish page_builder.finish end |
#init ⇒ Object
53 54 55 56 57 58 |
# File 'lib/embulk/filter/mysql.rb', line 53 def init @connection = ::Mysql.real_connect(task['host'], task['user'], task['password'], task['database'], task['port']) @statement = @connection.prepare(task['query']) @params = task['params'] @keep_input = task['keep_input'] end |