Class: ContentServer::FileCopyClient

Inherits:
Object
  • Object
show all
Defined in:
lib/content_server/queue_copy.rb

Overview

class QueueCopy

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(host, port, dynamic_content_data) ⇒ FileCopyClient

Returns a new instance of FileCopyClient.



127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
# File 'lib/content_server/queue_copy.rb', line 127

def initialize(host, port, dynamic_content_data)
  @local_queue = Queue.new
  @dynamic_content_data = dynamic_content_data
  @tcp_client = Networking::TCPClient.new(host, port, method(:handle_message))
  @file_receiver = FileReceiver.new(method(:done_copy),
                                    method(:abort_copy),
                                    method(:reset_copy))
  @local_thread = Thread.new do
    loop do
      pop_queue = @local_queue.pop
      if Params['enable_monitoring']
        ::ContentServer::Globals.process_vars.set('File Copy Client queue', @local_queue.size)
      end
      handle(@local_queue.pop)
    end
  end
  @local_thread.abort_on_exception = true
  Log.debug3("initialize FileCopyClient  host:#{host}  port:#{port}")
end

Class Method Details

.destination_filename(folder, sha1) ⇒ Object

Creates destination filename for backup server, input is base folder and sha1. for example: folder:/mnt/hd1/bbbackup, sha1:d0be2dc421be4fcd0172e5afceea3970e2f3d940 dest filename: /mnt/hd1/bbbackup/d0/be/2d/d0be2dc421be4fcd0172e5afceea3970e2f3d940



215
216
217
# File 'lib/content_server/queue_copy.rb', line 215

def self.destination_filename(folder, sha1)
  File.join(folder, sha1[0,2], sha1[2,2], sha1)
end

Instance Method Details

#abort_copy(checksum) ⇒ Object



157
158
159
# File 'lib/content_server/queue_copy.rb', line 157

def abort_copy(checksum)
  handle_message([:ABORT_COPY, checksum])
end

#done_copy(local_file_checksum, local_path) ⇒ Object



165
166
167
168
169
170
# File 'lib/content_server/queue_copy.rb', line 165

def done_copy(local_file_checksum, local_path)
  if Params['enable_monitoring']
    ::ContentServer::Globals.process_vars.inc('num_files_received')
  end
  Log.debug1("Done copy file: #{local_path}, #{local_file_checksum}")
end

#handle(message) ⇒ Object

This is a function which receives the messages (file or ack) and return answer in case of ack. Note that it is being executed from the class thread only!



182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
# File 'lib/content_server/queue_copy.rb', line 182

def handle(message)
  message_type, message_content = message
  if message_type == :SEND_COPY_MESSAGE
    Log.debug1("Requesting files to copy.")
    Log.debug2("Files requested: #{message_content.to_s}")
    bytes_written = @tcp_client.send_obj([:COPY_MESSAGE, message_content])
    Log.debug2("Sending copy message succeeded? bytes_written: #{bytes_written}.")
  elsif message_type == :COPY_CHUNK
    Log.debug1('Chunk received.')
    if @file_receiver.receive_chunk(*message_content)
      file_checksum, offset, file_size, content, content_checksum = message_content
      @tcp_client.send_obj([:COPY_CHUNK_FROM_REMOTE, file_checksum])
    end
  elsif message_type == :ACK_MESSAGE
    checksum, timestamp = message_content
    # Here we should check file existence
    Log.info("Returning ack for content: #{checksum}, timestamp: #{timestamp}")
    Log.debug1("Ack: #{!@dynamic_content_data.exists?(checksum)}")
    @tcp_client.send_obj([:ACK_MESSAGE, [timestamp,
                                         !@dynamic_content_data.exists?(checksum),
                                         checksum]])
  elsif message_type == :ABORT_COPY
    @tcp_client.send_obj([:ABORT_COPY, message_content])
  elsif message_type == :RESET_RESUME_COPY
    @tcp_client.send_obj([:RESET_RESUME_COPY, message_content])
  else
    Log.error("Unexpected message type: #{message_type}")
  end
end

#handle_message(message) ⇒ Object



172
173
174
175
176
177
178
# File 'lib/content_server/queue_copy.rb', line 172

def handle_message(message)
  Log.debug3('QueueFileReceiver handle message')
  @local_queue.push(message)
  if Params['enable_monitoring']
    ::ContentServer::Globals.process_vars.set('File Copy Client queue', @local_queue.size)
  end
end

#request_copy(content_data) ⇒ Object



153
154
155
# File 'lib/content_server/queue_copy.rb', line 153

def request_copy(content_data)
  handle_message([:SEND_COPY_MESSAGE, content_data])
end

#reset_copy(checksum, new_offset) ⇒ Object



161
162
163
# File 'lib/content_server/queue_copy.rb', line 161

def reset_copy(checksum, new_offset)
  handle_message([:RESET_RESUME_COPY, [checksum, new_offset]])
end

#threadsObject



147
148
149
150
151
# File 'lib/content_server/queue_copy.rb', line 147

def threads
  ret = [@local_thread]
  ret << @tcp_client.tcp_thread if @tcp_client != nil
  return ret
end