Class: RotatingEsLoader

Inherits:
EsClient show all
Extended by:
Memoist
Defined in:
lib/rotating_es_loader.rb

Overview

:nodoc

Constant Summary collapse

MAX_INDEX_AGE =

indexs with a datestamp newer than this age will not be wiped

3
DEFAULT_SLICE_SIZE =
50

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods inherited from EsClient

#client, #method_missing

Constructor Details

#initialize(opts) ⇒ RotatingEsLoader

Returns a new instance of RotatingEsLoader.



15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
# File 'lib/rotating_es_loader.rb', line 15

def initialize(opts)
  raise('no credentials provided') unless opts[:credentials]
  raise('no url provided') unless opts[:url]
  raise('no definitions provided') unless opts[:index_definitions].is_a?(Hash)
  uri = URI.parse(opts[:url])

  super(
    url: opts[:url],
    credentials: opts[:credentials]
  )

  @index_definitions = opts[:index_definitions]
  @slice_size = opts[:slice_size] || DEFAULT_SLICE_SIZE

  @logger.debug("index keys: #{index_keys}")
  @datasources = opts[:datasources]

  index_keys.each do |key|
    raise("No datasource for #{key}") unless @index_definitions[key][:datasource].respond_to?(:each)
  end

  es_info = client.info
  @es_major_version = es_info['version']['number'].split('.').first.to_i
end

Dynamic Method Handling

This class handles dynamic methods through the method_missing method in the class EsClient

Instance Attribute Details

#es_major_versionObject

Returns the value of attribute es_major_version.



13
14
15
# File 'lib/rotating_es_loader.rb', line 13

def es_major_version
  @es_major_version
end

#slice_sizeObject

Returns the value of attribute slice_size.



13
14
15
# File 'lib/rotating_es_loader.rb', line 13

def slice_size
  @slice_size
end

Instance Method Details

#create_documentsObject



73
74
75
76
77
78
79
80
81
# File 'lib/rotating_es_loader.rb', line 73

def create_documents
  index_keys.each do |k|
    create_documents_for_type(
      name: get_index_name(k),
      data: datasource_for(k),
      type: document_type_for(k)
    )
  end
end

#create_documents_for_type(name:, data:, type: nil) ⇒ Object



83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
# File 'lib/rotating_es_loader.rb', line 83

def create_documents_for_type(name:, data:, type: nil)
  @logger.info("Creating documents of in index #{name} in batches of #{@slice_size}")
  data.lazy.each_slice(@slice_size).each_with_index do |slice, slice_num|
    @logger.debug("batch #{slice_num}: #{slice.size} docs")
    result = client.bulk(
      body: slice.flat_map do |rec|
        index_record = { index: { _index: name, _id: rec[:id] } }
        index_record[:index].merge!(_type: type) if es_major_version == 5

        [
          index_record,
          rec
        ]
      end
    )

    @logger.warn("ERRORS: #{JSON.pretty_generate(result)}") if result['errors']
  end
end

#create_index(name:, key:) ⇒ Object



188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
# File 'lib/rotating_es_loader.rb', line 188

def create_index(name:, key:)
  @logger.debug("creating index #{name}")

  mappings = mappings_adjusted_for_es_version(key)

  @logger.debug("mappings: #{mappings.to_json}")
  @logger.debug("creating index #{name}")

  client.indices.create({
    index: name,
    body: {
      settings: settings_for(key),
      mappings: mappings
    }
  }.tap { |x| puts JSON.pretty_generate(x) })
end

#create_indicesObject



103
104
105
106
107
# File 'lib/rotating_es_loader.rb', line 103

def create_indices
  index_keys.each do |k|
    create_index(name: get_index_name(k), key: k)
  end
end

#datasource_for(key) ⇒ Object



58
59
60
# File 'lib/rotating_es_loader.rb', line 58

def datasource_for(key)
  @index_definitions[key][:datasource]
end

#delete_old_indicesObject



126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
# File 'lib/rotating_es_loader.rb', line 126

def delete_old_indices
  existing_indices = client.indices.get(index: '_all')

  @logger.debug("Existing indexes: #{existing_indices.keys}")

  index_keys.each do |index|
    keys = existing_indices.keys.select { |k| k.include?(index.to_s) }.sort
    keys_by_date = keys.group_by { |k| key_age(k) }
    keys_to_delete = []

    # delete all indexes, keeping one from each day for the last few days
    keys_by_date.each do |age, key_list|
      key_list.pop if age <= MAX_INDEX_AGE
      keys_to_delete += key_list
    end

    unless keys_to_delete.empty?
      @logger.debug("Deleting indexes #{keys_to_delete.join(', ')}")
      client.indices.delete index: keys_to_delete
    end
  end
end

#document_type_for(key) ⇒ Object



40
41
42
43
44
# File 'lib/rotating_es_loader.rb', line 40

def document_type_for(key)
  raise "document type not supported for ES #{es_major_version}" \
    unless es_major_version <= 5
  @index_definitions[key][:type]
end

#executeObject



62
63
64
65
66
67
# File 'lib/rotating_es_loader.rb', line 62

def execute
  create_indices
  create_documents
  swap_aliases
  delete_old_indices
end

#get_index_name(key) ⇒ Object



118
119
120
121
122
123
# File 'lib/rotating_es_loader.rb', line 118

def get_index_name(key)
  # TODO: make it more sequential, so that it sorts correctly
  date_str = Date.today.to_s.gsub(/\D/, '') + '-' + Time.now.to_i.to_s + '-' + Process.pid.to_s
  raise("provided key #{key} is not a valid index") unless index_keys.include?(key)
  return key.to_s + '-' + date_str
end

#index_keysObject



46
47
48
# File 'lib/rotating_es_loader.rb', line 46

def index_keys
  @index_definitions.keys
end

#key_age(key) ⇒ Object



109
110
111
112
113
114
115
116
# File 'lib/rotating_es_loader.rb', line 109

def key_age(key)
  date_str = key.split('-')[1]
  if date_str && date_str.size == 8
    (Date.today - Date.parse(date_str)).to_i
  else
    0
  end
end

#mappings_adjusted_for_es_version(key) ⇒ Object



176
177
178
179
180
181
182
183
184
185
186
# File 'lib/rotating_es_loader.rb', line 176

def mappings_adjusted_for_es_version(key)
  mapping_for_key = mappings_for(key) || @logger.warn("mappings does not contain a mapping for #{key}")
  mappings = {}
  if es_major_version < 6
    mappings[key] = { properties: mapping_for_key }
  else
    mappings[:properties] = mapping_for_key
  end

  mappings
end

#mappings_for(key) ⇒ Object



50
51
52
# File 'lib/rotating_es_loader.rb', line 50

def mappings_for(key)
  @index_definitions[key][:mappings]
end

#multitype_support?Boolean

Returns:

  • (Boolean)


69
70
71
# File 'lib/rotating_es_loader.rb', line 69

def multitype_support?
  return es_major_version <= 5
end

#settings_for(key) ⇒ Object



54
55
56
# File 'lib/rotating_es_loader.rb', line 54

def settings_for(key)
  @index_definitions[key][:settings]
end

#swap_aliasesObject



149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
# File 'lib/rotating_es_loader.rb', line 149

def swap_aliases
  index_keys.each do |alias_name|
    index_name = get_index_name(alias_name)

    actions = [
      { add: { index: index_name, alias: alias_name } }
    ]

    @logger.debug("fetching any indices attached to alias #{alias_name}")
    begin
      client.indices.get_alias(name: alias_name).keys.each do |index_to_remove|
        actions.unshift(
          remove: { index: index_to_remove, alias: alias_name }
        )
      end
    rescue StandardError => e
      @logger.warn(e)
    end

    @logger.debug('update_aliases actions: ' + actions.to_json)

    client.indices.update_aliases body: {
      actions: actions
    }
  end
end