Class: Sunspot::IndexQueue

Inherits:
Object
  • Object
show all
Defined in:
lib/sunspot/index_queue.rb,
lib/sunspot/index_queue/batch.rb,
lib/sunspot/index_queue/entry.rb,
lib/sunspot/index_queue/session_proxy.rb,
lib/sunspot/index_queue/entry/mongo_impl.rb,
lib/sunspot/index_queue/entry/redis_impl.rb,
lib/sunspot/index_queue/entry/data_mapper_impl.rb,
lib/sunspot/index_queue/entry/active_record_impl.rb

Overview

Implementation of an asynchronous queue for indexing records with Solr. Entries are added to the queue defining which records should be indexed or removed. The queue will then process those entries and send them to the Solr server in batches. This has two advantages over just updating in place. First, problems with Solr will not cause your application to stop functioning. Second, batching the commits to Solr is more efficient and it should be able to handle more throughput when you have a lot of records to index.

Defined Under Namespace

Modules: Entry Classes: Batch, SessionProxy, SolrNotResponding

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options = {}) ⇒ IndexQueue

Create a new IndexQueue. Available options:

:retry_interval - The number of seconds to wait between to retry indexing when an attempt fails (defaults to 1 minute). If an entry fails multiple times, it will be delayed for the interval times the number of failures. For example, if the interval is 1 minute and it has failed twice, the record won’t be attempted again for 2 minutes.

:batch_size - The maximum number of records to try submitting to solr at one time (defaults to 100).

:class_names - A list of class names that the queue will process. This can be used to have different queues process different classes of records when they need to different configurations.

:session - The Sunspot::Session object to use for communicating with Solr (defaults to a session with the default config).



51
52
53
54
55
56
57
58
59
60
61
62
# File 'lib/sunspot/index_queue.rb', line 51

def initialize(options = {})
  @retry_interval = options[:retry_interval] || 60
  @batch_size = options[:batch_size] || 100
  @batch_handler = nil
  @class_names = []
  if options[:class_names].is_a?(Array)
    @class_names.concat(options[:class_names].collect{|name| name.to_s})
  elsif options[:class_names]
    @class_names << options[:class_names].to_s
  end
  @session = options[:session] || Sunspot::Session.new
end

Instance Attribute Details

#batch_sizeObject

Returns the value of attribute batch_size.



17
18
19
# File 'lib/sunspot/index_queue.rb', line 17

def batch_size
  @batch_size
end

#class_namesObject (readonly)

Returns the value of attribute class_names.



18
19
20
# File 'lib/sunspot/index_queue.rb', line 18

def class_names
  @class_names
end

#retry_intervalObject

Returns the value of attribute retry_interval.



17
18
19
# File 'lib/sunspot/index_queue.rb', line 17

def retry_interval
  @retry_interval
end

#sessionObject (readonly)

Returns the value of attribute session.



18
19
20
# File 'lib/sunspot/index_queue.rb', line 18

def session
  @session
end

Class Method Details

.default_priorityObject

Get the default indexing priority. Defaults to zero.



33
34
35
# File 'lib/sunspot/index_queue.rb', line 33

def default_priority
  Thread.current[:sunspot_index_queue_priority] || 0
end

.set_priority(priority, &block) ⇒ Object

Set the default priority for indexing items within a block. Higher priority items will be processed first.



22
23
24
25
26
27
28
29
30
# File 'lib/sunspot/index_queue.rb', line 22

def set_priority(priority, &block)
  save_val = Thread.current[:sunspot_index_queue_priority]
  begin
    Thread.current[:sunspot_index_queue_priority] = priority.to_i
    yield
  ensure
    Thread.current[:sunspot_index_queue_priority] = save_val
  end
end

Instance Method Details

#batch_handler(&block) ⇒ Object

Provide a block that will handle submitting batches of records. The block will take a Batch object and must call submit! on it. This can be useful for doing things such as providing an identity map for records in the batch. Example:

# Use the ActiveRecord identity map for each batch submitted to reduce database activity.
queue.batch_handler do
  ActiveRecord::Base.cache do |batch|
    batch.submit!
  end
end


74
75
76
# File 'lib/sunspot/index_queue.rb', line 74

def batch_handler(&block)
  @batch_handler = block
end

#error_countObject

Get the number of entries that have errors in the queue.



117
118
119
# File 'lib/sunspot/index_queue.rb', line 117

def error_count
  Entry.error_count(self)
end

#errors(options = {}) ⇒ Object

Get the entries in the queue that have errors. Supported options are :limit (default 50) and :offset (default 0).



122
123
124
125
# File 'lib/sunspot/index_queue.rb', line 122

def errors(options = {})
  limit = options[:limit] ? options[:limit].to_i : 50
  Entry.errors(self, limit, options[:offset].to_i)
end

#index(record_or_hash, options = {}) ⇒ Object

Add a record to be indexed to the queue. The record can be specified as either an indexable object or as as hash with :class and :id keys. The priority to be indexed can be passed in the options as :priority (defaults to 0).



81
82
83
84
# File 'lib/sunspot/index_queue.rb', line 81

def index(record_or_hash, options = {})
  klass, id = class_and_id(record_or_hash)
  Entry.enqueue(self, klass, id, false, options[:priority] || self.class.default_priority)
end

#index_all(klass, ids, options = {}) ⇒ Object

Add a list of records to be indexed to the queue. The priority to be indexed can be passed in the options as :priority (defaults to 0).



96
97
98
# File 'lib/sunspot/index_queue.rb', line 96

def index_all(klass, ids, options = {})
  Entry.enqueue(self, klass, ids, false, options[:priority] || self.class.default_priority)
end

#processObject

Process the queue. Exits when there are no more entries to process at the current time. Returns the number of entries processed.

If any errors are encountered while processing the queue, they will be logged with the errors so they can be fixed and tried again later. However, if Solr is refusing connections, the processing is stopped right away and a Sunspot::IndexQueue::SolrNotResponding exception is raised.



138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
# File 'lib/sunspot/index_queue.rb', line 138

def process
  count = 0
  loop do
    entries = Entry.next_batch!(self)
    if entries.nil? || entries.empty?
      break if Entry.ready_count(self) == 0
    else
      batch = Batch.new(self, entries)
      if defined?(@batch_handler) && @batch_handler
        @batch_handler.call(batch)
      else
        batch.submit!
      end
      count += entries.select{|e| e.processed? }.size
    end
  end
  count
end

#ready_countObject

Get the number of entries in the queue that are ready to be processed.



112
113
114
# File 'lib/sunspot/index_queue.rb', line 112

def ready_count
  Entry.ready_count(self)
end

#remove(record_or_hash, options = {}) ⇒ Object

Add a record to be removed to the queue. The record can be specified as either an indexable object or as as hash with :class and :id keys. The priority to be indexed can be passed in the options as :priority (defaults to 0).



89
90
91
92
# File 'lib/sunspot/index_queue.rb', line 89

def remove(record_or_hash, options = {})
  klass, id = class_and_id(record_or_hash)
  Entry.enqueue(self, klass, id, true, options[:priority] || self.class.default_priority)
end

#remove_all(klass, ids, options = {}) ⇒ Object

Add a list of records to be removed to the queue. The priority to be indexed can be passed in the options as :priority (defaults to 0).



102
103
104
# File 'lib/sunspot/index_queue.rb', line 102

def remove_all(klass, ids, options = {})
  Entry.enqueue(self, klass, ids, true, options[:priority] || self.class.default_priority)
end

#reset!Object

Reset all entries in the queue to clear errors and set them to be indexed immediately.



128
129
130
# File 'lib/sunspot/index_queue.rb', line 128

def reset!
  Entry.reset!(self)
end

#total_countObject

Get the number of entries to be processed in the queue.



107
108
109
# File 'lib/sunspot/index_queue.rb', line 107

def total_count
  Entry.total_count(self)
end