Class: Chewy::Index::Syncer

Inherits:
Object
  • Object
show all
Defined in:
lib/chewy/index/syncer.rb

Overview

Note:

In rails 4.0 time converted to json with the precision of seconds without milliseconds used, so outdated check is not so precise there.

ATTENTION: synchronization may be slow in case when synchronized tables are missing compound index on primary key and outdated_sync_field.

This class is able to find missing and outdated documents in the ES comparing ids from the data source and the ES index. Also, if outdated_sync_field exists in the index definition, it performs comparison of this field values for each source object and corresponding ES document. Usually, this field is updated_at and if its value in the source is not equal to the value in the index - this means that this document outdated and should be reindexed.

To fetch necessary data from the source it uses adapter method Adapter::Base#import_fields, in case when the Object adapter is used it makes sense to read corresponding documentation.

If parallel option is passed to the initializer - it will fetch source and index data in parallel and then perform outdated objects calculation in parallel processes. Also, further import (if required) will be performed in parallel as well.

Constant Summary collapse

DEFAULT_SYNC_BATCH_SIZE =
20_000
ISO_DATETIME =
/\A(\d{4})-(\d\d)-(\d\d) (\d\d):(\d\d):(\d\d)(\.\d+)?\z/.freeze
OUTDATED_IDS_WORKER =
lambda do |outdated_sync_field_type, source_data_hash, index, total, index_data|
  ::Process.setproctitle("chewy [#{index}]: sync outdated calculation (#{::Parallel.worker_number + 1}/#{total})") if index
  index_data.each_with_object([]) do |(id, index_sync_value), result|
    next unless source_data_hash[id]

    outdated = if outdated_sync_field_type == 'date'
      !Chewy::Index::Syncer.dates_equal(typecast_date(source_data_hash[id]), Time.iso8601(index_sync_value))
    else
      source_data_hash[id] != index_sync_value
    end

    result.push(id) if outdated
  end
end
SOURCE_OR_INDEX_DATA_WORKER =
lambda do |syncer, index, kind|
  ::Process.setproctitle("chewy [#{index}]: sync fetching data (#{kind})")
  result = case kind
  when :source
    syncer.send(:fetch_source_data)
  when :index
    syncer.send(:fetch_index_data)
  end
  {kind => result}
end

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(index, parallel: nil) ⇒ Syncer

Returns a new instance of Syncer.

Parameters:

  • index (Chewy::Index)

    chewy index

  • parallel (true, Integer, Hash) (defaults to: nil)

    options for parallel execution or the number of processes



76
77
78
79
80
81
82
83
84
85
# File 'lib/chewy/index/syncer.rb', line 76

def initialize(index, parallel: nil)
  @index = index
  @parallel = if !parallel || parallel.is_a?(Hash)
    parallel
  elsif parallel.is_a?(Integer)
    {in_processes: parallel}
  else
    {}
  end
end

Class Method Details

.dates_equal(one, two) ⇒ Object

Compares times with ms precision.



70
71
72
# File 'lib/chewy/index/syncer.rb', line 70

def self.dates_equal(one, two)
  [one.to_i, one.usec / 1000] == [two.to_i, two.usec / 1000]
end

.typecast_date(string) ⇒ Object



56
57
58
59
60
61
62
63
64
65
66
67
# File 'lib/chewy/index/syncer.rb', line 56

def self.typecast_date(string)
  if string.is_a?(String) && (match = ISO_DATETIME.match(string))
    microsec = (match[7].to_r * 1_000_000).to_i
    day = "#{match[1]}-#{match[2]}-#{match[3]}"
    time_with_seconds = "#{match[4]}:#{match[5]}:#{match[6]}"
    microseconds = format('%06d', microsec)
    date = "#{day}T#{time_with_seconds}.#{microseconds}+00:00"
    Time.iso8601(date)
  else
    string
  end
end

Instance Method Details

#missing_idsArray<String>

Finds ids of all the objects that are not indexed yet or deleted from the source already.

Returns:

  • (Array<String>)

    an array of missing ids from both sides



101
102
103
104
105
106
107
108
109
110
# File 'lib/chewy/index/syncer.rb', line 101

def missing_ids
  return [] if source_data.blank?

  @missing_ids ||= begin
    source_data_ids = data_ids(source_data)
    index_data_ids = data_ids(index_data)

    (source_data_ids - index_data_ids).concat(index_data_ids - source_data_ids)
  end
end

#outdated_idsArray<String>

If index supports outdated sync, it compares the values of the outdated_sync_field for each object and document in the source and index and returns the ids of entities which differ.

Returns:

  • (Array<String>)

    an array of outdated ids

See Also:



118
119
120
121
122
123
124
125
126
# File 'lib/chewy/index/syncer.rb', line 118

def outdated_ids
  return [] if source_data.blank? || index_data.blank? || !@index.supports_outdated_sync?

  @outdated_ids ||= if @parallel
    parallel_outdated_ids
  else
    linear_outdated_ids
  end
end

#performInteger?

Finds all the missing and outdated ids and performs import for them.

Returns:

  • (Integer, nil)

    the amount of missing and outdated documents reindexed, nil in case of errors



90
91
92
93
94
95
# File 'lib/chewy/index/syncer.rb', line 90

def perform
  ids = missing_ids | outdated_ids
  return 0 if ids.blank?

  @index.import(ids, parallel: @parallel) && ids.count
end