Class: Flydata::SyncFileManager

Inherits:
Object
  • Object
show all
Defined in:
lib/flydata/sync_file_manager.rb

Constant Summary collapse

DUMP_DIR =
ENV['FLYDATA_DUMP'] || File.join(FLYDATA_HOME, 'dump')
BACKUP_DIR =
ENV['FLYDATA_BACKUP'] || File.join(FLYDATA_HOME, 'backup')
TABLE_POSITIONS_DIR =
ENV['FLYDATA_TABLE_POSITIONS'] || File.join(FLYDATA_HOME, 'positions')
SYNC_TABLE_POSITIONS =
0

Instance Method Summary collapse

Constructor Details

#initialize(data_entry) ⇒ SyncFileManager

Returns a new instance of SyncFileManager.



12
13
14
15
16
# File 'lib/flydata/sync_file_manager.rb', line 12

def initialize(data_entry)
  @data_entry = data_entry
  @table_position_files = {} # File objects keyed by table name
  @sync_table_positions_count = SYNC_TABLE_POSITIONS
end

Instance Method Details

#backup_dirObject



406
407
408
# File 'lib/flydata/sync_file_manager.rb', line 406

def backup_dir
  BACKUP_DIR
end

#backup_dump_dirObject



396
397
398
399
400
401
402
403
404
# File 'lib/flydata/sync_file_manager.rb', line 396

def backup_dump_dir
  backup_dir = BACKUP_DIR.dup
  FileUtils.mkdir_p(backup_dir) unless Dir.exists?(backup_dir)
  dest_dir = File.join(backup_dir, Time.now.strftime("%Y%m%d%H%M%S"))
  FileUtils.mkdir(dest_dir)
  %w(info pos stats mysql_table).each do |ext|
    FileUtils.mv(Dir.glob("#{dump_dir}/*.#{ext}"), dest_dir)
  end
end

#binlog_pathObject



108
109
110
# File 'lib/flydata/sync_file_manager.rb', line 108

def binlog_path
  File.join(FLYDATA_HOME, @data_entry['name'] + ".binlog.pos")
end

#closeObject



18
19
20
21
# File 'lib/flydata/sync_file_manager.rb', line 18

def close
  @table_position_files.values.each {|f| f.close }
  @table_position_files = {}
end

#delete_dump_fileObject



392
393
394
# File 'lib/flydata/sync_file_manager.rb', line 392

def delete_dump_file
  FileUtils.rm(dump_file_path) if File.exists?(dump_file_path)
end

#delete_non_table_control_files(delete_binlog = false) ⇒ Object



328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
# File 'lib/flydata/sync_file_manager.rb', line 328

def delete_non_table_control_files(delete_binlog= false)
  files_to_delete = [
      dump_file_path,
      dump_pos_path,
      mysql_table_marshal_dump_path,
      sync_info_file,
      stats_path
  ]
  if delete_binlog
    files_to_delete << binlog_path
    files_to_delete << sent_binlog_path
    files_to_delete << lock_pid_file
  end
  files_to_delete.flatten.each do |file_to_delete|
    FileUtils.rm(file_to_delete) if File.exists?(file_to_delete)
  end
end

#delete_table_binlog_pos(table_name) ⇒ Object



346
347
348
349
350
351
352
353
# File 'lib/flydata/sync_file_manager.rb', line 346

def delete_table_binlog_pos(table_name)
  file = File.join(table_positions_dir_path, table_name + ".binlog.pos")
  if File.exists?(file)
    FileUtils.rm(file, :force => true)
  else
    puts "#{file} does not exist. Something is wrong. Did you delete the file manually when flydata was running?"
  end
end

#delete_table_control_files(*tables) ⇒ Object



302
303
304
305
306
307
308
309
310
311
312
313
314
# File 'lib/flydata/sync_file_manager.rb', line 302

def delete_table_control_files(*tables)
  return if (tables.nil? or tables.empty?)
  files_to_delete = [
      table_position_file_paths(*tables),
      table_binlog_pos_paths(*tables),
      table_binlog_pos_init_paths(*tables),
      table_rev_file_paths(*tables),
      table_ddl_file_paths(*tables)
  ]
  files_to_delete.flatten.each do |path|
    FileUtils.rm(path) if File.exists?(path)
  end
end

#dump_file_pathObject



23
24
25
# File 'lib/flydata/sync_file_manager.rb', line 23

def dump_file_path
  File.join(dump_dir, @data_entry['name']) + ".dump"
end

#dump_pos_pathObject

dump pos file for resume



28
29
30
# File 'lib/flydata/sync_file_manager.rb', line 28

def dump_pos_path
  dump_file_path + ".pos"
end

#get_new_table_list(tables, file_type) ⇒ Object



73
74
75
76
77
78
79
80
# File 'lib/flydata/sync_file_manager.rb', line 73

def get_new_table_list(tables, file_type)
  table_positions_dir_path = ENV['FLYDATA_TABLE_POSITIONS'] || File.join(FLYDATA_HOME, 'positions')
  new_tables = []
  tables.each do |table|
    new_tables << table unless File.exists?(File.join(table_positions_dir_path, "#{table}.#{file_type}"))
  end
  new_tables
end

#get_table_binlog_pos(table_name) ⇒ Object



371
372
373
374
375
# File 'lib/flydata/sync_file_manager.rb', line 371

def get_table_binlog_pos(table_name)
  file = File.join(table_positions_dir_path, table_name + ".binlog.pos")
  return nil unless File.exists?(file)
  File.open(file, 'r').readline
end

#get_table_binlog_pos_init(table_name) ⇒ Object



265
266
267
268
269
# File 'lib/flydata/sync_file_manager.rb', line 265

def get_table_binlog_pos_init(table_name)
  file = File.join(table_positions_dir_path, table_name + ".binlog.pos.init")
  return nil unless File.exists?(file)
  File.open(file, 'r').readline
end

#get_table_position(table_name) ⇒ Object



237
238
239
240
241
242
# File 'lib/flydata/sync_file_manager.rb', line 237

def get_table_position(table_name)
  file = File.join(table_positions_dir_path, table_name + ".pos")
  return nil unless File.exists?(file)

  File.open(file) {|f| f.read}
end

#increment_and_save_table_position(table_name) {|seq| ... } ⇒ Object

Read a sequence number from the table’s position file, increment the number and pass the number to a block. After executing the block, saves the value to the position file.

Yields:

  • (seq)


193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
# File 'lib/flydata/sync_file_manager.rb', line 193

def increment_and_save_table_position(table_name)
  file = File.join(table_positions_dir_path, table_name + ".pos")
  retry_count = 0
  begin
    @table_position_files[table_name] ||= (f = File.open(file, File::RDWR); f.sync = true; f)
  rescue Errno::ENOENT
    raise if retry_count > 0 # Already retried.  Must be a differentfile causing the error
    # File not exist.  Create one with initial value of '0'
    File.open(file, "w") {|f| f.write('0') }
    retry_count += 1
    retry
  end
  f = @table_position_files[table_name]
  seq = f.read
  prev_seq_len = seq.size
  f.rewind
  seq = seq.to_i + 1
  seq = FlydataCore::QueryJob::SYNC_FIRST_SEQ if seq == 1
  new_seq_len = seq.to_s.size

  seq_to_write =  seq.to_s
  if new_seq_len < prev_seq_len
    seq_to_write += " " * (prev_seq_len - new_seq_len)
  end
  # logical transaction starts
  yield(seq)
  f.write(seq_to_write)
  if @sync_table_positions_count > 1
    @sync_table_positions_count -= 1
  elsif @sync_table_positions_count == 1
    f.fsync
    @sync_table_positions_count = SYNC_TABLE_POSITIONS
  end
  # logical transaction ends
  f.truncate(new_seq_len) if new_seq_len < prev_seq_len
  f.rewind
end

#increment_table_rev(table_name, base_rev) ⇒ Object



293
294
295
296
297
298
299
300
# File 'lib/flydata/sync_file_manager.rb', line 293

def increment_table_rev(table_name, base_rev)
  file = table_rev_file_path(table_name)
  new_rev = base_rev + 1
  File.open(file, "w") do |f|
    f.write(new_rev)
  end
  new_rev
end

#install_table_binlog_files(tables) ⇒ Object



377
378
379
380
381
382
383
384
385
386
387
388
389
390
# File 'lib/flydata/sync_file_manager.rb', line 377

def install_table_binlog_files(tables)
  FileUtils.mkdir_p(table_positions_dir_path) unless Dir.exists?(table_positions_dir_path)
  tables.each do |table_name|
    file_name = table_name + ".binlog.pos"
    src_file = File.join(dump_dir, file_name)
    if ! File.exists?(src_file)
      raise "#{src_file} does not exist. Error!!"
    end
    FileUtils.mv(src_file, table_positions_dir_path)
    # save the position at initial sync.  this is used for repair if
    # necessary.
    FileUtils.cp(File.join(table_positions_dir_path, file_name), File.join(table_positions_dir_path, file_name + ".init"))
  end
end

#load_binlog(file_path = binlog_path) ⇒ Object



101
102
103
104
105
106
# File 'lib/flydata/sync_file_manager.rb', line 101

def load_binlog(file_path = binlog_path)
  return nil unless File.exists?(file_path)
  f, pos = IO.read(file_path).strip.split("\t")
  return nil if f.nil? || f.empty? || pos.nil?
  { binfile: f, pos: pos.to_i }
end

#load_dump_posObject



38
39
40
41
42
43
44
45
46
47
# File 'lib/flydata/sync_file_manager.rb', line 38

def load_dump_pos
  path = dump_pos_path
  return {} unless File.exists?(path)
  items = File.open(path, 'r').readline.split("\t")
  raise "Invalid dump.pos file: #{path}" unless items.length >= 5 && items.length <= 7
  mysql_table = load_mysql_table_marshal_dump
  { status: items[0], table_name: items[1], last_pos: items[2].to_i,
    binlog_pos: {binfile: items[3], pos: items[4].to_i},
    state: items[5], substate: items[6], mysql_table: mysql_table}
end

#load_generated_ddl(tables) ⇒ Object



49
50
51
52
53
54
55
56
57
58
59
# File 'lib/flydata/sync_file_manager.rb', line 49

def load_generated_ddl(tables)
  tables = [ tables ] unless tables.kind_of?(Array)
  paths = table_ddl_file_paths(*tables)
  paths.collect{|path|
    begin
      File.open(path) {|f| f.read }
    rescue Errno::ENOENT
      nil
    end
  }
end

#load_sent_binlog(file_path = sent_binlog_path) ⇒ Object



119
120
121
122
123
124
# File 'lib/flydata/sync_file_manager.rb', line 119

def load_sent_binlog(file_path = sent_binlog_path)
  return nil unless File.exists?(file_path)
  f, pos = IO.read(file_path).strip.split("\t")
  return nil if f.nil? || f.empty? || pos.nil?
  { binfile: f, pos: pos.to_i }
end

#load_statsObject



420
421
422
423
# File 'lib/flydata/sync_file_manager.rb', line 420

def load_stats
  return nil unless File.exists?(stats_path)
  Hash[*File.read(stats_path).split(/\t/)]
end

#load_sync_infoObject



258
259
260
261
262
263
# File 'lib/flydata/sync_file_manager.rb', line 258

def load_sync_info
  return nil unless File.exists?(sync_info_file)
  items = File.open(sync_info_file, 'r').readline.split("\t")
  { initial_sync: (items[0] == 'true'),
    tables: items[1].split(" ") }
end

#lock_pid_fileObject



244
245
246
# File 'lib/flydata/sync_file_manager.rb', line 244

def lock_pid_file
  FLYDATA_LOCK
end

#mysql_table_marshal_dump_pathObject

MysqlTable marshal file



83
84
85
# File 'lib/flydata/sync_file_manager.rb', line 83

def mysql_table_marshal_dump_path
  dump_file_path + ".mysql_table"
end

#reset_table_position_files(tables) ⇒ Object

table files



158
159
160
161
162
163
# File 'lib/flydata/sync_file_manager.rb', line 158

def reset_table_position_files(tables)
  tables.each do |table_name|
    file = File.join(table_positions_dir_path, table_name + ".pos")
    File.open(file, "w") {|f| f.write('0') }
  end
end

#save_binlog(binlog_pos) ⇒ Object

master binlog.pos file



94
95
96
97
98
99
# File 'lib/flydata/sync_file_manager.rb', line 94

def save_binlog(binlog_pos)
  path = binlog_path
  File.open(path, 'w') do |f|
    f.write(binlog_content(binlog_pos))
  end
end

#save_dump_pos(status, table_name, last_pos, binlog_pos, state = nil, substate = nil) ⇒ Object



32
33
34
35
36
# File 'lib/flydata/sync_file_manager.rb', line 32

def save_dump_pos(status, table_name, last_pos, binlog_pos, state = nil, substate = nil)
  File.open(dump_pos_path, 'w') do |f|
    f.write(dump_pos_content(status, table_name, last_pos, binlog_pos, state, substate))
  end
end

#save_generated_ddl(tables, contents = "1") ⇒ Object



61
62
63
64
65
66
67
68
69
70
71
# File 'lib/flydata/sync_file_manager.rb', line 61

def save_generated_ddl(tables, contents = "1")
  tables = [ tables ] unless tables.kind_of?(Array)
  table_positions_dir_path = ENV['FLYDATA_TABLE_POSITIONS'] || File.join(FLYDATA_HOME, 'positions')
  #Create positions if dir does not exist
  unless File.directory?(table_positions_dir_path)
    FileUtils.mkdir_p(table_positions_dir_path)
  end
  tables.each do |tab|
    File.open(File.join(table_positions_dir_path, "#{tab}.generated_ddl"), 'w') {|f| f.write(contents) }
  end
end

#save_mysql_table_marshal_dump(mysql_table) ⇒ Object



87
88
89
90
91
# File 'lib/flydata/sync_file_manager.rb', line 87

def save_mysql_table_marshal_dump(mysql_table)
  File.open(mysql_table_marshal_dump_path, 'w') do |f|
    f.write Marshal.dump(mysql_table)
  end
end

#save_record_count_stat(table, record_count) ⇒ Object



414
415
416
417
418
# File 'lib/flydata/sync_file_manager.rb', line 414

def save_record_count_stat(table, record_count)
  stats = load_stats || Hash.new
  stats[table] = stats[table] ? stats[table].to_i + record_count : record_count
  save_stats(stats)
end

#save_sent_binlog(binlog_pos) ⇒ Object

sent binlog.pos file



113
114
115
116
117
# File 'lib/flydata/sync_file_manager.rb', line 113

def save_sent_binlog(binlog_pos)
  File.open(sent_binlog_path, 'w') do |f|
    f.write(binlog_content(binlog_pos))
  end
end

#save_ssl_ca(ssl_ca_content, path = ssl_ca_path) ⇒ Object



138
139
140
141
142
# File 'lib/flydata/sync_file_manager.rb', line 138

def save_ssl_ca(ssl_ca_content, path = ssl_ca_path)
  File.open(path, 'w') do |f|
    f.write(ssl_ca_content)
  end
end

#save_ssl_cipher(ssl_cipher_content, path = ssl_cipher_path) ⇒ Object



151
152
153
154
155
# File 'lib/flydata/sync_file_manager.rb', line 151

def save_ssl_cipher(ssl_cipher_content, path = ssl_cipher_path)
  File.open(path, 'w') do |f|
    f.write(ssl_cipher_content)
  end
end

#save_sync_info(initial_sync, tables) ⇒ Object



252
253
254
255
256
# File 'lib/flydata/sync_file_manager.rb', line 252

def save_sync_info(initial_sync, tables)
  File.open(sync_info_file, "w") do |f|
    f.write([initial_sync, tables.join(" ")].join("\t"))
  end
end

#save_table_binlog_pos(tables, binlog_pos, options = {}) ⇒ Object



355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
# File 'lib/flydata/sync_file_manager.rb', line 355

def save_table_binlog_pos(tables, binlog_pos, options = {})
  dest_dir = case options[:destination]
             when :positions; table_positions_dir_path
             when :dump; dump_dir
             else dump_dir
             end

  tables = [ tables ] unless tables.kind_of?(Array)
  tables.each do |table_name|
    file = File.join(dest_dir, table_name + ".binlog.pos")
    File.open(file, "w") do |f|
      f.write(binlog_content(binlog_pos))
    end
  end
end

#save_table_position(table_name, seq) ⇒ Object



231
232
233
234
235
# File 'lib/flydata/sync_file_manager.rb', line 231

def save_table_position(table_name, seq)
  file = File.join(table_positions_dir_path, table_name + ".pos")

  File.open(file, "w") {|f| f.write(seq)}
end

#sent_binlog_path(master_binlog_path = binlog_path) ⇒ Object



126
127
128
129
# File 'lib/flydata/sync_file_manager.rb', line 126

def sent_binlog_path(master_binlog_path = binlog_path)
  validate_master_binlog_path(master_binlog_path)
  "#{master_binlog_path[0..-5]}.sent.pos"
end

#ssl_ca_path(master_binlog_path = binlog_path) ⇒ Object

ssl_ca file path



132
133
134
135
136
# File 'lib/flydata/sync_file_manager.rb', line 132

def ssl_ca_path(master_binlog_path = binlog_path)
  validate_master_binlog_path(master_binlog_path)
  # <data-entry-name>.ssl_ca.pem
  "#{master_binlog_path[0..-12]}.ssl_ca.pem"
end

#ssl_cipher_path(master_binlog_path = binlog_path) ⇒ Object

ssl_cipher file path



145
146
147
148
149
# File 'lib/flydata/sync_file_manager.rb', line 145

def ssl_cipher_path(master_binlog_path = binlog_path)
  validate_master_binlog_path(master_binlog_path)
  # <data-entry-name>.ssl_cipher
  "#{master_binlog_path[0..-12]}.ssl_cipher"
end

#stats_pathObject



410
411
412
# File 'lib/flydata/sync_file_manager.rb', line 410

def stats_path
  File.join(dump_dir, @data_entry['name']) + ".stats"
end

#sync_info_fileObject



248
249
250
# File 'lib/flydata/sync_file_manager.rb', line 248

def sync_info_file
  File.join(dump_dir, "sync.info")
end

#table_binlog_pos_init_paths(*tables) ⇒ Object



184
185
186
187
# File 'lib/flydata/sync_file_manager.rb', line 184

def table_binlog_pos_init_paths(*tables)
  tables.empty? ? Dir.glob(File.join(table_positions_dir_path, '*.binlog.pos.init')) :
    tables.map{|table| File.join(table_positions_dir_path, table + '.binlog.pos.init')}
end

#table_binlog_pos_paths(*tables) ⇒ Object



179
180
181
182
# File 'lib/flydata/sync_file_manager.rb', line 179

def table_binlog_pos_paths(*tables)
  tables.empty? ? Dir.glob(File.join(table_positions_dir_path, '*.binlog.pos')) :
    tables.map{|table| File.join(table_positions_dir_path, table + '.binlog.pos')}
end

#table_ddl_file_paths(*tables) ⇒ Object



174
175
176
177
# File 'lib/flydata/sync_file_manager.rb', line 174

def table_ddl_file_paths(*tables)
  tables.empty? ? Dir.glob(File.join(table_positions_dir_path, '*.generated_ddl')) :
    tables.map{|table| File.join(table_positions_dir_path, table + '.generated_ddl')}
end

#table_position_file_paths(*tables) ⇒ Object



169
170
171
172
# File 'lib/flydata/sync_file_manager.rb', line 169

def table_position_file_paths(*tables)
  tables.empty? ? Dir.glob(File.join(table_positions_dir_path, '*.pos')) :
    tables.map{|table| File.join(table_positions_dir_path, table + '.pos')}
end

#table_positions_dir_pathObject



165
166
167
# File 'lib/flydata/sync_file_manager.rb', line 165

def table_positions_dir_path
  TABLE_POSITIONS_DIR
end

#table_rev(table_name) ⇒ Object



280
281
282
283
284
285
286
287
288
289
290
291
# File 'lib/flydata/sync_file_manager.rb', line 280

def table_rev(table_name)
  file = table_rev_file_path(table_name)
  return 1 unless File.exists?(file) #default revision is 1
  File.open(file, "r+") do |f|
    seq = f.read
    if seq.empty?
      return 1
    else
      return seq.to_i
    end
  end
end

#table_rev_file_path(table_name) ⇒ Object



271
272
273
# File 'lib/flydata/sync_file_manager.rb', line 271

def table_rev_file_path(table_name)
  File.join(table_positions_dir_path, table_name + ".rev")
end

#table_rev_file_paths(*tables) ⇒ Object



275
276
277
278
# File 'lib/flydata/sync_file_manager.rb', line 275

def table_rev_file_paths(*tables)
  tables.empty? ? Dir.glob(File.join(table_positions_dir_path, "*.rev")) :
    tables.map{|table| table_rev_file_path(table)}
end

#tables_from_positions_dirObject



316
317
318
319
320
321
322
323
324
325
326
# File 'lib/flydata/sync_file_manager.rb', line 316

def tables_from_positions_dir
  all_table_control_files = Dir.glob(File.join(table_positions_dir_path, '*.{pos,generated_ddl,init,rev}'))
  return if all_table_control_files.nil?
  tables = Set.new
  all_table_control_files.each do |control_file|
    file_name = File.basename(control_file)
    file_name = file_name.slice(0...(file_name.index('.')))
    tables << file_name
  end
  tables.to_a
end