Module: Sequel::Plugins::Elasticsearch::ClassMethods

Defined in:
lib/sequel/plugins/elasticsearch.rb

Overview

The class methods that will be added to the Sequel::Model

Instance Attribute Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#elasticsearch_environment_scopedObject

If generated indices should include the environment name



46
47
48
# File 'lib/sequel/plugins/elasticsearch.rb', line 46

def elasticsearch_environment_scoped
  @elasticsearch_environment_scoped
end

#elasticsearch_indexObject



146
147
148
149
150
# File 'lib/sequel/plugins/elasticsearch.rb', line 146

def elasticsearch_index
  return @elasticsearch_index unless elasticsearch_environment_scoped

  "#{@elasticsearch_index}-#{environment}".to_sym
end

#elasticsearch_optsObject

The extra options that will be passed to the Elasticsearch client.



40
41
42
# File 'lib/sequel/plugins/elasticsearch.rb', line 40

def elasticsearch_opts
  @elasticsearch_opts
end

#elasticsearch_typeObject

The Elasticsearch type to which the documents will be written.



44
45
46
# File 'lib/sequel/plugins/elasticsearch.rb', line 44

def elasticsearch_type
  @elasticsearch_type
end

Instance Method Details

#alias_index(new_index) ⇒ Object



132
133
134
135
136
137
138
139
# File 'lib/sequel/plugins/elasticsearch.rb', line 132

def alias_index(new_index)
  es_client.indices.update_aliases body: {
    actions: [
      { remove: { index: "#{elasticsearch_index}*", alias: elasticsearch_index } },
      { add: { index: new_index, alias: elasticsearch_index } }
    ]
  }
end

#call_esObject

Wrapper method in which error handling is done for Elasticsearch calls.



78
79
80
81
82
83
84
85
# File 'lib/sequel/plugins/elasticsearch.rb', line 78

def call_es
  yield
rescue ::Elasticsearch::Transport::Transport::Errors::NotFound,
       ::Elasticsearch::Transport::Transport::Error,
       Faraday::ConnectionFailed => e
  db.loggers.first.warn e if db.loggers.count.positive?
  nil
end

#environmentObject



166
167
168
# File 'lib/sequel/plugins/elasticsearch.rb', line 166

def environment
  ENV['APP_ENV'] || ENV['RACK_ENV'] || 'development'
end

#es(query = '', opts = {}) ⇒ Object

Execute a search or a scroll on the Model’s Elasticsearch index. This method is “safe” in that it will catch the more common Errors.



73
74
75
# File 'lib/sequel/plugins/elasticsearch.rb', line 73

def es(query = '', opts = {})
  call_es { query.is_a?(Result) ? scroll!(query, opts) : es!(query, opts) }
end

#es!(query = '', opts = {}) ⇒ Object

Execute a search on the Model’s Elasticsearch index without catching Errors.



54
55
56
57
58
59
60
61
# File 'lib/sequel/plugins/elasticsearch.rb', line 54

def es!(query = '', opts = {})
  opts = {
    index: elasticsearch_index,
    type: elasticsearch_type
  }.merge(opts)
  query.is_a?(String) ? opts[:q] = query : opts[:body] = query
  Result.new es_client.search(opts), self
end

#es_clientObject

Return the Elasticsearch client used to communicate with the cluster.



49
50
51
# File 'lib/sequel/plugins/elasticsearch.rb', line 49

def es_client
  @es_client = ::Elasticsearch::Client.new elasticsearch_opts
end

#import!(index: nil, dataset: nil, batch_size: 100) ⇒ Object

Import the whole dataset into Elasticsearch.

This assumes that a template that covers all the possible index names have been created. See timestamped_index for examples of the indices that will be created.

This adds or updates records to the last index created by this utility. Use the reindex! method to create a completely new index and alias.



95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
# File 'lib/sequel/plugins/elasticsearch.rb', line 95

def import!(index: nil, dataset: nil, batch_size: 100)
  dataset ||= self.dataset
  index_name = index || last_index

  # Index all the documents
  body = []
  dataset.each_page(batch_size) do |ds|
    body = []
    ds.all.each do |row|
      print '.'
      body << {
        update: {
          _index: index_name,
          _type: elasticsearch_type,
          _id: row.document_id,
          data: { doc: row.indexed_values, doc_as_upsert: true }
        }
      }
    end
    puts '/'
    es_client.bulk body: body
    body = nil
  end
end

#last_indexObject

Find the last created index that matches the specified index name.



142
143
144
# File 'lib/sequel/plugins/elasticsearch.rb', line 142

def last_index
  es_client.indices.get_alias(name: elasticsearch_index)&.keys&.sort&.first
end

#reindex!(index: nil, dataset: nil, batch_size: 100) ⇒ Object

Creates a new index in Elasticsearch from the specified dataset, as well as an alias to the new index.

See the documentation on import! for more details.



124
125
126
127
128
129
130
# File 'lib/sequel/plugins/elasticsearch.rb', line 124

def reindex!(index: nil, dataset: nil, batch_size: 100)
  index_name = index || timestamped_index
  import!(index: index_name, dataset: dataset, batch_size: batch_size)

  # Create an alias to the newly created index
  alias_index(index_name)
end

#scroll!(scroll_id, duration) ⇒ Object

Fetch the next page in a scroll without catching Errors.



64
65
66
67
68
69
# File 'lib/sequel/plugins/elasticsearch.rb', line 64

def scroll!(scroll_id, duration)
  scroll_id = scroll_id.scroll_id if scroll_id.is_a? Result
  return nil unless scroll_id

  Result.new es_client.scroll(scroll_id: scroll_id, scroll: duration), self
end

#timestamped_indexObject

Generate a timestamped index name according to the environment. This will use the APP_ENV or RACK_ENV ENV variable and a timestamp to construct index names like this:

base-name-staging-20191004.123456 # This is a staging index
base-name-production-20191005.171213 # This is a production index

The adding of the environment name to the index can be turned off by setting elasticsearch_environment_scoped to false.



161
162
163
164
# File 'lib/sequel/plugins/elasticsearch.rb', line 161

def timestamped_index
  time_str = Time.now.strftime('%Y%m%d.%H%M%S')
  "#{elasticsearch_index}-#{time_str}".to_sym
end