Class: Chewy::Type::Syncer
Overview
In rails 4.0 time converted to json with the precision of seconds without milliseconds used, so outdated check is not so precise there.
ATTENTION: synchronization may be slow in case when synchronized tables
are missing compound index on primary key and outdated_sync_field
.
This class is able to find missing and outdated documents in the ES
comparing ids from the data source and the ES index. Also, if outdated_sync_field
existss in the index definition, it performs comparison of this field
values for each source object and corresponding ES document. Usually,
this field is updated_at
and if its value in the source is not equal
to the value in the index - this means that this document outdated and
should be reindexed.
To fetch necessary data from the source it uses adapter method Adapter::Base#import_fields, in case when the Object adapter is used it makes sense to read corresponding documentation.
If parallel
option is passed to the initializer - it will fetch surce and
index data in parallel and then perform outdated objects calculation in
parallel processes. Also, further import (if required) will be performed
in parallel as well.
Constant Summary collapse
- DEFAULT_SYNC_BATCH_SIZE =
20_000
- ISO_DATETIME =
/\A(\d{4})-(\d\d)-(\d\d) (\d\d):(\d\d):(\d\d)(\.\d+)?\z/
- OUTDATED_IDS_WORKER =
lambda do |outdated_sync_field_type, source_data_hash, type, total, index_data| ::Process.setproctitle("chewy [#{type}]: sync outdated calculation (#{::Parallel.worker_number + 1}/#{total})") if type index_data.each_with_object([]) do |(id, index_sync_value), result| next unless source_data_hash[id] outdated = if outdated_sync_field_type == 'date' !Chewy::Type::Syncer.dates_equal(typecast_date(source_data_hash[id]), Time.iso8601(index_sync_value)) else source_data_hash[id] != index_sync_value end result.push(id) if outdated end end
- SOURCE_OR_INDEX_DATA_WORKER =
lambda do |syncer, type, kind| ::Process.setproctitle("chewy [#{type}]: sync fetching data (#{kind})") result = case kind when :source syncer.send(:fetch_source_data) when :index syncer.send(:fetch_index_data) end {kind => result} end
Class Method Summary collapse
-
.dates_equal(one, two) ⇒ Object
Compares times with ms precision.
- .typecast_date(string) ⇒ Object
Instance Method Summary collapse
-
#initialize(type, parallel: nil) ⇒ Syncer
constructor
A new instance of Syncer.
-
#missing_ids ⇒ Array<String>
Finds ids of all the objects that are not indexed yet or deleted from the source already.
-
#outdated_ids ⇒ Array<String>
If type supports outdated sync, it compares for the values of the type
outdated_sync_field
for each object and document in the source and index and returns the ids of entities which which are having different values there. -
#perform ⇒ Integer?
Finds all the missing and outdated ids and performs import for them.
Constructor Details
#initialize(type, parallel: nil) ⇒ Syncer
Returns a new instance of Syncer.
81 82 83 84 85 86 87 88 89 90 |
# File 'lib/chewy/type/syncer.rb', line 81 def initialize(type, parallel: nil) @type = type @parallel = if !parallel || parallel.is_a?(Hash) parallel elsif parallel.is_a?(Integer) {in_processes: parallel} else {} end end |
Class Method Details
.dates_equal(one, two) ⇒ Object
Compares times with ms precision.
67 68 69 |
# File 'lib/chewy/type/syncer.rb', line 67 def self.dates_equal(one, two) [one.to_i, one.usec / 1000] == [two.to_i, two.usec / 1000] end |
.typecast_date(string) ⇒ Object
56 57 58 59 60 61 62 63 64 |
# File 'lib/chewy/type/syncer.rb', line 56 def self.typecast_date(string) if string.is_a?(String) && (match = ISO_DATETIME.match(string)) microsec = (match[7].to_r * 1_000_000).to_i date = "#{match[1]}-#{match[2]}-#{match[3]}T#{match[4]}:#{match[5]}:#{match[6]}.#{format('%06d', microsec)}+00:00" Time.iso8601(date) else string end end |
Instance Method Details
#missing_ids ⇒ Array<String>
Finds ids of all the objects that are not indexed yet or deleted from the source already.
105 106 107 108 109 110 111 112 113 114 |
# File 'lib/chewy/type/syncer.rb', line 105 def missing_ids return [] if source_data.blank? @missing_ids ||= begin source_data_ids = data_ids(source_data) index_data_ids = data_ids(index_data) (source_data_ids - index_data_ids).concat(index_data_ids - source_data_ids) end end |
#outdated_ids ⇒ Array<String>
If type supports outdated sync, it compares for the values of the
type outdated_sync_field
for each object and document in the source
and index and returns the ids of entities which which are having
different values there.
123 124 125 126 127 128 129 130 131 132 |
# File 'lib/chewy/type/syncer.rb', line 123 def outdated_ids return [] if source_data.blank? || index_data.blank? || !@type.supports_outdated_sync? @outdated_ids ||= begin if @parallel parallel_outdated_ids else linear_outdated_ids end end end |
#perform ⇒ Integer?
Finds all the missing and outdated ids and performs import for them.
95 96 97 98 99 |
# File 'lib/chewy/type/syncer.rb', line 95 def perform ids = missing_ids | outdated_ids return 0 if ids.blank? @type.import(ids, parallel: @parallel) && ids.count end |