Module: Chewy::RakeHelper

Defined in:
lib/chewy/rake_helper.rb

Constant Summary collapse

IMPORT_CALLBACK =
lambda do |output, _name, start, finish, _id, payload|
  duration = (finish - start).ceil
  stats = payload.fetch(:import, {}).map { |key, count| "#{key} #{count}" }.join(', ')
  output.puts "  Imported #{payload[:index]} in #{human_duration(duration)}, stats: #{stats}"
  payload[:errors]&.each do |action, errors|
    output.puts "    #{action.to_s.humanize} errors:"
    errors.each do |error, documents|
      output.puts "      `#{error}`"
      output.puts "        on #{documents.count} documents: #{documents}"
    end
  end
end
JOURNAL_CALLBACK =
lambda do |output, _, _, _, _, payload|
  count = payload[:groups].values.map(&:size).sum
  targets = payload[:groups].keys.sort_by(&:derivable_name)
  output.puts "  Applying journal to #{targets}, #{count} entries, stage #{payload[:stage]}"
end
DELETE_BY_QUERY_OPTIONS =
%w[WAIT_FOR_COMPLETION REQUESTS_PER_SECOND SCROLL_SIZE].freeze
FALSE_VALUES =
%w[0 f false off].freeze

Class Method Summary collapse

Class Method Details

.all_indexesArray<Chewy::Index>

Eager loads and returns all the indexes defined in the application except Chewy::Stash::Specification and Chewy::Stash::Journal.

Returns:



204
205
206
207
# File 'lib/chewy/rake_helper.rb', line 204

def all_indexes
  Chewy.eager_load!
  Chewy::Index.descendants - [Chewy::Stash::Journal, Chewy::Stash::Specification]
end

.delete_by_query_options_from_env(env) ⇒ Object

Reads options that are required to run journal cleanup asynchronously from ENV hash

Examples:

Chewy::RakeHelper.delete_by_query_options_from_env({'WAIT_FOR_COMPLETION' => 'false','REQUESTS_PER_SECOND' => '10','SCROLL_SIZE' => '5000'})
# => { wait_for_completion: false, requests_per_second: 10.0, scroll_size: 5000 }

See Also:



245
246
247
248
249
250
251
252
253
254
255
256
# File 'lib/chewy/rake_helper.rb', line 245

def delete_by_query_options_from_env(env)
  env
    .slice(*DELETE_BY_QUERY_OPTIONS)
    .transform_keys { |k| k.downcase.to_sym }
    .to_h do |key, value|
      case key
      when :wait_for_completion then [key, !FALSE_VALUES.include?(value.downcase)]
      when :requests_per_second then [key, value.to_f]
      when :scroll_size then [key, value.to_i]
      end
    end
end

.journal_apply(time: nil, only: nil, except: nil, output: $stdout) ⇒ Array<Chewy::Index>

Applies changes that were done after the specified time for the specified indexes or all of them.

Examples:

Chewy::RakeHelper.journal_apply(time: 1.minute.ago) # applies entries created for the last minute
Chewy::RakeHelper.journal_apply(time: 1.minute.ago, only: 'places') # applies only PlacesIndex entries created for the last minute
Chewy::RakeHelper.journal_apply(time: 1.minute.ago, except: PlacesIndex) # applies everything, but PlacesIndex, entries created for the last minute

Parameters:

  • time (Time, DateTime) (defaults to: nil)

    use only journal entries created after this time

  • only (Array<Chewy::Index, String>, Chewy::Index, String) (defaults to: nil)

    indexes to synchronize; if nothing is passed - uses all the indexes defined in the app

  • except (Array<Chewy::Index, String>, Chewy::Index, String) (defaults to: nil)

    indexes to exclude from processing

  • output (IO) (defaults to: $stdout)

    output io for logging

Returns:

  • (Array<Chewy::Index>)

    indexes that were actually updated

Raises:

  • (ArgumentError)


163
164
165
166
167
168
169
170
171
# File 'lib/chewy/rake_helper.rb', line 163

def journal_apply(time: nil, only: nil, except: nil, output: $stdout)
  raise ArgumentError, 'Please specify the time to start with' unless time

  subscribed_task_stats(output) do
    output.puts "Applying journal entries created after #{time}"
    count = Chewy::Journal.new(journal_indexes_from(only: only, except: except)).apply(time)
    output.puts 'No journal entries were created after the specified time' if count.zero?
  end
end

.journal_clean(time: nil, only: nil, except: nil, delete_by_query_options: {}, output: $stdout) ⇒ Array<Chewy::Index>

Removes journal records created before the specified timestamp for the specified indexes or all of them.

Examples:

Chewy::RakeHelper.journal_clean # cleans everything
Chewy::RakeHelper.journal_clean(time: 1.minute.ago) # leaves only entries created for the last minute
Chewy::RakeHelper.journal_clean(only: 'places') # cleans only PlacesIndex entries
Chewy::RakeHelper.journal_clean(except: PlacesIndex) # cleans everything, but PlacesIndex entries

Parameters:

  • time (Time, DateTime) (defaults to: nil)

    clean all the journal entries created before this time

  • only (Array<Chewy::Index, String>, Chewy::Index, String) (defaults to: nil)

    indexes to synchronize; if nothing is passed - uses all the indexes defined in the app

  • except (Array<Chewy::Index, String>, Chewy::Index, String) (defaults to: nil)

    indexes to exclude from processing

  • output (IO) (defaults to: $stdout)

    output io for logging

Returns:

  • (Array<Chewy::Index>)

    indexes that were actually updated



187
188
189
190
191
192
193
194
195
196
197
198
# File 'lib/chewy/rake_helper.rb', line 187

def journal_clean(time: nil, only: nil, except: nil, delete_by_query_options: {}, output: $stdout)
  subscribed_task_stats(output) do
    output.puts "Cleaning journal entries created before #{time}" if time
    response = Chewy::Journal.new(journal_indexes_from(only: only, except: except)).clean(time, delete_by_query_options: delete_by_query_options)
    if response.key?('task')
      output.puts "Task to cleanup the journal has been created, #{response['task']}"
    else
      count = response['deleted'] || response['_indices']['_all']['deleted']
      output.puts "Cleaned up #{count} journal entries"
    end
  end
end

.normalize_index(identifier) ⇒ Object



262
263
264
265
266
# File 'lib/chewy/rake_helper.rb', line 262

def normalize_index(identifier)
  return identifier if identifier.is_a?(Class) && identifier < Chewy::Index

  "#{identifier.to_s.camelize}Index".constantize
end

.normalize_indexes(*identifiers) ⇒ Object



258
259
260
# File 'lib/chewy/rake_helper.rb', line 258

def normalize_indexes(*identifiers)
  identifiers.flatten(1).map { |identifier| normalize_index(identifier) }
end

.reindex(source:, dest:, output: $stdout) ⇒ Object

Reindex data from source index to destination index

Examples:

Chewy::RakeHelper.reindex(source: 'users_index', dest: 'cities_index') reindex data from 'users_index' index to 'cities_index'

Parameters:

  • source (String)

    , dest [String] indexes to reindex



215
216
217
218
219
220
221
# File 'lib/chewy/rake_helper.rb', line 215

def reindex(source:, dest:, output: $stdout)
  subscribed_task_stats(output) do
    output.puts "Source index is #{source}\nDestination index is #{dest}"
    Chewy::Index.reindex(source: source, dest: dest)
    output.puts "#{source} index successfully reindexed with #{dest} index data"
  end
end

.reset(only: nil, except: nil, parallel: nil, output: $stdout) ⇒ Array<Chewy::Index>

Performs zero-downtime reindexing of all documents for the specified indexes

Examples:

Chewy::RakeHelper.reset # resets everything
Chewy::RakeHelper.reset(only: 'cities') # resets only CitiesIndex
Chewy::RakeHelper.reset(only: ['cities', CountriesIndex]) # resets CitiesIndex and CountriesIndex
Chewy::RakeHelper.reset(except: CitiesIndex) # resets everything, but CitiesIndex
Chewy::RakeHelper.reset(only: ['cities', 'countries'], except: CitiesIndex) # resets only CountriesIndex

Parameters:

  • only (Array<Chewy::Index, String>, Chewy::Index, String) (defaults to: nil)

    index or indexes to reset; if nothing is passed - uses all the indexes defined in the app

  • except (Array<Chewy::Index, String>, Chewy::Index, String) (defaults to: nil)

    index or indexes to exclude from processing

  • parallel (true, Integer, Hash) (defaults to: nil)

    any acceptable parallel options for import

  • output (IO) (defaults to: $stdout)

    output io for logging

Returns:



40
41
42
43
44
45
46
47
48
# File 'lib/chewy/rake_helper.rb', line 40

def reset(only: nil, except: nil, parallel: nil, output: $stdout)
  warn_missing_index(output)

  subscribed_task_stats(output) do
    indexes_from(only: only, except: except).each do |index|
      reset_one(index, output, parallel: parallel)
    end
  end
end

.subscribed_task_stats(output = $stdout, &block) ⇒ Object



268
269
270
271
272
273
274
# File 'lib/chewy/rake_helper.rb', line 268

def subscribed_task_stats(output = $stdout, &block)
  start = Time.now
  ActiveSupport::Notifications.subscribed(JOURNAL_CALLBACK.curry[output], 'apply_journal.chewy') do
    ActiveSupport::Notifications.subscribed(IMPORT_CALLBACK.curry[output], 'import_objects.chewy', &block)
  end
  output.puts "Total: #{human_duration(Time.now - start)}"
end

.sync(only: nil, except: nil, parallel: nil, output: $stdout) ⇒ Array<Chewy::Index>

Performs synchronization for each passed index if it exists.

Examples:

Chewy::RakeHelper.sync # synchronizes everything
Chewy::RakeHelper.sync(only: 'places') # synchronizes only PlacesIndex
Chewy::RakeHelper.sync(except: PlacesIndex) # synchronizes everything, but PlacesIndex

Parameters:

  • only (Array<Chewy::Index, String>, Chewy::Index, String) (defaults to: nil)

    indexes to synchronize; if nothing is passed - uses all the indexes defined in the app

  • except (Array<Chewy::Index, String>, Chewy::Index, String) (defaults to: nil)

    indexes to exclude from processing

  • parallel (true, Integer, Hash) (defaults to: nil)

    any acceptable parallel options for sync

  • output (IO) (defaults to: $stdout)

    output io for logging

Returns:

  • (Array<Chewy::Index>)

    indexes that were actually updated



129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
# File 'lib/chewy/rake_helper.rb', line 129

def sync(only: nil, except: nil, parallel: nil, output: $stdout)
  subscribed_task_stats(output) do
    indexes_from(only: only, except: except).each_with_object([]) do |index, synced_indexes|
      output.puts "Synchronizing #{index}"
      output.puts "  #{index} doesn't support outdated synchronization" unless index.supports_outdated_sync?
      time = Time.now
      sync_result = index.sync(parallel: parallel)
      if !sync_result
        output.puts "  Something went wrong with the #{index} synchronization"
      elsif (sync_result[:count]).positive?
        output.puts "  Missing documents: #{sync_result[:missing]}" if sync_result[:missing].present?
        output.puts "  Outdated documents: #{sync_result[:outdated]}" if sync_result[:outdated].present?
        synced_indexes.push(index)
      else
        output.puts "  Skipping #{index}, up to date"
      end
      output.puts "  Took #{human_duration(Time.now - time)}"
    end
  end
end

.update(only: nil, except: nil, parallel: nil, output: $stdout) ⇒ Array<Chewy::Index>

Performs full update for each passed type if the corresponding index exists.

Examples:

Chewy::RakeHelper.update # updates everything
Chewy::RakeHelper.update(only: 'places') # updates only PlacesIndex
Chewy::RakeHelper.update(except: PlacesIndex) # updates everything, but PlacesIndex

Parameters:

  • only (Array<Chewy::Index, String>, Chewy::Index, String) (defaults to: nil)

    indexes to update; if nothing is passed - uses all the indexes defined in the app

  • except (Array<Chewy::Index, String>, Chewy::Index, String) (defaults to: nil)

    indexes to exclude from processing

  • parallel (true, Integer, Hash) (defaults to: nil)

    any acceptable parallel options for import

  • output (IO) (defaults to: $stdout)

    output io for logging

Returns:

  • (Array<Chewy::Index>)

    indexes that were actually updated



103
104
105
106
107
108
109
110
111
112
113
114
115
# File 'lib/chewy/rake_helper.rb', line 103

def update(only: nil, except: nil, parallel: nil, output: $stdout)
  subscribed_task_stats(output) do
    indexes_from(only: only, except: except).each_with_object([]) do |index, updated_indexes|
      if index.exists?
        output.puts "Updating #{index}"
        index.import(parallel: parallel)
        updated_indexes.push(index)
      else
        output.puts "Skipping #{index}, it does not exists (use rake chewy:reset[#{index.derivable_name}] to create and update it)"
      end
    end
  end
end

.update_mapping(name:, output: $stdout) ⇒ Object

Adds new fields to an existing data stream or index. Change the search settings of existing fields.

Examples:

Chewy::RakeHelper.update_mapping('cities', {properties: {new_field: {type: :text}}}) update 'cities' index with new_field of text type

Parameters:

  • name (String)

    , body_hash [Hash] index name and body hash to update



230
231
232
233
234
235
236
# File 'lib/chewy/rake_helper.rb', line 230

def update_mapping(name:, output: $stdout)
  subscribed_task_stats(output) do
    output.puts "Index name is #{name}"
    normalize_index(name).update_mapping
    output.puts "#{name} index successfully updated"
  end
end

.upgrade(only: nil, except: nil, parallel: nil, output: $stdout) ⇒ Array<Chewy::Index>

Performs zero-downtime reindexing of all documents for the specified indexes only if a particular index specification was changed.

Examples:

Chewy::RakeHelper.upgrade # resets everything
Chewy::RakeHelper.upgrade(only: 'cities') # resets only CitiesIndex
Chewy::RakeHelper.upgrade(only: ['cities', CountriesIndex]) # resets CitiesIndex and CountriesIndex
Chewy::RakeHelper.upgrade(except: CitiesIndex) # resets everything, but CitiesIndex
Chewy::RakeHelper.upgrade(only: ['cities', 'countries'], except: CitiesIndex) # resets only CountriesIndex

Parameters:

  • only (Array<Chewy::Index, String>, Chewy::Index, String) (defaults to: nil)

    index or indexes to reset; if nothing is passed - uses all the indexes defined in the app

  • except (Array<Chewy::Index, String>, Chewy::Index, String) (defaults to: nil)

    index or indexes to exclude from processing

  • parallel (true, Integer, Hash) (defaults to: nil)

    any acceptable parallel options for import

  • output (IO) (defaults to: $stdout)

    output io for logging

Returns:

  • (Array<Chewy::Index>)

    indexes that were actually reset



65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
# File 'lib/chewy/rake_helper.rb', line 65

def upgrade(only: nil, except: nil, parallel: nil, output: $stdout)
  warn_missing_index(output)

  subscribed_task_stats(output) do
    indexes = indexes_from(only: only, except: except)

    changed_indexes = indexes.select do |index|
      index.specification.changed?
    end

    if changed_indexes.present?
      indexes.each do |index|
        if changed_indexes.include?(index)
          reset_one(index, output, parallel: parallel)
        else
          output.puts "Skipping #{index}, the specification didn't change"
        end
      end
    else
      output.puts 'No index specification was changed'
    end

    changed_indexes
  end
end