Class: ThinkingSphinx::Deltas::ResqueDelta::DeltaJob

Inherits:
Object
  • Object
show all
Extended by:
Resque::Plugins::LockTimeout
Defined in:
lib/thinking_sphinx/deltas/resque_delta/delta_job.rb

Overview

A simple job class that processes a given index.

Direct Known Subclasses

FlyingSphinx::ResqueDelta::DeltaJob

Class Method Summary collapse

Class Method Details

.around_perform_lock1(*args) ⇒ Object

This allows us to have a concurrency safe version of ts-delayed-delta’s duplicates_exist:

github.com/freelancing-god/ts-delayed-delta/blob/master/lib/thinkin g_sphinx/deltas/delayed_delta/job.rb#L47

The name of this method ensures that it runs within around_perform_lock.

We’ve leveraged resque-lock-timeout to ensure that only one DeltaJob is running at a time. Now, this around filter essentially ensures that only one DeltaJob of each index type can sit at the queue at once. If the queue has more than one, lrem will clear the rest off.



69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
# File 'lib/thinking_sphinx/deltas/resque_delta/delta_job.rb', line 69

def self.around_perform_lock1(*args)
  # Remove all other instances of this job (with the same args) from the
  # queue. Uses LREM (http://code.google.com/p/redis/wiki/LremCommand) which
  # takes the form: "LREM key count value" and if count == 0 removes all
  # instances of value from the list.
  redis_job_value = Resque.encode(:class => self.to_s, :args => args)
  Resque.redis.lrem("queue:#{@queue}", 0, redis_job_value)

  # Grab the subset of flag as deleted document ids to work on
  core_index = ThinkingSphinx::Deltas::ResqueDelta::IndexUtils.delta_to_core(*args)
  ThinkingSphinx::Deltas::ResqueDelta::FlagAsDeletedSet.get_subset_for_processing(core_index)

  yield

  # Clear processing set
  ThinkingSphinx::Deltas::ResqueDelta::FlagAsDeletedSet.clear_processing(core_index)
end

.lock_failed(*args) ⇒ Object

Try again later if lock is in use.



47
48
49
# File 'lib/thinking_sphinx/deltas/resque_delta/delta_job.rb', line 47

def self.lock_failed(*args)
  Resque.enqueue(self, *args)
end

.perform(index) ⇒ Object

Runs Sphinx’s indexer tool to process the index. Currently assumes Sphinx is running.

Parameters:

  • index (String)

    the name of the Sphinx index



16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
# File 'lib/thinking_sphinx/deltas/resque_delta/delta_job.rb', line 16

def self.perform(index)
  return if skip?(index)

  config = ThinkingSphinx::Configuration.instance

  # Delta Index
  output = `#{config.bin_path}#{config.indexer_binary_name} --config #{config.config_file} --rotate #{index}`
  puts output unless ThinkingSphinx.suppress_delta_output?

  # Flag As Deleted
  return unless ThinkingSphinx.sphinx_running?

  index = ThinkingSphinx::Deltas::ResqueDelta::IndexUtils.delta_to_core(index)

  # Get the document ids we've saved
  flag_as_deleted_ids = ThinkingSphinx::Deltas::ResqueDelta::FlagAsDeletedSet.processing_members(index)

  unless flag_as_deleted_ids.empty?
    # Filter out the ids that aren't present in sphinx
    flag_as_deleted_ids = filter_flag_as_deleted_ids(flag_as_deleted_ids, index)

    unless flag_as_deleted_ids.empty?
      # Each hash element should be of the form { id => [1] }
      flag_hash = Hash[*flag_as_deleted_ids.collect {|id| [id, [1]] }.flatten(1)]

      config.client.update(index, ['sphinx_deleted'], flag_hash)
    end
  end
end