Module: Sequel::Plugins::Elasticsearch::ClassMethods
- Defined in:
- lib/sequel/plugins/elasticsearch.rb
Overview
The class methods that will be added to the Sequel::Model
Instance Attribute Summary collapse
-
#elasticsearch_environment_scoped ⇒ Object
If generated indices should include the environment name.
- #elasticsearch_index ⇒ Object
-
#elasticsearch_opts ⇒ Object
The extra options that will be passed to the Elasticsearch client.
-
#elasticsearch_type ⇒ Object
The Elasticsearch type to which the documents will be written.
Instance Method Summary collapse
- #alias_index(new_index) ⇒ Object
-
#call_es ⇒ Object
Wrapper method in which error handling is done for Elasticsearch calls.
- #environment ⇒ Object
-
#es(query = '', opts = {}) ⇒ Object
Execute a search or a scroll on the Model’s Elasticsearch index.
-
#es!(query = '', opts = {}) ⇒ Object
Execute a search on the Model’s Elasticsearch index without catching Errors.
-
#es_client ⇒ Object
Return the Elasticsearch client used to communicate with the cluster.
-
#import!(index: nil, dataset: nil, batch_size: 100) ⇒ Object
Import the whole dataset into Elasticsearch.
-
#last_index ⇒ Object
Find the last created index that matches the specified index name.
-
#reindex!(index: nil, dataset: nil, batch_size: 100) ⇒ Object
Creates a new index in Elasticsearch from the specified dataset, as well as an alias to the new index.
-
#scroll!(scroll_id, duration) ⇒ Object
Fetch the next page in a scroll without catching Errors.
-
#timestamped_index ⇒ Object
Generate a timestamped index name according to the environment.
Instance Attribute Details
#elasticsearch_environment_scoped ⇒ Object
If generated indices should include the environment name
46 47 48 |
# File 'lib/sequel/plugins/elasticsearch.rb', line 46 def elasticsearch_environment_scoped @elasticsearch_environment_scoped end |
#elasticsearch_index ⇒ Object
146 147 148 149 150 |
# File 'lib/sequel/plugins/elasticsearch.rb', line 146 def elasticsearch_index return @elasticsearch_index unless elasticsearch_environment_scoped "#{@elasticsearch_index}-#{environment}".to_sym end |
#elasticsearch_opts ⇒ Object
The extra options that will be passed to the Elasticsearch client.
40 41 42 |
# File 'lib/sequel/plugins/elasticsearch.rb', line 40 def elasticsearch_opts @elasticsearch_opts end |
#elasticsearch_type ⇒ Object
The Elasticsearch type to which the documents will be written.
44 45 46 |
# File 'lib/sequel/plugins/elasticsearch.rb', line 44 def elasticsearch_type @elasticsearch_type end |
Instance Method Details
#alias_index(new_index) ⇒ Object
132 133 134 135 136 137 138 139 |
# File 'lib/sequel/plugins/elasticsearch.rb', line 132 def alias_index(new_index) es_client.indices.update_aliases body: { actions: [ { remove: { index: "#{elasticsearch_index}*", alias: elasticsearch_index } }, { add: { index: new_index, alias: elasticsearch_index } } ] } end |
#call_es ⇒ Object
Wrapper method in which error handling is done for Elasticsearch calls.
78 79 80 81 82 83 84 85 |
# File 'lib/sequel/plugins/elasticsearch.rb', line 78 def call_es yield rescue ::Elasticsearch::Transport::Transport::Errors::NotFound, ::Elasticsearch::Transport::Transport::Error, Faraday::ConnectionFailed => e db.loggers.first.warn e if db.loggers.count.positive? nil end |
#environment ⇒ Object
166 167 168 |
# File 'lib/sequel/plugins/elasticsearch.rb', line 166 def environment ENV['APP_ENV'] || ENV['RACK_ENV'] || 'development' end |
#es(query = '', opts = {}) ⇒ Object
Execute a search or a scroll on the Model’s Elasticsearch index. This method is “safe” in that it will catch the more common Errors.
73 74 75 |
# File 'lib/sequel/plugins/elasticsearch.rb', line 73 def es(query = '', opts = {}) call_es { query.is_a?(Result) ? scroll!(query, opts) : es!(query, opts) } end |
#es!(query = '', opts = {}) ⇒ Object
Execute a search on the Model’s Elasticsearch index without catching Errors.
54 55 56 57 58 59 60 61 |
# File 'lib/sequel/plugins/elasticsearch.rb', line 54 def es!(query = '', opts = {}) opts = { index: elasticsearch_index, type: elasticsearch_type }.merge(opts) query.is_a?(String) ? opts[:q] = query : opts[:body] = query Result.new es_client.search(opts), self end |
#es_client ⇒ Object
Return the Elasticsearch client used to communicate with the cluster.
49 50 51 |
# File 'lib/sequel/plugins/elasticsearch.rb', line 49 def es_client @es_client = ::Elasticsearch::Client.new elasticsearch_opts end |
#import!(index: nil, dataset: nil, batch_size: 100) ⇒ Object
Import the whole dataset into Elasticsearch.
This assumes that a template that covers all the possible index names have been created. See timestamped_index for examples of the indices that will be created.
This adds or updates records to the last index created by this utility. Use the reindex! method to create a completely new index and alias.
95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 |
# File 'lib/sequel/plugins/elasticsearch.rb', line 95 def import!(index: nil, dataset: nil, batch_size: 100) dataset ||= self.dataset index_name = index || last_index # Index all the documents body = [] dataset.each_page(batch_size) do |ds| body = [] ds.all.each do |row| print '.' body << { update: { _index: index_name, _type: elasticsearch_type, _id: row.document_id, data: { doc: row.indexed_values, doc_as_upsert: true } } } end puts '/' es_client.bulk body: body body = nil end end |
#last_index ⇒ Object
Find the last created index that matches the specified index name.
142 143 144 |
# File 'lib/sequel/plugins/elasticsearch.rb', line 142 def last_index es_client.indices.get_alias(name: elasticsearch_index)&.keys&.sort&.first end |
#reindex!(index: nil, dataset: nil, batch_size: 100) ⇒ Object
Creates a new index in Elasticsearch from the specified dataset, as well as an alias to the new index.
See the documentation on import! for more details.
124 125 126 127 128 129 130 |
# File 'lib/sequel/plugins/elasticsearch.rb', line 124 def reindex!(index: nil, dataset: nil, batch_size: 100) index_name = index || import!(index: index_name, dataset: dataset, batch_size: batch_size) # Create an alias to the newly created index alias_index(index_name) end |
#scroll!(scroll_id, duration) ⇒ Object
Fetch the next page in a scroll without catching Errors.
64 65 66 67 68 69 |
# File 'lib/sequel/plugins/elasticsearch.rb', line 64 def scroll!(scroll_id, duration) scroll_id = scroll_id.scroll_id if scroll_id.is_a? Result return nil unless scroll_id Result.new es_client.scroll(scroll_id: scroll_id, scroll: duration), self end |
#timestamped_index ⇒ Object
Generate a timestamped index name according to the environment. This will use the APP_ENV or RACK_ENV ENV variable and a timestamp to construct index names like this:
base-name-staging-20191004.123456 # This is a staging index
base-name-production-20191005.171213 # This is a production index
The adding of the environment name to the index can be turned off by setting elasticsearch_environment_scoped to false.
161 162 163 164 |
# File 'lib/sequel/plugins/elasticsearch.rb', line 161 def time_str = Time.now.strftime('%Y%m%d.%H%M%S') "#{elasticsearch_index}-#{time_str}".to_sym end |