Class: IndexTanked::ActiveRecordDefaults::Queue::Worker
- Inherits:
-
Object
- Object
- IndexTanked::ActiveRecordDefaults::Queue::Worker
- Defined in:
- lib/index-tanked/active_record_defaults/queue/worker.rb
Constant Summary collapse
- SLEEP =
5
Instance Method Summary collapse
- #handle_error(e) ⇒ Object
-
#initialize(options = {}) ⇒ Worker
constructor
A new instance of Worker.
- #log(message) ⇒ Object
- #process_documents(batch_size) ⇒ Object
- #send_batches_to_indextank(partitioned_documents) ⇒ Object
- #start ⇒ Object
Constructor Details
#initialize(options = {}) ⇒ Worker
Returns a new instance of Worker.
7 8 9 10 |
# File 'lib/index-tanked/active_record_defaults/queue/worker.rb', line 7 def initialize(={}) @batch_size = [:batch_size] || 100 @identifier = "host:#{Socket.gethostname} pid:#{Process.pid}" rescue "pid:#{Process.pid}" end |
Instance Method Details
#handle_error(e) ⇒ Object
79 80 81 82 |
# File 'lib/index-tanked/active_record_defaults/queue/worker.rb', line 79 def handle_error(e) log("something (#{e.class} - #{e.}) got jacked, unlocking") log e.backtrace end |
#log(message) ⇒ Object
84 85 86 87 88 |
# File 'lib/index-tanked/active_record_defaults/queue/worker.rb', line 84 def log() = "[Index Tanked Worker: #{@identifier}] - #{}" puts RAILS_DEFAULT_LOGGER.info() end |
#process_documents(batch_size) ⇒ Object
37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 |
# File 'lib/index-tanked/active_record_defaults/queue/worker.rb', line 37 def process_documents(batch_size) Queue::Document.clear_expired_locks number_locked = Queue::Document.lock_records_for_batch(batch_size, @identifier) log("#{number_locked} records locked.") return number_locked if number_locked.zero? begin documents = Queue::Document.find_all_by_locked_by(@identifier) documents_without_duplicate_docids = Queue::Document.remove_duplicate_documents(documents) partitioned_documents = Queue::Document.partition_documents_by_companion_key(documents_without_duplicate_docids) send_batches_to_indextank(partitioned_documents) documents_deleted = Queue::Document.delete_all(:locked_by => @identifier) log("#{documents_deleted} completed documents removed from queue.") documents_deleted rescue StandardError, Timeout::Error => e handle_error(e) outdated_locked_records_deleted = Queue::Document.delete_outdated_locked_records_by_identifier(@identifier) locks_cleared = Queue::Document.clear_locks_by_identifier(@identifier) log("#{outdated_locked_records_deleted} outdated locks deleted") log("#{locks_cleared} locks cleared") 0 # return 0 so it sleeps end end |
#send_batches_to_indextank(partitioned_documents) ⇒ Object
60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 |
# File 'lib/index-tanked/active_record_defaults/queue/worker.rb', line 60 def send_batches_to_indextank(partitioned_documents) partitioned_documents.keys.each do |companion_key| index_name = companion_key.split(' - ').last record_count = partitioned_documents[companion_key].size log("#{record_count} document(s) prepared for #{index_name}.") begin this_batch = partitioned_documents[companion_key] Queue::Document.index_tanked(companion_key).index.batch_insert(this_batch) rescue IndexTanked::IndexTank::InvalidArgument => e bad_document_number = e..scan(/in document #(\d+) of \d+/).flatten.first bad_document_number = bad_document_number && (bad_document_number.to_i - 1) if bad_document_number log "Bad Document: #{this_batch[bad_document_number]}" end raise end end end |
#start ⇒ Object
12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 |
# File 'lib/index-tanked/active_record_defaults/queue/worker.rb', line 12 def start log "Starting IndexTanked Queue" trap('TERM') { log 'Exiting...'; $exit = true } trap('INT') { log 'Exiting...'; $exit = true } trap('QUIT') { log 'Exiting...'; $exit = true } loop do if IndexTanked::Configuration.index_available? count = process_documents(@batch_size) else count = 0 log('Indexing is currently disabled, sleeping.') end break if $exit if count.zero? (SLEEP * 2).times{ sleep(0.5) unless $exit } end break if $exit end end |