Class: ContentServer::FileCopyClient
- Inherits:
-
Object
- Object
- ContentServer::FileCopyClient
- Defined in:
- lib/content_server/queue_copy.rb
Overview
class QueueCopy
Class Method Summary collapse
-
.destination_filename(folder, sha1) ⇒ Object
Creates destination filename for backup server, input is base folder and sha1.
Instance Method Summary collapse
- #abort_copy(checksum) ⇒ Object
- #done_copy(local_file_checksum, local_path) ⇒ Object
-
#handle(message) ⇒ Object
This is a function which receives the messages (file or ack) and return answer in case of ack.
- #handle_message(message) ⇒ Object
-
#initialize(host, port) ⇒ FileCopyClient
constructor
A new instance of FileCopyClient.
- #request_copy(content_data) ⇒ Object
- #reset_copy(checksum, new_offset) ⇒ Object
- #threads ⇒ Object
Constructor Details
#initialize(host, port) ⇒ FileCopyClient
Returns a new instance of FileCopyClient.
237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 |
# File 'lib/content_server/queue_copy.rb', line 237 def initialize(host, port) @local_queue = Queue.new @tcp_client = Networking::TCPClient.new(host, port, method(:handle_message)) @file_receiver = FileReceiver.new(method(:done_copy), method(:abort_copy), method(:reset_copy)) @local_thread = Thread.new do loop do pop_data = @local_queue.pop $process_vars.set('File Copy Client queue', @local_queue.size) handle(pop_data) end end @local_thread.abort_on_exception = true Log.debug3("initialize FileCopyClient host:#{host} port:#{port}") end |
Class Method Details
.destination_filename(folder, sha1) ⇒ Object
Creates destination filename for backup server, input is base folder and sha1. for example: folder:/mnt/hd1/bbbackup, sha1:d0be2dc421be4fcd0172e5afceea3970e2f3d940 dest filename: /mnt/hd1/bbbackup/d0/be/2d/d0be2dc421be4fcd0172e5afceea3970e2f3d940
320 321 322 |
# File 'lib/content_server/queue_copy.rb', line 320 def self.destination_filename(folder, sha1) File.join(folder, sha1[0,2], sha1[2,2], sha1) end |
Instance Method Details
#abort_copy(checksum) ⇒ Object
264 265 266 |
# File 'lib/content_server/queue_copy.rb', line 264 def abort_copy(checksum) ([:ABORT_COPY, checksum]) end |
#done_copy(local_file_checksum, local_path) ⇒ Object
272 273 274 275 |
# File 'lib/content_server/queue_copy.rb', line 272 def done_copy(local_file_checksum, local_path) $process_vars.inc('num_files_received') Log.debug1("Done copy file: #{local_path}, #{local_file_checksum}") end |
#handle(message) ⇒ Object
This is a function which receives the messages (file or ack) and return answer in case of ack. Note that it is being executed from the class thread only!
285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 |
# File 'lib/content_server/queue_copy.rb', line 285 def handle() , = Log.debug1("backup copy message: Type #{}. message: #{}") if == :SEND_COPY_MESSAGE bytes_written = @tcp_client.send_obj([:COPY_MESSAGE, ]) Log.debug2("Sending copy message succeeded? bytes_written: #{bytes_written}.") elsif == :COPY_CHUNK if @file_receiver.receive_chunk(*) file_checksum, offset, file_size, content, content_checksum = @tcp_client.send_obj([:COPY_CHUNK_FROM_REMOTE, file_checksum]) else file_checksum, offset, file_size, content, content_checksum = Log.error("receive_chunk failed for chunk checksum:#{content_checksum}") end elsif == :ACK_MESSAGE checksum, = # check if checksum exists in final destination dest_path = FileReceiver.destination_filename(Params['backup_destination_folder'][0]['path'], checksum) need_to_copy = !File.exists?(dest_path) Log.debug1("Returning ack for content:'#{checksum}' timestamp:'#{}' Ack:'#{need_to_copy}'") @tcp_client.send_obj([:ACK_MESSAGE, [, need_to_copy, checksum]]) elsif == :ABORT_COPY @tcp_client.send_obj([:ABORT_COPY, ]) elsif == :RESET_RESUME_COPY @tcp_client.send_obj([:RESET_RESUME_COPY, ]) else Log.error("Unexpected message type: #{}") end end |
#handle_message(message) ⇒ Object
277 278 279 280 281 |
# File 'lib/content_server/queue_copy.rb', line 277 def () Log.debug3('QueueFileReceiver handle message') @local_queue.push() $process_vars.set('File Copy Client queue', @local_queue.size) end |
#request_copy(content_data) ⇒ Object
260 261 262 |
# File 'lib/content_server/queue_copy.rb', line 260 def request_copy(content_data) ([:SEND_COPY_MESSAGE, content_data]) end |
#reset_copy(checksum, new_offset) ⇒ Object
268 269 270 |
# File 'lib/content_server/queue_copy.rb', line 268 def reset_copy(checksum, new_offset) ([:RESET_RESUME_COPY, [checksum, new_offset]]) end |
#threads ⇒ Object
254 255 256 257 258 |
# File 'lib/content_server/queue_copy.rb', line 254 def threads ret = [@local_thread] ret << @tcp_client.tcp_thread if @tcp_client != nil return ret end |