Module: Sunspot::IndexQueue::Entry

Included in:
ActiveRecordImpl, DataMapperImpl, MongoImpl, RedisImpl
Defined in:
lib/sunspot/index_queue/entry.rb,
lib/sunspot/index_queue/entry/mongo_impl.rb,
lib/sunspot/index_queue/entry/redis_impl.rb,
lib/sunspot/index_queue/entry/data_mapper_impl.rb,
lib/sunspot/index_queue/entry/active_record_impl.rb

Overview

Abstract queue entry interface. All the gory details on actually handling the queue are handled by a specific implementation class. The default implementation will use ActiveRecord as the backing queue.

Implementing classes must define attribute readers for id, record_class_name, record_id, error, attempts, and is_delete?.

Defined Under Namespace

Classes: ActiveRecordImpl, DataMapperImpl, MongoImpl, RedisImpl

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#processed=(value) ⇒ Object (writeonly)

Sets the attribute processed

Parameters:

  • value

    the value to set the attribute processed to.


13
14
15
# File 'lib/sunspot/index_queue/entry.rb', line 13

def processed=(value)
  @processed = value
end

Class Method Details

.add(klass, id, delete, options = {}) ⇒ Object

Add an entry the queue. is_delete will be true if the entry is a delete. Implementations must implement this method.

Raises:

  • (NotImplementedError)

76
77
78
# File 'lib/sunspot/index_queue/entry.rb', line 76

def add(klass, id, delete, options = {})
  raise NotImplementedError.new("add")
end

.delete_entries(entries) ⇒ Object

Delete entries from the queue. Implementations must implement this method.


97
98
99
# File 'lib/sunspot/index_queue/entry.rb', line 97

def delete_entries(entries)
  implementation.delete_entries(entries)
end

.enqueue(queue, klass, ids, delete, priority) ⇒ Object

Add multiple entries to the queue. delete will be true if the entry is a delete.


81
82
83
84
85
86
87
88
89
90
91
92
93
94
# File 'lib/sunspot/index_queue/entry.rb', line 81

def enqueue(queue, klass, ids, delete, priority)
  klass = Sunspot::Util.full_const_get(klass.to_s) unless klass.is_a?(Class)
  unless queue.class_names.empty? || queue.class_names.include?(klass.name)
    raise ArgumentError.new("Class #{klass.name} is not in the class names allowed for the queue")
  end
  priority = priority.to_i
  if ids.is_a?(Array)
    ids.each do |id|
      implementation.add(klass, id, delete, priority)
    end
  else
    implementation.add(klass, ids, delete, priority)
  end
end

.error_count(queue) ⇒ Object

Get a count of the error entries for an IndexQueue. Implementations must implement this method.


56
57
58
# File 'lib/sunspot/index_queue/entry.rb', line 56

def error_count(queue)
  implementation.error_count(queue)
end

.errors(queue, limit, offset) ⇒ Object

Get the specified number of error entries for an IndexQueue. Implementations must implement this method.


61
62
63
# File 'lib/sunspot/index_queue/entry.rb', line 61

def errors(queue, limit, offset)
  implementation.errors(queue, limit, offset)
end

.implementationObject

The implementation class used for the queue.


41
42
43
# File 'lib/sunspot/index_queue/entry.rb', line 41

def implementation
  @implementation ||= ActiveRecordImpl
end

.implementation=(klass) ⇒ Object

Set the implementation class to use for the queue. This can be set as either a class object, full class name, or a symbol representing one of the default implementations.

# These all set the implementation to use the default ActiveRecord queue.
Sunspot::IndexQueue::Entry.implementation = :active_record
Sunspot::IndexQueue::Entry.implementation = "Sunspot::IndexQueue::Entry::ActiveRecordImpl"
Sunspot::IndexQueue::Entry.implementation = Sunspot::IndexQueue::Entry::ActiveRecordImpl

Implementations should support pulling entries in batches by a priority where higher priority entries are processed first. Errors should be automatically retried after an interval specified by the IndexQueue. The batch size set by the IndexQueue should also be honored.


27
28
29
30
31
32
33
34
35
36
37
38
# File 'lib/sunspot/index_queue/entry.rb', line 27

def implementation=(klass)
  unless klass.is_a?(Class) || klass.nil?
    class_name = klass.to_s
    class_name = Sunspot::Util.camel_case(class_name).gsub('/', '::') unless class_name.include?('::')
    if class_name.include?('::') || !const_defined?("#{class_name}Impl")
      klass = Sunspot::Util.full_const_get(class_name)
    else
      klass = const_get("#{class_name}Impl")
    end
  end
  @implementation = klass
end

.load_all_records(entries) ⇒ Object

Load all records in an array of entries. This can be faster than calling load on each DataAccessor depending on the implementation


103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
# File 'lib/sunspot/index_queue/entry.rb', line 103

def load_all_records(entries)
  #First create maps for classes => ids and  "Classname id" => entry
  class_id_map = {}
  entry_map = {}
  entries.each do |entry|
    classname = Sunspot::Util.full_const_get(entry.record_class_name)
    class_id_map[classname] = [] if class_id_map[classname].nil?
    class_id_map[classname] << entry.record_id
    entry_map["#{classname} #{entry.record_id}"] = entry
  end
  class_id_map.each do |klass, ids|
    adapter = Sunspot::Adapters::DataAccessor.create(klass)
    if klass.respond_to?(:sunspot_options) && klass.sunspot_options && klass.sunspot_options[:include] && adapter.respond_to?(:include=)
      adapter.include = klass.sunspot_options[:include]
    end
    adapter.load_all(ids).each do |record|
      entry = entry_map["#{klass} #{Sunspot::Adapters::InstanceAdapter.adapt(record).id}"]
      entry.instance_variable_set(:@record, record) if entry
    end
  end
end

.next_batch!(queue) ⇒ Object

Get the next batch of entries to process for IndexQueue. Implementations must implement this method.


66
67
68
# File 'lib/sunspot/index_queue/entry.rb', line 66

def next_batch!(queue)
  implementation.next_batch!(queue)
end

.ready_count(queue) ⇒ Object

Get a count of the entries ready to be processed for an IndexQueue. Implementations must implement this method.


51
52
53
# File 'lib/sunspot/index_queue/entry.rb', line 51

def ready_count(queue)
  implementation.ready_count(queue)
end

.reset!(queue) ⇒ Object

Reset the entries in the queue to be excuted again immediately and clear any errors.


71
72
73
# File 'lib/sunspot/index_queue/entry.rb', line 71

def reset!(queue)
  implementation.reset!(queue)
end

.total_count(queue) ⇒ Object

Get a count of the queue entries for an IndexQueue. Implementations must implement this method.


46
47
48
# File 'lib/sunspot/index_queue/entry.rb', line 46

def total_count(queue)
  implementation.total_count(queue)
end

Instance Method Details

#processed?Boolean

Returns:

  • (Boolean)

127
128
129
130
# File 'lib/sunspot/index_queue/entry.rb', line 127

def processed?
  @processed = false unless defined?(@processed)
  @processed
end

#recordObject

Get the record represented by this entry.


133
134
135
# File 'lib/sunspot/index_queue/entry.rb', line 133

def record
  @record ||= Sunspot::Adapters::DataAccessor.create(Sunspot::Util.full_const_get(record_class_name)).load_all([record_id]).first
end

#reset!Object

Reset an entry to be executed again immediatel and clear any errors. Implementations must implement this method.

Raises:

  • (NotImplementedError)

143
144
145
# File 'lib/sunspot/index_queue/entry.rb', line 143

def reset!
  raise NotImplementedError.new("reset!")
end

#set_error!(error, retry_interval = nil) ⇒ Object

Set the error message on an entry. Implementations must implement this method.

Raises:

  • (NotImplementedError)

138
139
140
# File 'lib/sunspot/index_queue/entry.rb', line 138

def set_error!(error, retry_interval = nil)
  raise NotImplementedError.new("set_error!")
end