Class: Elastic::Core::Connector

Inherits:
Object
  • Object
show all
Defined in:
lib/elastic/core/connector.rb

Constant Summary collapse

DEFAULT_TYPE =
'_doc'

Instance Method Summary collapse

Constructor Details

#initialize(_name, _mapping, settling_time: 10.seconds) ⇒ Connector

Returns a new instance of Connector.



5
6
7
8
9
# File 'lib/elastic/core/connector.rb', line 5

def initialize(_name, _mapping, settling_time: 10.seconds)
  @name = _name
  @mapping = _mapping
  @settling_time = settling_time
end

Instance Method Details

#bulk_index(_documents) ⇒ Object



65
66
67
68
69
70
71
72
73
74
75
76
# File 'lib/elastic/core/connector.rb', line 65

def bulk_index(_documents)
  return if Elastic.config.disable_indexing

  # TODO: validate documents type
  body = _documents.map { |doc| { 'index' => doc.merge('_type' => DEFAULT_TYPE) } }

  write_indices.each do |write_index|
    retry_on_temporary_error('bulk indexing') do
      api.bulk(index: write_index, body: body)
    end
  end
end

#copy_to(_to, batch_size: nil) ⇒ Object

rubocop:disable Metrics/AbcSize



137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
# File 'lib/elastic/core/connector.rb', line 137

def copy_to(_to, batch_size: nil) # rubocop:disable Metrics/AbcSize
  api.indices.refresh index: index_name

  r = api.search(
    index: index_name,
    body: { sort: ['_doc'] },
    scroll: '5m',
    size: batch_size || default_batch_size
  )

  count = 0
  while !r['hits']['hits'].empty?
    count += r['hits']['hits'].count
    Elastic.logger.info "Copied #{count} docs"

    body = r['hits']['hits'].map { |h| transform_hit_to_create(h) }
    api.bulk(index: _to, body: body)

    r = api.scroll scroll: '5m', scroll_id: r['_scroll_id']
  end
end

#count(query: nil) ⇒ Object



106
107
108
# File 'lib/elastic/core/connector.rb', line 106

def count(query: nil)
  api.count(index: index_name, body: query)['count']
end

#delete(_document) ⇒ Object

Raises:

  • (ArgumentError)


78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
# File 'lib/elastic/core/connector.rb', line 78

def delete(_document)
  raise ArgumentError, 'document must provide an id' unless _document['_id']

  return if Elastic.config.disable_indexing

  write_index, rolling_index = write_indices

  operations = [{
    'delete' => _document.merge('_index' => write_index, '_type' => DEFAULT_TYPE)
  }]

  if rolling_index
    operations << {
      'delete' => _document.merge('_index' => rolling_index, '_type' => DEFAULT_TYPE)
    }
  end

  api.bulk(body: operations)
end

#dropObject



25
26
27
28
# File 'lib/elastic/core/connector.rb', line 25

def drop
  api.indices.delete index: "#{index_name}:*"
  nil
end

#find(_id) ⇒ Object



102
103
104
# File 'lib/elastic/core/connector.rb', line 102

def find(_id)
  api.get(index: index_name, id: _id)
end

#index(_document) ⇒ Object



54
55
56
57
58
59
60
61
62
63
# File 'lib/elastic/core/connector.rb', line 54

def index(_document)
  return if Elastic.config.disable_indexing

  # TODO: validate document type
  operations = write_indices.map do |write_index|
    { 'index' => _document.merge('_index' => write_index, '_type' => DEFAULT_TYPE) }
  end

  api.bulk(body: operations)
end

#index_nameObject



11
12
13
# File 'lib/elastic/core/connector.rb', line 11

def index_name
  @index_name ||= "#{Elastic.config.index}_#{@name}"
end

#migrate(batch_size: nil) ⇒ Object



45
46
47
48
49
50
51
52
# File 'lib/elastic/core/connector.rb', line 45

def migrate(batch_size: nil)
  unless remap
    rollover do |new_index|
      copy_to new_index, batch_size: batch_size
    end
  end
  nil
end

#query(query: nil) ⇒ Object



110
111
112
# File 'lib/elastic/core/connector.rb', line 110

def query(query: nil)
  api.search(index: index_name, body: query)
end

#refreshObject



98
99
100
# File 'lib/elastic/core/connector.rb', line 98

def refresh
  api.indices.refresh index: index_name
end

#remapObject



30
31
32
33
34
35
36
37
38
39
40
41
42
43
# File 'lib/elastic/core/connector.rb', line 30

def remap
  case status
  when :not_available
    create_from_scratch
  when :not_synchronized
    begin
      setup_index_mapping resolve_actual_index_name
    rescue Elasticsearch::Transport::Transport::Errors::BadRequest
      return false
    end
  end

  true
end

#remove_orphaned_indicesObject



159
160
161
162
163
164
165
166
# File 'lib/elastic/core/connector.rb', line 159

def remove_orphaned_indices
  _, rolling_index = resolve_write_indices

  unless rolling_index.nil?
    Elastic.logger.info "Removing orphan index #{rolling_index}"
    api.indices.delete index: rolling_index
  end
end

#rollover(&_block) ⇒ Object

rubocop:disable Metrics/MethodLength



114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
# File 'lib/elastic/core/connector.rb', line 114

def rollover(&_block) # rubocop:disable Metrics/MethodLength
  actual_index, rolling_index = resolve_write_indices

  unless rolling_index.nil?
    raise Elastic::RolloverError, 'rollover process already in progress'
  end

  new_index = create_index_w_mapping

  begin
    transfer_alias(write_index_alias, to: new_index)
    wait_for_index_to_stabilize
    perform_optimized_write_on(new_index, &_block)
    transfer_alias(index_name, from: actual_index, to: new_index)
    transfer_alias(write_index_alias, from: actual_index)
    wait_for_index_to_stabilize
    api.indices.delete index: actual_index
  rescue StandardError
    api.indices.delete index: new_index
    raise
  end
end

#statusObject



15
16
17
18
19
20
21
22
23
# File 'lib/elastic/core/connector.rb', line 15

def status
  return :ready if Elastic.config.disable_indexing

  actual_name = resolve_actual_index_name
  return :not_available if actual_name.nil?
  return :not_synchronized unless mapping_synchronized? actual_name

  :ready
end