Class: ContentServer::FileCopyClient

Inherits:
Object
  • Object
show all
Defined in:
lib/content_server/queue_copy.rb

Overview

class QueueCopy

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(host, port, dynamic_content_data, process_variables) ⇒ FileCopyClient



122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
# File 'lib/content_server/queue_copy.rb', line 122

def initialize(host, port, dynamic_content_data, process_variables)
  @local_queue = Queue.new
  @dynamic_content_data = dynamic_content_data
  @tcp_client = Networking::TCPClient.new(host, port, method(:handle_message))
  @file_receiver = FileReceiver.new(method(:done_copy),
                                    method(:abort_copy),
                                    method(:reset_copy))
  @local_thread = Thread.new do
    loop do
      handle(@local_queue.pop)
    end
  end
  @local_thread.abort_on_exception = true
  @process_variables = process_variables
  Log.debug3("initialize FileCopyClient  host:#{host}  port:#{port}")
end

Class Method Details

.destination_filename(folder, sha1) ⇒ Object

Creates destination filename for backup server, input is base folder and sha1. for example: folder:/mnt/hd1/bbbackup, sha1:d0be2dc421be4fcd0172e5afceea3970e2f3d940 dest filename: /mnt/hd1/bbbackup/d0/be/2d/d0be2dc421be4fcd0172e5afceea3970e2f3d940



206
207
208
# File 'lib/content_server/queue_copy.rb', line 206

def self.destination_filename(folder, sha1)
  File.join(folder, sha1[0,2], sha1[2,2], sha1)
end

Instance Method Details

#abort_copy(checksum) ⇒ Object



149
150
151
# File 'lib/content_server/queue_copy.rb', line 149

def abort_copy(checksum)
  handle_message([:ABORT_COPY, checksum])
end

#add_process_variables_infoObject



162
163
164
# File 'lib/content_server/queue_copy.rb', line 162

def add_process_variables_info()
  @process_variables.inc('num_files_received')
end

#done_copy(local_file_checksum, local_path) ⇒ Object



157
158
159
160
# File 'lib/content_server/queue_copy.rb', line 157

def done_copy(local_file_checksum, local_path)
  add_process_variables_info()
  Log.debug1("Done copy file: #{local_path}, #{local_file_checksum}")
end

#handle(message) ⇒ Object

This is a function which receives the messages (file or ack) and return answer in case of ack. Note that it is being executed from the class thread only!



173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
# File 'lib/content_server/queue_copy.rb', line 173

def handle(message)
  message_type, message_content = message
  if message_type == :SEND_COPY_MESSAGE
    Log.debug1("Requesting file (content data) to copy.")
    Log.debug2("File requested: #{message_content.to_s}")
    bytes_written = @tcp_client.send_obj([:COPY_MESSAGE, message_content])
    Log.debug2("Sending copy message succeeded? bytes_written: #{bytes_written}.")
  elsif message_type == :COPY_CHUNK
    Log.debug1('Chunk received.')
    if @file_receiver.receive_chunk(*message_content)
      file_checksum, offset, file_size, content, content_checksum = message_content
      @tcp_client.send_obj([:COPY_CHUNK_FROM_REMOTE, file_checksum])
    end
  elsif message_type == :ACK_MESSAGE
    checksum, timestamp = message_content
    # Here we should check file existence
    Log.info("Returning ack for content: #{checksum}, timestamp: #{timestamp}")
    Log.debug1("Ack: #{!@dynamic_content_data.exists?(checksum)}")
    @tcp_client.send_obj([:ACK_MESSAGE, [timestamp,
                                         !@dynamic_content_data.exists?(checksum),
                                         checksum]])
  elsif message_type == :ABORT_COPY
    @tcp_client.send_obj([:ABORT_COPY, message_content])
  elsif message_type == :RESET_RESUME_COPY
    @tcp_client.send_obj([:RESET_RESUME_COPY, message_content])
  else
    Log.error("Unexpected message type: #{message_type}")
  end
end

#handle_message(message) ⇒ Object



166
167
168
169
# File 'lib/content_server/queue_copy.rb', line 166

def handle_message(message)
  Log.debug3('QueueFileReceiver handle message')
  @local_queue.push(message)
end

#request_copy(content_data) ⇒ Object



145
146
147
# File 'lib/content_server/queue_copy.rb', line 145

def request_copy(content_data)
  handle_message([:SEND_COPY_MESSAGE, content_data])
end

#reset_copy(checksum, new_offset) ⇒ Object



153
154
155
# File 'lib/content_server/queue_copy.rb', line 153

def reset_copy(checksum, new_offset)
  handle_message([:RESET_RESUME_COPY, [checksum, new_offset]])
end

#threadsObject



139
140
141
142
143
# File 'lib/content_server/queue_copy.rb', line 139

def threads
  ret = [@local_thread]
  ret << @tcp_client.tcp_thread if @tcp_client != nil
  return ret
end