Module: ElasticRecord::Index::Documents

Included in:
ElasticRecord::Index
Defined in:
lib/elastic_record/index/documents.rb

Instance Method Summary collapse

Instance Method Details

#build_scroll_enumerator(search: nil, scroll_id: nil, batch_size: 100, keep_alive: ElasticRecord::Config.scroll_keep_alive) ⇒ Object



155
156
157
# File 'lib/elastic_record/index/documents.rb', line 155

def build_scroll_enumerator(search: nil, scroll_id: nil, batch_size: 100, keep_alive: ElasticRecord::Config.scroll_keep_alive)
  ScrollEnumerator.new(self, search: search, scroll_id: scroll_id, batch_size: batch_size, keep_alive: keep_alive)
end

#bulk(options = {}, &block) ⇒ Object



170
171
172
173
174
175
176
# File 'lib/elastic_record/index/documents.rb', line 170

def bulk(options = {}, &block)
  if current_bulk_batch
    yield
  else
    start_new_bulk_batch(options, &block)
  end
end

#bulk_add(batch, index_name: alias_name) ⇒ Object



178
179
180
181
182
183
184
# File 'lib/elastic_record/index/documents.rb', line 178

def bulk_add(batch, index_name: alias_name)
  bulk do
    batch.each do |record|
      index_record(record, index_name: index_name)
    end
  end
end

#current_bulk_batchObject



186
187
188
# File 'lib/elastic_record/index/documents.rb', line 186

def current_bulk_batch
  connection.bulk_actions
end

#delete_by_query(query) ⇒ Object



128
129
130
131
132
133
134
135
136
# File 'lib/elastic_record/index/documents.rb', line 128

def delete_by_query(query)
  scroll_enumerator = build_scroll_enumerator search: query

  scroll_enumerator.each_slice do |hits|
    bulk do
      hits.each { |hit| delete_document hit['_id'] }
    end
  end
end

#delete_document(id, parent: nil, index_name: alias_name) ⇒ Object



112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
# File 'lib/elastic_record/index/documents.rb', line 112

def delete_document(id, parent: nil, index_name: alias_name)
  raise "Cannot delete document with empty id" if id.blank?
  index_name ||= alias_name

  if batch = current_bulk_batch
    instructions = { _index: index_name, _type: mapping_type, _id: id, retry_on_conflict: 3 }
    instructions[:parent] = parent if parent
    batch << { delete: instructions }
  else
    path = "/#{index_name}/#{mapping_type}/#{id}"
    path << "&parent=#{parent}" if parent

    connection.json_delete path
  end
end

#explain(id, elastic_query) ⇒ Object



151
152
153
# File 'lib/elastic_record/index/documents.rb', line 151

def explain(id, elastic_query)
  get "_explain", elastic_query
end

#index_document(id, document, parent: nil, index_name: alias_name) ⇒ Object



75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
# File 'lib/elastic_record/index/documents.rb', line 75

def index_document(id, document, parent: nil, index_name: alias_name)
  if batch = current_bulk_batch
    instructions = { _index: index_name, _type: mapping_type, _id: id }
    instructions[:parent] = parent if parent

    batch << { index: instructions }
    batch << document
  else
    path = "/#{index_name}/#{mapping_type}/#{id}"
    path << "?parent=#{parent}" if parent

    if id
      connection.json_put path, document
    else
      connection.json_post path, document
    end
  end
end

#index_record(record, index_name: alias_name) ⇒ Object



55
56
57
58
59
60
61
62
63
# File 'lib/elastic_record/index/documents.rb', line 55

def index_record(record, index_name: alias_name)
  unless disabled
    index_document(
      record.try(:id),
      record.as_search_document,
      index_name: index_name
    )
  end
end

#record_exists?(id) ⇒ Boolean

Returns:

  • (Boolean)


138
139
140
# File 'lib/elastic_record/index/documents.rb', line 138

def record_exists?(id)
  get(id)['found']
end

#scroll(scroll_id, scroll_keep_alive) ⇒ Object



159
160
161
162
163
164
165
166
167
168
# File 'lib/elastic_record/index/documents.rb', line 159

def scroll(scroll_id, scroll_keep_alive)
  options = {scroll_id: scroll_id, scroll: scroll_keep_alive}
  connection.json_get("/_search/scroll?#{options.to_query}")
rescue ElasticRecord::ConnectionError => e
  case e.status_code
  when '400' then raise ElasticRecord::InvalidScrollError, e.message
  when '404' then raise ElasticRecord::ExpiredScrollError, e.message
  else raise e
  end
end

#search(elastic_query, options = {}) ⇒ Object



142
143
144
145
146
147
148
149
# File 'lib/elastic_record/index/documents.rb', line 142

def search(elastic_query, options = {})
  url = "_search"
  if options.any?
    url += "?#{options.to_query}"
  end

  get url, elastic_query.update('_source' => load_from_source)
end

#update_document(id, document, parent: nil, index_name: alias_name) ⇒ Object



94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
# File 'lib/elastic_record/index/documents.rb', line 94

def update_document(id, document, parent: nil, index_name: alias_name)
  raise "Cannot update a document with empty id" if id.blank?
  params = {doc: document, doc_as_upsert: true}

  if batch = current_bulk_batch
    instructions = { _index: index_name, _type: mapping_type, _id: id, retry_on_conflict: 3 }
    instructions[:parent] = parent if parent

    batch << { update: instructions }
    batch << params
  else
    path = "/#{index_name}/#{mapping_type}/#{id}/_update?retry_on_conflict=3"
    path << "&parent=#{parent}" if parent

    connection.json_post path, params
  end
end

#update_record(record, index_name: alias_name) ⇒ Object



65
66
67
68
69
70
71
72
73
# File 'lib/elastic_record/index/documents.rb', line 65

def update_record(record, index_name: alias_name)
  unless disabled
    update_document(
      record.id,
      record.as_partial_update_document,
      index_name: index_name
    )
  end
end