Module: ElasticRecord::Index::Documents
- Included in:
- ElasticRecord::Index
- Defined in:
- lib/elastic_record/index/documents.rb
Instance Method Summary collapse
- #bulk(options = {}) ⇒ Object
- #bulk_add(batch, index_name: alias_name) ⇒ Object
- #create_scan_search(elastic_query, options = {}) ⇒ Object
- #current_bulk_batch ⇒ Object
- #delete_by_query(query) ⇒ Object
- #delete_document(id, parent: nil, index_name: alias_name) ⇒ Object
- #explain(id, elastic_query) ⇒ Object
- #index_document(id, document, parent: nil, index_name: alias_name) ⇒ Object
- #index_record(record, index_name: alias_name) ⇒ Object
- #record_exists?(id) ⇒ Boolean
- #scroll(scroll_id, scroll_keep_alive) ⇒ Object
- #search(elastic_query, options = {}) ⇒ Object
- #update_document(id, document, parent: nil, index_name: alias_name) ⇒ Object
- #update_record(record, index_name: alias_name) ⇒ Object
Instance Method Details
#bulk(options = {}) ⇒ Object
140 141 142 143 144 145 146 147 148 149 150 151 152 |
# File 'lib/elastic_record/index/documents.rb', line 140 def bulk( = {}) connection.bulk_stack.push [] yield if current_bulk_batch.any? body = current_bulk_batch.map { |action| "#{ElasticRecord::JSON.encode(action)}\n" }.join results = connection.json_post("/_bulk?#{.to_query}", body) verify_bulk_results(results) end ensure connection.bulk_stack.pop end |
#bulk_add(batch, index_name: alias_name) ⇒ Object
154 155 156 157 158 159 160 |
# File 'lib/elastic_record/index/documents.rb', line 154 def bulk_add(batch, index_name: alias_name) bulk do batch.each do |record| index_record(record, index_name: index_name) end end end |
#create_scan_search(elastic_query, options = {}) ⇒ Object
123 124 125 126 127 128 129 130 131 132 133 |
# File 'lib/elastic_record/index/documents.rb', line 123 def create_scan_search(elastic_query, = {}) [:batch_size] ||= 100 [:keep_alive] ||= ElasticRecord::Config.scroll_keep_alive = {search_type: 'scan', size: [:batch_size], scroll: [:keep_alive]} json = search(elastic_query, ) ScanSearch.new(self, json['_scroll_id'], ).tap do |scan_search| scan_search.total_hits = json['hits']['total'] end end |
#current_bulk_batch ⇒ Object
162 163 164 |
# File 'lib/elastic_record/index/documents.rb', line 162 def current_bulk_batch connection.bulk_stack.last end |
#delete_by_query(query) ⇒ Object
96 97 98 99 100 101 102 103 104 |
# File 'lib/elastic_record/index/documents.rb', line 96 def delete_by_query(query) scan_search = create_scan_search query scan_search.each_slice do |ids| bulk do ids.each { |id| delete_document(id) } end end end |
#delete_document(id, parent: nil, index_name: alias_name) ⇒ Object
80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 |
# File 'lib/elastic_record/index/documents.rb', line 80 def delete_document(id, parent: nil, index_name: alias_name) raise "Cannot delete document with empty id" if id.blank? index_name ||= alias_name if batch = current_bulk_batch instructions = { _index: index_name, _type: type, _id: id, _retry_on_conflict: 3 } instructions[:parent] = parent if parent batch << { delete: instructions } else path = "/#{index_name}/#{type}/#{id}" path << "&parent=#{parent}" if parent connection.json_delete path end end |
#explain(id, elastic_query) ⇒ Object
119 120 121 |
# File 'lib/elastic_record/index/documents.rb', line 119 def explain(id, elastic_query) get "_explain", elastic_query end |
#index_document(id, document, parent: nil, index_name: alias_name) ⇒ Object
48 49 50 51 52 53 54 55 56 57 58 59 60 61 |
# File 'lib/elastic_record/index/documents.rb', line 48 def index_document(id, document, parent: nil, index_name: alias_name) if batch = current_bulk_batch instructions = { _index: index_name, _type: type, _id: id } instructions[:parent] = parent if parent batch << { index: instructions } batch << document else path = "/#{index_name}/#{type}/#{id}" path << "?parent=#{parent}" if parent connection.json_put path, document end end |
#index_record(record, index_name: alias_name) ⇒ Object
36 37 38 39 40 |
# File 'lib/elastic_record/index/documents.rb', line 36 def index_record(record, index_name: alias_name) unless disabled index_document(record.send(record.class.primary_key), record.as_search, index_name: index_name) end end |
#record_exists?(id) ⇒ Boolean
106 107 108 |
# File 'lib/elastic_record/index/documents.rb', line 106 def record_exists?(id) get(id)['found'] end |
#scroll(scroll_id, scroll_keep_alive) ⇒ Object
135 136 137 138 |
# File 'lib/elastic_record/index/documents.rb', line 135 def scroll(scroll_id, scroll_keep_alive) = {scroll_id: scroll_id, scroll: scroll_keep_alive} connection.json_get("/_search/scroll?#{.to_query}") end |
#search(elastic_query, options = {}) ⇒ Object
110 111 112 113 114 115 116 117 |
# File 'lib/elastic_record/index/documents.rb', line 110 def search(elastic_query, = {}) url = "_search" if .any? url += "?#{.to_query}" end get url, elastic_query end |
#update_document(id, document, parent: nil, index_name: alias_name) ⇒ Object
63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 |
# File 'lib/elastic_record/index/documents.rb', line 63 def update_document(id, document, parent: nil, index_name: alias_name) params = {doc: document, doc_as_upsert: true} if batch = current_bulk_batch instructions = { _index: index_name, _type: type, _id: id, _retry_on_conflict: 3 } instructions[:parent] = parent if parent batch << { update: instructions } batch << params else path = "/#{index_name}/#{type}/#{id}/_update?retry_on_conflict=3" path << "&parent=#{parent}" if parent connection.json_post path, params end end |
#update_record(record, index_name: alias_name) ⇒ Object
42 43 44 45 46 |
# File 'lib/elastic_record/index/documents.rb', line 42 def update_record(record, index_name: alias_name) unless disabled update_document(record.send(record.class.primary_key), record.as_partial_update_document, index_name: index_name) end end |