Class: IndexTanked::ActiveRecordDefaults::Queue::Worker

Inherits:
Object
  • Object
show all
Defined in:
lib/index-tanked/active_record_defaults/queue/worker.rb

Constant Summary collapse

SLEEP =
5

Instance Method Summary collapse

Constructor Details

#initialize(options = {}) ⇒ Worker

Returns a new instance of Worker.



7
8
9
10
# File 'lib/index-tanked/active_record_defaults/queue/worker.rb', line 7

def initialize(options={})
  @batch_size = options[:batch_size] || 100
  @identifier = "host:#{Socket.gethostname} pid:#{Process.pid}" rescue "pid:#{Process.pid}"
end

Instance Method Details

#handle_error(e) ⇒ Object



79
80
81
82
# File 'lib/index-tanked/active_record_defaults/queue/worker.rb', line 79

def handle_error(e)
  log("something (#{e.class} - #{e.message}) got jacked, unlocking")
  log e.backtrace
end

#log(message) ⇒ Object



84
85
86
87
88
# File 'lib/index-tanked/active_record_defaults/queue/worker.rb', line 84

def log(message)
  message = "[Index Tanked Worker: #{@identifier}] - #{message}"
  puts message
  RAILS_DEFAULT_LOGGER.info(message)
end

#process_documents(batch_size) ⇒ Object



37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
# File 'lib/index-tanked/active_record_defaults/queue/worker.rb', line 37

def process_documents(batch_size)
  Queue::Document.clear_expired_locks
  number_locked = Queue::Document.lock_records_for_batch(batch_size, @identifier)
  log("#{number_locked} records locked.")
  return number_locked if number_locked.zero?
  begin
    documents = Queue::Document.find_all_by_locked_by(@identifier)
    documents_without_duplicate_docids = Queue::Document.remove_duplicate_documents(documents)
    partitioned_documents = Queue::Document.partition_documents_by_companion_key(documents_without_duplicate_docids)
    send_batches_to_indextank(partitioned_documents)
    documents_deleted = Queue::Document.delete_all(:locked_by => @identifier)
    log("#{documents_deleted} completed documents removed from queue.")
    documents_deleted
  rescue StandardError, Timeout::Error => e
    handle_error(e)
    outdated_locked_records_deleted = Queue::Document.delete_outdated_locked_records_by_identifier(@identifier)
    locks_cleared = Queue::Document.clear_locks_by_identifier(@identifier)
    log("#{outdated_locked_records_deleted} outdated locks deleted")
    log("#{locks_cleared} locks cleared")
    0 # return 0 so it sleeps
  end
end

#send_batches_to_indextank(partitioned_documents) ⇒ Object



60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
# File 'lib/index-tanked/active_record_defaults/queue/worker.rb', line 60

def send_batches_to_indextank(partitioned_documents)
  partitioned_documents.keys.each do |companion_key|
    index_name = companion_key.split(' - ').last
    record_count = partitioned_documents[companion_key].size
    log("#{record_count} document(s) prepared for #{index_name}.")
    begin
      this_batch = partitioned_documents[companion_key]
      Queue::Document.index_tanked(companion_key).index.batch_insert(this_batch)
    rescue IndexTanked::IndexTank::InvalidArgument => e
      bad_document_number = e.message.scan(/in document #(\d+) of \d+/).flatten.first
      bad_document_number = bad_document_number && (bad_document_number.to_i - 1)
      if bad_document_number
        log "Bad Document: #{this_batch[bad_document_number]}"
      end
      raise
    end
  end
end

#startObject



12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# File 'lib/index-tanked/active_record_defaults/queue/worker.rb', line 12

def start
  log "Starting IndexTanked Queue"

  trap('TERM') { log 'Exiting...'; $exit = true }
  trap('INT')  { log 'Exiting...'; $exit = true }
  trap('QUIT') { log 'Exiting...'; $exit = true }

  loop do
    if IndexTanked::Configuration.index_available?
      count = process_documents(@batch_size)
    else
      count = 0
      log('Indexing is currently disabled, sleeping.')
    end

    break if $exit

    if count.zero?
      (SLEEP * 2).times{ sleep(0.5) unless $exit }
    end

    break if $exit
  end
end