Class: Searchkick::Index

Inherits:
Object
  • Object
show all
Includes:
IndexOptions
Defined in:
lib/searchkick/index.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from IndexOptions

#index_options

Constructor Details

#initialize(name, options = {}) ⇒ Index

Returns a new instance of Index.



7
8
9
10
11
# File 'lib/searchkick/index.rb', line 7

def initialize(name, options = {})
  @name = name
  @options = options
  @klass_document_type = {} # cache
end

Instance Attribute Details

#nameObject (readonly)

Returns the value of attribute name.



5
6
7
# File 'lib/searchkick/index.rb', line 5

def name
  @name
end

#optionsObject (readonly)

Returns the value of attribute options.



5
6
7
# File 'lib/searchkick/index.rb', line 5

def options
  @options
end

Instance Method Details

#alias_exists?Boolean

Returns:

  • (Boolean)


29
30
31
# File 'lib/searchkick/index.rb', line 29

def alias_exists?
  client.indices.exists_alias name: name
end

#all_indices(unaliased: false) ⇒ Object



182
183
184
185
186
187
188
189
190
191
# File 'lib/searchkick/index.rb', line 182

def all_indices(unaliased: false)
  indices =
    begin
      client.indices.get_aliases
    rescue Elasticsearch::Transport::Transport::Errors::NotFound
      {}
    end
  indices = indices.select { |_k, v| v.empty? || v["aliases"].empty? } if unaliased
  indices.select { |k, _v| k =~ /\A#{Regexp.escape(name)}_\d{14,17}\z/ }.keys
end

#batches_leftObject



286
287
288
# File 'lib/searchkick/index.rb', line 286

def batches_left
  Searchkick.with_redis { |r| r.scard(batches_key) }
end

#bulk_delete(records) ⇒ Object



83
84
85
# File 'lib/searchkick/index.rb', line 83

def bulk_delete(records)
  bulk_delete_helper(records)
end

#bulk_index(records) ⇒ Object Also known as: import



87
88
89
# File 'lib/searchkick/index.rb', line 87

def bulk_index(records)
  bulk_index_helper(records)
end

#bulk_update(records, method_name) ⇒ Object



92
93
94
# File 'lib/searchkick/index.rb', line 92

def bulk_update(records, method_name)
  bulk_update_helper(records, method_name)
end

#clean_indicesObject

remove old indices that start w/ index_name



194
195
196
197
198
199
200
# File 'lib/searchkick/index.rb', line 194

def clean_indices
  indices = all_indices(unaliased: true)
  indices.each do |index|
    Searchkick::Index.new(index).delete
  end
  indices
end

#create(body = {}) ⇒ Object



13
14
15
# File 'lib/searchkick/index.rb', line 13

def create(body = {})
  client.indices.create index: name, body: body
end

#create_index(index_options: nil) ⇒ Object

reindex



175
176
177
178
179
180
# File 'lib/searchkick/index.rb', line 175

def create_index(index_options: nil)
  index_options ||= self.index_options
  index = Searchkick::Index.new("#{name}_#{Time.now.strftime('%Y%m%d%H%M%S%L')}", @options)
  index.create(index_options)
  index
end

#deleteObject



17
18
19
# File 'lib/searchkick/index.rb', line 17

def delete
  client.indices.delete index: name
end

#exists?Boolean

Returns:

  • (Boolean)


21
22
23
# File 'lib/searchkick/index.rb', line 21

def exists?
  client.indices.exists index: name
end

#import_scope(scope, resume: false, method_name: nil, async: false, batch: false, batch_id: nil, full: false) ⇒ Object



256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
# File 'lib/searchkick/index.rb', line 256

def import_scope(scope, resume: false, method_name: nil, async: false, batch: false, batch_id: nil, full: false)
  # use scope for import
  scope = scope.search_import if scope.respond_to?(:search_import)

  if batch
    import_or_update scope.to_a, method_name, async
    Searchkick.with_redis { |r| r.srem(batches_key, batch_id) } if batch_id
  elsif full && async
    full_reindex_async(scope)
  elsif scope.respond_to?(:find_in_batches)
    if resume
      # use total docs instead of max id since there's not a great way
      # to get the max _id without scripting since it's a string

      # TODO use primary key and prefix with table name
      scope = scope.where("id > ?", total_docs)
    end

    scope = scope.select("id").except(:includes, :preload) if async

    scope.find_in_batches batch_size: batch_size do |batch|
      import_or_update batch, method_name, async
    end
  else
    each_batch(scope) do |items|
      import_or_update items, method_name, async
    end
  end
end

#klass_document_type(klass) ⇒ Object



296
297
298
299
300
301
302
303
304
# File 'lib/searchkick/index.rb', line 296

def klass_document_type(klass)
  @klass_document_type[klass] ||= begin
    if klass.respond_to?(:document_type)
      klass.document_type
    else
      klass.model_name.to_s.underscore
    end
  end
end

#mappingObject



33
34
35
# File 'lib/searchkick/index.rb', line 33

def mapping
  client.indices.get_mapping index: name
end

#promote(new_name, update_refresh_interval: false) ⇒ Object Also known as: swap



49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
# File 'lib/searchkick/index.rb', line 49

def promote(new_name, update_refresh_interval: false)
  if update_refresh_interval
    new_index = Searchkick::Index.new(new_name)
    settings = options[:settings] || {}
    refresh_interval = (settings[:index] && settings[:index][:refresh_interval]) || "1s"
    new_index.update_settings(index: {refresh_interval: refresh_interval})
  end

  old_indices =
    begin
      client.indices.get_alias(name: name).keys
    rescue Elasticsearch::Transport::Transport::Errors::NotFound
      {}
    end
  actions = old_indices.map { |old_name| {remove: {index: old_name, alias: name}} } + [{add: {index: new_name, alias: name}}]
  client.indices.update_aliases body: {actions: actions}
end

#record_data(r) ⇒ Object



96
97
98
99
100
101
102
103
104
# File 'lib/searchkick/index.rb', line 96

def record_data(r)
  data = {
    _index: name,
    _id: search_id(r),
    _type: document_type(r)
  }
  data[:_routing] = r.search_routing if r.respond_to?(:search_routing)
  data
end

#refreshObject



25
26
27
# File 'lib/searchkick/index.rb', line 25

def refresh
  client.indices.refresh index: name
end

#refresh_intervalObject



41
42
43
# File 'lib/searchkick/index.rb', line 41

def refresh_interval
  settings.values.first["settings"]["index"]["refresh_interval"]
end

#reindex_queueObject

queue



156
157
158
# File 'lib/searchkick/index.rb', line 156

def reindex_queue
  Searchkick::ReindexQueue.new(name)
end

#reindex_record(record) ⇒ Object



114
115
116
117
118
119
120
121
122
123
124
# File 'lib/searchkick/index.rb', line 114

def reindex_record(record)
  if record.destroyed? || !record.should_index?
    begin
      remove(record)
    rescue Elasticsearch::Transport::Transport::Errors::NotFound
      # do nothing
    end
  else
    store(record)
  end
end

#reindex_record_async(record) ⇒ Object



126
127
128
129
130
131
132
133
134
135
136
# File 'lib/searchkick/index.rb', line 126

def reindex_record_async(record)
  if Searchkick.callbacks_value.nil?
    if defined?(Searchkick::ReindexV2Job)
      Searchkick::ReindexV2Job.perform_later(record.class.name, record.id.to_s)
    else
      raise Searchkick::Error, "Active Job not found"
    end
  else
    reindex_record(record)
  end
end

#reindex_scope(scope, import: true, resume: false, retain: false, async: false, refresh_interval: nil) ⇒ Object



217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
# File 'lib/searchkick/index.rb', line 217

def reindex_scope(scope, import: true, resume: false, retain: false, async: false, refresh_interval: nil)
  if resume
    index_name = all_indices.sort.last
    raise Searchkick::Error, "No index to resume" unless index_name
    index = Searchkick::Index.new(index_name)
  else
    clean_indices unless retain

    index_options = scope.searchkick_index_options
    index_options.deep_merge!(settings: {index: {refresh_interval: refresh_interval}}) if refresh_interval
    index = create_index(index_options: index_options)
  end

  # check if alias exists
  if alias_exists?
    # import before promotion
    index.import_scope(scope, resume: resume, async: async, full: true) if import

    # get existing indices to remove
    unless async
      promote(index.name, update_refresh_interval: !refresh_interval.nil?)
      clean_indices unless retain
    end
  else
    delete if exists?
    promote(index.name, update_refresh_interval: !refresh_interval.nil?)

    # import after promotion
    index.import_scope(scope, resume: resume, async: async, full: true) if import
  end

  if async
    {index_name: index.name}
  else
    index.refresh
    true
  end
end

#remove(record) ⇒ Object



75
76
77
# File 'lib/searchkick/index.rb', line 75

def remove(record)
  bulk_delete_helper([record])
end

#retrieve(record) ⇒ Object



106
107
108
109
110
111
112
# File 'lib/searchkick/index.rb', line 106

def retrieve(record)
  client.get(
    index: name,
    type: document_type(record),
    id: search_id(record)
  )["_source"]
end

#search_model(searchkick_klass, term = "*", **options) {|query.body| ... } ⇒ Object

TODO remove in next major version

Yields:

  • (query.body)


163
164
165
166
167
168
169
170
171
# File 'lib/searchkick/index.rb', line 163

def search_model(searchkick_klass, term = "*", **options, &block)
  query = Searchkick::Query.new(searchkick_klass, term, options)
  yield(query.body) if block
  if options[:execute] == false
    query
  else
    query.execute
  end
end

#settingsObject



37
38
39
# File 'lib/searchkick/index.rb', line 37

def settings
  client.indices.get_settings index: name
end

#similar_record(record, **options) ⇒ Object



138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
# File 'lib/searchkick/index.rb', line 138

def similar_record(record, **options)
  like_text = retrieve(record).to_hash
    .keep_if { |k, _| !options[:fields] || options[:fields].map(&:to_s).include?(k) }
    .values.compact.join(" ")

  # TODO deep merge method
  options[:where] ||= {}
  options[:where][:_id] ||= {}
  options[:where][:_id][:not] = record.id.to_s
  options[:per_page] ||= 10
  options[:similar] = true

  # TODO use index class instead of record class
  search_model(record.class, like_text, options)
end

#store(record) ⇒ Object

record based use helpers for notifications



71
72
73
# File 'lib/searchkick/index.rb', line 71

def store(record)
  bulk_index_helper([record])
end

#tokens(text, options = {}) ⇒ Object

other



292
293
294
# File 'lib/searchkick/index.rb', line 292

def tokens(text, options = {})
  client.indices.analyze({text: text, index: name}.merge(options))["tokens"].map { |t| t["token"] }
end

#total_docsObject



202
203
204
205
206
207
208
209
210
211
212
213
# File 'lib/searchkick/index.rb', line 202

def total_docs
  response =
    client.search(
      index: name,
      body: {
        query: {match_all: {}},
        size: 0
      }
    )

  response["hits"]["total"]
end

#update_record(record, method_name) ⇒ Object



79
80
81
# File 'lib/searchkick/index.rb', line 79

def update_record(record, method_name)
  bulk_update_helper([record], method_name)
end

#update_settings(settings) ⇒ Object



45
46
47
# File 'lib/searchkick/index.rb', line 45

def update_settings(settings)
  client.indices.put_settings index: name, body: settings
end