Class: ContentServer::FileCopyClient

Inherits:
Object
  • Object
show all
Defined in:
lib/content_server/queue_copy.rb

Overview

class QueueCopy

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(host, port) ⇒ FileCopyClient

Returns a new instance of FileCopyClient.



237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
# File 'lib/content_server/queue_copy.rb', line 237

def initialize(host, port)
  @local_queue = Queue.new
  @tcp_client = Networking::TCPClient.new(host, port, method(:handle_message))
  @file_receiver = FileReceiver.new(method(:done_copy),
                                    method(:abort_copy),
                                    method(:reset_copy))
  @local_thread = Thread.new do
    loop do
      pop_data = @local_queue.pop
      $process_vars.set('File Copy Client queue', @local_queue.size)
      handle(pop_data)
    end
  end
  @local_thread.abort_on_exception = true
  Log.debug3("initialize FileCopyClient  host:#{host}  port:#{port}")
end

Class Method Details

.destination_filename(folder, sha1) ⇒ Object

Creates destination filename for backup server, input is base folder and sha1. for example: folder:/mnt/hd1/bbbackup, sha1:d0be2dc421be4fcd0172e5afceea3970e2f3d940 dest filename: /mnt/hd1/bbbackup/d0/be/2d/d0be2dc421be4fcd0172e5afceea3970e2f3d940



320
321
322
# File 'lib/content_server/queue_copy.rb', line 320

def self.destination_filename(folder, sha1)
  File.join(folder, sha1[0,2], sha1[2,2], sha1)
end

Instance Method Details

#abort_copy(checksum) ⇒ Object



264
265
266
# File 'lib/content_server/queue_copy.rb', line 264

def abort_copy(checksum)
  handle_message([:ABORT_COPY, checksum])
end

#done_copy(local_file_checksum, local_path) ⇒ Object



272
273
274
275
# File 'lib/content_server/queue_copy.rb', line 272

def done_copy(local_file_checksum, local_path)
  $process_vars.inc('num_files_received')
  Log.debug1("Done copy file: #{local_path}, #{local_file_checksum}")
end

#handle(message) ⇒ Object

This is a function which receives the messages (file or ack) and return answer in case of ack. Note that it is being executed from the class thread only!



285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
# File 'lib/content_server/queue_copy.rb', line 285

def handle(message)
  message_type, message_content = message
  Log.debug1("backup copy message: Type #{message_type}.  message: #{message_content}")
  if message_type == :SEND_COPY_MESSAGE
    bytes_written = @tcp_client.send_obj([:COPY_MESSAGE, message_content])
    Log.debug2("Sending copy message succeeded? bytes_written: #{bytes_written}.")
  elsif message_type == :COPY_CHUNK
    if @file_receiver.receive_chunk(*message_content)
      file_checksum, offset, file_size, content, content_checksum = message_content
      @tcp_client.send_obj([:COPY_CHUNK_FROM_REMOTE, file_checksum])
    else
      file_checksum, offset, file_size, content, content_checksum = message_content
      Log.error("receive_chunk failed for chunk checksum:#{content_checksum}")
    end
  elsif message_type == :ACK_MESSAGE
    checksum, timestamp = message_content
    # check if checksum exists in final destination
    dest_path = FileReceiver.destination_filename(Params['backup_destination_folder'][0]['path'], checksum)
    need_to_copy = !File.exists?(dest_path)
    Log.debug1("Returning ack for content:'#{checksum}' timestamp:'#{timestamp}' Ack:'#{need_to_copy}'")
    @tcp_client.send_obj([:ACK_MESSAGE, [timestamp,
                                         need_to_copy,
                                         checksum]])
  elsif message_type == :ABORT_COPY
    @tcp_client.send_obj([:ABORT_COPY, message_content])
  elsif message_type == :RESET_RESUME_COPY
    @tcp_client.send_obj([:RESET_RESUME_COPY, message_content])
  else
    Log.error("Unexpected message type: #{message_type}")
  end
end

#handle_message(message) ⇒ Object



277
278
279
280
281
# File 'lib/content_server/queue_copy.rb', line 277

def handle_message(message)
  Log.debug3('QueueFileReceiver handle message')
  @local_queue.push(message)
  $process_vars.set('File Copy Client queue', @local_queue.size)
end

#request_copy(content_data) ⇒ Object



260
261
262
# File 'lib/content_server/queue_copy.rb', line 260

def request_copy(content_data)
  handle_message([:SEND_COPY_MESSAGE, content_data])
end

#reset_copy(checksum, new_offset) ⇒ Object



268
269
270
# File 'lib/content_server/queue_copy.rb', line 268

def reset_copy(checksum, new_offset)
  handle_message([:RESET_RESUME_COPY, [checksum, new_offset]])
end

#threadsObject



254
255
256
257
258
# File 'lib/content_server/queue_copy.rb', line 254

def threads
  ret = [@local_thread]
  ret << @tcp_client.tcp_thread if @tcp_client != nil
  return ret
end