Module: ElasticRecord::Index::Documents
- Included in:
- ElasticRecord::Index
- Defined in:
- lib/elastic_record/index/documents.rb
Instance Method Summary collapse
- #bulk(options = {}) ⇒ Object
- #bulk_add(batch, index_name: alias_name) ⇒ Object
- #create_scan_search(elastic_query, options = {}) ⇒ Object
- #current_bulk_batch ⇒ Object
- #delete_by_query(query) ⇒ Object
- #delete_document(id, parent: nil, index_name: alias_name) ⇒ Object
- #explain(id, elastic_query) ⇒ Object
- #index_document(id, document, parent: nil, index_name: alias_name) ⇒ Object
- #index_record(record, index_name: alias_name) ⇒ Object
- #record_exists?(id) ⇒ Boolean
- #scroll(scroll_id, scroll_keep_alive) ⇒ Object
- #search(elastic_query, options = {}) ⇒ Object
- #update_document(id, document, parent: nil, index_name: alias_name) ⇒ Object
- #update_record(record, index_name: alias_name) ⇒ Object
Instance Method Details
#bulk(options = {}) ⇒ Object
147 148 149 150 151 152 153 154 155 156 157 158 159 |
# File 'lib/elastic_record/index/documents.rb', line 147 def bulk( = {}) connection.bulk_stack.push [] yield if current_bulk_batch.any? body = current_bulk_batch.map { |action| "#{ElasticRecord::JSON.encode(action)}\n" }.join results = connection.json_post("/_bulk?#{options.to_query}", body) verify_bulk_results(results) end ensure connection.bulk_stack.pop end |
#bulk_add(batch, index_name: alias_name) ⇒ Object
161 162 163 164 165 166 167 |
# File 'lib/elastic_record/index/documents.rb', line 161 def bulk_add(batch, index_name: alias_name) bulk do batch.each do |record| index_record(record, index_name: index_name) end end end |
#create_scan_search(elastic_query, options = {}) ⇒ Object
130 131 132 133 134 135 136 137 138 139 140 |
# File 'lib/elastic_record/index/documents.rb', line 130 def create_scan_search(elastic_query, = {}) [:batch_size] ||= 100 [:keep_alive] ||= ElasticRecord::Config.scroll_keep_alive = {search_type: 'scan', size: [:batch_size], scroll: [:keep_alive]} json = search(elastic_query, ) ScanSearch.new(self, json['_scroll_id'], ).tap do |scan_search| scan_search.total_hits = json['hits']['total'] end end |
#current_bulk_batch ⇒ Object
169 170 171 |
# File 'lib/elastic_record/index/documents.rb', line 169 def current_bulk_batch connection.bulk_stack.last end |
#delete_by_query(query) ⇒ Object
103 104 105 106 107 108 109 110 111 |
# File 'lib/elastic_record/index/documents.rb', line 103 def delete_by_query(query) scan_search = create_scan_search query scan_search.each_slice do |ids| bulk do ids.each { |id| delete_document(id) } end end end |
#delete_document(id, parent: nil, index_name: alias_name) ⇒ Object
87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 |
# File 'lib/elastic_record/index/documents.rb', line 87 def delete_document(id, parent: nil, index_name: alias_name) raise "Cannot delete document with empty id" if id.blank? index_name ||= alias_name if batch = current_bulk_batch instructions = { _index: index_name, _type: type, _id: id, _retry_on_conflict: 3 } instructions[:parent] = parent if parent batch << { delete: instructions } else path = "/#{index_name}/#{type}/#{id}" path << "&parent=#{parent}" if parent connection.json_delete path end end |
#explain(id, elastic_query) ⇒ Object
126 127 128 |
# File 'lib/elastic_record/index/documents.rb', line 126 def explain(id, elastic_query) get "_explain", elastic_query end |
#index_document(id, document, parent: nil, index_name: alias_name) ⇒ Object
55 56 57 58 59 60 61 62 63 64 65 66 67 68 |
# File 'lib/elastic_record/index/documents.rb', line 55 def index_document(id, document, parent: nil, index_name: alias_name) if batch = current_bulk_batch instructions = { _index: index_name, _type: type, _id: id } instructions[:parent] = parent if parent batch << { index: instructions } batch << document else path = "/#{index_name}/#{type}/#{id}" path << "?parent=#{parent}" if parent connection.json_put path, document end end |
#index_record(record, index_name: alias_name) ⇒ Object
43 44 45 46 47 |
# File 'lib/elastic_record/index/documents.rb', line 43 def index_record(record, index_name: alias_name) unless disabled index_document(record.send(record.class.primary_key), record.as_search, index_name: index_name) end end |
#record_exists?(id) ⇒ Boolean
113 114 115 |
# File 'lib/elastic_record/index/documents.rb', line 113 def record_exists?(id) get(id)['found'] end |
#scroll(scroll_id, scroll_keep_alive) ⇒ Object
142 143 144 145 |
# File 'lib/elastic_record/index/documents.rb', line 142 def scroll(scroll_id, scroll_keep_alive) = {scroll_id: scroll_id, scroll: scroll_keep_alive} connection.json_get("/_search/scroll?#{options.to_query}") end |
#search(elastic_query, options = {}) ⇒ Object
117 118 119 120 121 122 123 124 |
# File 'lib/elastic_record/index/documents.rb', line 117 def search(elastic_query, = {}) url = "_search" if .any? url += "?#{options.to_query}" end get url, elastic_query end |
#update_document(id, document, parent: nil, index_name: alias_name) ⇒ Object
70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 |
# File 'lib/elastic_record/index/documents.rb', line 70 def update_document(id, document, parent: nil, index_name: alias_name) params = {doc: document, doc_as_upsert: true} if batch = current_bulk_batch instructions = { _index: index_name, _type: type, _id: id, _retry_on_conflict: 3 } instructions[:parent] = parent if parent batch << { update: instructions } batch << params else path = "/#{index_name}/#{type}/#{id}/_update?retry_on_conflict=3" path << "&parent=#{parent}" if parent connection.json_post path, params end end |
#update_record(record, index_name: alias_name) ⇒ Object
49 50 51 52 53 |
# File 'lib/elastic_record/index/documents.rb', line 49 def update_record(record, index_name: alias_name) unless disabled update_document(record.send(record.class.primary_key), record.as_partial_update_document, index_name: index_name) end end |