Module: ElasticRecord::Index::Documents

Included in:
ElasticRecord::Index
Defined in:
lib/elastic_record/index/documents.rb

Instance Method Summary collapse

Instance Method Details

#build_scroll_enumerator(search: nil, scroll_id: nil, batch_size: 100, keep_alive: ElasticRecord::Config.scroll_keep_alive) ⇒ Object



156
157
158
# File 'lib/elastic_record/index/documents.rb', line 156

def build_scroll_enumerator(search: nil, scroll_id: nil, batch_size: 100, keep_alive: ElasticRecord::Config.scroll_keep_alive)
  ScrollEnumerator.new(self, search: search, scroll_id: scroll_id, batch_size: batch_size, keep_alive: keep_alive)
end

#bulk(options = {}) ⇒ Object



165
166
167
168
169
170
171
172
173
174
175
176
177
# File 'lib/elastic_record/index/documents.rb', line 165

def bulk(options = {})
  connection.bulk_stack.push []

  yield

  if current_bulk_batch.any?
    body = current_bulk_batch.map { |action| "#{ElasticRecord::JSON.encode(action)}\n" }.join
    results = connection.json_post("/_bulk?#{options.to_query}", body)
    verify_bulk_results(results)
  end
ensure
  connection.bulk_stack.pop
end

#bulk_add(batch, index_name: alias_name) ⇒ Object



179
180
181
182
183
184
185
# File 'lib/elastic_record/index/documents.rb', line 179

def bulk_add(batch, index_name: alias_name)
  bulk do
    batch.each do |record|
      index_record(record, index_name: index_name)
    end
  end
end

#current_bulk_batchObject



187
188
189
# File 'lib/elastic_record/index/documents.rb', line 187

def current_bulk_batch
  connection.bulk_stack.last
end

#delete_by_query(query) ⇒ Object



129
130
131
132
133
134
135
136
137
# File 'lib/elastic_record/index/documents.rb', line 129

def delete_by_query(query)
  scroll_enumerator = build_scroll_enumerator search: query

  scroll_enumerator.each_slice do |ids|
    bulk do
      ids.each { |id| delete_document(id) }
    end
  end
end

#delete_document(id, doctype: model.doctype, parent: nil, index_name: alias_name) ⇒ Object



113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
# File 'lib/elastic_record/index/documents.rb', line 113

def delete_document(id, doctype: model.doctype, parent: nil, index_name: alias_name)
  raise "Cannot delete document with empty id" if id.blank?
  index_name ||= alias_name

  if batch = current_bulk_batch
    instructions = { _index: index_name, _type: doctype.name, _id: id, _retry_on_conflict: 3 }
    instructions[:parent] = parent if parent
    batch << { delete: instructions }
  else
    path = "/#{index_name}/#{doctype.name}/#{id}"
    path << "&parent=#{parent}" if parent

    connection.json_delete path
  end
end

#explain(id, elastic_query) ⇒ Object



152
153
154
# File 'lib/elastic_record/index/documents.rb', line 152

def explain(id, elastic_query)
  get "_explain", elastic_query
end

#index_document(id, document, doctype: model.doctype, parent: nil, index_name: alias_name) ⇒ Object



76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
# File 'lib/elastic_record/index/documents.rb', line 76

def index_document(id, document, doctype: model.doctype, parent: nil, index_name: alias_name)
  if batch = current_bulk_batch
    instructions = { _index: index_name, _type: doctype.name, _id: id }
    instructions[:parent] = parent if parent

    batch << { index: instructions }
    batch << document
  else
    path = "/#{index_name}/#{doctype.name}/#{id}"
    path << "?parent=#{parent}" if parent

    if id
      connection.json_put path, document
    else
      connection.json_post path, document
    end
  end
end

#index_record(record, index_name: alias_name) ⇒ Object



54
55
56
57
58
59
60
61
62
63
# File 'lib/elastic_record/index/documents.rb', line 54

def index_record(record, index_name: alias_name)
  unless disabled
    index_document(
      record.try(:id),
      record.as_search_document,
      doctype: record.doctype,
      index_name: index_name
    )
  end
end

#record_exists?(id) ⇒ Boolean

Returns:

  • (Boolean)


139
140
141
# File 'lib/elastic_record/index/documents.rb', line 139

def record_exists?(id)
  get(id, model.doctype)['found']
end

#scroll(scroll_id, scroll_keep_alive) ⇒ Object



160
161
162
163
# File 'lib/elastic_record/index/documents.rb', line 160

def scroll(scroll_id, scroll_keep_alive)
  options = {scroll_id: scroll_id, scroll: scroll_keep_alive}
  connection.json_get("/_search/scroll?#{options.to_query}")
end

#search(elastic_query, options = {}) ⇒ Object



143
144
145
146
147
148
149
150
# File 'lib/elastic_record/index/documents.rb', line 143

def search(elastic_query, options = {})
  url = "_search"
  if options.any?
    url += "?#{options.to_query}"
  end

  get url, model.doctype, elastic_query
end

#update_document(id, document, doctype: model.doctype, parent: nil, index_name: alias_name) ⇒ Object



95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
# File 'lib/elastic_record/index/documents.rb', line 95

def update_document(id, document, doctype: model.doctype, parent: nil, index_name: alias_name)
  raise "Cannot update a document with empty id" if id.blank?
  params = {doc: document, doc_as_upsert: true}

  if batch = current_bulk_batch
    instructions = { _index: index_name, _type: doctype.name, _id: id, _retry_on_conflict: 3 }
    instructions[:parent] = parent if parent

    batch << { update: instructions }
    batch << params
  else
    path = "/#{index_name}/#{doctype.name}/#{id}/_update?retry_on_conflict=3"
    path << "&parent=#{parent}" if parent

    connection.json_post path, params
  end
end

#update_record(record, index_name: alias_name) ⇒ Object



65
66
67
68
69
70
71
72
73
74
# File 'lib/elastic_record/index/documents.rb', line 65

def update_record(record, index_name: alias_name)
  unless disabled
    update_document(
      record.id,
      record.as_partial_update_document,
      doctype: record.doctype,
      index_name: index_name
    )
  end
end