Module: Sequel::Plugins::Elasticsearch::ClassMethods

Defined in:
lib/sequel/plugins/elasticsearch.rb

Overview

The class methods that will be added to the Sequel::Model

Instance Attribute Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#elasticsearch_indexObject

The Elasticsearch index to which the documents will be written.



37
38
39
# File 'lib/sequel/plugins/elasticsearch.rb', line 37

def elasticsearch_index
  @elasticsearch_index
end

#elasticsearch_optsObject

The extra options that will be passed to the Elasticsearch client.



35
36
37
# File 'lib/sequel/plugins/elasticsearch.rb', line 35

def elasticsearch_opts
  @elasticsearch_opts
end

#elasticsearch_typeObject

The Elasticsearch type to which the documents will be written.



39
40
41
# File 'lib/sequel/plugins/elasticsearch.rb', line 39

def elasticsearch_type
  @elasticsearch_type
end

Instance Method Details

#alias_index(new_index) ⇒ Object

Remove previous aliases and point the ‘elasticsearch_index` to the new index.



129
130
131
132
133
134
135
136
# File 'lib/sequel/plugins/elasticsearch.rb', line 129

def alias_index(new_index)
  es_client.indices.update_aliases body: {
    actions: [
      { remove: { index: "#{elasticsearch_index}*", alias: elasticsearch_index } },
      { add: { index: new_index, alias: elasticsearch_index } }
    ]
  }
end

#call_esObject

Wrapper method in which error handling is done for Elasticsearch calls.



71
72
73
74
75
76
77
78
# File 'lib/sequel/plugins/elasticsearch.rb', line 71

def call_es
  yield
rescue ::Elasticsearch::Transport::Transport::Errors::NotFound,
       ::Elasticsearch::Transport::Transport::Error,
       Faraday::ConnectionFailed => e
  db.loggers.first.warn e if db.loggers.count.positive?
  nil
end

#es(query = '', opts = {}) ⇒ Object

Execute a search or a scroll on the Model’s Elasticsearch index. This method is “safe” in that it will catch the more common Errors.



66
67
68
# File 'lib/sequel/plugins/elasticsearch.rb', line 66

def es(query = '', opts = {})
  call_es { query.is_a?(Result) ? scroll!(query, opts) : es!(query, opts) }
end

#es!(query = '', opts = {}) ⇒ Object

Execute a search on the Model’s Elasticsearch index without catching Errors.



47
48
49
50
51
52
53
54
# File 'lib/sequel/plugins/elasticsearch.rb', line 47

def es!(query = '', opts = {})
  opts = {
    index: elasticsearch_index,
    type: elasticsearch_type
  }.merge(opts)
  query.is_a?(String) ? opts[:q] = query : opts[:body] = query
  Result.new es_client.search(opts), self
end

#es_clientObject

Return the Elasticsearch client used to communicate with the cluster.



42
43
44
# File 'lib/sequel/plugins/elasticsearch.rb', line 42

def es_client
  @es_client = ::Elasticsearch::Client.new elasticsearch_opts
end

#import!(index: nil, dataset: nil, batch_size: 100) ⇒ Object

Import the whole dataset into Elasticsearch.

This assumes that a template that covers all the possible index names have been created. See timestamped_index for examples of the indices that will be created.

This adds or updates records to the last index created by this utility. Use the reindex! method to create a completely new index and alias.



88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
# File 'lib/sequel/plugins/elasticsearch.rb', line 88

def import!(index: nil, dataset: nil, batch_size: 100)
  dataset ||= self.dataset
  index_name = index || last_index || elasticsearch_index

  # Index all the documents
  body = []
  dataset.each_page(batch_size) do |ds|
    body = []
    ds.all.each do |row|
      print '.'
      body << { update: import_object(index_name, row) }
    end
    puts '/'
    es_client.bulk body: body
    body = nil
  end
end

#import_object(idx, row) ⇒ Object



106
107
108
109
110
111
112
113
114
# File 'lib/sequel/plugins/elasticsearch.rb', line 106

def import_object(idx, row)
  val = {
    _index: idx,
    _id: row.document_id,
    data: { doc: row.as_indexed_json, doc_as_upsert: true }
  }
  val[:_type] = elasticsearch_type if elasticsearch_type
  val
end

#last_indexObject

Find the last created index that matches the specified index name.



139
140
141
142
143
# File 'lib/sequel/plugins/elasticsearch.rb', line 139

def last_index
  es_client.indices.get_alias(name: elasticsearch_index)&.keys&.sort&.first
rescue ::Elasticsearch::Transport::Transport::Errors::NotFound
  nil
end

#reindex!(index: nil, dataset: nil, batch_size: 100) ⇒ Object

Creates a new index in Elasticsearch from the specified dataset, as well as an alias to the new index.

See the documentation on import! for more details.



120
121
122
123
124
125
126
# File 'lib/sequel/plugins/elasticsearch.rb', line 120

def reindex!(index: nil, dataset: nil, batch_size: 100)
  index_name = index || timestamped_index
  import!(index: index_name, dataset: dataset, batch_size: batch_size)

  # Create an alias to the newly created index
  alias_index(index_name)
end

#scroll!(scroll_id, duration) ⇒ Object

Fetch the next page in a scroll without catching Errors.



57
58
59
60
61
62
# File 'lib/sequel/plugins/elasticsearch.rb', line 57

def scroll!(scroll_id, duration)
  scroll_id = scroll_id.scroll_id if scroll_id.is_a? Result
  return nil unless scroll_id

  Result.new es_client.scroll(body: scroll_id, scroll: duration), self
end

#timestamped_indexObject

Generate a timestamped index name. This will use the current timestamp to construct index names like this:

base-name-20191004.123456


149
150
151
152
# File 'lib/sequel/plugins/elasticsearch.rb', line 149

def timestamped_index
  time_str = Time.now.strftime('%Y%m%d.%H%M%S') # TODO: Make the format configurable
  "#{elasticsearch_index}-#{time_str}".to_sym
end