Class: Chewy::Type::Syncer

Inherits:
Object show all
Defined in:
lib/chewy/type/syncer.rb

Overview

Note:

In rails 4.0 time converted to json with the precision of seconds without milliseconds used, so outdated check is not so precise there.

ATTENTION: synchronization may be slow in case when synchronized tables are missing compound index on primary key and outdated_sync_field.

This class is able to find missing and outdated documents in the ES comparing ids from the data source and the ES index. Also, if outdated_sync_field existss in the index definition, it performs comparison of this field values for each source object and corresponding ES document. Usually, this field is updated_at and if its value in the source is not equal to the value in the index - this means that this document outdated and should be reindexed.

To fetch necessary data from the source it uses adapter method Adapter::Base#import_fields, in case when the Object adapter is used it makes sense to read corresponding documentation.

If parallel option is passed to the initializer - it will fetch surce and index data in parallel and then perform outdated objects calculation in parallel processes. Also, further import (if required) will be performed in parallel as well.

Constant Summary collapse

DEFAULT_SYNC_BATCH_SIZE =
20_000
ISO_DATETIME =
/\A(\d{4})-(\d\d)-(\d\d) (\d\d):(\d\d):(\d\d)(\.\d+)?\z/
OUTDATED_IDS_WORKER =
lambda do |outdated_sync_field_type, source_data_hash, type, total, index_data|
  ::Process.setproctitle("chewy [#{type}]: sync outdated calculation (#{::Parallel.worker_number + 1}/#{total})") if type
  index_data.each_with_object([]) do |(id, index_sync_value), result|
    next unless source_data_hash[id]

    outdated = if outdated_sync_field_type == 'date'
      !Chewy::Type::Syncer.dates_equal(typecast_date(source_data_hash[id]), Time.iso8601(index_sync_value))
    else
      source_data_hash[id] != index_sync_value
    end

    result.push(id) if outdated
  end
end
SOURCE_OR_INDEX_DATA_WORKER =
lambda do |syncer, type, kind|
  ::Process.setproctitle("chewy [#{type}]: sync fetching data (#{kind})")
  result = case kind
  when :source
    syncer.send(:fetch_source_data)
  when :index
    syncer.send(:fetch_index_data)
  end
  {kind => result}
end

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(type, parallel: nil) ⇒ Syncer

Returns a new instance of Syncer.

Parameters:

  • type (Chewy::Type)

    chewy type

  • parallel (true, Integer, Hash) (defaults to: nil)

    options for parallel execution or the number of processes



81
82
83
84
85
86
87
88
89
90
# File 'lib/chewy/type/syncer.rb', line 81

def initialize(type, parallel: nil)
  @type = type
  @parallel = if !parallel || parallel.is_a?(Hash)
    parallel
  elsif parallel.is_a?(Integer)
    {in_processes: parallel}
  else
    {}
  end
end

Class Method Details

.dates_equal(one, two) ⇒ Object

Compares times with ms precision.



67
68
69
# File 'lib/chewy/type/syncer.rb', line 67

def self.dates_equal(one, two)
  [one.to_i, one.usec / 1000] == [two.to_i, two.usec / 1000]
end

.typecast_date(string) ⇒ Object



56
57
58
59
60
61
62
63
64
# File 'lib/chewy/type/syncer.rb', line 56

def self.typecast_date(string)
  if string.is_a?(String) && (match = ISO_DATETIME.match(string))
    microsec = (match[7].to_r * 1_000_000).to_i
    date = "#{match[1]}-#{match[2]}-#{match[3]}T#{match[4]}:#{match[5]}:#{match[6]}.#{format('%06d', microsec)}+00:00"
    Time.iso8601(date)
  else
    string
  end
end

Instance Method Details

#missing_idsArray<String>

Finds ids of all the objects that are not indexed yet or deleted from the source already.

Returns:

  • (Array<String>)

    an array of missing ids from both sides



105
106
107
108
109
110
111
112
113
114
# File 'lib/chewy/type/syncer.rb', line 105

def missing_ids
  return [] if source_data.blank?

  @missing_ids ||= begin
    source_data_ids = data_ids(source_data)
    index_data_ids = data_ids(index_data)

    (source_data_ids - index_data_ids).concat(index_data_ids - source_data_ids)
  end
end

#outdated_idsArray<String>

If type supports outdated sync, it compares for the values of the type outdated_sync_field for each object and document in the source and index and returns the ids of entities which which are having different values there.

Returns:

  • (Array<String>)

    an array of outdated ids

See Also:



123
124
125
126
127
128
129
130
131
132
# File 'lib/chewy/type/syncer.rb', line 123

def outdated_ids
  return [] if source_data.blank? || index_data.blank? || !@type.supports_outdated_sync?
  @outdated_ids ||= begin
    if @parallel
      parallel_outdated_ids
    else
      linear_outdated_ids
    end
  end
end

#performInteger?

Finds all the missing and outdated ids and performs import for them.

Returns:

  • (Integer, nil)

    the amount of missing and outdated documents reindexed, nil in case of errors



95
96
97
98
99
# File 'lib/chewy/type/syncer.rb', line 95

def perform
  ids = missing_ids | outdated_ids
  return 0 if ids.blank?
  @type.import(ids, parallel: @parallel) && ids.count
end