Class: Flydata::FileUtil::SyncFileManager

Inherits:
Object
  • Object
show all
Defined in:
lib/flydata/sync_file_manager.rb

Constant Summary collapse

DUMP_DIR =
ENV['FLYDATA_DUMP'] || File.join(FLYDATA_HOME, 'dump')
BACKUP_DIR =
ENV['FLYDATA_BACKUP'] || File.join(FLYDATA_HOME, 'backup')
TABLE_POSITIONS_DIR =
ENV['FLYDATA_TABLE_POSITIONS'] || File.join(FLYDATA_HOME, 'positions')

Instance Method Summary collapse

Constructor Details

#initialize(data_entry) ⇒ SyncFileManager

Returns a new instance of SyncFileManager.



7
8
9
10
# File 'lib/flydata/sync_file_manager.rb', line 7

def initialize(data_entry)
  @data_entry = data_entry
  @table_position_files = {} # File objects keyed by table name
end

Instance Method Details

#backup_dump_dirObject



196
197
198
199
200
201
202
# File 'lib/flydata/sync_file_manager.rb', line 196

def backup_dump_dir
  backup_dir = BACKUP_DIR.dup
  FileUtils.mkdir_p(backup_dir) unless Dir.exists?(backup_dir)
  dest_dir = File.join(backup_dir, Time.now.strftime("%Y%m%d%H%M%S"))
  FileUtils.mkdir(dest_dir)
  FileUtils.mv(dump_dir, dest_dir)
end

#binlog_pathObject



62
63
64
# File 'lib/flydata/sync_file_manager.rb', line 62

def binlog_path
  File.join(FLYDATA_HOME, @data_entry['name'] + ".binlog.pos")
end

#closeObject



12
13
14
15
# File 'lib/flydata/sync_file_manager.rb', line 12

def close
  @table_position_files.values.each {|f| f.close }
  @table_position_files = {}
end

#delete_dump_fileObject



192
193
194
# File 'lib/flydata/sync_file_manager.rb', line 192

def delete_dump_file
  FileUtils.rm(dump_file_path) if File.exists?(dump_file_path)
end

#delete_table_binlog_pos(table_name) ⇒ Object



163
164
165
166
167
168
169
170
# File 'lib/flydata/sync_file_manager.rb', line 163

def delete_table_binlog_pos(table_name)
  file = File.join(table_positions_dir_path, table_name + ".binlog.pos")
  if File.exists?(file)
    FileUtils.rm(file, :force => true)
  else
    puts "#{file} does not exist. Something is wrong. Did you delete the file manually when flydata was running?"
  end
end

#dump_file_pathObject



17
18
19
# File 'lib/flydata/sync_file_manager.rb', line 17

def dump_file_path
  File.join(dump_dir, @data_entry['name']) + ".dump"
end

#dump_pos_pathObject

dump pos file for resume



22
23
24
# File 'lib/flydata/sync_file_manager.rb', line 22

def dump_pos_path
  dump_file_path + ".pos"
end

#get_table_binlog_pos(table_name) ⇒ Object



126
127
128
129
130
# File 'lib/flydata/sync_file_manager.rb', line 126

def get_table_binlog_pos(table_name)
  file = File.join(table_positions_dir_path, table_name + ".binlog.pos")
  return nil unless File.exists?(file)
  File.open(file, 'r').readline
end

#increment_and_save_table_position(table_name) {|seq| ... } ⇒ Object

Read a sequence number from the table’s position file, increment the number and pass the number to a block. After executing the block, saves the value to the position file.

Yields:

  • (seq)


86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
# File 'lib/flydata/sync_file_manager.rb', line 86

def increment_and_save_table_position(table_name)
  file = File.join(table_positions_dir_path, table_name + ".pos")
  retry_count = 0
  begin
    @table_position_files[table_name] ||= File.open(file, "r+")
  rescue Errno::ENOENT
    raise if retry_count > 0 # Already retried.  Must be a differentfile causing the error
    # File not exist.  Create one with initial value of '0'
    File.open(file, "w") {|f| f.write('0') }
    retry_count += 1
    retry
  end
  f = @table_position_files[table_name]
  seq = f.read
  seq = seq.to_i + 1
  yield(seq)
  f.rewind
  f.truncate(0)
  f.write(seq)
  f.flush
  f.rewind
end

#increment_table_rev(table_name, base_rev) ⇒ Object



154
155
156
157
158
159
160
161
# File 'lib/flydata/sync_file_manager.rb', line 154

def increment_table_rev(table_name, base_rev)
  file = table_rev_file_path(table_name)
  new_rev = base_rev + 1
  File.open(file, "w") do |f|
    f.write(new_rev)
  end
  new_rev
end

#load_dump_posObject



32
33
34
35
36
37
38
39
40
41
# File 'lib/flydata/sync_file_manager.rb', line 32

def load_dump_pos
  path = dump_pos_path
  return {} unless File.exists?(path)
  items = File.open(path, 'r').readline.split("\t")
  raise "Invalid dump.pos file: #{path}" unless items.length >= 5 && items.length <= 7
  mysql_table = load_mysql_table_marshal_dump
  { status: items[0], table_name: items[1], last_pos: items[2].to_i,
    binlog_pos: {binfile: items[3], pos: items[4].to_i},
    state: items[5], substate: items[6], mysql_table: mysql_table}
end

#load_sync_infoObject



119
120
121
122
123
124
# File 'lib/flydata/sync_file_manager.rb', line 119

def load_sync_info
  return nil unless File.exists?(sync_info_file)
  items = File.open(sync_info_file, 'r').readline.split("\t")
  { initial_sync: (items[0] == 'true'),
    tables: items[1] }
end

#move_table_binlog_files(tables) ⇒ Object



181
182
183
184
185
186
187
188
189
190
# File 'lib/flydata/sync_file_manager.rb', line 181

def move_table_binlog_files(tables)
  FileUtils.mkdir_p(table_positions_dir_path) unless Dir.exists?(table_positions_dir_path)
  tables.each do |table_name|
    file = File.join(dump_dir, table_name + ".binlog.pos")
    if ! File.exists?(file)
      raise "#{file} does not exist. Error!!"
    end
    FileUtils.mv(file, table_positions_dir_path)
  end
end

#mysql_table_marshal_dump_pathObject

MysqlTable marshal file



44
45
46
# File 'lib/flydata/sync_file_manager.rb', line 44

def mysql_table_marshal_dump_path
  dump_file_path + ".mysql_table"
end

#reset_table_position_files(tables) ⇒ Object



66
67
68
69
70
71
# File 'lib/flydata/sync_file_manager.rb', line 66

def reset_table_position_files(tables)
  tables.each do |table_name|
    file = File.join(table_positions_dir_path, table_name + ".pos")
    File.open(file, "w") {|f| f.write('0') }
  end
end

#save_binlog(binlog_pos) ⇒ Object

binlog.pos file



55
56
57
58
59
60
# File 'lib/flydata/sync_file_manager.rb', line 55

def save_binlog(binlog_pos)
  path = binlog_path
  File.open(path, 'w') do |f|
    f.write(binlog_content(binlog_pos))
  end
end

#save_dump_pos(status, table_name, last_pos, binlog_pos, state = nil, substate = nil) ⇒ Object



26
27
28
29
30
# File 'lib/flydata/sync_file_manager.rb', line 26

def save_dump_pos(status, table_name, last_pos, binlog_pos, state = nil, substate = nil)
  File.open(dump_pos_path, 'w') do |f|
    f.write(dump_pos_content(status, table_name, last_pos, binlog_pos, state, substate))
  end
end

#save_mysql_table_marshal_dump(mysql_table) ⇒ Object



48
49
50
51
52
# File 'lib/flydata/sync_file_manager.rb', line 48

def save_mysql_table_marshal_dump(mysql_table)
  File.open(mysql_table_marshal_dump_path, 'w') do |f|
    f.write Marshal.dump(mysql_table)
  end
end

#save_sync_info(initial_sync, tables) ⇒ Object



113
114
115
116
117
# File 'lib/flydata/sync_file_manager.rb', line 113

def save_sync_info(initial_sync, tables)
  File.open(sync_info_file, "w") do |f|
    f.write([initial_sync, tables].join("\t"))
  end
end

#save_table_binlog_pos(tables, binlog_pos) ⇒ Object



172
173
174
175
176
177
178
179
# File 'lib/flydata/sync_file_manager.rb', line 172

def save_table_binlog_pos(tables, binlog_pos)
  tables.split(" ").each do |table_name|
    file = File.join(dump_dir, table_name + ".binlog.pos")
    File.open(file, "w") do |f|
      f.write(binlog_content(binlog_pos))
    end
  end
end

#sync_info_fileObject



109
110
111
# File 'lib/flydata/sync_file_manager.rb', line 109

def sync_info_file
  File.join(dump_dir, "sync.info")
end

#table_position_file_paths(*tables) ⇒ Object



77
78
79
80
# File 'lib/flydata/sync_file_manager.rb', line 77

def table_position_file_paths(*tables)
  tables.empty? ? Dir.glob(File.join(table_positions_dir_path, '*.pos')) :
    tables.map{|table| File.join(table_positions_dir_path, table + '.pos')}
end

#table_positions_dir_pathObject



73
74
75
# File 'lib/flydata/sync_file_manager.rb', line 73

def table_positions_dir_path
  TABLE_POSITIONS_DIR
end

#table_rev(table_name) ⇒ Object



141
142
143
144
145
146
147
148
149
150
151
152
# File 'lib/flydata/sync_file_manager.rb', line 141

def table_rev(table_name)
  file = table_rev_file_path(table_name)
  return 1 unless File.exists?(file) #default revision is 1
  File.open(file, "r+") do |f|
    seq = f.read
    if seq.empty?
      return 1
    else
      return seq.to_i
    end
  end
end

#table_rev_file_path(table_name) ⇒ Object



132
133
134
# File 'lib/flydata/sync_file_manager.rb', line 132

def table_rev_file_path(table_name)
  File.join(table_positions_dir_path, table_name + ".rev")
end

#table_rev_file_paths(*tables) ⇒ Object



136
137
138
139
# File 'lib/flydata/sync_file_manager.rb', line 136

def table_rev_file_paths(*tables)
  tables.empty? ? Dir.glob(File.join(table_positions_dir_path, "*.rev")) :
    tables.map{|table| table_rev_file_path(table)}
end