Class: Flydata::SyncFileManager
- Inherits:
-
Object
- Object
- Flydata::SyncFileManager
- Defined in:
- lib/flydata/sync_file_manager.rb
Constant Summary collapse
- DUMP_DIR =
ENV['FLYDATA_DUMP'] || File.join(FLYDATA_HOME, 'dump')
- BACKUP_DIR =
ENV['FLYDATA_BACKUP'] || File.join(FLYDATA_HOME, 'backup')
- TABLE_POSITIONS_DIR =
FLYDATA_TABLE_POSITIONS_DIR
- SOURCE_TABLE_EXT =
"mysql_table"
Instance Method Summary collapse
- #backup_dir ⇒ Object
- #backup_dump_dir ⇒ Object
- #close ⇒ Object
- #delete_dump_file ⇒ Object
- #delete_dump_files ⇒ Object
- #delete_master_position_files ⇒ Object
- #delete_table_ddl_files(*tables) ⇒ Object
- #delete_table_position_files(*tables) ⇒ Object
- #delete_table_rev_files(*tables) ⇒ Object
- #delete_table_source_pos(table_name) ⇒ Object
- #dump_file_path ⇒ Object
-
#dump_pos_path ⇒ Object
dump pos file for resume.
- #get_new_table_list(tables, file_type) ⇒ Object
- #get_next_table_position(table_name) ⇒ Object
- #get_table_position(table_name) ⇒ Object
- #get_table_source_pos(table_name) ⇒ Object
- #get_table_source_pos_init(table_name) ⇒ Object
-
#get_table_source_raw_pos(table_name) ⇒ Object
returns String.
-
#increment_and_save_table_position(table_name) {|seq| ... } ⇒ Object
Read a sequence number from the table’s position file, increment the number and pass the number to a block.
- #increment_table_position(seq) ⇒ Object
- #increment_table_rev(table_name, base_rev) ⇒ Object
-
#initialize(data_entry, source = nil) ⇒ SyncFileManager
constructor
A new instance of SyncFileManager.
- #install_table_source_pos_files(tables) ⇒ Object
- #load_dump_pos ⇒ Object
- #load_generated_ddl(tables) ⇒ Object
- #load_sent_source_pos(file_path = sent_source_pos_path) ⇒ Object
- #load_source_pos(file_path = source_pos_path) ⇒ Object
- #load_stats ⇒ Object
- #load_sync_info ⇒ Object
- #lock_pid_file ⇒ Object
- #open_table_position_file(table_name) ⇒ Object
-
#reset_table_position_files(tables) ⇒ Object
table files.
- #save_dump_pos(status, table_name, last_pos, source_pos, state = nil, substate = nil) ⇒ Object
- #save_generated_ddl(tables, contents = "1") ⇒ Object
- #save_record_count_stat(table, record_count) ⇒ Object
-
#save_sent_source_pos(source_pos) ⇒ Object
sent source pos file (binlog.pos).
-
#save_source_pos(source_pos) ⇒ Object
master binlog.pos file.
- #save_source_table_marshal_dump(source_table) ⇒ Object
- #save_ssl_ca(ssl_ca_content, path = ssl_ca_path) ⇒ Object
- #save_ssl_cipher(ssl_cipher_content, path = ssl_cipher_path) ⇒ Object
- #save_sync_info(initial_sync, tables) ⇒ Object
- #save_table_position(table_name, seq) ⇒ Object
- #save_table_source_pos(tables, source_pos, options = {}) ⇒ Object
- #sent_source_pos_path(master_source_pos_path = source_pos_path) ⇒ Object
- #source ⇒ Object
- #source_pos_path ⇒ Object
-
#source_table_marshal_dump_path ⇒ Object
SourceTable marshal file.
-
#ssl_ca_path(master_source_pos_path = source_pos_path) ⇒ Object
ssl_ca file path.
-
#ssl_cipher_path(master_source_pos_path = source_pos_path) ⇒ Object
ssl_cipher file path.
- #stats_path ⇒ Object
- #sync_info_file ⇒ Object
- #table_ddl_file_paths(*tables) ⇒ Object
- #table_position_file_paths(*tables) ⇒ Object
- #table_positions_dir_path ⇒ Object
- #table_rev(table_name) ⇒ Object
- #table_rev_file_path(table_name) ⇒ Object
- #table_rev_file_paths(*tables) ⇒ Object
- #table_source_pos_file_path(table_name) ⇒ Object
- #table_source_pos_init_paths(*tables) ⇒ Object
- #table_source_pos_paths(*tables) ⇒ Object
- #tables_from_positions_dir ⇒ Object
Constructor Details
#initialize(data_entry, source = nil) ⇒ SyncFileManager
Returns a new instance of SyncFileManager.
14 15 16 17 18 |
# File 'lib/flydata/sync_file_manager.rb', line 14 def initialize(data_entry, source = nil) @data_entry = data_entry @source = source #for Source dependent objects @table_position_files = {} # File objects keyed by table name end |
Instance Method Details
#backup_dir ⇒ Object
460 461 462 |
# File 'lib/flydata/sync_file_manager.rb', line 460 def backup_dir BACKUP_DIR end |
#backup_dump_dir ⇒ Object
450 451 452 453 454 455 456 457 458 |
# File 'lib/flydata/sync_file_manager.rb', line 450 def backup_dump_dir backup_dir = BACKUP_DIR.dup FileUtils.mkdir_p(backup_dir) unless Dir.exists?(backup_dir) dest_dir = File.join(backup_dir, Time.now.strftime("%Y%m%d%H%M%S")) FileUtils.mkdir(dest_dir) ['info', 'pos', 'stats', SOURCE_TABLE_EXT].each do |ext| FileUtils.mv(Dir.glob("#{dump_dir}/*.#{ext}"), dest_dir) end end |
#close ⇒ Object
24 25 26 27 |
# File 'lib/flydata/sync_file_manager.rb', line 24 def close @table_position_files.values.each {|f| f.close } @table_position_files = {} end |
#delete_dump_file ⇒ Object
446 447 448 |
# File 'lib/flydata/sync_file_manager.rb', line 446 def delete_dump_file FileUtils.rm(dump_file_path) if File.exists?(dump_file_path) end |
#delete_dump_files ⇒ Object
363 364 365 366 367 368 369 370 371 372 373 374 |
# File 'lib/flydata/sync_file_manager.rb', line 363 def delete_dump_files files_to_delete = [ dump_file_path, dump_pos_path, source_table_marshal_dump_path, sync_info_file, stats_path ] files_to_delete.flatten.each do |file_to_delete| FileUtils.rm(file_to_delete) if File.exists?(file_to_delete) end end |
#delete_master_position_files ⇒ Object
376 377 378 379 380 381 382 383 384 385 |
# File 'lib/flydata/sync_file_manager.rb', line 376 def delete_master_position_files files_to_delete = [ source_pos_path, sent_source_pos_path, lock_pid_file, ] files_to_delete.flatten.each do |file_to_delete| FileUtils.rm(file_to_delete) if File.exists?(file_to_delete) end end |
#delete_table_ddl_files(*tables) ⇒ Object
345 346 347 348 349 350 |
# File 'lib/flydata/sync_file_manager.rb', line 345 def delete_table_ddl_files(*tables) files_to_delete = table_ddl_file_paths(*tables) files_to_delete.each do |path| FileUtils.rm(path) if File.exists?(path) end end |
#delete_table_position_files(*tables) ⇒ Object
327 328 329 330 331 332 333 334 335 336 |
# File 'lib/flydata/sync_file_manager.rb', line 327 def delete_table_position_files(*tables) files_to_delete = [ table_position_file_paths(*tables), table_source_pos_paths(*tables), table_source_pos_init_paths(*tables), ] files_to_delete.flatten.each do |path| FileUtils.rm(path) if File.exists?(path) end end |
#delete_table_rev_files(*tables) ⇒ Object
338 339 340 341 342 343 |
# File 'lib/flydata/sync_file_manager.rb', line 338 def delete_table_rev_files(*tables) files_to_delete = table_rev_file_paths(*tables) files_to_delete.each do |path| FileUtils.rm(path) if File.exists?(path) end end |
#delete_table_source_pos(table_name) ⇒ Object
388 389 390 391 392 393 394 395 |
# File 'lib/flydata/sync_file_manager.rb', line 388 def delete_table_source_pos(table_name) file = File.join(table_positions_dir_path, table_name + ".binlog.pos") if File.exists?(file) FileUtils.rm(file, :force => true) else puts "#{file} does not exist. Something is wrong. Did you delete the file manually when flydata was running?" end end |
#dump_file_path ⇒ Object
29 30 31 |
# File 'lib/flydata/sync_file_manager.rb', line 29 def dump_file_path File.join(dump_dir, @data_entry['name']) + ".dump" end |
#dump_pos_path ⇒ Object
dump pos file for resume
34 35 36 |
# File 'lib/flydata/sync_file_manager.rb', line 34 def dump_pos_path dump_file_path + ".pos" end |
#get_new_table_list(tables, file_type) ⇒ Object
77 78 79 80 81 82 83 |
# File 'lib/flydata/sync_file_manager.rb', line 77 def get_new_table_list(tables, file_type) new_tables = [] tables.each do |table| new_tables << table unless File.exists?(File.join(table_positions_dir_path, "#{table}.#{file_type}")) end new_tables end |
#get_next_table_position(table_name) ⇒ Object
263 264 265 266 |
# File 'lib/flydata/sync_file_manager.rb', line 263 def get_next_table_position(table_name) seq = get_table_position(table_name) increment_table_position(seq) end |
#get_table_position(table_name) ⇒ Object
256 257 258 259 260 261 |
# File 'lib/flydata/sync_file_manager.rb', line 256 def get_table_position(table_name) f = open_table_position_file(table_name) seq = f.read f.rewind seq end |
#get_table_source_pos(table_name) ⇒ Object
413 414 415 416 417 418 |
# File 'lib/flydata/sync_file_manager.rb', line 413 def get_table_source_pos(table_name) source_pos_str = get_table_source_raw_pos(table_name) return nil unless source_pos_str source.source_pos.create_source_pos( source_pos_str ) end |
#get_table_source_pos_init(table_name) ⇒ Object
289 290 291 292 293 294 |
# File 'lib/flydata/sync_file_manager.rb', line 289 def get_table_source_pos_init(table_name) file = File.join(table_positions_dir_path, table_name + ".binlog.pos.init") return nil unless File.exists?(file) source.source_pos.create_source_pos( File.open(file, 'r').readline ) end |
#get_table_source_raw_pos(table_name) ⇒ Object
returns String. interface for fluentd
420 421 422 423 424 425 |
# File 'lib/flydata/sync_file_manager.rb', line 420 def get_table_source_raw_pos(table_name) #returns String. interface for fluentd file = table_source_pos_file_path(table_name) return nil unless File.exists?(file) File.open(file, 'r').readline end |
#increment_and_save_table_position(table_name) {|seq| ... } ⇒ Object
Read a sequence number from the table’s position file, increment the number and pass the number to a block. After executing the block, saves the value to the position file.
215 216 217 218 219 220 221 222 223 224 |
# File 'lib/flydata/sync_file_manager.rb', line 215 def increment_and_save_table_position(table_name) seq = get_table_position(table_name) seq = increment_table_position(seq) # logical transaction starts yield(seq) save_table_position(table_name, seq) # logical transaction ends end |
#increment_table_position(seq) ⇒ Object
205 206 207 208 209 |
# File 'lib/flydata/sync_file_manager.rb', line 205 def increment_table_position(seq) seq = seq.to_i + 1 seq = FlydataCore::QueryJob::SYNC_FIRST_SEQ if seq == 1 seq end |
#increment_table_rev(table_name, base_rev) ⇒ Object
318 319 320 321 322 323 324 325 |
# File 'lib/flydata/sync_file_manager.rb', line 318 def increment_table_rev(table_name, base_rev) file = table_rev_file_path(table_name) new_rev = base_rev + 1 File.open(file, "w") do |f| f.write(new_rev) end new_rev end |
#install_table_source_pos_files(tables) ⇒ Object
431 432 433 434 435 436 437 438 439 440 441 442 443 444 |
# File 'lib/flydata/sync_file_manager.rb', line 431 def install_table_source_pos_files(tables) FileUtils.mkdir_p(table_positions_dir_path) unless Dir.exists?(table_positions_dir_path) tables.each do |table_name| file_name = table_name + ".binlog.pos" src_file = File.join(dump_dir, file_name) if ! File.exists?(src_file) raise "#{src_file} does not exist. Error!!" end FileUtils.mv(src_file, table_positions_dir_path) # save the position at initial sync. this is used for repair if # necessary. FileUtils.cp(File.join(table_positions_dir_path, file_name), File.join(table_positions_dir_path, file_name + ".init")) end end |
#load_dump_pos ⇒ Object
45 46 47 48 49 50 51 52 |
# File 'lib/flydata/sync_file_manager.rb', line 45 def load_dump_pos path = dump_pos_path return {} unless File.exists?(path) content = File.open(path, 'r').readline source_table = load_source_table_marshal_dump dump_pos_content_to_hash(content).merge( { source_table: source_table} ) end |
#load_generated_ddl(tables) ⇒ Object
54 55 56 57 58 59 60 61 62 63 64 |
# File 'lib/flydata/sync_file_manager.rb', line 54 def load_generated_ddl(tables) tables = [ tables ] unless tables.kind_of?(Array) paths = table_ddl_file_paths(*tables) paths.collect{|path| begin File.open(path) {|f| f.read } rescue Errno::ENOENT nil end } end |
#load_sent_source_pos(file_path = sent_source_pos_path) ⇒ Object
130 131 132 133 134 135 136 137 138 139 140 |
# File 'lib/flydata/sync_file_manager.rb', line 130 def load_sent_source_pos(file_path = sent_source_pos_path) return nil unless File.exists?(file_path) source_pos_str = IO.read(file_path).strip begin source_pos = source.source_pos.create_source_pos(source_pos_str) rescue RuntimeError return nil end source_pos end |
#load_source_pos(file_path = source_pos_path) ⇒ Object
106 107 108 109 110 111 112 113 114 115 116 117 |
# File 'lib/flydata/sync_file_manager.rb', line 106 def load_source_pos(file_path = source_pos_path) return nil unless File.exists?(file_path) source_pos_str = IO.read(file_path).strip begin context = source.source_pos source_pos = context.create_source_pos(source_pos_str) rescue RuntimeError return nil end source_pos end |
#load_stats ⇒ Object
474 475 476 477 |
# File 'lib/flydata/sync_file_manager.rb', line 474 def load_stats return nil unless File.exists?(stats_path) Hash[*File.read(stats_path).split(/\t/)] end |
#load_sync_info ⇒ Object
282 283 284 285 286 287 |
# File 'lib/flydata/sync_file_manager.rb', line 282 def load_sync_info return nil unless File.exists?(sync_info_file) items = File.open(sync_info_file, 'r').readline.split("\t") { initial_sync: (items[0] == 'true'), tables: items[1].split(" ") } end |
#lock_pid_file ⇒ Object
268 269 270 |
# File 'lib/flydata/sync_file_manager.rb', line 268 def lock_pid_file FLYDATA_LOCK end |
#open_table_position_file(table_name) ⇒ Object
226 227 228 229 230 231 232 233 234 235 236 237 238 239 |
# File 'lib/flydata/sync_file_manager.rb', line 226 def open_table_position_file(table_name) file = File.join(table_positions_dir_path, table_name + ".pos") retry_count = 0 begin @table_position_files[table_name] ||= (f = File.open(file, File::RDWR); f.sync = true; f) rescue Errno::ENOENT raise if retry_count > 0 # Already retried. Must be a differentfile causing the error # File not exist. Create one with initial value of '0' File.open(file, "w") {|f| f.write('0') } retry_count += 1 retry end @table_position_files[table_name] end |
#reset_table_position_files(tables) ⇒ Object
table files
174 175 176 177 178 179 |
# File 'lib/flydata/sync_file_manager.rb', line 174 def reset_table_position_files(tables) tables.each do |table_name| file = File.join(table_positions_dir_path, table_name + ".pos") File.open(file, "w") {|f| f.write('0') } end end |
#save_dump_pos(status, table_name, last_pos, source_pos, state = nil, substate = nil) ⇒ Object
38 39 40 41 42 43 |
# File 'lib/flydata/sync_file_manager.rb', line 38 def save_dump_pos(status, table_name, last_pos, source_pos, state = nil, substate = nil) raise "Cannot create dump pos file because source position is unavailable." unless source_pos File.open(dump_pos_path, 'w') do |f| f.write(dump_pos_content(status, table_name, last_pos, source_pos, state, substate)) end end |
#save_generated_ddl(tables, contents = "1") ⇒ Object
66 67 68 69 70 71 72 73 74 75 |
# File 'lib/flydata/sync_file_manager.rb', line 66 def save_generated_ddl(tables, contents = "1") tables = [ tables ] unless tables.kind_of?(Array) #Create positions if dir does not exist unless File.directory?(table_positions_dir_path) FileUtils.mkdir_p(table_positions_dir_path) end tables.each do |tab| File.open(File.join(table_positions_dir_path, "#{tab}.generated_ddl"), 'w') {|f| f.write(contents) } end end |
#save_record_count_stat(table, record_count) ⇒ Object
468 469 470 471 472 |
# File 'lib/flydata/sync_file_manager.rb', line 468 def save_record_count_stat(table, record_count) stats = load_stats || Hash.new stats[table] = stats[table] ? stats[table].to_i + record_count : record_count save_stats(stats) end |
#save_sent_source_pos(source_pos) ⇒ Object
sent source pos file (binlog.pos)
124 125 126 127 128 |
# File 'lib/flydata/sync_file_manager.rb', line 124 def save_sent_source_pos(source_pos) File.open(sent_source_pos_path, 'w') do |f| f.write(source_pos.to_s) end end |
#save_source_pos(source_pos) ⇒ Object
master binlog.pos file
99 100 101 102 103 104 |
# File 'lib/flydata/sync_file_manager.rb', line 99 def save_source_pos(source_pos) path = source_pos_path File.open(path, 'w') do |f| f.write(source_pos.to_s) end end |
#save_source_table_marshal_dump(source_table) ⇒ Object
92 93 94 95 96 |
# File 'lib/flydata/sync_file_manager.rb', line 92 def save_source_table_marshal_dump(source_table) File.open(source_table_marshal_dump_path, 'w') do |f| f.write Marshal.dump(source_table) end end |
#save_ssl_ca(ssl_ca_content, path = ssl_ca_path) ⇒ Object
154 155 156 157 158 |
# File 'lib/flydata/sync_file_manager.rb', line 154 def save_ssl_ca(ssl_ca_content, path = ssl_ca_path) File.open(path, 'w') do |f| f.write(ssl_ca_content) end end |
#save_ssl_cipher(ssl_cipher_content, path = ssl_cipher_path) ⇒ Object
167 168 169 170 171 |
# File 'lib/flydata/sync_file_manager.rb', line 167 def save_ssl_cipher(ssl_cipher_content, path = ssl_cipher_path) File.open(path, 'w') do |f| f.write(ssl_cipher_content) end end |
#save_sync_info(initial_sync, tables) ⇒ Object
276 277 278 279 280 |
# File 'lib/flydata/sync_file_manager.rb', line 276 def save_sync_info(initial_sync, tables) File.open(sync_info_file, "w") do |f| f.write([initial_sync, tables.join(" ")].join("\t")) end end |
#save_table_position(table_name, seq) ⇒ Object
241 242 243 244 245 246 247 248 249 250 251 252 253 254 |
# File 'lib/flydata/sync_file_manager.rb', line 241 def save_table_position(table_name, seq) f = open_table_position_file(table_name) prev_seq_len = f.size seq_to_write = seq.to_s new_seq_len = seq_to_write.size if new_seq_len < prev_seq_len seq_to_write += " " * (prev_seq_len - new_seq_len) end f.write(seq_to_write) f.truncate(new_seq_len) if new_seq_len < prev_seq_len f.rewind end |
#save_table_source_pos(tables, source_pos, options = {}) ⇒ Object
397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 |
# File 'lib/flydata/sync_file_manager.rb', line 397 def save_table_source_pos(tables, source_pos, = {}) dest_dir = case [:destination] when :positions; table_positions_dir_path when :dump; dump_dir else dump_dir end tables = [ tables ] unless tables.kind_of?(Array) tables.each do |table_name| file = File.join(dest_dir, table_name + ".binlog.pos") File.open(file, "w") do |f| f.write(source_pos.to_s) end end end |
#sent_source_pos_path(master_source_pos_path = source_pos_path) ⇒ Object
142 143 144 145 |
# File 'lib/flydata/sync_file_manager.rb', line 142 def sent_source_pos_path(master_source_pos_path = source_pos_path) validate_master_source_pos_path(master_source_pos_path) "#{master_source_pos_path[0..-5]}.sent.pos" end |
#source ⇒ Object
20 21 22 |
# File 'lib/flydata/sync_file_manager.rb', line 20 def source @source end |
#source_pos_path ⇒ Object
119 120 121 |
# File 'lib/flydata/sync_file_manager.rb', line 119 def source_pos_path File.join(FLYDATA_HOME, @data_entry['name'] + ".binlog.pos") end |
#source_table_marshal_dump_path ⇒ Object
SourceTable marshal file
88 89 90 |
# File 'lib/flydata/sync_file_manager.rb', line 88 def source_table_marshal_dump_path dump_file_path + ".#{SOURCE_TABLE_EXT}" end |
#ssl_ca_path(master_source_pos_path = source_pos_path) ⇒ Object
ssl_ca file path
148 149 150 151 152 |
# File 'lib/flydata/sync_file_manager.rb', line 148 def ssl_ca_path(master_source_pos_path = source_pos_path) validate_master_source_pos_path(master_source_pos_path) # <data-entry-name>.ssl_ca.pem "#{master_source_pos_path[0..-12]}.ssl_ca.pem" end |
#ssl_cipher_path(master_source_pos_path = source_pos_path) ⇒ Object
ssl_cipher file path
161 162 163 164 165 |
# File 'lib/flydata/sync_file_manager.rb', line 161 def ssl_cipher_path(master_source_pos_path = source_pos_path) validate_master_source_pos_path(master_source_pos_path) # <data-entry-name>.ssl_cipher "#{master_source_pos_path[0..-12]}.ssl_cipher" end |
#stats_path ⇒ Object
464 465 466 |
# File 'lib/flydata/sync_file_manager.rb', line 464 def stats_path File.join(dump_dir, @data_entry['name']) + ".stats" end |
#sync_info_file ⇒ Object
272 273 274 |
# File 'lib/flydata/sync_file_manager.rb', line 272 def sync_info_file File.join(dump_dir, "sync.info") end |
#table_ddl_file_paths(*tables) ⇒ Object
190 191 192 193 |
# File 'lib/flydata/sync_file_manager.rb', line 190 def table_ddl_file_paths(*tables) tables.empty? ? Dir.glob(File.join(table_positions_dir_path, '*.generated_ddl')) : tables.map{|table| File.join(table_positions_dir_path, table + '.generated_ddl')} end |
#table_position_file_paths(*tables) ⇒ Object
185 186 187 188 |
# File 'lib/flydata/sync_file_manager.rb', line 185 def table_position_file_paths(*tables) tables.empty? ? Dir.glob(File.join(table_positions_dir_path, '*.pos')) : tables.map{|table| File.join(table_positions_dir_path, table + '.pos')} end |
#table_positions_dir_path ⇒ Object
181 182 183 |
# File 'lib/flydata/sync_file_manager.rb', line 181 def table_positions_dir_path TABLE_POSITIONS_DIR end |
#table_rev(table_name) ⇒ Object
305 306 307 308 309 310 311 312 313 314 315 316 |
# File 'lib/flydata/sync_file_manager.rb', line 305 def table_rev(table_name) file = table_rev_file_path(table_name) return 1 unless File.exists?(file) #default revision is 1 File.open(file, "r+") do |f| seq = f.read if seq.empty? return 1 else return seq.to_i end end end |
#table_rev_file_path(table_name) ⇒ Object
296 297 298 |
# File 'lib/flydata/sync_file_manager.rb', line 296 def table_rev_file_path(table_name) File.join(table_positions_dir_path, table_name + ".rev") end |
#table_rev_file_paths(*tables) ⇒ Object
300 301 302 303 |
# File 'lib/flydata/sync_file_manager.rb', line 300 def table_rev_file_paths(*tables) tables.empty? ? Dir.glob(File.join(table_positions_dir_path, "*.rev")) : tables.map{|table| table_rev_file_path(table)} end |
#table_source_pos_file_path(table_name) ⇒ Object
427 428 429 |
# File 'lib/flydata/sync_file_manager.rb', line 427 def table_source_pos_file_path(table_name) File.join(table_positions_dir_path, table_name + ".binlog.pos") end |
#table_source_pos_init_paths(*tables) ⇒ Object
200 201 202 203 |
# File 'lib/flydata/sync_file_manager.rb', line 200 def table_source_pos_init_paths(*tables) tables.empty? ? Dir.glob(File.join(table_positions_dir_path, '*.binlog.pos.init')) : tables.map{|table| File.join(table_positions_dir_path, table + '.binlog.pos.init')} end |
#table_source_pos_paths(*tables) ⇒ Object
195 196 197 198 |
# File 'lib/flydata/sync_file_manager.rb', line 195 def table_source_pos_paths(*tables) tables.empty? ? Dir.glob(File.join(table_positions_dir_path, '*.binlog.pos')) : tables.map{|table| File.join(table_positions_dir_path, table + '.binlog.pos')} end |
#tables_from_positions_dir ⇒ Object
352 353 354 355 356 357 358 359 360 361 |
# File 'lib/flydata/sync_file_manager.rb', line 352 def tables_from_positions_dir all_table_control_files = Dir.glob(File.join(table_positions_dir_path, '*.{pos,generated_ddl,init,rev}')) tables = Set.new all_table_control_files.each do |control_file| file_name = File.basename(control_file) file_name = file_name.slice(0...(file_name.index('.'))) tables << file_name end tables.to_a end |