Class: Elasticity::Strategies::AliasIndex

Inherits:
Object
  • Object
show all
Defined in:
lib/elasticity/strategies/alias_index.rb

Overview

This strategy keeps two aliases that might be mapped to the same index or different index, allowing runtime changes by simply atomically updating the aliases. For example, look at the remap method implementation.

Constant Summary collapse

STATUSES =
[:missing, :ok]

Instance Method Summary collapse

Constructor Details

#initialize(client, index_base_name, document_type) ⇒ AliasIndex

Returns a new instance of AliasIndex.



9
10
11
12
13
14
# File 'lib/elasticity/strategies/alias_index.rb', line 9

def initialize(client, index_base_name, document_type)
  @client       = client
  @main_alias   = index_base_name
  @update_alias = "#{index_base_name}_update"
  @document_type = document_type
end

Instance Method Details

#bulk {|b| ... } ⇒ Object

Yields:

  • (b)


224
225
226
227
228
# File 'lib/elasticity/strategies/alias_index.rb', line 224

def bulk
  b = Bulk::Alias.new(@client, @update_alias, main_indexes)
  yield b
  b.execute
end

#create(index_def) ⇒ Object



163
164
165
166
167
168
169
170
171
172
173
174
175
# File 'lib/elasticity/strategies/alias_index.rb', line 163

def create(index_def)
  if missing?
    index_name = create_index(index_def)
    @client.index_update_aliases(body: {
      actions: [
        { add: { index: index_name, alias: @main_alias } },
        { add: { index: index_name, alias: @update_alias } },
      ]
    })
  else
    raise IndexError.new(@main_alias, "index already exists")
  end
end

#create_if_undefined(index_def) ⇒ Object



177
178
179
# File 'lib/elasticity/strategies/alias_index.rb', line 177

def create_if_undefined(index_def)
  create(index_def) if missing?
end

#deleteObject



181
182
183
# File 'lib/elasticity/strategies/alias_index.rb', line 181

def delete
  @client.index_delete(index: "#{@main_alias}-*")
end

#delete_by_query(type, body) ⇒ Object



220
221
222
# File 'lib/elasticity/strategies/alias_index.rb', line 220

def delete_by_query(type, body)
  @client.delete_by_query(index: @main_alias, type: type, body: body)
end

#delete_document(type, id) ⇒ Object



204
205
206
207
208
209
210
# File 'lib/elasticity/strategies/alias_index.rb', line 204

def delete_document(type, id)
  ops = (main_indexes | update_indexes).map do |index|
    { delete: { _index: index, _type: type, _id: id } }
  end

  @client.bulk(body: ops)
end

#delete_if_definedObject



185
186
187
# File 'lib/elasticity/strategies/alias_index.rb', line 185

def delete_if_defined
  delete unless missing?
end

#flushObject



230
231
232
# File 'lib/elasticity/strategies/alias_index.rb', line 230

def flush
  @client.index_flush(index: @update_alias)
end

#get_document(type, id) ⇒ Object



212
213
214
# File 'lib/elasticity/strategies/alias_index.rb', line 212

def get_document(type, id)
  @client.get(index: @main_alias, type: type, id: id)
end

#index_document(type, id, attributes) ⇒ Object



194
195
196
197
198
199
200
201
202
# File 'lib/elasticity/strategies/alias_index.rb', line 194

def index_document(type, id, attributes)
  res = @client.index(index: @update_alias, type: type, id: id, body: attributes)

  if id = res["_id"]
    [id, res["created"]]
  else
    raise IndexError.new(@update_alias, "failed to index document")
  end
end

#main_indexesObject



151
152
153
154
155
# File 'lib/elasticity/strategies/alias_index.rb', line 151

def main_indexes
  @client.index_get_aliases(index: "#{@main_alias}-*", name: @main_alias).keys
rescue Elasticsearch::Transport::Transport::Errors::NotFound
  []
end

#mappingsObject



240
241
242
243
244
# File 'lib/elasticity/strategies/alias_index.rb', line 240

def mappings
  @client.index_get_mapping(index: @main_alias, type: @document_type).values.first
rescue Elasticsearch::Transport::Transport::Errors::NotFound
  nil
end

#missing?Boolean

Returns:

  • (Boolean)


147
148
149
# File 'lib/elasticity/strategies/alias_index.rb', line 147

def missing?
  status == :missing
end

#recreate(index_def) ⇒ Object



189
190
191
192
# File 'lib/elasticity/strategies/alias_index.rb', line 189

def recreate(index_def)
  delete_if_defined
  create(index_def)
end

#ref_index_nameObject



16
17
18
# File 'lib/elasticity/strategies/alias_index.rb', line 16

def ref_index_name
  @main_alias
end

#remap(index_def) ⇒ Object

Remap allows zero-downtime/zero-dataloss remap of elasticsearch indexes. Here is the overview of how it works:

  1. Creates a new index with the new mapping

  2. Update the aliases so that any write goes to the new index and reads goes to both indexes.

  3. Use scan and scroll to iterate over all the documents in the old index, moving them to the new index.

  4. Update the aliases so that all operations goes to the new index.

  5. Deletes the old index.

It does a little bit more to ensure consistency and to handle race-conditions. For more details look at the implementation.



32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
# File 'lib/elasticity/strategies/alias_index.rb', line 32

def remap(index_def)
  main_indexes   = self.main_indexes
  update_indexes = self.update_indexes

  if main_indexes.size != 1 || update_indexes.size != 1 || main_indexes != update_indexes
    raise "Index can't be remapped right now, check if another remapping is already happening"
  end

  new_index      = create_index(index_def)
  original_index = main_indexes[0]

  begin
    # Configure aliases so that search includes the old index and the new index, and writes are made to
    # the new index.
    @client.index_update_aliases(body: {
      actions: [
        { remove: { index: original_index, alias: @update_alias } },
        { add:    { index: new_index, alias: @update_alias } },
        { add:    { index: new_index, alias: @main_alias }},
      ]
    })

    @client.index_flush(index: original_index)
    cursor = @client.search index: original_index, search_type: 'scan', scroll: '10m', _source: false, size: 100
    loop do
      cursor = @client.scroll(scroll_id: cursor['_scroll_id'], scroll: '1m')
      hits   = cursor['hits']['hits']
      break if hits.empty?

      # Fetch documents based on the ids that existed when the migration started, to make sure we only migrate
      # documents that haven't been deleted.
      id_docs = hits.map do |hit|
        { _index: original_index, _type: hit["_type"], _id: hit["_id"] }
      end

      docs = @client.mget(body: { docs: id_docs }, refresh: true)["docs"]
      break if docs.empty?

      # Move only documents that still exists on the old index, into the new index.
      ops = []
      docs.each do |doc|
        ops << { index: { _index: new_index, _type: doc["_type"], _id: doc["_id"], data: doc["_source"] } } if doc["found"]
      end

      @client.bulk(body: ops)

      # Deal with race conditions by removing from the new index any document that doesn't exist in the old index anymore.
      ops = []
      @client.mget(body: { docs: id_docs }, refresh: true)["docs"].each_with_index do |new_doc, idx|
        if docs[idx]["found"] && !new_doc["found"]
          ops << { delete: { _index: new_index, _type: new_doc["_type"], _id: new_doc["_id"] } }
        end
      end

      @client.bulk(body: ops) unless ops.empty?
    end

    # Update aliases to only point to the new index.
    @client.index_update_aliases(body: {
      actions: [
        { remove: { index: original_index, alias: @main_alias } },
      ]
    })
    @client.index_delete(index: original_index)

  rescue
    @client.index_update_aliases(body: {
      actions: [
        { add:    { index: original_index, alias: @update_alias } },
        { remove: { index: new_index, alias: @update_alias } },
      ]
    })

    @client.index_flush(index: new_index)
    cursor = @client.search index: new_index, search_type: 'scan', scroll: '1m', size: 100
    loop do
      cursor = @client.scroll(scroll_id: cursor['_scroll_id'], scroll: '1m')
      hits   = cursor['hits']['hits']
      break if hits.empty?

      # Move all the documents that exists on the new index back to the old index
      ops = []
      hits.each do |doc|
        ops << { index: { _index: original_index, _type: doc["_type"], _id: doc["_id"], data: doc["_source"] } }
      end

      @client.bulk(body: ops)
    end

    @client.index_flush(index: original_index)
    @client.index_update_aliases(body: {
      actions: [
        { remove: { index: new_index, alias: @main_alias } },
      ]
    })
    @client.index_delete(index: new_index)

    raise
  end
end

#search_indexObject



216
217
218
# File 'lib/elasticity/strategies/alias_index.rb', line 216

def search_index
  @main_alias
end

#settingsObject



234
235
236
237
238
# File 'lib/elasticity/strategies/alias_index.rb', line 234

def settings
  @client.index_get_settings(index: @main_alias, type: @document_type).values.first
rescue Elasticsearch::Transport::Transport::Errors::NotFound
  nil
end

#statusObject



133
134
135
136
137
138
139
140
141
142
143
144
145
# File 'lib/elasticity/strategies/alias_index.rb', line 133

def status
  search_exists = @client.index_exists_alias(name: @main_alias)
  update_exists = @client.index_exists_alias(name: @update_alias)

  case
  when search_exists && update_exists
    :ok
  when !search_exists && !update_exists
    :missing
  else
    :inconsistent
  end
end

#update_indexesObject



157
158
159
160
161
# File 'lib/elasticity/strategies/alias_index.rb', line 157

def update_indexes
  @client.index_get_aliases(index: "#{@main_alias}-*", name: @update_alias).keys
rescue Elasticsearch::Transport::Transport::Errors::NotFound
  []
end