Class: BBFS::ContentServer::QueueIndexer
- Inherits:
- 
      Object
      
        - Object
- BBFS::ContentServer::QueueIndexer
 
- Defined in:
- lib/content_server/queue_indexer.rb
Overview
Simple indexer, gets inputs events (files to index) and outputs content data updates into output queue.
Instance Method Summary collapse
- 
  
    
      #initialize(input_queue, output_queue, content_data_path)  ⇒ QueueIndexer 
    
    
  
  
  
    constructor
  
  
  
  
  
  
  
    A new instance of QueueIndexer. 
- #run ⇒ Object
- 
  
    
      #shallow_check(instance)  ⇒ Object 
    
    
  
  
  
  
  
  
  
  
  
    Check file existence, check it’s size and modification date. 
Constructor Details
#initialize(input_queue, output_queue, content_data_path) ⇒ QueueIndexer
Returns a new instance of QueueIndexer.
| 12 13 14 15 16 | # File 'lib/content_server/queue_indexer.rb', line 12 def initialize(input_queue, output_queue, content_data_path) @input_queue = input_queue @output_queue = output_queue @content_data_path = content_data_path end | 
Instance Method Details
#run ⇒ Object
| 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 | # File 'lib/content_server/queue_indexer.rb', line 18 def run server_content_data = ContentData::ContentData.new # Shallow check content data files. tmp_content_data = ContentData::ContentData.new tmp_content_data.from_file(@content_data_path) if File.exists?(@content_data_path) tmp_content_data.instances.each_value do |instance| # Skipp instances (files) which did not pass the shallow check. Log.info('Shallow checking content data:') if shallow_check(instance) Log.info("exists: #{instance.full_path}") server_content_data.add_content(tmp_content_data.contents[instance.checksum]) server_content_data.add_instance(instance) else Log.info("changed: #{instance.full_path}") # Add non existing and changed files to index queue. @input_queue.push([FileMonitoring::FileStatEnum::STABLE, instance.full_path]) end end # Start indexing on demand and write changes to queue thread = Thread.new do while true do Log.info 'Waiting on index input queue.' state, is_dir, path = @input_queue.pop Log.info "event: #{state}, #{is_dir}, #{path}." # index files and add to copy queue # delete directory with it's sub files # delete file if state == FileMonitoring::FileStatEnum::STABLE && !is_dir Log.info "Indexing content #{path}." index_agent = FileIndexing::IndexAgent.new indexer_patterns = FileIndexing::IndexerPatterns.new indexer_patterns.add_pattern(path) index_agent.index(indexer_patterns, server_content_data) Log.info("Failed files: #{index_agent.failed_files.to_a.join(',')}.") \ if !index_agent.failed_files.empty? Log.info("indexed content #{index_agent.indexed_content}.") server_content_data.merge index_agent.indexed_content elsif ((state == FileMonitoring::FileStatEnum::NON_EXISTING || state == FileMonitoring::FileStatEnum::CHANGED) && !is_dir) # If file content changed, we should remove old instance. key = FileIndexing::IndexAgent.global_path(path) # Check if deleted file exists at content data. Log.info("Instance to remove: #{key}") if server_content_data.instances.key?(key) instance_to_remove = server_content_data.instances[key] # Remove file from content data only if it does not pass the shallow check, i.e., # content has changed/removed. if !shallow_check(instance_to_remove) content_to_remove = server_content_data.contents[instance_to_remove.checksum] # Remove the deleted instance. content_data_to_remove = ContentData::ContentData.new content_data_to_remove.add_content(content_to_remove) content_data_to_remove.add_instance(instance_to_remove) # Remove the file. server_content_data = ContentData::ContentData.remove_instances( content_data_to_remove, server_content_data) end end elsif state == FileMonitoring::FileStatEnum::NON_EXISTING && is_dir Log.info("NonExisting/Changed: #{path}") # Remove directory but only when non-existing. Log.info("Directory to remove: #{path}") global_dir = FileIndexing::IndexAgent.global_path(path) server_content_data = ContentData::ContentData.remove_directory( server_content_data, global_dir) else Log.info("This case should not be handled: #{state}, #{is_dir}, #{path}.") end # TODO(kolman): Don't write to file each change? Log.info "Writing server content data to #{@content_data_path}." server_content_data.to_file(@content_data_path) Log.info 'Adding server content data to queue.' @output_queue.push(ContentData::ContentData.new(server_content_data)) end # while true do end # Thread.new do thread end | 
#shallow_check(instance) ⇒ Object
Check file existence, check it’s size and modification date. If something wrong reindex the file and update content data.
| 101 102 103 104 105 106 | # File 'lib/content_server/queue_indexer.rb', line 101 def shallow_check(instance) shallow_instance = FileIndexing::IndexAgent.create_shallow_instance(instance.full_path) return false unless shallow_instance return (shallow_instance.size == instance.size && shallow_instance.modification_time == instance.modification_time) end |