Class: Flydata::SyncFileManager

Inherits:
Object
  • Object
show all
Defined in:
lib/flydata/sync_file_manager.rb

Constant Summary collapse

DUMP_DIR =
FLYDATA_DUMP_DIR
BACKUP_DIR =
FLYDATA_BACKUP_DIR
TABLE_POSITIONS_DIR =
FLYDATA_TABLE_POSITIONS_DIR
INITIAL_SYNC_SEQ =
1
SOURCE_TABLE_EXT =
"mysql_table"

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(data_entry, source = nil) ⇒ SyncFileManager

Returns a new instance of SyncFileManager.



16
17
18
19
20
# File 'lib/flydata/sync_file_manager.rb', line 16

def initialize(data_entry, source = nil)
  @data_entry = data_entry
  @source = source #for Source dependent objects
  @table_position_files = {} # File objects keyed by table name
end

Class Method Details

.clear_sync_client_resourcesObject

Rename instead of deleting files



515
516
517
518
519
520
521
522
523
524
525
# File 'lib/flydata/sync_file_manager.rb', line 515

def self.clear_sync_client_resources
  timestamp = Time.now.utc.strftime("%Y%m%d_%H%M%S")
  [FLYDATA_CLIENT_BUFFER_DIR,
   FLYDATA_TABLE_POSITIONS_DIR,
   DUMP_DIR,
  ].each do |dir_path|
    if Dir.exists?(dir_path)
      FileUtils.mv(dir_path, "#{dir_path}_#{timestamp}")
    end
  end
end

Instance Method Details

#backup_dirObject



495
496
497
# File 'lib/flydata/sync_file_manager.rb', line 495

def backup_dir
  BACKUP_DIR
end

#backup_dump_dirObject



485
486
487
488
489
490
491
492
493
# File 'lib/flydata/sync_file_manager.rb', line 485

def backup_dump_dir
  backup_dir = BACKUP_DIR.dup
  FileUtils.mkdir_p(backup_dir) unless Dir.exists?(backup_dir)
  dest_dir = File.join(backup_dir, Time.now.strftime("%Y%m%d%H%M%S"))
  FileUtils.mkdir(dest_dir)
  ['info', 'pos', 'stats', SOURCE_TABLE_EXT].each do |ext|
    FileUtils.mv(Dir.glob("#{dump_dir}/*.#{ext}"), dest_dir)
  end
end

#closeObject



26
27
28
29
# File 'lib/flydata/sync_file_manager.rb', line 26

def close
  @table_position_files.values.each {|f| f.close }
  @table_position_files = {}
end

#delete_dump_fileObject



481
482
483
# File 'lib/flydata/sync_file_manager.rb', line 481

def delete_dump_file
  FileUtils.rm(dump_file_path) if File.exists?(dump_file_path)
end

#delete_dump_filesObject



398
399
400
401
402
403
404
405
406
407
408
409
# File 'lib/flydata/sync_file_manager.rb', line 398

def delete_dump_files
  files_to_delete = [
      dump_file_path,
      dump_pos_path,
      source_table_marshal_dump_path,
      sync_info_file,
      stats_path
  ]
  files_to_delete.flatten.each do |file_to_delete|
    FileUtils.rm(file_to_delete) if File.exists?(file_to_delete)
  end
end

#delete_master_position_filesObject



411
412
413
414
415
416
417
418
419
420
# File 'lib/flydata/sync_file_manager.rb', line 411

def delete_master_position_files
  files_to_delete = [
    source_pos_path,
    sent_source_pos_path,
    lock_pid_file,
  ]
  files_to_delete.flatten.each do |file_to_delete|
    FileUtils.rm(file_to_delete) if File.exists?(file_to_delete)
  end
end

#delete_sync_infoObject



296
297
298
# File 'lib/flydata/sync_file_manager.rb', line 296

def delete_sync_info
  FileUtils.rm(sync_info_file)
end

#delete_table_ddl_files(*tables) ⇒ Object



380
381
382
383
384
385
# File 'lib/flydata/sync_file_manager.rb', line 380

def delete_table_ddl_files(*tables)
  files_to_delete = table_ddl_file_paths(*tables)
  files_to_delete.each do |path|
    FileUtils.rm(path) if File.exists?(path)
  end
end

#delete_table_position_files(*tables) ⇒ Object



362
363
364
365
366
367
368
369
370
371
# File 'lib/flydata/sync_file_manager.rb', line 362

def delete_table_position_files(*tables)
  files_to_delete = [
      table_position_file_paths(*tables),
      table_source_pos_paths(*tables),
      table_source_pos_init_paths(*tables),
  ]
  files_to_delete.flatten.each do |path|
    FileUtils.rm(path) if File.exists?(path)
  end
end

#delete_table_rev_files(*tables) ⇒ Object



373
374
375
376
377
378
# File 'lib/flydata/sync_file_manager.rb', line 373

def delete_table_rev_files(*tables)
  files_to_delete = table_rev_file_paths(*tables)
  files_to_delete.each do |path|
    FileUtils.rm(path) if File.exists?(path)
  end
end

#delete_table_source_pos(table_name) ⇒ Object



423
424
425
426
427
428
429
430
# File 'lib/flydata/sync_file_manager.rb', line 423

def delete_table_source_pos(table_name)
  file = File.join(table_positions_dir_path, table_name + ".binlog.pos")
  if File.exists?(file)
    FileUtils.rm(file, :force => true)
  else
    puts "#{file} does not exist. Something is wrong. Did you delete the file manually when flydata was running?"
  end
end

#dump_file_pathObject



31
32
33
# File 'lib/flydata/sync_file_manager.rb', line 31

def dump_file_path
  File.join(dump_dir, @data_entry['name']) + ".dump"
end

#dump_pos_pathObject

dump pos file for resume



36
37
38
# File 'lib/flydata/sync_file_manager.rb', line 36

def dump_pos_path
  dump_file_path + ".pos"
end

#get_new_table_list(tables, file_type) ⇒ Object



79
80
81
82
83
84
85
# File 'lib/flydata/sync_file_manager.rb', line 79

def get_new_table_list(tables, file_type)
  new_tables = []
  tables.each do |table|
    new_tables << table unless File.exists?(File.join(table_positions_dir_path, "#{table}.#{file_type}"))
  end
  new_tables
end

#get_next_table_position(table_name) ⇒ Object



278
279
280
281
# File 'lib/flydata/sync_file_manager.rb', line 278

def get_next_table_position(table_name)
  seq = get_table_position(table_name)
  increment_table_position(seq)
end

#get_table_position(table_name) ⇒ Object



271
272
273
274
275
276
# File 'lib/flydata/sync_file_manager.rb', line 271

def get_table_position(table_name)
  f = open_table_position_file(table_name)
  seq = f.read
  f.rewind
  seq
end

#get_table_source_pos(table_name) ⇒ Object



448
449
450
451
452
453
# File 'lib/flydata/sync_file_manager.rb', line 448

def get_table_source_pos(table_name)
  source_pos_str = get_table_source_raw_pos(table_name)
  return nil unless source_pos_str

  source.source_pos.create_source_pos( source_pos_str )
end

#get_table_source_pos_init(table_name) ⇒ Object



324
325
326
327
328
329
# File 'lib/flydata/sync_file_manager.rb', line 324

def get_table_source_pos_init(table_name)
  file = File.join(table_positions_dir_path, table_name + ".binlog.pos.init")
  return nil unless File.exists?(file)

  source.source_pos.create_source_pos( File.open(file, 'r').readline )
end

#get_table_source_raw_pos(table_name) ⇒ Object

returns String. interface for fluentd



455
456
457
458
459
460
# File 'lib/flydata/sync_file_manager.rb', line 455

def get_table_source_raw_pos(table_name) #returns String. interface for fluentd
  file = table_source_pos_file_path(table_name)
  return nil unless File.exists?(file)

  File.open(file, 'r').readline
end

#increment_and_save_table_position(table_name) {|seq| ... } ⇒ Object

Read a sequence number from the table’s position file, increment the number and pass the number to a block. After executing the block, saves the value to the position file.

Yields:

  • (seq)


217
218
219
220
221
222
223
224
225
226
# File 'lib/flydata/sync_file_manager.rb', line 217

def increment_and_save_table_position(table_name)
  seq = get_table_position(table_name)

  seq = increment_table_position(seq)

  # logical transaction starts
  yield(seq)
  save_table_position(table_name, seq)
  # logical transaction ends
end

#increment_table_position(seq) ⇒ Object



207
208
209
210
211
# File 'lib/flydata/sync_file_manager.rb', line 207

def increment_table_position(seq)
  seq = seq.to_i + 1
  seq = "#{seq}.sync" if seq == 1
  seq
end

#increment_table_rev(table_name, base_rev) ⇒ Object



353
354
355
356
357
358
359
360
# File 'lib/flydata/sync_file_manager.rb', line 353

def increment_table_rev(table_name, base_rev)
  file = table_rev_file_path(table_name)
  new_rev = base_rev + 1
  File.open(file, "w") do |f|
    f.write(new_rev)
  end
  new_rev
end

#install_table_source_pos_files(tables) ⇒ Object



466
467
468
469
470
471
472
473
474
475
476
477
478
479
# File 'lib/flydata/sync_file_manager.rb', line 466

def install_table_source_pos_files(tables)
  FileUtils.mkdir_p(table_positions_dir_path) unless Dir.exists?(table_positions_dir_path)
  tables.each do |table_name|
    file_name = table_name + ".binlog.pos"
    src_file = File.join(dump_dir, file_name)
    if ! File.exists?(src_file)
      raise "#{src_file} does not exist. Error!!"
    end
    FileUtils.mv(src_file, table_positions_dir_path)
    # save the position at initial sync.  this is used for repair if
    # necessary.
    FileUtils.cp(File.join(table_positions_dir_path, file_name), File.join(table_positions_dir_path, file_name + ".init"))
  end
end

#load_dump_posObject



47
48
49
50
51
52
53
54
# File 'lib/flydata/sync_file_manager.rb', line 47

def load_dump_pos
  path = dump_pos_path
  return {} unless File.exists?(path)
  content = File.open(path, 'r').readline
  source_table = load_source_table_marshal_dump

  dump_pos_content_to_hash(content).merge( { source_table: source_table} )
end

#load_generated_ddl(tables) ⇒ Object



56
57
58
59
60
61
62
63
64
65
66
# File 'lib/flydata/sync_file_manager.rb', line 56

def load_generated_ddl(tables)
  tables = [ tables ] unless tables.kind_of?(Array)
  paths = table_ddl_file_paths(*tables)
  paths.collect{|path|
    begin
      File.open(path) {|f| f.read }
    rescue Errno::ENOENT
      nil
    end
  }
end

#load_sent_source_pos(file_path = sent_source_pos_path) ⇒ Object



132
133
134
135
136
137
138
139
140
141
142
# File 'lib/flydata/sync_file_manager.rb', line 132

def load_sent_source_pos(file_path = sent_source_pos_path)
  return nil unless File.exists?(file_path)

  source_pos_str = IO.read(file_path).strip
  begin
    source_pos = source.source_pos.create_source_pos(source_pos_str)
  rescue RuntimeError
    return nil
  end
  source_pos
end

#load_source_pos(file_path = source_pos_path) ⇒ Object



108
109
110
111
112
113
114
115
116
117
118
119
# File 'lib/flydata/sync_file_manager.rb', line 108

def load_source_pos(file_path = source_pos_path)
  return nil unless File.exists?(file_path)

  source_pos_str = IO.read(file_path).strip
  begin
    context = source.source_pos
    source_pos = context.create_source_pos(source_pos_str)
  rescue RuntimeError
    return nil
  end
  source_pos
end

#load_statsObject



509
510
511
512
# File 'lib/flydata/sync_file_manager.rb', line 509

def load_stats
  return nil unless File.exists?(stats_path)
  Hash[*File.read(stats_path).split(/\t/)]
end

#load_sync_infoObject



311
312
313
314
315
316
317
318
319
320
321
322
# File 'lib/flydata/sync_file_manager.rb', line 311

def load_sync_info
  return nil unless File.exists?(sync_info_file)
  line = File.open(sync_info_file, 'r').readline
  begin
    JSON.parse(line, symbolize_names: true)
  rescue
    # For compatibility
    items = line.split("\t")
    { initial_sync: (items[0] == 'true'),
      tables: items[1].split(" ") }
  end
end

#lock_pid_fileObject



283
284
285
# File 'lib/flydata/sync_file_manager.rb', line 283

def lock_pid_file
  FLYDATA_LOCK
end

#open_table_position_file(table_name) ⇒ Object



232
233
234
235
236
237
238
239
240
241
242
243
244
245
# File 'lib/flydata/sync_file_manager.rb', line 232

def open_table_position_file(table_name)
  file = table_position_file_path(table_name)
  retry_count = 0
  begin
    @table_position_files[table_name] ||= (f = File.open(file, File::RDWR); f.sync = true; f)
  rescue Errno::ENOENT
    raise if retry_count > 0 # Already retried.  Must be a differentfile causing the error
    # File not exist.  Create one with initial value of '0'
    File.open(file, "w") {|f| f.write('0') }
    retry_count += 1
    retry
  end
  @table_position_files[table_name]
end

#reset_table_position_files(tables, options = {}) ⇒ Object

table files



176
177
178
179
180
181
# File 'lib/flydata/sync_file_manager.rb', line 176

def reset_table_position_files(tables, options = {})
  tables.each do |table_name|
    file = File.join(table_positions_dir_path, table_name + ".pos")
    File.open(file, "w") {|f| f.write("0") }
  end
end

#save_dump_pos(status, table_name, last_pos, source_pos, state = nil, substate = nil) ⇒ Object



40
41
42
43
44
45
# File 'lib/flydata/sync_file_manager.rb', line 40

def save_dump_pos(status, table_name, last_pos, source_pos, state = nil, substate = nil)
  raise "Cannot create dump pos file because source position is unavailable." unless source_pos
  File.open(dump_pos_path, 'w') do |f|
    f.write(dump_pos_content(status, table_name, last_pos, source_pos, state, substate))
  end
end

#save_generated_ddl(tables, contents = "1") ⇒ Object



68
69
70
71
72
73
74
75
76
77
# File 'lib/flydata/sync_file_manager.rb', line 68

def save_generated_ddl(tables, contents = "1")
  tables = [ tables ] unless tables.kind_of?(Array)
  #Create positions if dir does not exist
  unless File.directory?(table_positions_dir_path)
    FileUtils.mkdir_p(table_positions_dir_path)
  end
  tables.each do |tab|
    File.open(File.join(table_positions_dir_path, "#{tab}.generated_ddl"), 'w') {|f| f.write(contents) }
  end
end

#save_record_count_stat(table, record_count) ⇒ Object



503
504
505
506
507
# File 'lib/flydata/sync_file_manager.rb', line 503

def save_record_count_stat(table, record_count)
  stats = load_stats || Hash.new
  stats[table] = stats[table] ? stats[table].to_i + record_count : record_count
  save_stats(stats)
end

#save_sent_source_pos(source_pos) ⇒ Object

sent source pos file (binlog.pos)



126
127
128
129
130
# File 'lib/flydata/sync_file_manager.rb', line 126

def save_sent_source_pos(source_pos)
  File.open(sent_source_pos_path, 'w') do |f|
    f.write(source_pos.to_s)
  end
end

#save_source_pos(source_pos) ⇒ Object

master binlog.pos file



101
102
103
104
105
106
# File 'lib/flydata/sync_file_manager.rb', line 101

def save_source_pos(source_pos)
  path = source_pos_path
  File.open(path, 'w') do |f|
    f.write(source_pos.to_s)
  end
end

#save_source_table_marshal_dump(source_table) ⇒ Object



94
95
96
97
98
# File 'lib/flydata/sync_file_manager.rb', line 94

def save_source_table_marshal_dump(source_table)
  File.open(source_table_marshal_dump_path, 'w') do |f|
    f.write Marshal.dump(source_table)
  end
end

#save_ssl_ca(ssl_ca_content, path = ssl_ca_path) ⇒ Object



156
157
158
159
160
# File 'lib/flydata/sync_file_manager.rb', line 156

def save_ssl_ca(ssl_ca_content, path = ssl_ca_path)
  File.open(path, 'w') do |f|
    f.write(ssl_ca_content)
  end
end

#save_ssl_cipher(ssl_cipher_content, path = ssl_cipher_path) ⇒ Object



169
170
171
172
173
# File 'lib/flydata/sync_file_manager.rb', line 169

def save_ssl_cipher(ssl_cipher_content, path = ssl_cipher_path)
  File.open(path, 'w') do |f|
    f.write(ssl_cipher_content)
  end
end

#save_sync_info(initial_sync, tables, auto_create_status = nil) ⇒ Object



300
301
302
303
304
305
306
307
308
309
# File 'lib/flydata/sync_file_manager.rb', line 300

def save_sync_info(initial_sync, tables, auto_create_status = nil)
  File.open(sync_info_file, "w") do |f|
    content = {
      initial_sync: initial_sync,
      tables: tables,
      auto_create_status: auto_create_status,
    }.to_json
    f.write(content)
  end
end

#save_table_position(table_name, seq) ⇒ Object



247
248
249
250
251
252
253
254
255
256
257
258
259
260
# File 'lib/flydata/sync_file_manager.rb', line 247

def save_table_position(table_name, seq)
  f = open_table_position_file(table_name)
  prev_seq_len = f.size

  seq_to_write =  seq.to_s
  new_seq_len = seq_to_write.size

  if new_seq_len < prev_seq_len
    seq_to_write += " " * (prev_seq_len - new_seq_len)
  end
  f.write(seq_to_write)
  f.truncate(new_seq_len) if new_seq_len < prev_seq_len
  f.rewind
end

#save_table_positions(table_names, seq) ⇒ Object

This doen’t cache the File object



263
264
265
266
267
268
269
# File 'lib/flydata/sync_file_manager.rb', line 263

def save_table_positions(table_names, seq)
  table_names = Array(table_names)
  table_names.each do |table_name|
    file_path = table_position_file_path(table_name)
    File.write(file_path, seq.to_s)
  end
end

#save_table_source_pos(tables, source_pos, options = {}) ⇒ Object



432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
# File 'lib/flydata/sync_file_manager.rb', line 432

def save_table_source_pos(tables, source_pos, options = {})
  dest_dir = case options[:destination]
             when :positions; table_positions_dir_path
             when :dump; dump_dir
             else dump_dir
             end

  tables = [ tables ] unless tables.kind_of?(Array)
  tables.each do |table_name|
    file = File.join(dest_dir, table_name + ".binlog.pos")
    File.open(file, "w") do |f|
      f.write(source_pos.to_s)
    end
  end
end

#sent_source_pos_path(master_source_pos_path = source_pos_path) ⇒ Object



144
145
146
147
# File 'lib/flydata/sync_file_manager.rb', line 144

def sent_source_pos_path(master_source_pos_path = source_pos_path)
  validate_master_source_pos_path(master_source_pos_path)
  "#{master_source_pos_path[0..-5]}.sent.pos"
end

#sourceObject



22
23
24
# File 'lib/flydata/sync_file_manager.rb', line 22

def source
  @source
end

#source_pos_pathObject



121
122
123
# File 'lib/flydata/sync_file_manager.rb', line 121

def source_pos_path
  File.join(FLYDATA_HOME, @data_entry['name'] + ".binlog.pos")
end

#source_table_marshal_dump_pathObject

SourceTable marshal file



90
91
92
# File 'lib/flydata/sync_file_manager.rb', line 90

def source_table_marshal_dump_path
  dump_file_path + ".#{SOURCE_TABLE_EXT}"
end

#ssl_ca_path(master_source_pos_path = source_pos_path) ⇒ Object

ssl_ca file path



150
151
152
153
154
# File 'lib/flydata/sync_file_manager.rb', line 150

def ssl_ca_path(master_source_pos_path = source_pos_path)
  validate_master_source_pos_path(master_source_pos_path)
  # <data-entry-name>.ssl_ca.pem
  "#{master_source_pos_path[0..-12]}.ssl_ca.pem"
end

#ssl_cipher_path(master_source_pos_path = source_pos_path) ⇒ Object

ssl_cipher file path



163
164
165
166
167
# File 'lib/flydata/sync_file_manager.rb', line 163

def ssl_cipher_path(master_source_pos_path = source_pos_path)
  validate_master_source_pos_path(master_source_pos_path)
  # <data-entry-name>.ssl_cipher
  "#{master_source_pos_path[0..-12]}.ssl_cipher"
end

#stats_pathObject



499
500
501
# File 'lib/flydata/sync_file_manager.rb', line 499

def stats_path
  File.join(dump_dir, @data_entry['name']) + ".stats"
end

#sync_info_fileObject

“sync.info” file includes initial sync information for resuming

  • initial_sync: True if initial sync is full initial sync

  • tables: target tables for initial sync

  • auto_create_status: START|SENT_DDL|CREATED_TABLES



292
293
294
# File 'lib/flydata/sync_file_manager.rb', line 292

def sync_info_file
  File.join(dump_dir, "sync.info")
end

#table_ddl_file_paths(*tables) ⇒ Object



192
193
194
195
# File 'lib/flydata/sync_file_manager.rb', line 192

def table_ddl_file_paths(*tables)
  tables.empty? ? Dir.glob(File.join(table_positions_dir_path, '*.generated_ddl')) :
    tables.map{|table| File.join(table_positions_dir_path, table + '.generated_ddl')}
end

#table_position_file_path(table_name) ⇒ Object



228
229
230
# File 'lib/flydata/sync_file_manager.rb', line 228

def table_position_file_path(table_name)
  File.join(table_positions_dir_path, table_name + ".pos")
end

#table_position_file_paths(*tables) ⇒ Object



187
188
189
190
# File 'lib/flydata/sync_file_manager.rb', line 187

def table_position_file_paths(*tables)
  tables.empty? ? Dir.glob(File.join(table_positions_dir_path, '*.pos')) :
    tables.map{|table| File.join(table_positions_dir_path, table + '.pos')}
end

#table_positions_dir_pathObject



183
184
185
# File 'lib/flydata/sync_file_manager.rb', line 183

def table_positions_dir_path
  TABLE_POSITIONS_DIR
end

#table_rev(table_name) ⇒ Object



340
341
342
343
344
345
346
347
348
349
350
351
# File 'lib/flydata/sync_file_manager.rb', line 340

def table_rev(table_name)
  file = table_rev_file_path(table_name)
  return 1 unless File.exists?(file) #default revision is 1
  File.open(file, "r+") do |f|
    seq = f.read
    if seq.empty?
      return 1
    else
      return seq.to_i
    end
  end
end

#table_rev_file_path(table_name) ⇒ Object



331
332
333
# File 'lib/flydata/sync_file_manager.rb', line 331

def table_rev_file_path(table_name)
  File.join(table_positions_dir_path, table_name + ".rev")
end

#table_rev_file_paths(*tables) ⇒ Object



335
336
337
338
# File 'lib/flydata/sync_file_manager.rb', line 335

def table_rev_file_paths(*tables)
  tables.empty? ? Dir.glob(File.join(table_positions_dir_path, "*.rev")) :
    tables.map{|table| table_rev_file_path(table)}
end

#table_source_pos_file_path(table_name) ⇒ Object



462
463
464
# File 'lib/flydata/sync_file_manager.rb', line 462

def table_source_pos_file_path(table_name)
  File.join(table_positions_dir_path, table_name + ".binlog.pos")
end

#table_source_pos_init_paths(*tables) ⇒ Object



202
203
204
205
# File 'lib/flydata/sync_file_manager.rb', line 202

def table_source_pos_init_paths(*tables)
  tables.empty? ? Dir.glob(File.join(table_positions_dir_path, '*.binlog.pos.init')) :
    tables.map{|table| File.join(table_positions_dir_path, table + '.binlog.pos.init')}
end

#table_source_pos_paths(*tables) ⇒ Object



197
198
199
200
# File 'lib/flydata/sync_file_manager.rb', line 197

def table_source_pos_paths(*tables)
  tables.empty? ? Dir.glob(File.join(table_positions_dir_path, '*.binlog.pos')) :
    tables.map{|table| File.join(table_positions_dir_path, table + '.binlog.pos')}
end

#tables_from_positions_dirObject



387
388
389
390
391
392
393
394
395
396
# File 'lib/flydata/sync_file_manager.rb', line 387

def tables_from_positions_dir
  all_table_control_files = Dir.glob(File.join(table_positions_dir_path, '*.{pos,generated_ddl,init,rev}'))
  tables = Set.new
  all_table_control_files.each do |control_file|
    file_name = File.basename(control_file)
    file_name = file_name.slice(0...(file_name.index('.')))
    tables << file_name
  end
  tables.to_a
end