Class: LargeHadronMigrator

Inherits:
ActiveRecord::Migration
  • Object
show all
Defined in:
lib/large-hadron-migrator.rb

Overview

Copyright © 2011, SoundCloud Ltd., Rany Keddo, Tobias Bielohlawek

Migrate large tables without downtime by copying to a temporary table in
chunks. The old table is not dropped. Instead, it is moved to
timestamp_table_name for verification.

WARNING:
   - this is an unlocked online operation. updates will probably become
     inconsistent during migration.
   - may cause the universe to implode.

USAGE:

class AddIndexToEmails < LargeHadronMigrator
  def self.up
    large_hadron_migrate :emails, :wait => 0.2 do |table_name|
      execute %Q{
        alter table %s
          add index index_emails_on_hashed_address (hashed_address)
      } % table_name
    end
  end
end

How to deploy large hadrons with capistrano
-------------------------------------------

1. Run cap deploy:update_code. The new release directory is not symlinked,
   so that restarts will not load the new code.

2. Run rake db:migrate from the new release directory on an appserver,
   preferably in a screen session.

3. Wait for migrations to sync to all slaves then cap deploy.

Restarting before step 2 is done
--------------------------------

- When adding a column

1. Migration running on master; no effect, nothing has changed.
2. Tables switched on master. slave out of sync, migration running.
   a. Given the app server reads columns from slave on restart, nothing
      happens.
   b. Given the app server reads columns from master on restart, bad
      shitz happen, ie: queries are built with new columns, ie on :include
      the explicit column list will be built (rather than *) for the
      included table. Since it does not exist on the slave, queries will
      break here.

3. Tables switched on slave
  -  Same as 2b. Just do a cap deploy instead of cap deploy:restart.

- When removing a column

1. Migration running on master; no effect, nothing has changed.
2. Tables switched on master. slave out of sync, migration running.
   a. Given the app server reads columns from slave on restart
      - Writes against master will fail due to the additional column.
      - Reads will succeed against slaves, but not master.

   b. Given the app server reads columns from master on restart:
     - Writes against master might succeed. Old code referencing
       removed columns will fail.
     - Reads might or might not succeed, for the same reason.

tl;dr: Never restart during migrations when removing columns with large
hadron. You can restart while adding migrations as long as active record
reads column definitions from the slave.

Pushing out hotfixes while migrating in step 2
----------------------------------------------

- Check out the currently running (old) code ref.
- Branch from this, make your changes, push it up
- Deploy this version.

Deploying the new version will hurt your head. Don't do it.

Class Method Summary collapse

Class Method Details

.add_trigger_on_action(table, journal_table, action) ⇒ Object



284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
# File 'lib/large-hadron-migrator.rb', line 284

def self.add_trigger_on_action(table, journal_table, action)
  columns = table_column_names(table)
  table_alias = (action == 'delete') ? 'OLD' : 'NEW'
  fallback    = (action == 'delete') ? "`hadron_action` = 'delete'" : columns.map { |c| "#{tick(c)} = #{table_alias}.#{tick(c)}" }.join(",")

  execute %Q{
    create trigger %s
      after #{action} on %s for each row
      begin
        insert into %s (%s, `hadron_action`)
        values (%s, '#{ action }')
        ON DUPLICATE KEY UPDATE %s;
      end
  } % [trigger_name(action, table),
       table,
       journal_table,
       columns.map { |c| tick(c) }.join(","),
       columns.map { |c| "#{table_alias}.#{tick(c)}" }.join(","),
       fallback
      ]
end

.chunked_insert(last_insert_id, chunk_size, new_table, insertion_columns, curr_table, wait, where = "") ⇒ Object



160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
# File 'lib/large-hadron-migrator.rb', line 160

def self.chunked_insert(last_insert_id, chunk_size, new_table, insertion_columns, curr_table, wait, where = "")
  # do the inserts in chunks. helps to reduce io contention and keeps the
  # undo log small.
  chunks = (last_insert_id / chunk_size.to_f).ceil
  times = []
  (1..chunks).each do |chunk|

    times << Benchmark.measure do
      execute "start transaction"

      execute %Q{
        insert into %s
        (%s)
        select %s
        from %s
        where (id between %d and %d) %s

      } % [
        new_table,
        insertion_columns.keys.join(","),
        insertion_columns.values.join(","),
        curr_table,
        ((chunk - 1) * chunk_size) + 1,
        [chunk * chunk_size, last_insert_id].min,
        where
      ]
      execute "COMMIT"
    end

    say_remaining_estimate(times, chunks, chunk, wait)

    # larger values trade greater inconsistency for less io
    sleep wait
  end
end

.chunked_update(last_insert_id, chunk_size, new_table, insertion_columns, curr_table, wait, where = "") ⇒ Object



196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
# File 'lib/large-hadron-migrator.rb', line 196

def self.chunked_update(last_insert_id, chunk_size, new_table, insertion_columns, curr_table, wait, where = "")
  # do the inserts in chunks. helps to reduce io contention and keeps the
  # undo log small.
  chunks = (last_insert_id / chunk_size.to_f).ceil
  times = []
  (1..chunks).each do |chunk|

    times << Benchmark.measure do
      execute "start transaction"

      execute %Q{
        update %s as t1
        join %s as t2 on t1.id = t2.id
        set %s
        where (t2.id between %d and %d) %s
      } % [
        new_table,
        curr_table,
        insertion_columns.keys.map { |keys| "t1.#{keys} = t2.#{keys}"}.join(","),
        ((chunk - 1) * chunk_size) + 1,
        [chunk * chunk_size, last_insert_id].min,
        where
      ]
      execute "COMMIT"
    end

    say_remaining_estimate(times, chunks, chunk, wait)

    # larger values trade greater inconsistency for less io
    sleep wait
  end
end

.cleanup(table) ⇒ Object



314
315
316
317
318
# File 'lib/large-hadron-migrator.rb', line 314

def self.cleanup(table)
  delete_trigger_on_action(table, "insert")
  delete_trigger_on_action(table, "update")
  delete_trigger_on_action(table, "delete")
end

.clone_table(source, dest, window = 0, add_action_column = false) ⇒ Object



268
269
270
# File 'lib/large-hadron-migrator.rb', line 268

def self.clone_table(source, dest, window = 0, add_action_column = false)
  execute schema_sql(source, dest, window, add_action_column)
end

.clone_table_for_changes(table, journal_table) ⇒ Object



276
277
278
# File 'lib/large-hadron-migrator.rb', line 276

def self.clone_table_for_changes(table, journal_table)
  clone_table(table, journal_table, 0, true)
end

.common_columns(t1, t2) ⇒ Object



272
273
274
# File 'lib/large-hadron-migrator.rb', line 272

def self.common_columns(t1, t2)
  table_column_names(t1) & table_column_names(t2)
end

.delete_trigger_on_action(table, action) ⇒ Object



306
307
308
# File 'lib/large-hadron-migrator.rb', line 306

def self.delete_trigger_on_action(table, action)
  execute "drop trigger if exists %s" % trigger_name(action, table)
end

.format_time(time) ⇒ Object



397
398
399
400
401
402
403
# File 'lib/large-hadron-migrator.rb', line 397

def self.format_time(time)
  if RUBY_VERSION < "1.9"
    time.strftime("%Y_%m_%d_%H_%M_%S_#{"%03d" % (time.usec / 1000)}")
  else
    time.strftime("%Y_%m_%d_%H_%M_%S_%3N")
  end
end

.large_hadron_migrate(curr_table, *args, &block) ⇒ Object

id_window must be larger than the number of inserts added to the journal table. if this is not the case, inserts will be lost in the replay phase.



88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
# File 'lib/large-hadron-migrator.rb', line 88

def self.large_hadron_migrate(curr_table, *args, &block)
  opts = args.extract_options!.reverse_merge :wait => 0.5,
      :chunk_size => 35_000,
      :id_window => 11_000

  curr_table = curr_table.to_s
  chunk_size = opts[:chunk_size].to_i

  # we are in dev/test mode - so speed it up
  chunk_size = 10_000_000.to_i if Rails.env.development? or Rails.env.test?
  wait = opts[:wait].to_f
  id_window = opts[:id_window]

  raise "chunk_size must be >= 1" unless chunk_size >= 1

  started = format_time(Time.now)
  new_table      = "lhmn_%s" % curr_table
  old_table      = "lhmo_%s_%s" % [started, curr_table]
  journal_table  = "lhmj_%s_%s" % [started, curr_table]

  begin
    # clean tables. old tables are never deleted to guard against rollbacks.
    execute %Q{drop table if exists %s} % new_table

    clone_table(curr_table, new_table, id_window)
    clone_table_for_changes(curr_table, journal_table)

    # add triggers
    add_trigger_on_action(curr_table, journal_table, "insert")
    add_trigger_on_action(curr_table, journal_table, "update")
    add_trigger_on_action(curr_table, journal_table, "delete")

    last_insert_id = last_smaller_insert_id(curr_table, journal_table)
    say "last inserted id in #{curr_table}: #{last_insert_id}"

    # alter new table
    default_values = {}
    yield new_table, default_values

    insertion_columns = prepare_insertion_columns(new_table, curr_table, default_values)
    raise "insertion_columns empty" if insertion_columns.empty?

    chunked_insert \
      last_insert_id,
      chunk_size,
      new_table,
      insertion_columns,
      curr_table,
      wait

    rename_tables [[ curr_table, old_table ], [ new_table, curr_table ]]
    cleanup(curr_table)

    # replay changes from the changes jornal
    replay_insert_changes(curr_table, journal_table, chunk_size, wait)
    replay_update_changes(curr_table, journal_table, chunk_size, wait)
    replay_delete_changes(curr_table, journal_table)

    old_table
  ensure
    cleanup(curr_table)
  end
end

.last_insert_id(curr_table) ⇒ Object



240
241
242
243
244
# File 'lib/large-hadron-migrator.rb', line 240

def self.last_insert_id(curr_table)
  with_master do
    select_value("SELECT max(id) FROM #{curr_table}").to_i
  end
end

.last_smaller_insert_id(curr_table, journal_table) ⇒ Object

to satisfy spec “does not lose records during table clone step”



230
231
232
233
234
235
236
237
238
# File 'lib/large-hadron-migrator.rb', line 230

def self.last_smaller_insert_id(curr_table, journal_table)
  with_master do
    select_value(%{
      SELECT max(#{curr_table}.id) as max_id from #{curr_table}
      LEFT OUTER JOIN #{journal_table} ON #{journal_table}.id = #{curr_table}.id
      WHERE #{journal_table}.id IS NULL
    } % [ curr_table, journal_table ]).to_i
  end
end

.prepare_insertion_columns(new_table, table, default_values = {}) ⇒ Object



152
153
154
155
156
157
158
# File 'lib/large-hadron-migrator.rb', line 152

def self.prepare_insertion_columns(new_table, table, default_values = {})
  {}.tap do |columns|
    (common_columns(new_table, table) | default_values.keys).each do |column|
      columns[tick(column)] = default_values[column] || tick(column)
    end
  end
end

.rename_tables(tables = {}) ⇒ Object



280
281
282
# File 'lib/large-hadron-migrator.rb', line 280

def self.rename_tables(tables = {})
  execute "rename table %s" % tables.map{ |old_table, new_table| "#{old_table} to #{new_table}" }.join(', ')
end

.replay_delete_changes(table, journal_table) ⇒ Object



343
344
345
346
347
348
349
350
351
352
353
# File 'lib/large-hadron-migrator.rb', line 343

def self.replay_delete_changes(table, journal_table)
  with_master do
    if connection.select_values("select id from #{journal_table} where hadron_action = 'delete' LIMIT 1").any?
      execute %Q{
        delete from #{table} where id in (
          select id from #{journal_table} where hadron_action = 'delete'
        )
      }
    end
  end
end

.replay_insert_changes(table, journal_table, chunk_size = 10000, wait = 0.2) ⇒ Object



329
330
331
332
333
334
335
336
337
338
339
340
341
# File 'lib/large-hadron-migrator.rb', line 329

def self.replay_insert_changes(table, journal_table, chunk_size = 10000, wait = 0.2)
  last_insert_id = last_insert_id(journal_table)
  columns = prepare_insertion_columns(table, journal_table)

  chunked_insert \
    last_insert_id,
    chunk_size,
    table,
    columns,
    journal_table,
    wait,
    "AND hadron_action = 'insert'"
end

.replay_update_changes(table, journal_table, chunk_size = 10000, wait = 0.2) ⇒ Object



355
356
357
358
359
360
361
362
363
364
365
366
367
# File 'lib/large-hadron-migrator.rb', line 355

def self.replay_update_changes(table, journal_table, chunk_size = 10000, wait = 0.2)
  last_insert_id = last_insert_id(journal_table)
  columns = prepare_insertion_columns(table, journal_table)

  chunked_update \
    last_insert_id,
    chunk_size,
    table,
    columns,
    journal_table,
    wait,
    "AND hadron_action = 'update'"
end

.say_remaining_estimate(times, chunks, chunk, wait) ⇒ Object



320
321
322
323
324
325
326
327
# File 'lib/large-hadron-migrator.rb', line 320

def self.say_remaining_estimate(times, chunks, chunk, wait)
  avg = times.inject(0) { |s, t| s += t.real } / times.size.to_f
  remaining = chunks - chunk
  say "%d more chunks to go, estimated end: %s" % [
    remaining,
    Time.now + (remaining * (avg + wait))
  ]
end

.schema_sql(source, dest, window, add_action_column = false) ⇒ Object

use show create instead of create table like. there was some weird

behavior with the latter where the auto_increment of the source table
got modified when updating the destination.


374
375
376
377
378
379
380
381
382
383
384
385
386
387
# File 'lib/large-hadron-migrator.rb', line 374

def self.schema_sql(source, dest, window, add_action_column = false)
  show_create(source).tap do |schema|
    schema.gsub!(/auto_increment=(\d+)/i) do
      "auto_increment=#{  $1.to_i + window }"
    end

    if add_action_column
      schema.sub!(/\) ENGINE=/,
        ", hadron_action ENUM('update', 'insert', 'delete'), INDEX hadron_action (hadron_action) USING BTREE) ENGINE=")
    end

    schema.gsub!('CREATE TABLE `%s`' % source, 'CREATE TABLE `%s`' % dest)
  end
end

.show_create(t1) ⇒ Object



389
390
391
# File 'lib/large-hadron-migrator.rb', line 389

def self.show_create(t1)
  (execute "show create table %s" % t1).fetch_row.last
end

.table_column_names(table_name) ⇒ Object



246
247
248
249
250
251
252
253
254
255
256
# File 'lib/large-hadron-migrator.rb', line 246

def self.table_column_names(table_name)
  with_master do
    select_values %Q{
      select column_name
        from information_schema.columns
       where table_name = "%s"
         and table_schema = "%s"

    } % [table_name, connection.current_database]
  end
end

.tick(col) ⇒ Object



393
394
395
# File 'lib/large-hadron-migrator.rb', line 393

def self.tick(col)
  "`#{ col }`"
end

.trigger_name(action, table) ⇒ Object



310
311
312
# File 'lib/large-hadron-migrator.rb', line 310

def self.trigger_name(action, table)
  tick("after_#{action}_#{table}")
end

.with_masterObject



258
259
260
261
262
263
264
265
266
# File 'lib/large-hadron-migrator.rb', line 258

def self.with_master
  if ActiveRecord::Base.respond_to? :with_master
    ActiveRecord::Base.with_master do
      yield
    end
  else
    yield
  end
end