Module: Sequel::Plugins::Elasticsearch::ClassMethods
- Defined in:
- lib/sequel/plugins/elasticsearch.rb
Overview
The class methods that will be added to the Sequel::Model
Instance Attribute Summary collapse
-
#elasticsearch_index ⇒ Object
The Elasticsearch index to which the documents will be written.
-
#elasticsearch_opts ⇒ Object
The extra options that will be passed to the Elasticsearch client.
-
#elasticsearch_type ⇒ Object
The Elasticsearch type to which the documents will be written.
Instance Method Summary collapse
-
#alias_index(new_index) ⇒ Object
Remove previous aliases and point the ‘elasticsearch_index` to the new index.
-
#call_es ⇒ Object
Wrapper method in which error handling is done for Elasticsearch calls.
-
#es(query = '', opts = {}) ⇒ Object
Execute a search or a scroll on the Model’s Elasticsearch index.
-
#es!(query = '', opts = {}) ⇒ Object
Execute a search on the Model’s Elasticsearch index without catching Errors.
-
#es_client ⇒ Object
Return the Elasticsearch client used to communicate with the cluster.
-
#import!(index: nil, dataset: nil, batch_size: 100) ⇒ Object
Import the whole dataset into Elasticsearch.
- #import_object(idx, row) ⇒ Object
-
#last_index ⇒ Object
Find the last created index that matches the specified index name.
-
#reindex!(index: nil, dataset: nil, batch_size: 100) ⇒ Object
Creates a new index in Elasticsearch from the specified dataset, as well as an alias to the new index.
-
#scroll!(scroll_id, duration) ⇒ Object
Fetch the next page in a scroll without catching Errors.
-
#timestamped_index ⇒ Object
Generate a timestamped index name.
Instance Attribute Details
#elasticsearch_index ⇒ Object
The Elasticsearch index to which the documents will be written.
37 38 39 |
# File 'lib/sequel/plugins/elasticsearch.rb', line 37 def elasticsearch_index @elasticsearch_index end |
#elasticsearch_opts ⇒ Object
The extra options that will be passed to the Elasticsearch client.
35 36 37 |
# File 'lib/sequel/plugins/elasticsearch.rb', line 35 def elasticsearch_opts @elasticsearch_opts end |
#elasticsearch_type ⇒ Object
The Elasticsearch type to which the documents will be written.
39 40 41 |
# File 'lib/sequel/plugins/elasticsearch.rb', line 39 def elasticsearch_type @elasticsearch_type end |
Instance Method Details
#alias_index(new_index) ⇒ Object
Remove previous aliases and point the ‘elasticsearch_index` to the new index.
129 130 131 132 133 134 135 136 |
# File 'lib/sequel/plugins/elasticsearch.rb', line 129 def alias_index(new_index) es_client.indices.update_aliases body: { actions: [ { remove: { index: "#{elasticsearch_index}*", alias: elasticsearch_index } }, { add: { index: new_index, alias: elasticsearch_index } } ] } end |
#call_es ⇒ Object
Wrapper method in which error handling is done for Elasticsearch calls.
71 72 73 74 75 76 77 78 |
# File 'lib/sequel/plugins/elasticsearch.rb', line 71 def call_es yield rescue ::Elasticsearch::Transport::Transport::Errors::NotFound, ::Elasticsearch::Transport::Transport::Error, Faraday::ConnectionFailed => e db.loggers.first.warn e if db.loggers.count.positive? nil end |
#es(query = '', opts = {}) ⇒ Object
Execute a search or a scroll on the Model’s Elasticsearch index. This method is “safe” in that it will catch the more common Errors.
66 67 68 |
# File 'lib/sequel/plugins/elasticsearch.rb', line 66 def es(query = '', opts = {}) call_es { query.is_a?(Result) ? scroll!(query, opts) : es!(query, opts) } end |
#es!(query = '', opts = {}) ⇒ Object
Execute a search on the Model’s Elasticsearch index without catching Errors.
47 48 49 50 51 52 53 54 |
# File 'lib/sequel/plugins/elasticsearch.rb', line 47 def es!(query = '', opts = {}) opts = { index: elasticsearch_index, type: elasticsearch_type }.merge(opts) query.is_a?(String) ? opts[:q] = query : opts[:body] = query Result.new es_client.search(opts), self end |
#es_client ⇒ Object
Return the Elasticsearch client used to communicate with the cluster.
42 43 44 |
# File 'lib/sequel/plugins/elasticsearch.rb', line 42 def es_client @es_client = ::Elasticsearch::Client.new elasticsearch_opts end |
#import!(index: nil, dataset: nil, batch_size: 100) ⇒ Object
Import the whole dataset into Elasticsearch.
This assumes that a template that covers all the possible index names have been created. See timestamped_index
for examples of the indices that will be created.
This adds or updates records to the last index created by this utility. Use the reindex!
method to create a completely new index and alias.
88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 |
# File 'lib/sequel/plugins/elasticsearch.rb', line 88 def import!(index: nil, dataset: nil, batch_size: 100) dataset ||= self.dataset index_name = index || last_index || elasticsearch_index # Index all the documents body = [] dataset.each_page(batch_size) do |ds| body = [] ds.all.each do |row| print '.' body << { update: import_object(index_name, row) } end puts '/' es_client.bulk body: body body = nil end end |
#import_object(idx, row) ⇒ Object
106 107 108 109 110 111 112 113 114 |
# File 'lib/sequel/plugins/elasticsearch.rb', line 106 def import_object(idx, row) val = { _index: idx, _id: row.document_id, data: { doc: row.as_indexed_json, doc_as_upsert: true } } val[:_type] = elasticsearch_type if elasticsearch_type val end |
#last_index ⇒ Object
Find the last created index that matches the specified index name.
139 140 141 142 143 |
# File 'lib/sequel/plugins/elasticsearch.rb', line 139 def last_index es_client.indices.get_alias(name: elasticsearch_index)&.keys&.sort&.first rescue ::Elasticsearch::Transport::Transport::Errors::NotFound nil end |
#reindex!(index: nil, dataset: nil, batch_size: 100) ⇒ Object
Creates a new index in Elasticsearch from the specified dataset, as well as an alias to the new index.
See the documentation on import!
for more details.
120 121 122 123 124 125 126 |
# File 'lib/sequel/plugins/elasticsearch.rb', line 120 def reindex!(index: nil, dataset: nil, batch_size: 100) index_name = index || import!(index: index_name, dataset: dataset, batch_size: batch_size) # Create an alias to the newly created index alias_index(index_name) end |
#scroll!(scroll_id, duration) ⇒ Object
Fetch the next page in a scroll without catching Errors.
57 58 59 60 61 62 |
# File 'lib/sequel/plugins/elasticsearch.rb', line 57 def scroll!(scroll_id, duration) scroll_id = scroll_id.scroll_id if scroll_id.is_a? Result return nil unless scroll_id Result.new es_client.scroll(body: scroll_id, scroll: duration), self end |
#timestamped_index ⇒ Object
Generate a timestamped index name. This will use the current timestamp to construct index names like this:
base-name-20191004.123456
149 150 151 152 |
# File 'lib/sequel/plugins/elasticsearch.rb', line 149 def time_str = Time.now.strftime('%Y%m%d.%H%M%S') # TODO: Make the format configurable "#{elasticsearch_index}-#{time_str}".to_sym end |