Module: ElasticRecord::Index::Documents

Included in:
ElasticRecord::Index
Defined in:
lib/elastic_record/index/documents.rb

Instance Method Summary collapse

Instance Method Details

#bulk(options = {}) ⇒ Object



147
148
149
150
151
152
153
154
155
156
157
158
159
# File 'lib/elastic_record/index/documents.rb', line 147

def bulk(options = {})
  connection.bulk_stack.push []

  yield

  if current_bulk_batch.any?
    body = current_bulk_batch.map { |action| "#{ElasticRecord::JSON.encode(action)}\n" }.join
    results = connection.json_post("/_bulk?#{options.to_query}", body)
    verify_bulk_results(results)
  end
ensure
  connection.bulk_stack.pop
end

#bulk_add(batch, index_name: alias_name) ⇒ Object



161
162
163
164
165
166
167
# File 'lib/elastic_record/index/documents.rb', line 161

def bulk_add(batch, index_name: alias_name)
  bulk do
    batch.each do |record|
      index_record(record, index_name: index_name)
    end
  end
end

#create_scan_search(elastic_query, options = {}) ⇒ Object



130
131
132
133
134
135
136
137
138
139
140
# File 'lib/elastic_record/index/documents.rb', line 130

def create_scan_search(elastic_query, options = {})
  options[:batch_size] ||= 100
  options[:keep_alive] ||= ElasticRecord::Config.scroll_keep_alive

  search_options = {search_type: 'scan', size: options[:batch_size], scroll: options[:keep_alive]}
  json = search(elastic_query, search_options)

  ScanSearch.new(self, json['_scroll_id'], options).tap do |scan_search|
    scan_search.total_hits = json['hits']['total']
  end
end

#current_bulk_batchObject



169
170
171
# File 'lib/elastic_record/index/documents.rb', line 169

def current_bulk_batch
  connection.bulk_stack.last
end

#delete_by_query(query) ⇒ Object



103
104
105
106
107
108
109
110
111
# File 'lib/elastic_record/index/documents.rb', line 103

def delete_by_query(query)
  scan_search = create_scan_search query

  scan_search.each_slice do |ids|
    bulk do
      ids.each { |id| delete_document(id) }
    end
  end
end

#delete_document(id, parent: nil, index_name: alias_name) ⇒ Object



87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
# File 'lib/elastic_record/index/documents.rb', line 87

def delete_document(id,  parent: nil, index_name: alias_name)
  raise "Cannot delete document with empty id" if id.blank?
  index_name ||= alias_name

  if batch = current_bulk_batch
    instructions = { _index: index_name, _type: type, _id: id, _retry_on_conflict: 3 }
    instructions[:parent] = parent if parent
    batch << { delete: instructions }
  else
    path = "/#{index_name}/#{type}/#{id}"
    path << "&parent=#{parent}" if parent

    connection.json_delete path
  end
end

#explain(id, elastic_query) ⇒ Object



126
127
128
# File 'lib/elastic_record/index/documents.rb', line 126

def explain(id, elastic_query)
  get "_explain", elastic_query
end

#index_document(id, document, parent: nil, index_name: alias_name) ⇒ Object



55
56
57
58
59
60
61
62
63
64
65
66
67
68
# File 'lib/elastic_record/index/documents.rb', line 55

def index_document(id, document, parent: nil, index_name: alias_name)
  if batch = current_bulk_batch
    instructions = { _index: index_name, _type: type, _id: id }
    instructions[:parent] = parent if parent

    batch << { index: instructions }
    batch << document
  else
    path = "/#{index_name}/#{type}/#{id}"
    path << "?parent=#{parent}" if parent

    connection.json_put path, document
  end
end

#index_record(record, index_name: alias_name) ⇒ Object



43
44
45
46
47
# File 'lib/elastic_record/index/documents.rb', line 43

def index_record(record, index_name: alias_name)
  unless disabled
    index_document(record.send(record.class.primary_key), record.as_search, index_name: index_name)
  end
end

#record_exists?(id) ⇒ Boolean

Returns:

  • (Boolean)


113
114
115
# File 'lib/elastic_record/index/documents.rb', line 113

def record_exists?(id)
  get(id)['found']
end

#scroll(scroll_id, scroll_keep_alive) ⇒ Object



142
143
144
145
# File 'lib/elastic_record/index/documents.rb', line 142

def scroll(scroll_id, scroll_keep_alive)
  options = {scroll_id: scroll_id, scroll: scroll_keep_alive}
  connection.json_get("/_search/scroll?#{options.to_query}")
end

#search(elastic_query, options = {}) ⇒ Object



117
118
119
120
121
122
123
124
# File 'lib/elastic_record/index/documents.rb', line 117

def search(elastic_query, options = {})
  url = "_search"
  if options.any?
    url += "?#{options.to_query}"
  end

  get url, elastic_query
end

#update_document(id, document, parent: nil, index_name: alias_name) ⇒ Object



70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
# File 'lib/elastic_record/index/documents.rb', line 70

def update_document(id, document, parent: nil, index_name: alias_name)
  params = {doc: document, doc_as_upsert: true}

  if batch = current_bulk_batch
    instructions = { _index: index_name, _type: type, _id: id, _retry_on_conflict: 3 }
    instructions[:parent] = parent if parent

    batch << { update: instructions }
    batch << params
  else
    path = "/#{index_name}/#{type}/#{id}/_update?retry_on_conflict=3"
    path << "&parent=#{parent}" if parent

    connection.json_post path, params
  end
end

#update_record(record, index_name: alias_name) ⇒ Object



49
50
51
52
53
# File 'lib/elastic_record/index/documents.rb', line 49

def update_record(record, index_name: alias_name)
  unless disabled
    update_document(record.send(record.class.primary_key), record.as_partial_update_document, index_name: index_name)
  end
end