Class: Searchkick::Index

Inherits:
Object
  • Object
show all
Includes:
IndexOptions
Defined in:
lib/searchkick/index.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from IndexOptions

#index_options

Constructor Details

#initialize(name, options = {}) ⇒ Index

Returns a new instance of Index.



7
8
9
10
11
# File 'lib/searchkick/index.rb', line 7

def initialize(name, options = {})
  @name = name
  @options = options
  @klass_document_type = {} # cache
end

Instance Attribute Details

#nameObject (readonly)

Returns the value of attribute name.



5
6
7
# File 'lib/searchkick/index.rb', line 5

def name
  @name
end

#optionsObject (readonly)

Returns the value of attribute options.



5
6
7
# File 'lib/searchkick/index.rb', line 5

def options
  @options
end

Instance Method Details

#alias_exists?Boolean

Returns:

  • (Boolean)


35
36
37
# File 'lib/searchkick/index.rb', line 35

def alias_exists?
  client.indices.exists_alias name: name
end

#all_indices(unaliased: false) ⇒ Object



188
189
190
191
192
193
194
195
196
197
# File 'lib/searchkick/index.rb', line 188

def all_indices(unaliased: false)
  indices =
    begin
      client.indices.get_aliases
    rescue Elasticsearch::Transport::Transport::Errors::NotFound
      {}
    end
  indices = indices.select { |_k, v| v.empty? || v["aliases"].empty? } if unaliased
  indices.select { |k, _v| k =~ /\A#{Regexp.escape(name)}_\d{14,17}\z/ }.keys
end

#batches_leftObject



311
312
313
# File 'lib/searchkick/index.rb', line 311

def batches_left
  Searchkick.with_redis { |r| r.scard(batches_key) }
end

#bulk_delete(records) ⇒ Object



89
90
91
# File 'lib/searchkick/index.rb', line 89

def bulk_delete(records)
  bulk_delete_helper(records)
end

#bulk_index(records) ⇒ Object Also known as: import



93
94
95
# File 'lib/searchkick/index.rb', line 93

def bulk_index(records)
  bulk_index_helper(records)
end

#bulk_update(records, method_name) ⇒ Object



98
99
100
# File 'lib/searchkick/index.rb', line 98

def bulk_update(records, method_name)
  bulk_update_helper(records, method_name)
end

#clean_indicesObject

remove old indices that start w/ index_name



200
201
202
203
204
205
206
# File 'lib/searchkick/index.rb', line 200

def clean_indices
  indices = all_indices(unaliased: true)
  indices.each do |index|
    Searchkick::Index.new(index).delete
  end
  indices
end

#create(body = {}) ⇒ Object



13
14
15
# File 'lib/searchkick/index.rb', line 13

def create(body = {})
  client.indices.create index: name, body: body
end

#create_index(index_options: nil) ⇒ Object

reindex



181
182
183
184
185
186
# File 'lib/searchkick/index.rb', line 181

def create_index(index_options: nil)
  index_options ||= self.index_options
  index = Searchkick::Index.new("#{name}_#{Time.now.strftime('%Y%m%d%H%M%S%L')}", @options)
  index.create(index_options)
  index
end

#deleteObject



17
18
19
20
21
22
23
24
25
# File 'lib/searchkick/index.rb', line 17

def delete
  if !Searchkick.server_below?("6.0.0-alpha1") && alias_exists?
    # can't call delete directly on aliases in ES 6
    indices = client.indices.get_alias(name: name).keys
    client.indices.delete index: indices
  else
    client.indices.delete index: name
  end
end

#exists?Boolean

Returns:

  • (Boolean)


27
28
29
# File 'lib/searchkick/index.rb', line 27

def exists?
  client.indices.exists index: name
end

#import_scope(scope, resume: false, method_name: nil, async: false, batch: false, batch_id: nil, full: false) ⇒ Object



281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
# File 'lib/searchkick/index.rb', line 281

def import_scope(scope, resume: false, method_name: nil, async: false, batch: false, batch_id: nil, full: false)
  # use scope for import
  scope = scope.search_import if scope.respond_to?(:search_import)

  if batch
    import_or_update scope.to_a, method_name, async
    Searchkick.with_redis { |r| r.srem(batches_key, batch_id) } if batch_id
  elsif full && async
    full_reindex_async(scope)
  elsif scope.respond_to?(:find_in_batches)
    if resume
      # use total docs instead of max id since there's not a great way
      # to get the max _id without scripting since it's a string

      # TODO use primary key and prefix with table name
      scope = scope.where("id > ?", total_docs)
    end

    scope = scope.select("id").except(:includes, :preload) if async

    scope.find_in_batches batch_size: batch_size do |items|
      import_or_update items, method_name, async
    end
  else
    each_batch(scope) do |items|
      import_or_update items, method_name, async
    end
  end
end

#klass_document_type(klass, ignore_type = false) ⇒ Object



321
322
323
324
325
326
327
328
329
330
331
332
333
# File 'lib/searchkick/index.rb', line 321

def klass_document_type(klass, ignore_type = false)
  @klass_document_type[[klass, ignore_type]] ||= begin
    if klass.respond_to?(:document_type)
      klass.document_type
    elsif !ignore_type && klass.searchkick_klass.searchkick_options[:_type]
      type = klass.searchkick_klass.searchkick_options[:_type]
      type = type.call if type.respond_to?(:call)
      type
    else
      klass.model_name.to_s.underscore
    end
  end
end

#mappingObject



39
40
41
# File 'lib/searchkick/index.rb', line 39

def mapping
  client.indices.get_mapping index: name
end

#promote(new_name, update_refresh_interval: false) ⇒ Object Also known as: swap



55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
# File 'lib/searchkick/index.rb', line 55

def promote(new_name, update_refresh_interval: false)
  if update_refresh_interval
    new_index = Searchkick::Index.new(new_name)
    settings = options[:settings] || {}
    refresh_interval = (settings[:index] && settings[:index][:refresh_interval]) || "1s"
    new_index.update_settings(index: {refresh_interval: refresh_interval})
  end

  old_indices =
    begin
      client.indices.get_alias(name: name).keys
    rescue Elasticsearch::Transport::Transport::Errors::NotFound
      {}
    end
  actions = old_indices.map { |old_name| {remove: {index: old_name, alias: name}} } + [{add: {index: new_name, alias: name}}]
  client.indices.update_aliases body: {actions: actions}
end

#record_data(r) ⇒ Object



102
103
104
105
106
107
108
109
110
# File 'lib/searchkick/index.rb', line 102

def record_data(r)
  data = {
    _index: name,
    _id: search_id(r),
    _type: document_type(r)
  }
  data[:_routing] = r.search_routing if r.respond_to?(:search_routing)
  data
end

#refreshObject



31
32
33
# File 'lib/searchkick/index.rb', line 31

def refresh
  client.indices.refresh index: name
end

#refresh_intervalObject



47
48
49
# File 'lib/searchkick/index.rb', line 47

def refresh_interval
  settings.values.first["settings"]["index"]["refresh_interval"]
end

#reindex_queueObject

queue



162
163
164
# File 'lib/searchkick/index.rb', line 162

def reindex_queue
  Searchkick::ReindexQueue.new(name)
end

#reindex_record(record) ⇒ Object



120
121
122
123
124
125
126
127
128
129
130
# File 'lib/searchkick/index.rb', line 120

def reindex_record(record)
  if record.destroyed? || !record.should_index?
    begin
      remove(record)
    rescue Elasticsearch::Transport::Transport::Errors::NotFound
      # do nothing
    end
  else
    store(record)
  end
end

#reindex_record_async(record) ⇒ Object



132
133
134
135
136
137
138
139
140
141
142
# File 'lib/searchkick/index.rb', line 132

def reindex_record_async(record)
  if Searchkick.callbacks_value.nil?
    if defined?(Searchkick::ReindexV2Job)
      Searchkick::ReindexV2Job.perform_later(record.class.name, record.id.to_s)
    else
      raise Searchkick::Error, "Active Job not found"
    end
  else
    reindex_record(record)
  end
end

#reindex_scope(scope, import: true, resume: false, retain: false, async: false, refresh_interval: nil) ⇒ Object



223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
# File 'lib/searchkick/index.rb', line 223

def reindex_scope(scope, import: true, resume: false, retain: false, async: false, refresh_interval: nil)
  if resume
    index_name = all_indices.sort.last
    raise Searchkick::Error, "No index to resume" unless index_name
    index = Searchkick::Index.new(index_name)
  else
    clean_indices unless retain

    index_options = scope.searchkick_index_options
    index_options.deep_merge!(settings: {index: {refresh_interval: refresh_interval}}) if refresh_interval
    index = create_index(index_options: index_options)
  end

  # check if alias exists
  alias_exists = alias_exists?
  if alias_exists
    # import before promotion
    index.import_scope(scope, resume: resume, async: async, full: true) if import

    # get existing indices to remove
    unless async
      promote(index.name, update_refresh_interval: !refresh_interval.nil?)
      clean_indices unless retain
    end
  else
    delete if exists?
    promote(index.name, update_refresh_interval: !refresh_interval.nil?)

    # import after promotion
    index.import_scope(scope, resume: resume, async: async, full: true) if import
  end

  if async
    if async.is_a?(Hash) && async[:wait]
      puts "Created index: #{index.name}"
      puts "Jobs queued. Waiting..."
      loop do
        sleep 3
        status = Searchkick.reindex_status(index.name)
        break if status[:completed]
        puts "Batches left: #{status[:batches_left]}"
      end
      # already promoted if alias didn't exist
      if alias_exists
        puts "Jobs complete. Promoting..."
        promote(index.name, update_refresh_interval: !refresh_interval.nil?)
      end
      clean_indices unless retain
      puts "SUCCESS!"
    end

    {index_name: index.name}
  else
    index.refresh
    true
  end
end

#remove(record) ⇒ Object



81
82
83
# File 'lib/searchkick/index.rb', line 81

def remove(record)
  bulk_delete_helper([record])
end

#retrieve(record) ⇒ Object



112
113
114
115
116
117
118
# File 'lib/searchkick/index.rb', line 112

def retrieve(record)
  client.get(
    index: name,
    type: document_type(record),
    id: search_id(record)
  )["_source"]
end

#search_model(searchkick_klass, term = "*", **options) {|query.body| ... } ⇒ Object

TODO remove in next major version

Yields:

  • (query.body)


169
170
171
172
173
174
175
176
177
# File 'lib/searchkick/index.rb', line 169

def search_model(searchkick_klass, term = "*", **options, &block)
  query = Searchkick::Query.new(searchkick_klass, term, options)
  yield(query.body) if block
  if options[:execute] == false
    query
  else
    query.execute
  end
end

#settingsObject



43
44
45
# File 'lib/searchkick/index.rb', line 43

def settings
  client.indices.get_settings index: name
end

#similar_record(record, **options) ⇒ Object



144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
# File 'lib/searchkick/index.rb', line 144

def similar_record(record, **options)
  like_text = retrieve(record).to_hash
    .keep_if { |k, _| !options[:fields] || options[:fields].map(&:to_s).include?(k) }
    .values.compact.join(" ")

  # TODO deep merge method
  options[:where] ||= {}
  options[:where][:_id] ||= {}
  options[:where][:_id][:not] = record.id.to_s
  options[:per_page] ||= 10
  options[:similar] = true

  # TODO use index class instead of record class
  search_model(record.class, like_text, options)
end

#store(record) ⇒ Object

record based use helpers for notifications



77
78
79
# File 'lib/searchkick/index.rb', line 77

def store(record)
  bulk_index_helper([record])
end

#tokens(text, options = {}) ⇒ Object

other



317
318
319
# File 'lib/searchkick/index.rb', line 317

def tokens(text, options = {})
  client.indices.analyze(body: {text: text}.merge(options), index: name)["tokens"].map { |t| t["token"] }
end

#total_docsObject



208
209
210
211
212
213
214
215
216
217
218
219
# File 'lib/searchkick/index.rb', line 208

def total_docs
  response =
    client.search(
      index: name,
      body: {
        query: {match_all: {}},
        size: 0
      }
    )

  response["hits"]["total"]
end

#update_record(record, method_name) ⇒ Object



85
86
87
# File 'lib/searchkick/index.rb', line 85

def update_record(record, method_name)
  bulk_update_helper([record], method_name)
end

#update_settings(settings) ⇒ Object



51
52
53
# File 'lib/searchkick/index.rb', line 51

def update_settings(settings)
  client.indices.put_settings index: name, body: settings
end