Class: Flydata::SyncFileManager

Inherits:
Object
  • Object
show all
Defined in:
lib/flydata/sync_file_manager.rb

Constant Summary collapse

DUMP_DIR =
ENV['FLYDATA_DUMP'] || File.join(FLYDATA_HOME, 'dump')
BACKUP_DIR =
ENV['FLYDATA_BACKUP'] || File.join(FLYDATA_HOME, 'backup')
TABLE_POSITIONS_DIR =
FLYDATA_TABLE_POSITIONS_DIR
SOURCE_TABLE_EXT =
"mysql_table"

Instance Method Summary collapse

Constructor Details

#initialize(data_entry, source = nil) ⇒ SyncFileManager

Returns a new instance of SyncFileManager.



14
15
16
17
18
# File 'lib/flydata/sync_file_manager.rb', line 14

def initialize(data_entry, source = nil)
  @data_entry = data_entry
  @source = source #for Source dependent objects
  @table_position_files = {} # File objects keyed by table name
end

Instance Method Details

#backup_dirObject



448
449
450
# File 'lib/flydata/sync_file_manager.rb', line 448

def backup_dir
  BACKUP_DIR
end

#backup_dump_dirObject



438
439
440
441
442
443
444
445
446
# File 'lib/flydata/sync_file_manager.rb', line 438

def backup_dump_dir
  backup_dir = BACKUP_DIR.dup
  FileUtils.mkdir_p(backup_dir) unless Dir.exists?(backup_dir)
  dest_dir = File.join(backup_dir, Time.now.strftime("%Y%m%d%H%M%S"))
  FileUtils.mkdir(dest_dir)
  ['info', 'pos', 'stats', SOURCE_TABLE_EXT].each do |ext|
    FileUtils.mv(Dir.glob("#{dump_dir}/*.#{ext}"), dest_dir)
  end
end

#closeObject



24
25
26
27
# File 'lib/flydata/sync_file_manager.rb', line 24

def close
  @table_position_files.values.each {|f| f.close }
  @table_position_files = {}
end

#delete_dump_fileObject



434
435
436
# File 'lib/flydata/sync_file_manager.rb', line 434

def delete_dump_file
  FileUtils.rm(dump_file_path) if File.exists?(dump_file_path)
end

#delete_dump_filesObject



351
352
353
354
355
356
357
358
359
360
361
362
# File 'lib/flydata/sync_file_manager.rb', line 351

def delete_dump_files
  files_to_delete = [
      dump_file_path,
      dump_pos_path,
      source_table_marshal_dump_path,
      sync_info_file,
      stats_path
  ]
  files_to_delete.flatten.each do |file_to_delete|
    FileUtils.rm(file_to_delete) if File.exists?(file_to_delete)
  end
end

#delete_master_position_filesObject



364
365
366
367
368
369
370
371
372
373
# File 'lib/flydata/sync_file_manager.rb', line 364

def delete_master_position_files
  files_to_delete = [
    source_pos_path,
    sent_source_pos_path,
    lock_pid_file,
  ]
  files_to_delete.flatten.each do |file_to_delete|
    FileUtils.rm(file_to_delete) if File.exists?(file_to_delete)
  end
end

#delete_table_control_files(*tables) ⇒ Object



327
328
329
330
331
332
333
334
335
336
337
338
# File 'lib/flydata/sync_file_manager.rb', line 327

def delete_table_control_files(*tables)
  files_to_delete = [
      table_position_file_paths(*tables),
      table_source_pos_paths(*tables),
      table_source_pos_init_paths(*tables),
      table_rev_file_paths(*tables),
      table_ddl_file_paths(*tables)
  ]
  files_to_delete.flatten.each do |path|
    FileUtils.rm(path) if File.exists?(path)
  end
end

#delete_table_source_pos(table_name) ⇒ Object



376
377
378
379
380
381
382
383
# File 'lib/flydata/sync_file_manager.rb', line 376

def delete_table_source_pos(table_name)
  file = File.join(table_positions_dir_path, table_name + ".binlog.pos")
  if File.exists?(file)
    FileUtils.rm(file, :force => true)
  else
    puts "#{file} does not exist. Something is wrong. Did you delete the file manually when flydata was running?"
  end
end

#dump_file_pathObject



29
30
31
# File 'lib/flydata/sync_file_manager.rb', line 29

def dump_file_path
  File.join(dump_dir, @data_entry['name']) + ".dump"
end

#dump_pos_pathObject

dump pos file for resume



34
35
36
# File 'lib/flydata/sync_file_manager.rb', line 34

def dump_pos_path
  dump_file_path + ".pos"
end

#get_new_table_list(tables, file_type) ⇒ Object



77
78
79
80
81
82
83
# File 'lib/flydata/sync_file_manager.rb', line 77

def get_new_table_list(tables, file_type)
  new_tables = []
  tables.each do |table|
    new_tables << table unless File.exists?(File.join(table_positions_dir_path, "#{table}.#{file_type}"))
  end
  new_tables
end

#get_next_table_position(table_name) ⇒ Object



263
264
265
266
# File 'lib/flydata/sync_file_manager.rb', line 263

def get_next_table_position(table_name)
  seq = get_table_position(table_name)
  increment_table_position(seq)
end

#get_table_position(table_name) ⇒ Object



256
257
258
259
260
261
# File 'lib/flydata/sync_file_manager.rb', line 256

def get_table_position(table_name)
  f = open_table_position_file(table_name)
  seq = f.read
  f.rewind
  seq
end

#get_table_source_pos(table_name) ⇒ Object



401
402
403
404
405
406
# File 'lib/flydata/sync_file_manager.rb', line 401

def get_table_source_pos(table_name)
  source_pos_str = get_table_source_raw_pos(table_name)
  return nil unless source_pos_str

  source.source_pos.create_source_pos( source_pos_str )
end

#get_table_source_pos_init(table_name) ⇒ Object



289
290
291
292
293
294
# File 'lib/flydata/sync_file_manager.rb', line 289

def get_table_source_pos_init(table_name)
  file = File.join(table_positions_dir_path, table_name + ".binlog.pos.init")
  return nil unless File.exists?(file)

  source.source_pos.create_source_pos( File.open(file, 'r').readline )
end

#get_table_source_raw_pos(table_name) ⇒ Object

returns String. interface for fluentd



408
409
410
411
412
413
# File 'lib/flydata/sync_file_manager.rb', line 408

def get_table_source_raw_pos(table_name) #returns String. interface for fluentd
  file = table_source_pos_file_path(table_name)
  return nil unless File.exists?(file)

  File.open(file, 'r').readline
end

#increment_and_save_table_position(table_name) {|seq| ... } ⇒ Object

Read a sequence number from the table’s position file, increment the number and pass the number to a block. After executing the block, saves the value to the position file.

Yields:

  • (seq)


215
216
217
218
219
220
221
222
223
224
# File 'lib/flydata/sync_file_manager.rb', line 215

def increment_and_save_table_position(table_name)
  seq = get_table_position(table_name)

  seq = increment_table_position(seq)

  # logical transaction starts
  yield(seq)
  save_table_position(table_name, seq)
  # logical transaction ends
end

#increment_table_position(seq) ⇒ Object



205
206
207
208
209
# File 'lib/flydata/sync_file_manager.rb', line 205

def increment_table_position(seq)
  seq = seq.to_i + 1
  seq = FlydataCore::QueryJob::SYNC_FIRST_SEQ if seq == 1
  seq
end

#increment_table_rev(table_name, base_rev) ⇒ Object



318
319
320
321
322
323
324
325
# File 'lib/flydata/sync_file_manager.rb', line 318

def increment_table_rev(table_name, base_rev)
  file = table_rev_file_path(table_name)
  new_rev = base_rev + 1
  File.open(file, "w") do |f|
    f.write(new_rev)
  end
  new_rev
end

#install_table_source_pos_files(tables) ⇒ Object



419
420
421
422
423
424
425
426
427
428
429
430
431
432
# File 'lib/flydata/sync_file_manager.rb', line 419

def install_table_source_pos_files(tables)
  FileUtils.mkdir_p(table_positions_dir_path) unless Dir.exists?(table_positions_dir_path)
  tables.each do |table_name|
    file_name = table_name + ".binlog.pos"
    src_file = File.join(dump_dir, file_name)
    if ! File.exists?(src_file)
      raise "#{src_file} does not exist. Error!!"
    end
    FileUtils.mv(src_file, table_positions_dir_path)
    # save the position at initial sync.  this is used for repair if
    # necessary.
    FileUtils.cp(File.join(table_positions_dir_path, file_name), File.join(table_positions_dir_path, file_name + ".init"))
  end
end

#load_dump_posObject



45
46
47
48
49
50
51
52
# File 'lib/flydata/sync_file_manager.rb', line 45

def load_dump_pos
  path = dump_pos_path
  return {} unless File.exists?(path)
  content = File.open(path, 'r').readline
  source_table = load_source_table_marshal_dump

  dump_pos_content_to_hash(content).merge( { source_table: source_table} )
end

#load_generated_ddl(tables) ⇒ Object



54
55
56
57
58
59
60
61
62
63
64
# File 'lib/flydata/sync_file_manager.rb', line 54

def load_generated_ddl(tables)
  tables = [ tables ] unless tables.kind_of?(Array)
  paths = table_ddl_file_paths(*tables)
  paths.collect{|path|
    begin
      File.open(path) {|f| f.read }
    rescue Errno::ENOENT
      nil
    end
  }
end

#load_sent_source_pos(file_path = sent_source_pos_path) ⇒ Object



130
131
132
133
134
135
136
137
138
139
140
# File 'lib/flydata/sync_file_manager.rb', line 130

def load_sent_source_pos(file_path = sent_source_pos_path)
  return nil unless File.exists?(file_path)

  source_pos_str = IO.read(file_path).strip
  begin
    source_pos = source.source_pos.create_source_pos(source_pos_str)
  rescue RuntimeError
    return nil
  end
  source_pos
end

#load_source_pos(file_path = source_pos_path) ⇒ Object



106
107
108
109
110
111
112
113
114
115
116
117
# File 'lib/flydata/sync_file_manager.rb', line 106

def load_source_pos(file_path = source_pos_path)
  return nil unless File.exists?(file_path)

  source_pos_str = IO.read(file_path).strip
  begin
    context = source.source_pos
    source_pos = context.create_source_pos(source_pos_str)
  rescue RuntimeError
    return nil
  end
  source_pos
end

#load_statsObject



462
463
464
465
# File 'lib/flydata/sync_file_manager.rb', line 462

def load_stats
  return nil unless File.exists?(stats_path)
  Hash[*File.read(stats_path).split(/\t/)]
end

#load_sync_infoObject



282
283
284
285
286
287
# File 'lib/flydata/sync_file_manager.rb', line 282

def load_sync_info
  return nil unless File.exists?(sync_info_file)
  items = File.open(sync_info_file, 'r').readline.split("\t")
  { initial_sync: (items[0] == 'true'),
    tables: items[1].split(" ") }
end

#lock_pid_fileObject



268
269
270
# File 'lib/flydata/sync_file_manager.rb', line 268

def lock_pid_file
  FLYDATA_LOCK
end

#open_table_position_file(table_name) ⇒ Object



226
227
228
229
230
231
232
233
234
235
236
237
238
239
# File 'lib/flydata/sync_file_manager.rb', line 226

def open_table_position_file(table_name)
  file = File.join(table_positions_dir_path, table_name + ".pos")
  retry_count = 0
  begin
    @table_position_files[table_name] ||= (f = File.open(file, File::RDWR); f.sync = true; f)
  rescue Errno::ENOENT
    raise if retry_count > 0 # Already retried.  Must be a differentfile causing the error
    # File not exist.  Create one with initial value of '0'
    File.open(file, "w") {|f| f.write('0') }
    retry_count += 1
    retry
  end
  @table_position_files[table_name]
end

#reset_table_position_files(tables) ⇒ Object

table files



174
175
176
177
178
179
# File 'lib/flydata/sync_file_manager.rb', line 174

def reset_table_position_files(tables)
  tables.each do |table_name|
    file = File.join(table_positions_dir_path, table_name + ".pos")
    File.open(file, "w") {|f| f.write('0') }
  end
end

#save_dump_pos(status, table_name, last_pos, source_pos, state = nil, substate = nil) ⇒ Object



38
39
40
41
42
43
# File 'lib/flydata/sync_file_manager.rb', line 38

def save_dump_pos(status, table_name, last_pos, source_pos, state = nil, substate = nil)
  raise "Cannot create dump pos file because source position is unavailable." unless source_pos
  File.open(dump_pos_path, 'w') do |f|
    f.write(dump_pos_content(status, table_name, last_pos, source_pos, state, substate))
  end
end

#save_generated_ddl(tables, contents = "1") ⇒ Object



66
67
68
69
70
71
72
73
74
75
# File 'lib/flydata/sync_file_manager.rb', line 66

def save_generated_ddl(tables, contents = "1")
  tables = [ tables ] unless tables.kind_of?(Array)
  #Create positions if dir does not exist
  unless File.directory?(table_positions_dir_path)
    FileUtils.mkdir_p(table_positions_dir_path)
  end
  tables.each do |tab|
    File.open(File.join(table_positions_dir_path, "#{tab}.generated_ddl"), 'w') {|f| f.write(contents) }
  end
end

#save_record_count_stat(table, record_count) ⇒ Object



456
457
458
459
460
# File 'lib/flydata/sync_file_manager.rb', line 456

def save_record_count_stat(table, record_count)
  stats = load_stats || Hash.new
  stats[table] = stats[table] ? stats[table].to_i + record_count : record_count
  save_stats(stats)
end

#save_sent_source_pos(source_pos) ⇒ Object

sent source pos file (binlog.pos)



124
125
126
127
128
# File 'lib/flydata/sync_file_manager.rb', line 124

def save_sent_source_pos(source_pos)
  File.open(sent_source_pos_path, 'w') do |f|
    f.write(source_pos.to_s)
  end
end

#save_source_pos(source_pos) ⇒ Object

master binlog.pos file



99
100
101
102
103
104
# File 'lib/flydata/sync_file_manager.rb', line 99

def save_source_pos(source_pos)
  path = source_pos_path
  File.open(path, 'w') do |f|
    f.write(source_pos.to_s)
  end
end

#save_source_table_marshal_dump(source_table) ⇒ Object



92
93
94
95
96
# File 'lib/flydata/sync_file_manager.rb', line 92

def save_source_table_marshal_dump(source_table)
  File.open(source_table_marshal_dump_path, 'w') do |f|
    f.write Marshal.dump(source_table)
  end
end

#save_ssl_ca(ssl_ca_content, path = ssl_ca_path) ⇒ Object



154
155
156
157
158
# File 'lib/flydata/sync_file_manager.rb', line 154

def save_ssl_ca(ssl_ca_content, path = ssl_ca_path)
  File.open(path, 'w') do |f|
    f.write(ssl_ca_content)
  end
end

#save_ssl_cipher(ssl_cipher_content, path = ssl_cipher_path) ⇒ Object



167
168
169
170
171
# File 'lib/flydata/sync_file_manager.rb', line 167

def save_ssl_cipher(ssl_cipher_content, path = ssl_cipher_path)
  File.open(path, 'w') do |f|
    f.write(ssl_cipher_content)
  end
end

#save_sync_info(initial_sync, tables) ⇒ Object



276
277
278
279
280
# File 'lib/flydata/sync_file_manager.rb', line 276

def save_sync_info(initial_sync, tables)
  File.open(sync_info_file, "w") do |f|
    f.write([initial_sync, tables.join(" ")].join("\t"))
  end
end

#save_table_position(table_name, seq) ⇒ Object



241
242
243
244
245
246
247
248
249
250
251
252
253
254
# File 'lib/flydata/sync_file_manager.rb', line 241

def save_table_position(table_name, seq)
  f = open_table_position_file(table_name)
  prev_seq_len = f.size

  seq_to_write =  seq.to_s
  new_seq_len = seq_to_write.size

  if new_seq_len < prev_seq_len
    seq_to_write += " " * (prev_seq_len - new_seq_len)
  end
  f.write(seq_to_write)
  f.truncate(new_seq_len) if new_seq_len < prev_seq_len
  f.rewind
end

#save_table_source_pos(tables, source_pos, options = {}) ⇒ Object



385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
# File 'lib/flydata/sync_file_manager.rb', line 385

def save_table_source_pos(tables, source_pos, options = {})
  dest_dir = case options[:destination]
             when :positions; table_positions_dir_path
             when :dump; dump_dir
             else dump_dir
             end

  tables = [ tables ] unless tables.kind_of?(Array)
  tables.each do |table_name|
    file = File.join(dest_dir, table_name + ".binlog.pos")
    File.open(file, "w") do |f|
      f.write(source_pos.to_s)
    end
  end
end

#sent_source_pos_path(master_source_pos_path = source_pos_path) ⇒ Object



142
143
144
145
# File 'lib/flydata/sync_file_manager.rb', line 142

def sent_source_pos_path(master_source_pos_path = source_pos_path)
  validate_master_source_pos_path(master_source_pos_path)
  "#{master_source_pos_path[0..-5]}.sent.pos"
end

#sourceObject



20
21
22
# File 'lib/flydata/sync_file_manager.rb', line 20

def source
  @source
end

#source_pos_pathObject



119
120
121
# File 'lib/flydata/sync_file_manager.rb', line 119

def source_pos_path
  File.join(FLYDATA_HOME, @data_entry['name'] + ".binlog.pos")
end

#source_table_marshal_dump_pathObject

SourceTable marshal file



88
89
90
# File 'lib/flydata/sync_file_manager.rb', line 88

def source_table_marshal_dump_path
  dump_file_path + ".#{SOURCE_TABLE_EXT}"
end

#ssl_ca_path(master_source_pos_path = source_pos_path) ⇒ Object

ssl_ca file path



148
149
150
151
152
# File 'lib/flydata/sync_file_manager.rb', line 148

def ssl_ca_path(master_source_pos_path = source_pos_path)
  validate_master_source_pos_path(master_source_pos_path)
  # <data-entry-name>.ssl_ca.pem
  "#{master_source_pos_path[0..-12]}.ssl_ca.pem"
end

#ssl_cipher_path(master_source_pos_path = source_pos_path) ⇒ Object

ssl_cipher file path



161
162
163
164
165
# File 'lib/flydata/sync_file_manager.rb', line 161

def ssl_cipher_path(master_source_pos_path = source_pos_path)
  validate_master_source_pos_path(master_source_pos_path)
  # <data-entry-name>.ssl_cipher
  "#{master_source_pos_path[0..-12]}.ssl_cipher"
end

#stats_pathObject



452
453
454
# File 'lib/flydata/sync_file_manager.rb', line 452

def stats_path
  File.join(dump_dir, @data_entry['name']) + ".stats"
end

#sync_info_fileObject



272
273
274
# File 'lib/flydata/sync_file_manager.rb', line 272

def sync_info_file
  File.join(dump_dir, "sync.info")
end

#table_ddl_file_paths(*tables) ⇒ Object



190
191
192
193
# File 'lib/flydata/sync_file_manager.rb', line 190

def table_ddl_file_paths(*tables)
  tables.empty? ? Dir.glob(File.join(table_positions_dir_path, '*.generated_ddl')) :
    tables.map{|table| File.join(table_positions_dir_path, table + '.generated_ddl')}
end

#table_position_file_paths(*tables) ⇒ Object



185
186
187
188
# File 'lib/flydata/sync_file_manager.rb', line 185

def table_position_file_paths(*tables)
  tables.empty? ? Dir.glob(File.join(table_positions_dir_path, '*.pos')) :
    tables.map{|table| File.join(table_positions_dir_path, table + '.pos')}
end

#table_positions_dir_pathObject



181
182
183
# File 'lib/flydata/sync_file_manager.rb', line 181

def table_positions_dir_path
  TABLE_POSITIONS_DIR
end

#table_rev(table_name) ⇒ Object



305
306
307
308
309
310
311
312
313
314
315
316
# File 'lib/flydata/sync_file_manager.rb', line 305

def table_rev(table_name)
  file = table_rev_file_path(table_name)
  return 1 unless File.exists?(file) #default revision is 1
  File.open(file, "r+") do |f|
    seq = f.read
    if seq.empty?
      return 1
    else
      return seq.to_i
    end
  end
end

#table_rev_file_path(table_name) ⇒ Object



296
297
298
# File 'lib/flydata/sync_file_manager.rb', line 296

def table_rev_file_path(table_name)
  File.join(table_positions_dir_path, table_name + ".rev")
end

#table_rev_file_paths(*tables) ⇒ Object



300
301
302
303
# File 'lib/flydata/sync_file_manager.rb', line 300

def table_rev_file_paths(*tables)
  tables.empty? ? Dir.glob(File.join(table_positions_dir_path, "*.rev")) :
    tables.map{|table| table_rev_file_path(table)}
end

#table_source_pos_file_path(table_name) ⇒ Object



415
416
417
# File 'lib/flydata/sync_file_manager.rb', line 415

def table_source_pos_file_path(table_name)
  File.join(table_positions_dir_path, table_name + ".binlog.pos")
end

#table_source_pos_init_paths(*tables) ⇒ Object



200
201
202
203
# File 'lib/flydata/sync_file_manager.rb', line 200

def table_source_pos_init_paths(*tables)
  tables.empty? ? Dir.glob(File.join(table_positions_dir_path, '*.binlog.pos.init')) :
    tables.map{|table| File.join(table_positions_dir_path, table + '.binlog.pos.init')}
end

#table_source_pos_paths(*tables) ⇒ Object



195
196
197
198
# File 'lib/flydata/sync_file_manager.rb', line 195

def table_source_pos_paths(*tables)
  tables.empty? ? Dir.glob(File.join(table_positions_dir_path, '*.binlog.pos')) :
    tables.map{|table| File.join(table_positions_dir_path, table + '.binlog.pos')}
end

#tables_from_positions_dirObject



340
341
342
343
344
345
346
347
348
349
# File 'lib/flydata/sync_file_manager.rb', line 340

def tables_from_positions_dir
  all_table_control_files = Dir.glob(File.join(table_positions_dir_path, '*.{pos,generated_ddl,init,rev}'))
  tables = Set.new
  all_table_control_files.each do |control_file|
    file_name = File.basename(control_file)
    file_name = file_name.slice(0...(file_name.index('.')))
    tables << file_name
  end
  tables.to_a
end