Class: Chewy::Type::Syncer
Overview
In rails 4.0 time converted to json with the precision of seconds without milliseconds used, so outdated check is not so precise there.
ATTENTION: synchronization may be slow in case when synchronized tables
are missing compound index on primary key and outdated_sync_field.
This class is able to find missing and outdated documents in the ES
comparing ids from the data source and the ES index. Also, if outdated_sync_field
existss in the index definition, it performs comparison of this field
values for each source object and corresponding ES document. Usually,
this field is updated_at and if its value in the source is not equal
to the value in the index - this means that this document outdated and
should be reindexed.
To fetch necessary data from the source it uses adapter method Adapter::Base#import_fields, in case when the Object adapter is used it makes sense to read corresponding documentation.
If parallel option is passed to the initializer - it will fetch surce and
index data in parallel and then perform outdated objects calculation in
parallel processes. Also, further import (if required) will be performed
in parallel as well.
Constant Summary collapse
- DEFAULT_SYNC_BATCH_SIZE =
20_000- ISO_DATETIME =
/\A(\d{4})-(\d\d)-(\d\d) (\d\d):(\d\d):(\d\d)(\.\d+)?\z/- OUTDATED_IDS_WORKER =
lambda do |outdated_sync_field_type, source_data_hash, index_data| index_data.each_with_object([]) do |(id, index_sync_value), result| next unless source_data_hash[id] outdated = if outdated_sync_field_type == 'date' !Chewy::Type::Syncer.dates_equal(typecast_date(source_data_hash[id]), DateTime.iso8601(index_sync_value)) else source_data_hash[id] != index_sync_value end result.push(id) if outdated end end
- SOURCE_OR_INDEX_DATA_WORKER =
lambda do |syncer, type| result = case type when :source syncer.send(:fetch_source_data) when :index syncer.send(:fetch_index_data) end {type => result} end
Class Method Summary collapse
-
.dates_equal(one, two) ⇒ Object
Compares times with ms precision.
- .typecast_date(string) ⇒ Object
Instance Method Summary collapse
-
#initialize(type, parallel: nil) ⇒ Syncer
constructor
A new instance of Syncer.
-
#missing_ids ⇒ Array<String>
Finds ids of all the objects that are not indexed yet or deleted from the source already.
-
#outdated_ids ⇒ Array<String>
If type supports outdated sync, it compares for the values of the type
outdated_sync_fieldfor each object and document in the source and index and returns the ids of entities which which are having different values there. -
#perform ⇒ Integer?
Finds all the missing and outdated ids and performs import for them.
Constructor Details
#initialize(type, parallel: nil) ⇒ Syncer
Returns a new instance of Syncer.
79 80 81 82 83 84 85 86 87 88 |
# File 'lib/chewy/type/syncer.rb', line 79 def initialize(type, parallel: nil) @type = type @parallel = if !parallel || parallel.is_a?(Hash) parallel elsif parallel.is_a?(Integer) {in_processes: parallel} else {} end end |
Class Method Details
.dates_equal(one, two) ⇒ Object
Compares times with ms precision.
65 66 67 |
# File 'lib/chewy/type/syncer.rb', line 65 def self.dates_equal(one, two) [one.to_i, one.usec / 1000] == [two.to_i, two.usec / 1000] end |
.typecast_date(string) ⇒ Object
54 55 56 57 58 59 60 61 62 |
# File 'lib/chewy/type/syncer.rb', line 54 def self.typecast_date(string) if string.is_a?(String) && (match = ISO_DATETIME.match(string)) microsec = (match[7].to_r * 1_000_000).to_i date = "#{match[1]}-#{match[2]}-#{match[3]}T#{match[4]}:#{match[5]}:#{match[6]}.#{format('%06d', microsec)}+00:00" DateTime.iso8601(date) else string end end |
Instance Method Details
#missing_ids ⇒ Array<String>
Finds ids of all the objects that are not indexed yet or deleted from the source already.
103 104 105 106 107 108 109 110 111 112 |
# File 'lib/chewy/type/syncer.rb', line 103 def missing_ids return [] if source_data.blank? @missing_ids ||= begin source_data_ids = data_ids(source_data) index_data_ids = data_ids(index_data) (source_data_ids - index_data_ids).concat(index_data_ids - source_data_ids) end end |
#outdated_ids ⇒ Array<String>
If type supports outdated sync, it compares for the values of the
type outdated_sync_field for each object and document in the source
and index and returns the ids of entities which which are having
different values there.
121 122 123 124 125 126 127 128 129 130 |
# File 'lib/chewy/type/syncer.rb', line 121 def outdated_ids return [] if source_data.blank? || index_data.blank? || !@type.supports_outdated_sync? @outdated_ids ||= begin if @parallel parallel_outdated_ids else linear_outdated_ids end end end |
#perform ⇒ Integer?
Finds all the missing and outdated ids and performs import for them.
93 94 95 96 97 |
# File 'lib/chewy/type/syncer.rb', line 93 def perform ids = missing_ids | outdated_ids return 0 if ids.blank? @type.import(ids, parallel: @parallel) && ids.count end |