Class: ContentServer::FileCopyManager
- Inherits:
-
Object
- Object
- ContentServer::FileCopyManager
- Defined in:
- lib/content_server/queue_copy.rb
Instance Method Summary collapse
-
#add_content(checksum, path) ⇒ Object
Add content to copy process.
-
#clean_time_out_thread ⇒ Object
clean timed out contents.
-
#initialize(copy_input_queue, file_streamer) ⇒ FileCopyManager
constructor
A new instance of FileCopyManager.
- #receive_ack(checksum) ⇒ Object
- #remove_content(checksum) ⇒ Object
Constructor Details
#initialize(copy_input_queue, file_streamer) ⇒ FileCopyManager
Returns a new instance of FileCopyManager.
25 26 27 28 29 30 31 32 33 34 |
# File 'lib/content_server/queue_copy.rb', line 25 def initialize(copy_input_queue, file_streamer) @copy_input_queue = copy_input_queue @file_streamer = file_streamer @max_contents_under_copy = Params['max_copy_streams'] @contents_under_copy = {} @contents_to_copy = {} @contents_to_copy_queue = Queue.new @keeper = Mutex.new @clean_time_out_thread = clean_time_out_thread end |
Instance Method Details
#add_content(checksum, path) ⇒ Object
Add content to copy process. If already in copy process or waiting for copy then skip. If no open places for copy then put in waiting list
38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 |
# File 'lib/content_server/queue_copy.rb', line 38 def add_content(checksum, path) Log.debug2("Try to add content:%s to copy waiting list", checksum) @keeper.synchronize{ # if content is being copied or waiting then skip it if !@contents_under_copy[checksum] if !@contents_to_copy[checksum] if @contents_under_copy.size < @max_contents_under_copy @contents_under_copy[checksum] = [path, false, Time.now] $process_vars.set('contents under copy', @contents_under_copy.size) @copy_input_queue.push([:SEND_ACK_MESSAGE, checksum]) $process_vars.set('Copy File Queue Size', @copy_input_queue.size) else # no place in copy streams. Add to waiting list Log.debug2("add content:%s to copy waiting list", checksum) @contents_to_copy[checksum] = true # replace with a set @contents_to_copy_queue.push([checksum, path]) $process_vars.set('contents to copy queue', @contents_to_copy_queue.size) end else Log.debug2("content:%s already in waiting list. skipping.", checksum) end else Log.debug2("content:%s is being copied. skipping.", checksum) end } end |
#clean_time_out_thread ⇒ Object
clean timed out contents
103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 |
# File 'lib/content_server/queue_copy.rb', line 103 def clean_time_out_thread @thread = Thread.new do loop { sleep 10 @keeper.synchronize{ # clean timed out contents time_now = Time.now new_contents_under_copy = {} @contents_under_copy.each_key { |checksum| if time_now - @contents_under_copy[checksum][2] > Params['local_timeout'] @contents_under_copy.delete(checksum) $process_vars.set('contents under copy', @contents_under_copy.size) Log.warning("Content:#{checksum} has timed out on copy process") @file_streamer.abort_streaming(checksum) if (@contents_to_copy_queue.size > 0) new_content = @contents_to_copy_queue.pop $process_vars.set('contents to copy queue', @contents_to_copy_queue.size) @contents_to_copy.delete(new_content[0]) new_contents_under_copy[new_content[0]] = new_content[1] end end } new_contents_under_copy.each_key { |checksum| @contents_under_copy[checksum.clone] = [new_contents_under_copy[checksum].clone, false, time_now] $process_vars.set('contents under copy', @contents_under_copy.size) @copy_input_queue.push([:SEND_ACK_MESSAGE, checksum]) $process_vars.set('Copy File Queue Size', @copy_input_queue.size) } } } end end |
#receive_ack(checksum) ⇒ Object
65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 |
# File 'lib/content_server/queue_copy.rb', line 65 def receive_ack(checksum) @keeper.synchronize{ content_record = @contents_under_copy[checksum] if content_record if !content_record[1] path = content_record[0] Log.debug1("Streaming to backup server. content: %s path:%s.", checksum, path) @file_streamer.start_streaming(checksum, path) # updating Ack content_record[1] = true else Log.warning("File already received ack: %s", checksum) end else Log.warning("File was aborted or copied: %s", checksum) end } end |
#remove_content(checksum) ⇒ Object
84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 |
# File 'lib/content_server/queue_copy.rb', line 84 def remove_content(checksum) @keeper.synchronize{ Log.debug3("removing checksum:%s from contents under copy", checksum) @contents_under_copy.delete(checksum) $process_vars.set('contents under copy', @contents_under_copy.size) #1 place is became available. Put another file in copy process if waiting if (@contents_to_copy_queue.size > 0) new_content = @contents_to_copy_queue.pop $process_vars.set('contents to copy queue', @contents_to_copy_queue.size) @contents_to_copy.delete(new_content[0]) @contents_under_copy[new_content[0]] = [new_content[1], false, Time.now] $process_vars.set('contents under copy', @contents_under_copy.size) @copy_input_queue.push([:SEND_ACK_MESSAGE, new_content[0]]) $process_vars.set('Copy File Queue Size', @copy_input_queue.size) end } end |