Class: BackupTool
- Inherits:
-
Object
- Object
- BackupTool
- Defined in:
- lib/backuptool.rb
Instance Method Summary collapse
-
#buffered_download(remote, local) ⇒ Object
Download a file from HDFS, buffered way * Args : -
remote-> HDFS path -local-> local path. -
#cleanup(days) ⇒ Object
Cleans up backups that are older than a number of days.
-
#create_backup_flag(date) ⇒ Object
Method that creates a backup flag to signal that the backup is finished on all nodes This is an individual command that has to be called manually after snapshots have finished.
- #delete_snapshots(node: @cassandra.node_name, date: 'ALL') ⇒ Object
- #get_backup_flags ⇒ Object
- #get_snapshot_metadata(node, date) ⇒ Object
- #get_snapshots_node(node, date) ⇒ Object
-
#initialize(cassandra, hadoop, logger) ⇒ BackupTool
constructor
Create a new BackupTool instance * Args : -
cassandra-> Cassandra instance -hadoop-> HDFS instance -logger-> Logger. - #list_snapshots(node: @cassandra.node_name) ⇒ Object
- #new_snapshot ⇒ Object
-
#restore_snapshot(node, date, destination, keyspace: 'ALL', table: 'ALL') ⇒ Object
Restore a snapshot from HDFS * Args : -
node-> node where the snapshot comes from -date-> snapshot date -destination-> local directory where to restore. -
#search_snapshots(node: 'ALL', date: 'ALL') ⇒ Object
Look for snapshots * Args : -
node-> Cassandra node name -date-> HDFS instance.
Constructor Details
#initialize(cassandra, hadoop, logger) ⇒ BackupTool
Create a new BackupTool instance
-
Args :
-
cassandra-> Cassandra instance -
hadoop-> HDFS instance -
logger-> Logger
-
17 18 19 20 21 22 23 |
# File 'lib/backuptool.rb', line 17 def initialize(cassandra, hadoop, logger) @cassandra = cassandra @hadoop = hadoop @logger = logger = META_DIR end |
Instance Method Details
#buffered_download(remote, local) ⇒ Object
Download a file from HDFS, buffered way
-
Args :
-
remote-> HDFS path -
local-> local path
-
226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 |
# File 'lib/backuptool.rb', line 226 def buffered_download(remote, local) @logger.debug("#{remote} => #{local}") # Create the destination directory if not exists path = File.dirname(local) FileUtils.mkdir_p(path) unless File.exist?(path) file = open(local, 'wb') offset = 0 length = BUFFER_SIZE print '[' while length == BUFFER_SIZE print '#' content = @hadoop.read(remote, offset: offset, length: BUFFER_SIZE) file.write(content) length = content.length offset += length end print "]\n" file.close end |
#cleanup(days) ⇒ Object
Cleans up backups that are older than a number of days. This functions cleans data on all nodes.
175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 |
# File 'lib/backuptool.rb', line 175 def cleanup(days) retention_date = Date.today - days @logger.info("Cleaning backup data on all nodes before #{retention_date}.") all_snapshots = search_snapshots @logger.info("A total of #{all_snapshots.size} snapshots were found on Hadoop server.") snapshots_to_be_deleted = all_snapshots.select { |snapshot| snapshot.get_date < retention_date } @logger.info("A total of #{snapshots_to_be_deleted.size} snapshots will be deleted.") snapshots_to_be_deleted.each do |snapshot| delete_snapshots(node: snapshot.node, date: snapshot.date) end all_backup_flags = get_backup_flags @logger.info("A total of #{all_backup_flags.size} back up flags were found on Hadoop server.") backup_flags_to_be_delete = all_backup_flags.select { |flag| flag.date < retention_date } @logger.info("A total of #{backup_flags_to_be_delete.size} backup flags will be deleted.") backup_flags_location = @hadoop.base_dir + '/' + + '/' + @cassandra.cluster_name backup_flags_to_be_delete.each do |flag| file = backup_flags_location + '/' + flag.file @logger.info("Deleting #{file}") @hadoop.delete(file) end end |
#create_backup_flag(date) ⇒ Object
Method that creates a backup flag to signal that the backup is finished on all nodes This is an individual command that has to be called manually after snapshots have finished
205 206 207 208 209 210 211 |
# File 'lib/backuptool.rb', line 205 def create_backup_flag(date) file_name = 'BACKUP_COMPLETED_' + date remote_file = @hadoop.base_dir + '/' + + '/' + @cassandra.cluster_name + '/' + file_name @logger.info('Setting backup completed flag : ' + remote_file) @hadoop.create(remote_file, '', overwrite: true) end |
#delete_snapshots(node: @cassandra.node_name, date: 'ALL') ⇒ Object
145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 |
# File 'lib/backuptool.rb', line 145 def delete_snapshots(node: @cassandra.node_name, date: 'ALL') snapshots = search_snapshots(node: node, date: date) if snapshots.empty? raise('No snapshot found for deletion') else snapshots.each do |snapshot| @logger.info("Deleting snapshot #{snapshot}") node_snapshots = search_snapshots(node: snapshot.node) = Set.new node_snapshots.each do |s| += s. if s != snapshot end files = snapshot. - @logger.info("#{files.length} files to delete") files.each do |file| @logger.info("Deleting file #{file}") remote = @hadoop.base_dir + '/' + snapshot.cluster + '/' + snapshot.node + '/' + file @logger.debug("DELETE => #{remote}") @hadoop.delete(remote) end @logger.info('Deleting metadata in Hadoop') remote = @hadoop.base_dir + '/' + + '/' + snapshot.cluster + '/' + snapshot.node + '/cass_snap_' + snapshot.date @logger.debug("DELETE => #{remote}") @hadoop.delete(remote) end end end |
#get_backup_flags ⇒ Object
213 214 215 216 217 218 219 220 |
# File 'lib/backuptool.rb', line 213 def get_backup_flags backup_flags_location = @hadoop.base_dir + '/' + + '/' + @cassandra.cluster_name ls = @hadoop.list(backup_flags_location) backup_flags = ls.select { |item| item['pathSuffix'].include? 'BACKUP_COMPLETED_' } backup_flags.collect do |file| BackupFlag.new(@cassandra.cluster_name, file['pathSuffix']) end end |
#get_snapshot_metadata(node, date) ⇒ Object
32 33 34 35 36 37 |
# File 'lib/backuptool.rb', line 32 def (node, date) remote = @hadoop.base_dir + '/' + + '/' + @cassandra.cluster_name + '/' + node + '/cass_snap_' + date return @hadoop.read(remote).split("\n").to_set rescue Exception => e raise("Could not read metadata : #{e.message}") end |
#get_snapshots_node(node, date) ⇒ Object
39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 |
# File 'lib/backuptool.rb', line 39 def get_snapshots_node(node, date) result = [] begin if date == 'ALL' ls = @hadoop.list("#{@hadoop.base_dir}/#{@metadir}/#{@cassandra.cluster_name}/#{node}") = ls.select { |item| item['pathSuffix'].include? 'cass_snap_' } .each do |item| date = item['pathSuffix'].gsub('cass_snap_', '') = (node, date) snapshot = CassandraSnapshot.new(@cassandra.cluster_name, node, date, ) result.push(snapshot) end else = (node, date) snapshot = CassandraSnapshot.new(@cassandra.cluster_name, node, date, ) result.push(snapshot) end rescue Exception => e @logger.warn("Could not get snapshots for node #{node} : #{e.message}") end result end |
#list_snapshots(node: @cassandra.node_name) ⇒ Object
80 81 82 83 84 |
# File 'lib/backuptool.rb', line 80 def list_snapshots(node: @cassandra.node_name) @logger.info('Listing available snapshots') snapshots = search_snapshots(node: node) tp(snapshots, 'cluster', 'node', 'date') end |
#new_snapshot ⇒ Object
86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 |
# File 'lib/backuptool.rb', line 86 def new_snapshot @logger.info('Starting a new snapshot') snapshot = @cassandra.new_snapshot begin path = @hadoop.base_dir + '/' + snapshot.cluster + '/' + snapshot.node + '/' if not @hadoop.mkdir(path) raise("Could not create your cluster directory : #{path}") end rescue Exception => e raise("Could not create your cluster directory : #{e.message}") end existing = search_snapshots(node: snapshot.node) last = if existing.empty? CassandraSnapshot.new(snapshot.cluster, snapshot.node, 'never') else existing[-1] end @logger.info('Uploading tables to Hadoop') files = snapshot. - last. @logger.info("#{files.length} files to upload") index = 0 number_of_files = files.size total_file_size = 0 files.each do |file| index += 1 local = @cassandra.data_path + '/' + file local_file_size = File.size(local) total_file_size += local_file_size pretty_size = Filesize.from("#{local_file_size} B").pretty @logger.info("Sending file #{index}/#{number_of_files} #{file} having size #{pretty_size} to Hadoop") remote = @hadoop.base_dir + '/' + snapshot.cluster + '/' + snapshot.node + '/' + file @logger.debug("#{local} => #{remote}") f = File.open(local, 'r') begin retries = 3 @hadoop.create(remote, f, overwrite: true) rescue @logger.info("Hadoop write failed - retrying in 1s") sleep 1 retry if (retries -= 1) < 0 end f.close end total_file_size_pretty = Filesize.from("#{total_file_size} B").pretty @logger.info("Total size of uploaded files is #{total_file_size_pretty}") @logger.info('Sending metadata to Hadoop') remote = @hadoop.base_dir + '/' + + '/' + snapshot.cluster + '/' + snapshot.node + '/cass_snap_' + snapshot.date @logger.debug("metadata => #{remote}") @hadoop.create(remote, snapshot..to_a * "\n", overwrite: true) @cassandra.delete_snapshot(snapshot) @logger.info('Success !') end |
#restore_snapshot(node, date, destination, keyspace: 'ALL', table: 'ALL') ⇒ Object
Restore a snapshot from HDFS
-
Args :
-
node-> node where the snapshot comes from -
date-> snapshot date -
destination-> local directory where to restore
-
255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 |
# File 'lib/backuptool.rb', line 255 def restore_snapshot(node, date, destination, keyspace: 'ALL', table: 'ALL') # Search the snapshot matching node and date snapshots = search_snapshots(node: node, date: date) if snapshots.empty? raise('No snapshot found for restore') elsif snapshots.length > 1 raise('More than one candidate snapshot to restore') else snapshot = snapshots[0] @logger.info("Restoring snapshot #{snapshot}") @logger.info("Snapshot has #{snapshot.metadata.length} files") files_to_be_restored = snapshot..select { |item| filename = File.basename(item) matches_keyspace = keyspace == 'ALL' || (filename.include? keyspace) matches_table = table == 'ALL' || (filename.include? table) matches_keyspace && matches_table } @logger.info("Found #{files_to_be_restored.length} to be restored that match keyspace #{keyspace} and table #{table}") # For each file in the list files_to_be_restored.each do |file| @logger.info("Restoring file #{file}") local = destination + '/' + file remote = @hadoop.base_dir + '/' + snapshot.cluster + '/' + snapshot.node + '/' + file # Download the file from hdfs buffered_download(remote, local) end @logger.info('Success !') end end |
#search_snapshots(node: 'ALL', date: 'ALL') ⇒ Object
Look for snapshots
-
Args :
-
node-> Cassandra node name -
date-> HDFS instance
-
29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 |
# File 'lib/backuptool.rb', line 29 def search_snapshots(node: 'ALL', date: 'ALL') result = [] def (node, date) remote = @hadoop.base_dir + '/' + + '/' + @cassandra.cluster_name + '/' + node + '/cass_snap_' + date return @hadoop.read(remote).split("\n").to_set rescue Exception => e raise("Could not read metadata : #{e.message}") end def get_snapshots_node(node, date) result = [] begin if date == 'ALL' ls = @hadoop.list("#{@hadoop.base_dir}/#{@metadir}/#{@cassandra.cluster_name}/#{node}") = ls.select { |item| item['pathSuffix'].include? 'cass_snap_' } .each do |item| date = item['pathSuffix'].gsub('cass_snap_', '') = (node, date) snapshot = CassandraSnapshot.new(@cassandra.cluster_name, node, date, ) result.push(snapshot) end else = (node, date) snapshot = CassandraSnapshot.new(@cassandra.cluster_name, node, date, ) result.push(snapshot) end rescue Exception => e @logger.warn("Could not get snapshots for node #{node} : #{e.message}") end result end if node == 'ALL' begin ls = @hadoop.list("#{@hadoop.base_dir}/#{@metadir}/#{@cassandra.cluster_name}") ls_nodes = ls.select { |item| item['type'].casecmp('DIRECTORY') == 0 } ls_nodes.each do |item| n = item['pathSuffix'] result += get_snapshots_node(n, date) end rescue Exception => e @logger.warn("Could not get snapshots for cluster #{@cassandra.cluster_name} : #{e.message}") end else result = get_snapshots_node(node, date) end result.sort end |