Class: BackupTool
- Inherits:
-
Object
- Object
- BackupTool
- Defined in:
- lib/backuptool.rb
Instance Method Summary collapse
-
#buffered_download(remote, local) ⇒ Object
Download a file from HDFS, buffered way * Args : -
remote-> HDFS path -local-> local path. -
#cleanup(days) ⇒ Object
Cleans up backups that are older than a number of days.
-
#create_backup_flag(date) ⇒ Object
Method that creates a backup flag to signal that the backup is finished on all nodes This is an individual command that has to be called manually after snapshots have finished.
- #delete_snapshots(node: @cassandra.node_name, date: 'ALL') ⇒ Object
- #get_backup_flags ⇒ Object
- #get_snapshot_metadata(node, date) ⇒ Object
- #get_snapshots_node(node, date) ⇒ Object
-
#initialize(cassandra, hadoop, logger) ⇒ BackupTool
constructor
Create a new BackupTool instance * Args : -
cassandra-> Cassandra instance -hadoop-> HDFS instance -logger-> Logger. - #list_snapshots(node: @cassandra.node_name) ⇒ Object
- #new_snapshot ⇒ Object
-
#restore_snapshot(node, date, destination) ⇒ Object
Restore a snapshot from HDFS * Args : -
node-> node where the snapshot comes from -date-> snapshot date -destination-> local directory where to restore. -
#search_snapshots(node: 'ALL', date: 'ALL') ⇒ Object
Look for snapshots * Args : -
node-> Cassandra node name -date-> HDFS instance.
Constructor Details
#initialize(cassandra, hadoop, logger) ⇒ BackupTool
Create a new BackupTool instance
-
Args :
-
cassandra-> Cassandra instance -
hadoop-> HDFS instance -
logger-> Logger
-
16 17 18 19 20 21 22 |
# File 'lib/backuptool.rb', line 16 def initialize(cassandra, hadoop, logger) @cassandra = cassandra @hadoop = hadoop @logger = logger @metadir = META_DIR end |
Instance Method Details
#buffered_download(remote, local) ⇒ Object
Download a file from HDFS, buffered way
-
Args :
-
remote-> HDFS path -
local-> local path
-
199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 |
# File 'lib/backuptool.rb', line 199 def buffered_download(remote, local) @logger.debug("#{remote} => #{local}") # Create the destination directory if not exists path = File.dirname(local) FileUtils.mkdir_p(path) unless File.exist?(path) file = open(local, 'wb') offset = 0 length = BUFFER_SIZE print '[' while length == BUFFER_SIZE print '#' content = @hadoop.read(remote, offset: offset, length: BUFFER_SIZE) file.write(content) length = content.length offset += length end print "]\n" file.close end |
#cleanup(days) ⇒ Object
Cleans up backups that are older than a number of days. This functions cleans data on all nodes.
148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 |
# File 'lib/backuptool.rb', line 148 def cleanup(days) retention_date = Date.today - days @logger.info("Cleaning backup data on all nodes before #{retention_date}.") all_snapshots = search_snapshots @logger.info("A total of #{all_snapshots.size} snapshots were found on Hadoop server.") snapshots_to_be_deleted = all_snapshots.select { |snapshot| snapshot.get_date < retention_date } @logger.info("A total of #{snapshots_to_be_deleted.size} snapshots will be deleted.") snapshots_to_be_deleted.each do |snapshot| delete_snapshots(node: snapshot.node, date: snapshot.date) end all_backup_flags = get_backup_flags @logger.info("A total of #{all_backup_flags.size} back up flags were found on Hadoop server.") backup_flags_to_be_delete = all_backup_flags.select { |flag| flag.date < retention_date } @logger.info("A total of #{backup_flags_to_be_delete.size} backup flags will be deleted.") backup_flags_location = @hadoop.base_dir + '/' + @metadir + '/' + @cassandra.cluster_name backup_flags_to_be_delete.each do |flag| file = backup_flags_location + '/' + flag.file @logger.info("Deleting #{file}") @hadoop.delete(file) end end |
#create_backup_flag(date) ⇒ Object
Method that creates a backup flag to signal that the backup is finished on all nodes This is an individual command that has to be called manually after snapshots have finished
178 179 180 181 182 183 184 |
# File 'lib/backuptool.rb', line 178 def create_backup_flag(date) file_name = 'BACKUP_COMPLETED_' + date remote_file = @hadoop.base_dir + '/' + @metadir + '/' + @cassandra.cluster_name + '/' + file_name @logger.info('Setting backup completed flag : ' + remote_file) @hadoop.create(remote_file, '', overwrite: true) end |
#delete_snapshots(node: @cassandra.node_name, date: 'ALL') ⇒ Object
118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 |
# File 'lib/backuptool.rb', line 118 def delete_snapshots(node: @cassandra.node_name, date: 'ALL') snapshots = search_snapshots(node: node, date: date) if snapshots.empty? raise('No snapshot found for deletion') else snapshots.each do |snapshot| @logger.info("Deleting snapshot #{snapshot}") node_snapshots = search_snapshots(node: snapshot.node) = Set.new node_snapshots.each do |s| += s. if s != snapshot end files = snapshot. - @logger.info("#{files.length} files to delete") files.each do |file| @logger.info("Deleting file #{file}") remote = @hadoop.base_dir + '/' + snapshot.cluster + '/' + snapshot.node + '/' + file @logger.debug("DELETE => #{remote}") @hadoop.delete(remote) end @logger.info('Deleting metadata in Hadoop') remote = @hadoop.base_dir + '/' + @metadir + '/' + snapshot.cluster + '/' + snapshot.node + '/cass_snap_' + snapshot.date @logger.debug("DELETE => #{remote}") @hadoop.delete(remote) end end end |
#get_backup_flags ⇒ Object
186 187 188 189 190 191 192 193 |
# File 'lib/backuptool.rb', line 186 def get_backup_flags backup_flags_location = @hadoop.base_dir + '/' + @metadir + '/' + @cassandra.cluster_name ls = @hadoop.list(backup_flags_location) backup_flags = ls.select { |item| item['pathSuffix'].include? 'BACKUP_COMPLETED_' } backup_flags.collect do |file| BackupFlag.new(@cassandra.cluster_name, file['pathSuffix']) end end |
#get_snapshot_metadata(node, date) ⇒ Object
31 32 33 34 35 36 |
# File 'lib/backuptool.rb', line 31 def (node, date) remote = @hadoop.base_dir + '/' + @metadir + '/' + @cassandra.cluster_name + '/' + node + '/cass_snap_' + date return @hadoop.read(remote).split("\n").to_set rescue Exception => e raise("Could not read metadata : #{e.}") end |
#get_snapshots_node(node, date) ⇒ Object
38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 |
# File 'lib/backuptool.rb', line 38 def get_snapshots_node(node, date) result = [] begin if date == 'ALL' ls = @hadoop.list("#{@hadoop.base_dir}/#{@metadir}/#{@cassandra.cluster_name}/#{node}") = ls.select { |item| item['pathSuffix'].include? 'cass_snap_' } .each do |item| date = item['pathSuffix'].gsub('cass_snap_', '') = (node, date) snapshot = CassandraSnapshot.new(@cassandra.cluster_name, node, date, ) result.push(snapshot) end else = (node, date) snapshot = CassandraSnapshot.new(@cassandra.cluster_name, node, date, ) result.push(snapshot) end rescue Exception => e @logger.warn("Could not get snapshots for node #{node} : #{e.}") end result end |
#list_snapshots(node: @cassandra.node_name) ⇒ Object
79 80 81 82 83 |
# File 'lib/backuptool.rb', line 79 def list_snapshots(node: @cassandra.node_name) @logger.info('Listing available snapshots') snapshots = search_snapshots(node: node) tp(snapshots, 'cluster', 'node', 'date') end |
#new_snapshot ⇒ Object
85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 |
# File 'lib/backuptool.rb', line 85 def new_snapshot @logger.info('Starting a new snapshot') snapshot = @cassandra.new_snapshot existing = search_snapshots(node: snapshot.node) last = if existing.empty? CassandraSnapshot.new(snapshot.cluster, snapshot.node, 'never') else existing[-1] end @logger.info('Uploading tables to Hadoop') files = snapshot. - last. @logger.info("#{files.length} files to upload") files.each do |file| @logger.info("Sending file #{file} to Hadoop") local = @cassandra.data_path + '/' + file remote = @hadoop.base_dir + '/' + snapshot.cluster + '/' + snapshot.node + '/' + file @logger.debug("#{local} => #{remote}") f = File.open(local, 'r') @hadoop.create(remote, f, overwrite: true) f.close end @logger.info('Sending metadata to Hadoop') remote = @hadoop.base_dir + '/' + @metadir + '/' + snapshot.cluster + '/' + snapshot.node + '/cass_snap_' + snapshot.date @logger.debug("metadata => #{remote}") @hadoop.create(remote, snapshot..to_a * "\n", overwrite: true) @cassandra.delete_snapshot(snapshot) @logger.info('Success !') end |
#restore_snapshot(node, date, destination) ⇒ Object
Restore a snapshot from HDFS
-
Args :
-
node-> node where the snapshot comes from -
date-> snapshot date -
destination-> local directory where to restore
-
228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 |
# File 'lib/backuptool.rb', line 228 def restore_snapshot(node, date, destination) # Search the snapshot matching node and date snapshots = search_snapshots(node: node, date: date) if snapshots.empty? raise('No snapshot found for restore') elsif snapshots.length > 1 raise('More than one candidate snapshot to restore') else snapshot = snapshots[0] @logger.info("Restoring snapshot #{snapshot}") @logger.info("#{snapshot..length} files to restore") # For each file in metadata snapshot..each do |file| @logger.info("Restoring file #{file}") local = destination + '/' + file remote = @hadoop.base_dir + '/' + snapshot.cluster + '/' + snapshot.node + '/' + file # Download the file from hdfs buffered_download(remote, local) end @logger.info('Success !') end end |
#search_snapshots(node: 'ALL', date: 'ALL') ⇒ Object
Look for snapshots
-
Args :
-
node-> Cassandra node name -
date-> HDFS instance
-
28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 |
# File 'lib/backuptool.rb', line 28 def search_snapshots(node: 'ALL', date: 'ALL') result = [] def (node, date) remote = @hadoop.base_dir + '/' + @metadir + '/' + @cassandra.cluster_name + '/' + node + '/cass_snap_' + date return @hadoop.read(remote).split("\n").to_set rescue Exception => e raise("Could not read metadata : #{e.}") end def get_snapshots_node(node, date) result = [] begin if date == 'ALL' ls = @hadoop.list("#{@hadoop.base_dir}/#{@metadir}/#{@cassandra.cluster_name}/#{node}") = ls.select { |item| item['pathSuffix'].include? 'cass_snap_' } .each do |item| date = item['pathSuffix'].gsub('cass_snap_', '') = (node, date) snapshot = CassandraSnapshot.new(@cassandra.cluster_name, node, date, ) result.push(snapshot) end else = (node, date) snapshot = CassandraSnapshot.new(@cassandra.cluster_name, node, date, ) result.push(snapshot) end rescue Exception => e @logger.warn("Could not get snapshots for node #{node} : #{e.}") end result end if node == 'ALL' begin ls = @hadoop.list("#{@hadoop.base_dir}/#{@metadir}/#{@cassandra.cluster_name}") ls_nodes = ls.select { |item| item['type'].casecmp('DIRECTORY') == 0 } ls_nodes.each do |item| n = item['pathSuffix'] result += get_snapshots_node(n, date) end rescue Exception => e @logger.warn("Could not get snapshots for cluster #{@cassandra.cluster_name} : #{e.}") end else result = get_snapshots_node(node, date) end result.sort end |