Class: ThinkingSphinx::Deltas::ResqueDelta::DeltaJob

Inherits:
Object
  • Object
show all
Extended by:
Resque::Plugins::LockTimeout
Defined in:
lib/thinking_sphinx/deltas/resque_delta/delta_job.rb

Overview

A simple job class that processes a given index.

Direct Known Subclasses

FlyingSphinx::ResqueDelta::DeltaJob

Class Method Summary collapse

Class Method Details

.around_perform_lock1(*args) ⇒ Object

This allows us to have a concurrency safe version of ts-delayed-delta’s duplicates_exist:

github.com/freelancing-god/ts-delayed-delta/blob/master/lib/thinkin g_sphinx/deltas/delayed_delta/job.rb#L47

The name of this method ensures that it runs within around_perform_lock.

We’ve leveraged resque-lock-timeout to ensure that only one DeltaJob is running at a time. Now, this around filter essentially ensures that only one DeltaJob of each index type can sit at the queue at once. If the queue has more than one, lrem will clear the rest off.



69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
# File 'lib/thinking_sphinx/deltas/resque_delta/delta_job.rb', line 69

def self.around_perform_lock1(*args)
  # Remove all other instances of this job (with the same args) from the
  # queue. Uses LREM (http://code.google.com/p/redis/wiki/LremCommand) which
  # takes the form: "LREM key count value" and if count == 0 removes all
  # instances of value from the list.
  redis_job_value = Resque.encode(:class => self.to_s, :args => args)
  Resque.redis.lrem("queue:#{@queue}", 0, redis_job_value)

  # Grab the subset of flag as deleted document ids to work on
  core_index = ThinkingSphinx::Deltas::ResqueDelta::IndexUtils.delta_to_core(*args)
  ThinkingSphinx::Deltas::ResqueDelta::FlagAsDeletedSet.get_subset_for_processing(core_index)

  yield

  # Clear processing set
  ThinkingSphinx::Deltas::ResqueDelta::FlagAsDeletedSet.clear_processing(core_index)
end

.lock_failed(*args) ⇒ Object

Try again later if lock is in use.



47
48
49
# File 'lib/thinking_sphinx/deltas/resque_delta/delta_job.rb', line 47

def self.lock_failed(*args)
  Resque.enqueue(self, *args)
end

.perform(index) ⇒ Object

Runs Sphinx’s indexer tool to process the index. Currently assumes Sphinx is running.



16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
# File 'lib/thinking_sphinx/deltas/resque_delta/delta_job.rb', line 16

def self.perform(index)
  return if skip?(index)

  config = ThinkingSphinx::Configuration.instance

  # Delta Index
  output = `#{config.bin_path}#{config.indexer_binary_name} --config #{config.config_file} --rotate #{index}`
  puts output unless ThinkingSphinx.suppress_delta_output?

  # Flag As Deleted
  return unless ThinkingSphinx.sphinx_running?

  index = ThinkingSphinx::Deltas::ResqueDelta::IndexUtils.delta_to_core(index)

  # Get the document ids we've saved
  flag_as_deleted_ids = ThinkingSphinx::Deltas::ResqueDelta::FlagAsDeletedSet.processing_members(index)

  unless flag_as_deleted_ids.empty?
    # Filter out the ids that aren't present in sphinx
    flag_as_deleted_ids = filter_flag_as_deleted_ids(flag_as_deleted_ids, index)

    unless flag_as_deleted_ids.empty?
      # Each hash element should be of the form { id => [1] }
      flag_hash = Hash[*flag_as_deleted_ids.collect {|id| [id, [1]] }.flatten(1)]

      config.client.update(index, ['sphinx_deleted'], flag_hash)
    end
  end
end