Class: ContentServer::FileCopyClient

Inherits:
Object
  • Object
show all
Defined in:
lib/content_server/queue_copy.rb

Overview

class QueueCopy

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(host, port) ⇒ FileCopyClient

Returns a new instance of FileCopyClient.



239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
# File 'lib/content_server/queue_copy.rb', line 239

def initialize(host, port)
  @local_queue = Queue.new
  @tcp_client = Networking::TCPClient.new(host, port, method(:handle_message))
  @file_receiver = FileReceiver.new(method(:done_copy),
                                    method(:abort_copy),
                                    method(:reset_copy))
  @local_thread = Thread.new do
    loop do
      pop_data = @local_queue.pop
      $process_vars.set('File Copy Client queue', @local_queue.size)
      handle(pop_data)
    end
  end
  @local_thread.abort_on_exception = true
  Log.debug3("initialize FileCopyClient  host:%s  port:%s", host, port)
end

Class Method Details

.destination_filename(folder, sha1) ⇒ Object

Creates destination filename for backup server, input is base folder and sha1. for example: folder:/mnt/hd1/bbbackup, sha1:d0be2dc421be4fcd0172e5afceea3970e2f3d940 dest filename: /mnt/hd1/bbbackup/d0/be/2d/d0be2dc421be4fcd0172e5afceea3970e2f3d940



322
323
324
# File 'lib/content_server/queue_copy.rb', line 322

def self.destination_filename(folder, sha1)
  File.join(folder, sha1[0,2], sha1[2,2], sha1)
end

Instance Method Details

#abort_copy(checksum) ⇒ Object



266
267
268
# File 'lib/content_server/queue_copy.rb', line 266

def abort_copy(checksum)
  handle_message([:ABORT_COPY, checksum])
end

#done_copy(local_file_checksum, local_path) ⇒ Object



274
275
276
277
# File 'lib/content_server/queue_copy.rb', line 274

def done_copy(local_file_checksum, local_path)
  $process_vars.inc('num_files_received')
  Log.debug1("Done copy file: path %s, checksum %s", local_path, local_file_checksum)
end

#handle(message) ⇒ Object

This is a function which receives the messages (file or ack) and return answer in case of ack. Note that it is being executed from the class thread only!



287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
# File 'lib/content_server/queue_copy.rb', line 287

def handle(message)
  message_type, message_content = message
  Log.debug1("backup copy message: Type %s. message: %s", message_type, message_content)
  if message_type == :SEND_COPY_MESSAGE
    bytes_written = @tcp_client.send_obj([:COPY_MESSAGE, message_content])
    Log.debug2("Sending copy message succeeded? bytes_written: %s.", bytes_written)
  elsif message_type == :COPY_CHUNK
    if @file_receiver.receive_chunk(*message_content)
      file_checksum, offset, file_size, content, content_checksum = message_content
      @tcp_client.send_obj([:COPY_CHUNK_FROM_REMOTE, file_checksum])
    else
      file_checksum, offset, file_size, content, content_checksum = message_content
      Log.error("receive_chunk failed for chunk checksum:#{content_checksum}")
    end
  elsif message_type == :ACK_MESSAGE
    checksum, timestamp = message_content
    # check if checksum exists in final destination
    dest_path = FileReceiver.destination_filename(Params['backup_destination_folder'][0]['path'], checksum)
    need_to_copy = !File.exists?(dest_path)
    Log.debug1("Returning ack for content:'%s' timestamp:'%s' Ack:'%s'", checksum, timestamp, need_to_copy)
    @tcp_client.send_obj([:ACK_MESSAGE, [timestamp,
                                         need_to_copy,
                                         checksum]])
  elsif message_type == :ABORT_COPY
    @tcp_client.send_obj([:ABORT_COPY, message_content])
  elsif message_type == :RESET_RESUME_COPY
    @tcp_client.send_obj([:RESET_RESUME_COPY, message_content])
  else
    Log.error("Unexpected message type: #{message_type}")
  end
end

#handle_message(message) ⇒ Object



279
280
281
282
283
# File 'lib/content_server/queue_copy.rb', line 279

def handle_message(message)
  Log.debug3('QueueFileReceiver handle message')
  @local_queue.push(message)
  $process_vars.set('File Copy Client queue', @local_queue.size)
end

#request_copy(content_data) ⇒ Object



262
263
264
# File 'lib/content_server/queue_copy.rb', line 262

def request_copy(content_data)
  handle_message([:SEND_COPY_MESSAGE, content_data])
end

#reset_copy(checksum, new_offset) ⇒ Object



270
271
272
# File 'lib/content_server/queue_copy.rb', line 270

def reset_copy(checksum, new_offset)
  handle_message([:RESET_RESUME_COPY, [checksum, new_offset]])
end

#threadsObject



256
257
258
259
260
# File 'lib/content_server/queue_copy.rb', line 256

def threads
  ret = [@local_thread]
  ret << @tcp_client.tcp_thread if @tcp_client != nil
  return ret
end