Class: SchemaTransformer::Transform

Inherits:
Base
  • Object
show all
Includes:
Help
Defined in:
lib/schema_transformer/transform.rb

Constant Summary collapse

@@stagger =
0

Instance Attribute Summary collapse

Attributes inherited from Base

#options

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Help

#help

Constructor Details

#initialize(base = File.expand_path("..", __FILE__), options = {}) ⇒ Transform

Returns a new instance of Transform.



14
15
16
17
# File 'lib/schema_transformer/transform.rb', line 14

def initialize(base = File.expand_path("..", __FILE__), options = {})
  super
  @batch_size = options[:batch_size] || 10_000
end

Instance Attribute Details

#tableObject (readonly)

Returns the value of attribute table.



13
14
15
# File 'lib/schema_transformer/transform.rb', line 13

def table
  @table
end

#temp_tableObject (readonly)

Returns the value of attribute temp_table.



13
14
15
# File 'lib/schema_transformer/transform.rb', line 13

def temp_table
  @temp_table
end

Class Method Details

.run(options) ⇒ Object



7
8
9
10
11
# File 'lib/schema_transformer/transform.rb', line 7

def self.run(options)
  @@stagger = options[:stagger] ? options[:stagger].to_f : 0
  @transformer = SchemaTransformer::Transform.new(options[:base] || Dir.pwd)
  @transformer.run(options)
end

Instance Method Details

#cleanupObject



141
142
143
144
# File 'lib/schema_transformer/transform.rb', line 141

def cleanup
  sql = %Q{DROP TABLE #{@trash_table}}
  @conn.execute(sql)
end

#createObject



77
78
79
80
81
82
83
84
85
86
87
88
# File 'lib/schema_transformer/transform.rb', line 77

def create
  if self.temp_table_exists?
    @temp_model = define_model(@temp_table)
  else
    sql_create = %{CREATE TABLE #{@temp_table} LIKE #{@table}}
    sql_mod = %{ALTER TABLE #{@temp_table} #{@mod}}
    @conn.execute(sql_create)
    @conn.execute(sql_mod)
    @temp_model = define_model(@temp_table)
  end
  reset_column_info
end

#define_model(table) ⇒ Object



211
212
213
214
215
216
217
218
219
# File 'lib/schema_transformer/transform.rb', line 211

def define_model(table)
  # Object.const_set(table.classify, Class.new(ActiveRecord::Base))
  Object.class_eval(<<-code)
    class #{table.classify} < ActiveRecord::Base
      set_table_name "#{table}"
    end
  code
  table.classify.constantize # returns the constant
end

#final_syncObject



115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
# File 'lib/schema_transformer/transform.rb', line 115

def final_sync
  @temp_model = define_model(@temp_table)
  reset_column_info
  
  sync
  columns = subset_columns.collect{|x| "#{@temp_table}.`#{x}` = #{@table}.`#{x}`" }.join(", ")
  # need to limit the final sync, if we do the entire table it takes a long time
  limit_cond = get_limit_cond
  sql = %{
    UPDATE #{@temp_table} INNER JOIN #{@table}
      ON #{@temp_table}.id = #{@table}.id
      SET #{columns}
    WHERE #{limit_cond}
  }
  # puts sql
  @conn.execute(sql)
end

#find(table, cond) ⇒ Object

returns Array of record ids



183
184
185
186
187
188
189
190
191
# File 'lib/schema_transformer/transform.rb', line 183

def find(table, cond)
  sql = "SELECT id FROM #{table} WHERE #{cond}"
  response = @conn.execute(sql)
  results = []
  while row = response.fetch_row do
    results << row[0].to_i
  end
  results
end

#find_in_batches(table, options = {}) ⇒ Object

lower memory heavy version of ActiveRecord’s find in batches



194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
# File 'lib/schema_transformer/transform.rb', line 194

def find_in_batches(table, options = {})
  raise "You can't specify an order, it's forced to be #{batch_order}" if options[:order]
  raise "You can't specify a limit, it's forced to be the batch_size"  if options[:limit]

  start = options.delete(:start).to_i
  batch_size = options.delete(:batch_size) || 1000
  order_limit = "ORDER BY id LIMIT #{batch_size}"

  records = find(table, "id >= #{start} #{order_limit}")
  while records.any?
    yield records

    break if records.size < batch_size
    records = find(table, "id > #{records.last} #{order_limit}")
  end
end

#gather_info(table) ⇒ Object



64
65
66
67
68
69
70
71
72
73
74
75
# File 'lib/schema_transformer/transform.rb', line 64

def gather_info(table)
  if table.nil?
    raise UsageError, "You need to specific the table name: schema_transformer #{@action} <table_name>"
  end
  data = JSON.parse(IO.read(transform_file(table)))
  @table = data["table"]
  @mod = data["mod"]
  # variables need for rest of the program
  @temp_table = "#{@table}_st_temp"
  @trash_table = "#{@table}_st_trash"
  @model = define_model(@table)
end

#generateObject



43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
# File 'lib/schema_transformer/transform.rb', line 43

def generate
  data = {}
  ask "What is the name of the table you want to alter?"
  data[:table] = gets(:table)
  ask <<-TXT
What is the modification to the table?
Examples 1: 
  ADD COLUMN smart tinyint(1) DEFAULT '0'
Examples 2: 
  ADD INDEX idx_name (name)
Examples 3: 
  ADD COLUMN smart tinyint(1) DEFAULT '0', DROP COLUMN full_name
TXT
  data[:mod] = gets(:mod)
  path = transform_file(data[:table])
  FileUtils.mkdir(File.dirname(path)) unless File.exist?(File.dirname(path))
  File.open(path,"w") { |f| f << data.to_json }
  @table = data[:table]
  data
end

#get_limit_condObject



146
147
148
149
150
151
152
153
154
155
# File 'lib/schema_transformer/transform.rb', line 146

def get_limit_cond
  if @model.column_names.include?("updated_at")
    "#{@table}.updated_at >= '#{1.day.ago.strftime("%Y-%m-%d")}'"
  else
    res = @conn.execute("SELECT max(id) AS max_id FROM `#{@table}`")
    max = res.fetch_row[0].to_i + 1 # nil case is okay: [nil][0].to_i => 0
    bound = max - 100_000 < 0 ? 0 : max
    "#{@table}.id >= #{bound}"
  end
end

#gets(name = nil) ⇒ Object

the parameter is only for testing



158
159
160
# File 'lib/schema_transformer/transform.rb', line 158

def gets(name = nil)
  STDIN.gets.strip
end

#insert_columns_sqlObject



167
168
169
170
171
172
173
174
175
176
177
178
179
180
# File 'lib/schema_transformer/transform.rb', line 167

def insert_columns_sql
  # existing subset
  subset = subset_columns

  # added
  added_s = @temp_model.column_names - @model.column_names
  added = @temp_model.columns.
            select{|c| added_s.include?(c.name) }.
            collect{|c| "#{extract_default(c)} AS `#{c.name}`" }

  # combine both
  columns = subset.collect{|x| "`#{x}`"} + added
  sql = columns.join(", ")
end

#log(msg) ⇒ Object



234
235
236
# File 'lib/schema_transformer/transform.rb', line 234

def log(msg)
  @log.info(msg)
end

#reset_column_infoObject



229
230
231
232
# File 'lib/schema_transformer/transform.rb', line 229

def reset_column_info
  @model.reset_column_information
  @temp_model.reset_column_information
end

#run(options) ⇒ Object



19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
# File 'lib/schema_transformer/transform.rb', line 19

def run(options)
  @action = options[:action].first
  case @action
  when "generate"
    self.generate
    help(:generate)
  when "sync"
    help(:sync_progress)
    table = options[:action][1]
    self.gather_info(table)
    self.create
    self.sync
    help(:sync)
  when "switch"
    table = options[:action][1]
    self.gather_info(table)
    self.switch
    self.cleanup
    help(:switch)
  else
    raise UsageError, "Invalid action #{@action}"
  end
end

#subset_columnsObject



162
163
164
165
# File 'lib/schema_transformer/transform.rb', line 162

def subset_columns
  removed = @model.column_names - @temp_model.column_names
  subset  = @model.column_names - removed
end

#switchObject



133
134
135
136
137
138
139
# File 'lib/schema_transformer/transform.rb', line 133

def switch
  final_sync
  to_trash  = %Q{RENAME TABLE #{@table} TO #{@trash_table}}
  from_temp = %Q{RENAME TABLE #{@temp_table} TO #{@table}}
  @conn.execute(to_trash)
  @conn.execute(from_temp)
end

#syncObject



90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
# File 'lib/schema_transformer/transform.rb', line 90

def sync
  res = @conn.execute("SELECT max(id) AS max_id FROM `#{@temp_table}`")
  start = res.fetch_row[0].to_i + 1 # nil case is okay: [nil][0].to_i => 0
  find_in_batches(@table, :start => start, :batch_size => @batch_size) do |batch|
    # puts "batch #{batch.inspect}"
    lower = batch.first
    upper = batch.last
    
    columns = insert_columns_sql
    sql = %Q{
      INSERT INTO #{@temp_table} (
        SELECT #{columns}
      	FROM #{@table} WHERE id >= #{lower} AND id <= #{upper}
      )
    }
    # puts sql
    @conn.execute(sql)
    
    if @@stagger > 0
      log("Staggering: delaying for #{@@stagger} seconds before next batch insert")
      sleep(@@stagger)
    end
  end
end

#temp_table_exists?Boolean

Returns:

  • (Boolean)


225
226
227
# File 'lib/schema_transformer/transform.rb', line 225

def temp_table_exists?
  @conn.table_exists?(@temp_table)
end

#transform_file(table) ⇒ Object



221
222
223
# File 'lib/schema_transformer/transform.rb', line 221

def transform_file(table)
  @base+"/config/schema_transformations/#{table}.json"
end