Class: Elastic::Core::Connector
- Inherits:
-
Object
- Object
- Elastic::Core::Connector
- Defined in:
- lib/elastic/core/connector.rb
Constant Summary collapse
- DEFAULT_TYPE =
'_doc'
Instance Method Summary collapse
- #bulk_index(_documents) ⇒ Object
-
#copy_to(_to, batch_size: nil) ⇒ Object
rubocop:disable Metrics/AbcSize.
- #count(query: nil) ⇒ Object
- #delete(_document) ⇒ Object
- #drop ⇒ Object
- #find(_id) ⇒ Object
- #index(_document) ⇒ Object
- #index_name ⇒ Object
-
#initialize(_name, _mapping, settling_time: 10.seconds) ⇒ Connector
constructor
A new instance of Connector.
- #migrate(batch_size: nil) ⇒ Object
- #query(query: nil) ⇒ Object
- #refresh ⇒ Object
- #remap ⇒ Object
- #remove_orphaned_indices ⇒ Object
-
#rollover(&_block) ⇒ Object
rubocop:disable Metrics/MethodLength.
- #status ⇒ Object
Constructor Details
#initialize(_name, _mapping, settling_time: 10.seconds) ⇒ Connector
Returns a new instance of Connector.
5 6 7 8 9 |
# File 'lib/elastic/core/connector.rb', line 5 def initialize(_name, _mapping, settling_time: 10.seconds) @name = _name @mapping = _mapping @settling_time = settling_time end |
Instance Method Details
#bulk_index(_documents) ⇒ Object
65 66 67 68 69 70 71 72 73 74 75 76 |
# File 'lib/elastic/core/connector.rb', line 65 def bulk_index(_documents) return if Elastic.config.disable_indexing # TODO: validate documents type body = _documents.map { |doc| { 'index' => doc.merge('_type' => DEFAULT_TYPE) } } write_indices.each do |write_index| retry_on_temporary_error('bulk indexing') do api.bulk(index: write_index, body: body) end end end |
#copy_to(_to, batch_size: nil) ⇒ Object
rubocop:disable Metrics/AbcSize
137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 |
# File 'lib/elastic/core/connector.rb', line 137 def copy_to(_to, batch_size: nil) # rubocop:disable Metrics/AbcSize api.indices.refresh index: index_name r = api.search( index: index_name, body: { sort: ['_doc'] }, scroll: '5m', size: batch_size || default_batch_size ) count = 0 while !r['hits']['hits'].empty? count += r['hits']['hits'].count Elastic.logger.info "Copied #{count} docs" body = r['hits']['hits'].map { |h| transform_hit_to_create(h) } api.bulk(index: _to, body: body) r = api.scroll scroll: '5m', scroll_id: r['_scroll_id'] end end |
#count(query: nil) ⇒ Object
106 107 108 |
# File 'lib/elastic/core/connector.rb', line 106 def count(query: nil) api.count(index: index_name, body: query)['count'] end |
#delete(_document) ⇒ Object
78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 |
# File 'lib/elastic/core/connector.rb', line 78 def delete(_document) raise ArgumentError, 'document must provide an id' unless _document['_id'] return if Elastic.config.disable_indexing write_index, rolling_index = write_indices operations = [{ 'delete' => _document.merge('_index' => write_index, '_type' => DEFAULT_TYPE) }] if rolling_index operations << { 'delete' => _document.merge('_index' => rolling_index, '_type' => DEFAULT_TYPE) } end api.bulk(body: operations) end |
#drop ⇒ Object
25 26 27 28 |
# File 'lib/elastic/core/connector.rb', line 25 def drop api.indices.delete index: "#{index_name}:*" nil end |
#find(_id) ⇒ Object
102 103 104 |
# File 'lib/elastic/core/connector.rb', line 102 def find(_id) api.get(index: index_name, id: _id) end |
#index(_document) ⇒ Object
54 55 56 57 58 59 60 61 62 63 |
# File 'lib/elastic/core/connector.rb', line 54 def index(_document) return if Elastic.config.disable_indexing # TODO: validate document type operations = write_indices.map do |write_index| { 'index' => _document.merge('_index' => write_index, '_type' => DEFAULT_TYPE) } end api.bulk(body: operations) end |
#index_name ⇒ Object
11 12 13 |
# File 'lib/elastic/core/connector.rb', line 11 def index_name @index_name ||= "#{Elastic.config.index}_#{@name}" end |
#migrate(batch_size: nil) ⇒ Object
45 46 47 48 49 50 51 52 |
# File 'lib/elastic/core/connector.rb', line 45 def migrate(batch_size: nil) unless remap rollover do |new_index| copy_to new_index, batch_size: batch_size end end nil end |
#query(query: nil) ⇒ Object
110 111 112 |
# File 'lib/elastic/core/connector.rb', line 110 def query(query: nil) api.search(index: index_name, body: query) end |
#refresh ⇒ Object
98 99 100 |
# File 'lib/elastic/core/connector.rb', line 98 def refresh api.indices.refresh index: index_name end |
#remap ⇒ Object
30 31 32 33 34 35 36 37 38 39 40 41 42 43 |
# File 'lib/elastic/core/connector.rb', line 30 def remap case status when :not_available create_from_scratch when :not_synchronized begin setup_index_mapping resolve_actual_index_name rescue Elasticsearch::Transport::Transport::Errors::BadRequest return false end end true end |
#remove_orphaned_indices ⇒ Object
159 160 161 162 163 164 165 166 |
# File 'lib/elastic/core/connector.rb', line 159 def remove_orphaned_indices _, rolling_index = resolve_write_indices unless rolling_index.nil? Elastic.logger.info "Removing orphan index #{rolling_index}" api.indices.delete index: rolling_index end end |
#rollover(&_block) ⇒ Object
rubocop:disable Metrics/MethodLength
114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 |
# File 'lib/elastic/core/connector.rb', line 114 def rollover(&_block) # rubocop:disable Metrics/MethodLength actual_index, rolling_index = resolve_write_indices unless rolling_index.nil? raise Elastic::RolloverError, 'rollover process already in progress' end new_index = create_index_w_mapping begin transfer_alias(write_index_alias, to: new_index) wait_for_index_to_stabilize perform_optimized_write_on(new_index, &_block) transfer_alias(index_name, from: actual_index, to: new_index) transfer_alias(write_index_alias, from: actual_index) wait_for_index_to_stabilize api.indices.delete index: actual_index rescue StandardError api.indices.delete index: new_index raise end end |
#status ⇒ Object
15 16 17 18 19 20 21 22 23 |
# File 'lib/elastic/core/connector.rb', line 15 def status return :ready if Elastic.config.disable_indexing actual_name = resolve_actual_index_name return :not_available if actual_name.nil? return :not_synchronized unless mapping_synchronized? actual_name :ready end |