Class: ContentServer::FileStreamer
- Inherits:
-
Object
- Object
- ContentServer::FileStreamer
- Defined in:
- lib/content_server/file_streamer.rb
Instance Attribute Summary collapse
-
#thread ⇒ Object
readonly
Returns the value of attribute thread.
Instance Method Summary collapse
- #abort_streaming(checksum) ⇒ Object
- #copy_another_chuck(checksum) ⇒ Object
- #handle(message) ⇒ Object
-
#initialize(send_chunk_clb, abort_streaming_clb = nil) ⇒ FileStreamer
constructor
A new instance of FileStreamer.
- #reset_stream(checksum, path, offset) ⇒ Object
- #reset_streaming(checksum, new_offset) ⇒ Object
- #run ⇒ Object
- #start_streaming(checksum, path) ⇒ Object
Constructor Details
#initialize(send_chunk_clb, abort_streaming_clb = nil) ⇒ FileStreamer
Returns a new instance of FileStreamer.
48 49 50 51 52 53 54 55 56 |
# File 'lib/content_server/file_streamer.rb', line 48 def initialize(send_chunk_clb, abort_streaming_clb=nil) @send_chunk_clb = send_chunk_clb @abort_streaming_clb = abort_streaming_clb @stream_queue = Queue.new # Used from internal thread only. @streams = {} @thread = run end |
Instance Attribute Details
#thread ⇒ Object (readonly)
Returns the value of attribute thread.
41 42 43 |
# File 'lib/content_server/file_streamer.rb', line 41 def thread @thread end |
Instance Method Details
#abort_streaming(checksum) ⇒ Object
72 73 74 75 76 77 |
# File 'lib/content_server/file_streamer.rb', line 72 def abort_streaming(checksum) @stream_queue << [:ABORT_STREAM, checksum] if Params['enable_monitoring'] ::ContentServer::Globals.process_vars.set('File Streamer queue', @stream_queue.size) end end |
#copy_another_chuck(checksum) ⇒ Object
58 59 60 61 62 63 |
# File 'lib/content_server/file_streamer.rb', line 58 def copy_another_chuck(checksum) @stream_queue << [:COPY_CHUNK, checksum] if Params['enable_monitoring'] ::ContentServer::Globals.process_vars.set('File Streamer queue', @stream_queue.size) end end |
#handle(message) ⇒ Object
98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 |
# File 'lib/content_server/file_streamer.rb', line 98 def handle() type, content = if type == :NEW_STREAM checksum, path = content reset_stream(checksum, path, 0) @stream_queue << [:COPY_CHUNK, checksum] if @streams.key?(checksum) if Params['enable_monitoring'] ::ContentServer::Globals.process_vars.set('File Streamer queue', @stream_queue.size) end elsif type == :ABORT_STREAM checksum = content Stream.close_delete_stream(checksum, @streams) elsif type == :RESET_STREAM checksum, new_offset = content reset_stream(checksum, nil, new_offset) @stream_queue << [:COPY_CHUNK, checksum] if @streams.key?(checksum) if Params['enable_monitoring'] ::ContentServer::Globals.process_vars.set('File Streamer queue', @stream_queue.size) end elsif type == :COPY_CHUNK checksum = content if @streams.key?(checksum) offset = @streams[checksum].file.pos Log.debug1("Sending chunk for #{checksum}, offset #{offset}.") chunk = @streams[checksum].file.read(Params['streaming_chunk_size']) if chunk.nil? # No more to read, send end of file. @send_chunk_clb.call(checksum, offset, @streams[checksum].size, nil, nil) Stream.close_delete_stream(checksum, @streams) else chunk_checksum = FileIndexing::IndexAgent.get_content_checksum(chunk) @send_chunk_clb.call(checksum, offset, @streams[checksum].size, chunk, chunk_checksum) end else Log.debug1("No checksum found to copy chunk. #{checksum}.") end end end |
#reset_stream(checksum, path, offset) ⇒ Object
138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 |
# File 'lib/content_server/file_streamer.rb', line 138 def reset_stream(checksum, path, offset) if !@streams.key? checksum begin file = File.new(path, 'rb') if offset > 0 file.seek(offset) end Log.debug1("File streamer: #{file.to_s}.") rescue IOError => e Log.warning("Could not stream local file #{path}. #{e.to_s}") end @streams[checksum] = Stream.new(checksum, path, file, file.size) else @streams[checksum].file.seek(offset) end end |
#reset_streaming(checksum, new_offset) ⇒ Object
79 80 81 82 83 84 |
# File 'lib/content_server/file_streamer.rb', line 79 def reset_streaming(checksum, new_offset) @stream_queue << [:RESET_STREAM, [checksum, new_offset]] if Params['enable_monitoring'] ::ContentServer::Globals.process_vars.set('File Streamer queue', @stream_queue.size) end end |
#run ⇒ Object
86 87 88 89 90 91 92 93 94 95 96 |
# File 'lib/content_server/file_streamer.rb', line 86 def run return Thread.new do loop { stream_pop = @stream_queue.pop if Params['enable_monitoring'] ::ContentServer::Globals.process_vars.set('File Streamer queue', @stream_queue.size) end checksum = handle(stream_pop) } end end |
#start_streaming(checksum, path) ⇒ Object
65 66 67 68 69 70 |
# File 'lib/content_server/file_streamer.rb', line 65 def start_streaming(checksum, path) @stream_queue << [:NEW_STREAM, [checksum, path]] if Params['enable_monitoring'] ::ContentServer::Globals.process_vars.set('File Streamer queue', @stream_queue.size) end end |