Class: Flydata::FileUtil::SyncFileManager
- Inherits:
-
Object
- Object
- Flydata::FileUtil::SyncFileManager
- Defined in:
- lib/flydata/sync_file_manager.rb
Constant Summary collapse
- DUMP_DIR =
ENV['FLYDATA_DUMP'] || File.join(FLYDATA_HOME, 'dump')
- TABLE_POSITIONS_DIR =
ENV['FLYDATA_TABLE_POSITIONS'] || File.join(FLYDATA_HOME, 'positions')
Instance Method Summary collapse
- #binlog_path ⇒ Object
- #dump_file_path ⇒ Object
-
#dump_pos_path ⇒ Object
dump pos file for resume.
-
#increment_and_save_table_position(table_name) ⇒ Object
Read a sequence number from the table’s position file, increment the number and pass the number to a block.
-
#initialize(data_entry) ⇒ SyncFileManager
constructor
A new instance of SyncFileManager.
- #load_dump_pos ⇒ Object
-
#mysql_table_marshal_dump_path ⇒ Object
MysqlTable marshal file.
-
#save_binlog(binlog_pos) ⇒ Object
binlog.pos file.
- #save_dump_pos(status, table_name, last_pos, binlog_pos, state = nil, substate = nil) ⇒ Object
- #save_mysql_table_marshal_dump(mysql_table) ⇒ Object
- #table_position_file_paths ⇒ Object
- #table_positions_dir_path ⇒ Object
Constructor Details
#initialize(data_entry) ⇒ SyncFileManager
Returns a new instance of SyncFileManager.
6 7 8 |
# File 'lib/flydata/sync_file_manager.rb', line 6 def initialize(data_entry) @data_entry = data_entry end |
Instance Method Details
#binlog_path ⇒ Object
66 67 68 |
# File 'lib/flydata/sync_file_manager.rb', line 66 def binlog_path File.join(FLYDATA_HOME, @data_entry['name'] + ".binlog.pos") end |
#dump_file_path ⇒ Object
10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
# File 'lib/flydata/sync_file_manager.rb', line 10 def dump_file_path dump_dir = @data_entry['mysql_data_entry_preference']['mysqldump_dir'] if dump_dir dump_dir = dump_dir.dup dump_dir[0] = ENV['HOME'] if dump_dir.match(/^~$|^~\//) else dump_dir = DUMP_DIR.dup end if File.exists?(dump_dir) and not Dir.exists?(dump_dir) raise "'mysqldump_dir'(#{dump_dir}) must be a directory." end FileUtils.mkdir_p(dump_dir) unless Dir.exists?(dump_dir) File.join(dump_dir, @data_entry['name']) + ".dump" end |
#dump_pos_path ⇒ Object
dump pos file for resume
26 27 28 |
# File 'lib/flydata/sync_file_manager.rb', line 26 def dump_pos_path dump_file_path + ".pos" end |
#increment_and_save_table_position(table_name) ⇒ Object
Read a sequence number from the table’s position file, increment the number and pass the number to a block. After executing the block, saves the value to the position file.
82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 |
# File 'lib/flydata/sync_file_manager.rb', line 82 def increment_and_save_table_position(table_name) file = File.join(table_positions_dir_path, table_name + ".pos") retry_count = 0 begin File.open(file, "r+") do |f| seq = f.read seq = seq.to_i + 1 yield(seq) f.rewind f.truncate(0) f.write(seq) end rescue Errno::ENOENT raise if retry_count > 0 # Already retried. Must be a differentfile causing the error # File not exist. Create one with initial value of '0' File.open(file, "w") {|f| f.write('0') } retry_count += 1 retry end end |
#load_dump_pos ⇒ Object
36 37 38 39 40 41 42 43 44 45 |
# File 'lib/flydata/sync_file_manager.rb', line 36 def load_dump_pos path = dump_pos_path return nil unless File.exists?(path) items = File.open(path, 'r').readline.split("\t") raise "Invalid dump.pos file: #{path}" unless items.length >= 5 && items.length <= 7 mysql_table = load_mysql_table_marshal_dump { status: items[0], table_name: items[1], last_pos: items[2].to_i, binlog_pos: {binfile: items[3], pos: items[4].to_i}, state: items[5], substate: items[6], mysql_table: mysql_table} end |
#mysql_table_marshal_dump_path ⇒ Object
MysqlTable marshal file
48 49 50 |
# File 'lib/flydata/sync_file_manager.rb', line 48 def mysql_table_marshal_dump_path dump_file_path + ".mysql_table" end |
#save_binlog(binlog_pos) ⇒ Object
binlog.pos file
59 60 61 62 63 64 |
# File 'lib/flydata/sync_file_manager.rb', line 59 def save_binlog(binlog_pos) path = binlog_path File.open(path, 'w') do |f| f.write(binlog_content(binlog_pos)) end end |
#save_dump_pos(status, table_name, last_pos, binlog_pos, state = nil, substate = nil) ⇒ Object
30 31 32 33 34 |
# File 'lib/flydata/sync_file_manager.rb', line 30 def save_dump_pos(status, table_name, last_pos, binlog_pos, state = nil, substate = nil) File.open(dump_pos_path, 'w') do |f| f.write(dump_pos_content(status, table_name, last_pos, binlog_pos, state, substate)) end end |
#save_mysql_table_marshal_dump(mysql_table) ⇒ Object
52 53 54 55 56 |
# File 'lib/flydata/sync_file_manager.rb', line 52 def save_mysql_table_marshal_dump(mysql_table) File.open(mysql_table_marshal_dump_path, 'w') do |f| f.write Marshal.dump(mysql_table) end end |
#table_position_file_paths ⇒ Object
74 75 76 |
# File 'lib/flydata/sync_file_manager.rb', line 74 def table_position_file_paths Dir.glob(File.join(table_positions_dir_path, '*.pos')) end |
#table_positions_dir_path ⇒ Object
70 71 72 |
# File 'lib/flydata/sync_file_manager.rb', line 70 def table_positions_dir_path TABLE_POSITIONS_DIR end |