Class: ContentServer::FileCopyClient

Inherits:
Object
  • Object
show all
Defined in:
lib/content_server/queue_copy.rb

Overview

class QueueCopy

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(host, port, dynamic_content_data) ⇒ FileCopyClient

Returns a new instance of FileCopyClient.



123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
# File 'lib/content_server/queue_copy.rb', line 123

def initialize(host, port, dynamic_content_data)
  @local_queue = Queue.new
  start_process_var_monitoring
  @dynamic_content_data = dynamic_content_data
  @tcp_client = Networking::TCPClient.new(host, port, method(:handle_message))
  @file_receiver = FileReceiver.new(method(:done_copy),
                                    method(:abort_copy),
                                    method(:reset_copy))
  @local_thread = Thread.new do
    loop do
      handle(@local_queue.pop)
    end
  end
  @local_thread.abort_on_exception = true
  Log.debug3("initialize FileCopyClient  host:#{host}  port:#{port}")
end

Class Method Details

.destination_filename(folder, sha1) ⇒ Object

Creates destination filename for backup server, input is base folder and sha1. for example: folder:/mnt/hd1/bbbackup, sha1:d0be2dc421be4fcd0172e5afceea3970e2f3d940 dest filename: /mnt/hd1/bbbackup/d0/be/2d/d0be2dc421be4fcd0172e5afceea3970e2f3d940



219
220
221
# File 'lib/content_server/queue_copy.rb', line 219

def self.destination_filename(folder, sha1)
  File.join(folder, sha1[0,2], sha1[2,2], sha1)
end

Instance Method Details

#abort_copy(checksum) ⇒ Object



166
167
168
# File 'lib/content_server/queue_copy.rb', line 166

def abort_copy(checksum)
  handle_message([:ABORT_COPY, checksum])
end

#done_copy(local_file_checksum, local_path) ⇒ Object



174
175
176
177
# File 'lib/content_server/queue_copy.rb', line 174

def done_copy(local_file_checksum, local_path)
  Params['process_vars'].inc('num_files_received')
  Log.debug1("Done copy file: #{local_path}, #{local_file_checksum}")
end

#handle(message) ⇒ Object

This is a function which receives the messages (file or ack) and return answer in case of ack. Note that it is being executed from the class thread only!



186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
# File 'lib/content_server/queue_copy.rb', line 186

def handle(message)
  message_type, message_content = message
  if message_type == :SEND_COPY_MESSAGE
    Log.debug1("Requesting file (content data) to copy.")
    Log.debug2("File requested: #{message_content.to_s}")
    bytes_written = @tcp_client.send_obj([:COPY_MESSAGE, message_content])
    Log.debug2("Sending copy message succeeded? bytes_written: #{bytes_written}.")
  elsif message_type == :COPY_CHUNK
    Log.debug1('Chunk received.')
    if @file_receiver.receive_chunk(*message_content)
      file_checksum, offset, file_size, content, content_checksum = message_content
      @tcp_client.send_obj([:COPY_CHUNK_FROM_REMOTE, file_checksum])
    end
  elsif message_type == :ACK_MESSAGE
    checksum, timestamp = message_content
    # Here we should check file existence
    Log.info("Returning ack for content: #{checksum}, timestamp: #{timestamp}")
    Log.debug1("Ack: #{!@dynamic_content_data.exists?(checksum)}")
    @tcp_client.send_obj([:ACK_MESSAGE, [timestamp,
                                         !@dynamic_content_data.exists?(checksum),
                                         checksum]])
  elsif message_type == :ABORT_COPY
    @tcp_client.send_obj([:ABORT_COPY, message_content])
  elsif message_type == :RESET_RESUME_COPY
    @tcp_client.send_obj([:RESET_RESUME_COPY, message_content])
  else
    Log.error("Unexpected message type: #{message_type}")
  end
end

#handle_message(message) ⇒ Object



179
180
181
182
# File 'lib/content_server/queue_copy.rb', line 179

def handle_message(message)
  Log.debug3('QueueFileReceiver handle message')
  @local_queue.push(message)
end

#request_copy(content_data) ⇒ Object



162
163
164
# File 'lib/content_server/queue_copy.rb', line 162

def request_copy(content_data)
  handle_message([:SEND_COPY_MESSAGE, content_data])
end

#reset_copy(checksum, new_offset) ⇒ Object



170
171
172
# File 'lib/content_server/queue_copy.rb', line 170

def reset_copy(checksum, new_offset)
  handle_message([:RESET_RESUME_COPY, [checksum, new_offset]])
end

#start_process_var_monitoringObject



140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
# File 'lib/content_server/queue_copy.rb', line 140

def start_process_var_monitoring
  if Params['enable_monitoring']
    @process_var_thread = Thread.new do
      last_data_flush_time = nil
      while true do
        if last_data_flush_time.nil? || last_data_flush_time + Params['process_vars_delay'] < Time.now
          Log.info("process_vars:FileCopyClient queue size:#{@local_queue.size}")
          Params['process_vars'].set('File Copy Client queue', @local_queue.size)
          last_data_flush_time = Time.now
        end
        sleep(0.3)
      end
    end
  end
end

#threadsObject



156
157
158
159
160
# File 'lib/content_server/queue_copy.rb', line 156

def threads
  ret = [@local_thread]
  ret << @tcp_client.tcp_thread if @tcp_client != nil
  return ret
end