Class: Sunspot::IndexQueue::Entry::DataMapperImpl

Inherits:
Object
  • Object
show all
Includes:
DataMapper::Resource, Sunspot::IndexQueue::Entry
Defined in:
lib/sunspot/index_queue/entry/data_mapper_impl.rb

Overview

Implementation of an indexing queue backed by Datamapper.

To create the table, you can use dm-migrations and run auto_migrate! on this class.

This implementation assumes the primary key of the records being indexed in an integer since that works with most data models and is very efficient. If this is not the case, you can subclass this class and change the data type of the record_id property.

Instance Attribute Summary

Attributes included from Sunspot::IndexQueue::Entry

#processed

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Sunspot::IndexQueue::Entry

enqueue, implementation, implementation=, load_all_records, #processed?, #record

Class Method Details

.add(klass, id, delete, priority) ⇒ Object

Implementation of the add method.



76
77
78
79
80
81
82
83
# File 'lib/sunspot/index_queue/entry/data_mapper_impl.rb', line 76

def add(klass, id, delete, priority)
  queue_entry_key = {:record_id => id, :record_class_name => klass.name, :lock => nil}
  queue_entry = first(:conditions => queue_entry_key) || new(queue_entry_key.merge(:priority => priority))
  queue_entry.is_delete = delete
  queue_entry.priority = priority if priority > queue_entry.priority
  queue_entry.run_at = Time.now.utc
  queue_entry.save!
end

.delete_entries(entries) ⇒ Object

Implementation of the delete_entries method.



86
87
88
# File 'lib/sunspot/index_queue/entry/data_mapper_impl.rb', line 86

def delete_entries(entries)
  all(:id => entries.map(&:id)).destroy!
end

.error_count(queue) ⇒ Object

Implementation of the error_count method.



44
45
46
47
48
# File 'lib/sunspot/index_queue/entry/data_mapper_impl.rb', line 44

def error_count(queue)
  conditions = {:error.not => nil}
  conditions[:record_class_name] = queue.class_names unless queue.class_names.empty?
  count(conditions)
end

.errors(queue, limit, offset) ⇒ Object

Implementation of the errors method.



51
52
53
54
55
# File 'lib/sunspot/index_queue/entry/data_mapper_impl.rb', line 51

def errors(queue, limit, offset)
  conditions = {:error.not => nil}
  conditions[:record_class_name] = queue.class_names unless queue.class_names.empty?
  all(conditions.merge(:limit => limit, :offset => offset, :order => :id))
end

.next_batch!(queue) ⇒ Object

Implementation of the next_batch! method.



64
65
66
67
68
69
70
71
72
73
# File 'lib/sunspot/index_queue/entry/data_mapper_impl.rb', line 64

def next_batch!(queue)
  conditions = {:run_at.lte => Time.now.utc}
  conditions[:record_class_name] = queue.class_names unless queue.class_names.empty?
  batch_entries = all(conditions.merge(:fields => [:id], :limit => queue.batch_size, :order => [:priority.desc, :run_at]))
  queue_entry_ids = batch_entries.collect{|entry| entry.id}
  return [] if queue_entry_ids.empty?
  lock = rand(0x7FFFFFFF)
  all(:id => queue_entry_ids).update!(:run_at => Time.now.utc + queue.retry_interval, :lock => lock, :error => nil)
  all(:id => queue_entry_ids, :lock => lock)
end

.ready_count(queue) ⇒ Object

Implementation of the ready_count method.



37
38
39
40
41
# File 'lib/sunspot/index_queue/entry/data_mapper_impl.rb', line 37

def ready_count(queue)
  conditions = {:run_at.lte => Time.now.utc}
  conditions[:record_class_name] = queue.class_names unless queue.class_names.empty?
  count(conditions)
end

.reset!(queue) ⇒ Object

Implementation of the reset! method.



58
59
60
61
# File 'lib/sunspot/index_queue/entry/data_mapper_impl.rb', line 58

def reset!(queue)
  conditions = queue.class_names.empty? ? {} : {:record_class_name => queue.class_names}
  all(conditions).update!(:run_at => Time.now.utc, :attempts => 0, :error => nil, :lock => nil)
end

.total_count(queue) ⇒ Object

Implementation of the total_count method.



31
32
33
34
# File 'lib/sunspot/index_queue/entry/data_mapper_impl.rb', line 31

def total_count(queue)
  conditions = queue.class_names.empty? ? {} : {:record_class_name => queue.class_names}
  count(conditions)
end

Instance Method Details

#reset!Object

Implementation of the reset! method.



108
109
110
111
112
113
114
# File 'lib/sunspot/index_queue/entry/data_mapper_impl.rb', line 108

def reset!
  begin
    update!(:attempts => 0, :error => nil, :lock => nil, :run_at => Time.now.utc)
  rescue => e
    DataMapper.logger.warn(e)
  end
end

#set_error!(error, retry_interval = nil) ⇒ Object

Implementation of the set_error! method.



92
93
94
95
96
97
98
99
100
101
102
103
104
105
# File 'lib/sunspot/index_queue/entry/data_mapper_impl.rb', line 92

def set_error!(error, retry_interval = nil)
  self.attempts += 1
  self.run_at = Time.now.utc + (retry_interval * attempts) if retry_interval
  self.error = "#{error.class.name}: #{error.message}\n#{error.backtrace.join("\n")[0, 4000]}"
  self.lock = nil
  begin
    save!
  rescue => e
    if DataMapper.logger
      DataMapper.logger.warn(error)
      DataMapper.logger.warn(e)
    end
  end
end