Class: Sunspot::IndexQueue::Entry::ActiveRecordImpl

Inherits:
ActiveRecord::Base
  • Object
show all
Includes:
Sunspot::IndexQueue::Entry
Defined in:
lib/sunspot/index_queue/entry/active_record_impl.rb

Overview

Implementation of an indexing queue backed by ActiveRecord.

To create the table, you should have a migration containing the following:

self.up
  Sunspot::IndexQueue::Entry::ActiveRecordImpl.create_table
end

self.down
  drop_table Sunspot::IndexQueue::Entry::ActiveRecordImpl.table_name
end

The default set up is to use an integer for the record_id column type since it is the most efficient and works with most data models. If you need to use a string as the primary key, you can add additional statements to the migration to do so.

Instance Attribute Summary

Attributes included from Sunspot::IndexQueue::Entry

#processed

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Sunspot::IndexQueue::Entry

enqueue, implementation, implementation=, load_all_records, #processed?, #record

Class Method Details

.add(klass, id, delete, priority) ⇒ Object

Implementation of the add method.


85
86
87
88
89
90
91
92
# File 'lib/sunspot/index_queue/entry/active_record_impl.rb', line 85

def add(klass, id, delete, priority)
  queue_entry_key = {:record_id => id, :record_class_name => klass.name, :lock => nil}
  queue_entry = first(:conditions => queue_entry_key) || new(queue_entry_key.merge(:priority => priority))
  queue_entry.is_delete = delete
  queue_entry.priority = priority if priority > queue_entry.priority
  queue_entry.run_at = Time.now.utc
  queue_entry.save!
end

.create_tableObject

Create the table used to store the queue in the database.


100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
# File 'lib/sunspot/index_queue/entry/active_record_impl.rb', line 100

def create_table
  connection.create_table table_name do |t|
    t.string :record_class_name, :null => false
    t.integer :record_id, :null => false
    t.boolean :is_delete, :null => false, :default => false
    t.datetime :run_at, :null => false
    t.integer :priority, :null => false, :default => 0
    t.integer :lock, :null => true
    t.string :error, :null => true, :limit => 4000
    t.integer :attempts, :null => false, :default => 0
  end

  connection.add_index table_name, :record_id
  connection.add_index table_name, [:run_at, :record_class_name, :priority], :name => "#{table_name}_run_at"
end

.delete_entries(entries) ⇒ Object

Implementation of the delete_entries method.


95
96
97
# File 'lib/sunspot/index_queue/entry/active_record_impl.rb', line 95

def delete_entries(entries)
  delete_all(:id => entries)
end

.error_count(queue) ⇒ Object

Implementation of the error_count method.


44
45
46
47
48
49
50
51
# File 'lib/sunspot/index_queue/entry/active_record_impl.rb', line 44

def error_count(queue)
  conditions = ["#{connection.quote_column_name('error')} IS NOT NULL"]
  unless queue.class_names.empty?
    conditions.first << " AND #{connection.quote_column_name('record_class_name')} IN (?)"
    conditions << queue.class_names
  end
  count(:conditions => conditions)
end

.errors(queue, limit, offset) ⇒ Object

Implementation of the errors method.


54
55
56
57
58
59
60
61
# File 'lib/sunspot/index_queue/entry/active_record_impl.rb', line 54

def errors(queue, limit, offset)
  conditions = ["#{connection.quote_column_name('error')} IS NOT NULL"]
  unless queue.class_names.empty?
    conditions.first << " AND #{connection.quote_column_name('record_class_name')} IN (?)"
    conditions << queue.class_names
  end
  all(:conditions => conditions, :limit => limit, :offset => offset, :order => :id)
end

.next_batch!(queue) ⇒ Object

Implementation of the next_batch! method.


70
71
72
73
74
75
76
77
78
79
80
81
82
# File 'lib/sunspot/index_queue/entry/active_record_impl.rb', line 70

def next_batch!(queue)
  conditions = ["#{connection.quote_column_name('run_at')} <= ?", Time.now.utc]
  unless queue.class_names.empty?
    conditions.first << " AND #{connection.quote_column_name('record_class_name')} IN (?)"
    conditions << queue.class_names
  end
  batch_entries = all(:select => "id", :conditions => conditions, :limit => queue.batch_size, :order => 'priority DESC, run_at')
  queue_entry_ids = batch_entries.collect{|entry| entry.id}
  return [] if queue_entry_ids.empty?
  lock = rand(0x7FFFFFFF)
  update_all({:run_at => queue.retry_interval.from_now.utc, :lock => lock, :error => nil}, :id => queue_entry_ids)
  all(:conditions => {:id => queue_entry_ids, :lock => lock})
end

.ready_count(queue) ⇒ Object

Implementation of the ready_count method.


34
35
36
37
38
39
40
41
# File 'lib/sunspot/index_queue/entry/active_record_impl.rb', line 34

def ready_count(queue)
  conditions = ["#{connection.quote_column_name('run_at')} <= ?", Time.now.utc]
  unless queue.class_names.empty?
    conditions.first << " AND #{connection.quote_column_name('record_class_name')} IN (?)"
    conditions << queue.class_names
  end
  count(:conditions => conditions)
end

.reset!(queue) ⇒ Object

Implementation of the reset! method.


64
65
66
67
# File 'lib/sunspot/index_queue/entry/active_record_impl.rb', line 64

def reset! (queue)
  conditions = queue.class_names.empty? ? {} : {:record_class_name => queue.class_names}
  update_all({:run_at => Time.now.utc, :attempts => 0, :error => nil, :lock => nil}, conditions)
end

.total_count(queue) ⇒ Object

Implementation of the total_count method.


28
29
30
31
# File 'lib/sunspot/index_queue/entry/active_record_impl.rb', line 28

def total_count(queue)
  conditions = queue.class_names.empty? ? {} : {:record_class_name => queue.class_names}
  count(:conditions => conditions)
end

Instance Method Details

#reset!Object

Implementation of the reset! method.


134
135
136
137
138
139
140
# File 'lib/sunspot/index_queue/entry/active_record_impl.rb', line 134

def reset!
  begin
    update_attributes!(:attempts => 0, :error => nil, :lock => nil, :run_at => Time.now.utc)
  rescue => e
    logger.warn(e)
  end
end

#set_error!(error, retry_interval = nil) ⇒ Object

Implementation of the set_error! method.


118
119
120
121
122
123
124
125
126
127
128
129
130
131
# File 'lib/sunspot/index_queue/entry/active_record_impl.rb', line 118

def set_error!(error, retry_interval = nil)
  self.attempts += 1
  self.run_at = (retry_interval * attempts).from_now.utc if retry_interval
  self.error = "#{error.class.name}: #{error.message}\n#{error.backtrace.join("\n")[0, 4000]}"
  self.lock = nil
  begin
    save!
  rescue => e
    if logger
      logger.warn(error)
      logger.warn(e)
    end
  end
end