Module: ElasticRecord::Index::Documents
- Included in:
- ElasticRecord::Index
- Defined in:
- lib/elastic_record/index/documents.rb
Instance Method Summary collapse
- #build_scroll_enumerator(search: nil, scroll_id: nil, batch_size: 100, keep_alive: ElasticRecord::Config.scroll_keep_alive) ⇒ Object
- #bulk(options = {}) ⇒ Object
- #bulk_add(batch, index_name: alias_name) ⇒ Object
- #current_bulk_batch ⇒ Object
- #delete_by_query(query) ⇒ Object
- #delete_document(id, doctype: model.doctype, parent: nil, index_name: alias_name) ⇒ Object
- #explain(id, elastic_query) ⇒ Object
- #index_document(id, document, doctype: model.doctype, parent: nil, index_name: alias_name) ⇒ Object
- #index_record(record, index_name: alias_name) ⇒ Object
- #record_exists?(id) ⇒ Boolean
- #scroll(scroll_id, scroll_keep_alive) ⇒ Object
- #search(elastic_query, options = {}) ⇒ Object
- #update_document(id, document, doctype: model.doctype, parent: nil, index_name: alias_name) ⇒ Object
- #update_record(record, index_name: alias_name) ⇒ Object
Instance Method Details
#build_scroll_enumerator(search: nil, scroll_id: nil, batch_size: 100, keep_alive: ElasticRecord::Config.scroll_keep_alive) ⇒ Object
156 157 158 |
# File 'lib/elastic_record/index/documents.rb', line 156 def build_scroll_enumerator(search: nil, scroll_id: nil, batch_size: 100, keep_alive: ElasticRecord::Config.scroll_keep_alive) ScrollEnumerator.new(self, search: search, scroll_id: scroll_id, batch_size: batch_size, keep_alive: keep_alive) end |
#bulk(options = {}) ⇒ Object
165 166 167 168 169 170 171 172 173 174 175 176 177 |
# File 'lib/elastic_record/index/documents.rb', line 165 def bulk( = {}) connection.bulk_stack.push [] yield if current_bulk_batch.any? body = current_bulk_batch.map { |action| "#{ElasticRecord::JSON.encode(action)}\n" }.join results = connection.json_post("/_bulk?#{options.to_query}", body) verify_bulk_results(results) end ensure connection.bulk_stack.pop end |
#bulk_add(batch, index_name: alias_name) ⇒ Object
179 180 181 182 183 184 185 |
# File 'lib/elastic_record/index/documents.rb', line 179 def bulk_add(batch, index_name: alias_name) bulk do batch.each do |record| index_record(record, index_name: index_name) end end end |
#current_bulk_batch ⇒ Object
187 188 189 |
# File 'lib/elastic_record/index/documents.rb', line 187 def current_bulk_batch connection.bulk_stack.last end |
#delete_by_query(query) ⇒ Object
129 130 131 132 133 134 135 136 137 |
# File 'lib/elastic_record/index/documents.rb', line 129 def delete_by_query(query) scroll_enumerator = build_scroll_enumerator search: query scroll_enumerator.each_slice do |ids| bulk do ids.each { |id| delete_document(id) } end end end |
#delete_document(id, doctype: model.doctype, parent: nil, index_name: alias_name) ⇒ Object
113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 |
# File 'lib/elastic_record/index/documents.rb', line 113 def delete_document(id, doctype: model.doctype, parent: nil, index_name: alias_name) raise "Cannot delete document with empty id" if id.blank? index_name ||= alias_name if batch = current_bulk_batch instructions = { _index: index_name, _type: doctype.name, _id: id, _retry_on_conflict: 3 } instructions[:parent] = parent if parent batch << { delete: instructions } else path = "/#{index_name}/#{doctype.name}/#{id}" path << "&parent=#{parent}" if parent connection.json_delete path end end |
#explain(id, elastic_query) ⇒ Object
152 153 154 |
# File 'lib/elastic_record/index/documents.rb', line 152 def explain(id, elastic_query) get "_explain", elastic_query end |
#index_document(id, document, doctype: model.doctype, parent: nil, index_name: alias_name) ⇒ Object
76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 |
# File 'lib/elastic_record/index/documents.rb', line 76 def index_document(id, document, doctype: model.doctype, parent: nil, index_name: alias_name) if batch = current_bulk_batch instructions = { _index: index_name, _type: doctype.name, _id: id } instructions[:parent] = parent if parent batch << { index: instructions } batch << document else path = "/#{index_name}/#{doctype.name}/#{id}" path << "?parent=#{parent}" if parent if id connection.json_put path, document else connection.json_post path, document end end end |
#index_record(record, index_name: alias_name) ⇒ Object
54 55 56 57 58 59 60 61 62 63 |
# File 'lib/elastic_record/index/documents.rb', line 54 def index_record(record, index_name: alias_name) unless disabled index_document( record.try(:id), record.as_search_document, doctype: record.doctype, index_name: index_name ) end end |
#record_exists?(id) ⇒ Boolean
139 140 141 |
# File 'lib/elastic_record/index/documents.rb', line 139 def record_exists?(id) get(id, model.doctype)['found'] end |
#scroll(scroll_id, scroll_keep_alive) ⇒ Object
160 161 162 163 |
# File 'lib/elastic_record/index/documents.rb', line 160 def scroll(scroll_id, scroll_keep_alive) = {scroll_id: scroll_id, scroll: scroll_keep_alive} connection.json_get("/_search/scroll?#{options.to_query}") end |
#search(elastic_query, options = {}) ⇒ Object
143 144 145 146 147 148 149 150 |
# File 'lib/elastic_record/index/documents.rb', line 143 def search(elastic_query, = {}) url = "_search" if .any? url += "?#{options.to_query}" end get url, model.doctype, elastic_query end |
#update_document(id, document, doctype: model.doctype, parent: nil, index_name: alias_name) ⇒ Object
95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 |
# File 'lib/elastic_record/index/documents.rb', line 95 def update_document(id, document, doctype: model.doctype, parent: nil, index_name: alias_name) raise "Cannot update a document with empty id" if id.blank? params = {doc: document, doc_as_upsert: true} if batch = current_bulk_batch instructions = { _index: index_name, _type: doctype.name, _id: id, _retry_on_conflict: 3 } instructions[:parent] = parent if parent batch << { update: instructions } batch << params else path = "/#{index_name}/#{doctype.name}/#{id}/_update?retry_on_conflict=3" path << "&parent=#{parent}" if parent connection.json_post path, params end end |
#update_record(record, index_name: alias_name) ⇒ Object
65 66 67 68 69 70 71 72 73 74 |
# File 'lib/elastic_record/index/documents.rb', line 65 def update_record(record, index_name: alias_name) unless disabled update_document( record.id, record.as_partial_update_document, doctype: record.doctype, index_name: index_name ) end end |