Module: ElasticRecord::Index::Documents

Included in:
ElasticRecord::Index
Defined in:
lib/elastic_record/index/documents.rb

Instance Method Summary collapse

Instance Method Details

#bulk(options = {}) ⇒ Object



140
141
142
143
144
145
146
147
148
149
150
151
152
# File 'lib/elastic_record/index/documents.rb', line 140

def bulk(options = {})
  connection.bulk_stack.push []

  yield

  if current_bulk_batch.any?
    body = current_bulk_batch.map { |action| "#{ElasticRecord::JSON.encode(action)}\n" }.join
    results = connection.json_post("/_bulk?#{options.to_query}", body)
    verify_bulk_results(results)
  end
ensure
  connection.bulk_stack.pop
end

#bulk_add(batch, index_name: alias_name) ⇒ Object



154
155
156
157
158
159
160
# File 'lib/elastic_record/index/documents.rb', line 154

def bulk_add(batch, index_name: alias_name)
  bulk do
    batch.each do |record|
      index_record(record, index_name: index_name)
    end
  end
end

#create_scan_search(elastic_query, options = {}) ⇒ Object



123
124
125
126
127
128
129
130
131
132
133
# File 'lib/elastic_record/index/documents.rb', line 123

def create_scan_search(elastic_query, options = {})
  options[:batch_size] ||= 100
  options[:keep_alive] ||= ElasticRecord::Config.scroll_keep_alive

  search_options = {search_type: 'scan', size: options[:batch_size], scroll: options[:keep_alive]}
  json = search(elastic_query, search_options)

  ScanSearch.new(self, json['_scroll_id'], options).tap do |scan_search|
    scan_search.total_hits = json['hits']['total']
  end
end

#current_bulk_batchObject



162
163
164
# File 'lib/elastic_record/index/documents.rb', line 162

def current_bulk_batch
  connection.bulk_stack.last
end

#delete_by_query(query) ⇒ Object



96
97
98
99
100
101
102
103
104
# File 'lib/elastic_record/index/documents.rb', line 96

def delete_by_query(query)
  scan_search = create_scan_search query

  scan_search.each_slice do |ids|
    bulk do
      ids.each { |id| delete_document(id) }
    end
  end
end

#delete_document(id, parent: nil, index_name: alias_name) ⇒ Object



80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
# File 'lib/elastic_record/index/documents.rb', line 80

def delete_document(id,  parent: nil, index_name: alias_name)
  raise "Cannot delete document with empty id" if id.blank?
  index_name ||= alias_name

  if batch = current_bulk_batch
    instructions = { _index: index_name, _type: type, _id: id, _retry_on_conflict: 3 }
    instructions[:parent] = parent if parent
    batch << { delete: instructions }
  else
    path = "/#{index_name}/#{type}/#{id}"
    path << "&parent=#{parent}" if parent

    connection.json_delete path
  end
end

#explain(id, elastic_query) ⇒ Object



119
120
121
# File 'lib/elastic_record/index/documents.rb', line 119

def explain(id, elastic_query)
  get "_explain", elastic_query
end

#index_document(id, document, parent: nil, index_name: alias_name) ⇒ Object



48
49
50
51
52
53
54
55
56
57
58
59
60
61
# File 'lib/elastic_record/index/documents.rb', line 48

def index_document(id, document, parent: nil, index_name: alias_name)
  if batch = current_bulk_batch
    instructions = { _index: index_name, _type: type, _id: id }
    instructions[:parent] = parent if parent

    batch << { index: instructions }
    batch << document
  else
    path = "/#{index_name}/#{type}/#{id}"
    path << "?parent=#{parent}" if parent

    connection.json_put path, document
  end
end

#index_record(record, index_name: alias_name) ⇒ Object



36
37
38
39
40
# File 'lib/elastic_record/index/documents.rb', line 36

def index_record(record, index_name: alias_name)
  unless disabled
    index_document(record.send(record.class.primary_key), record.as_search, index_name: index_name)
  end
end

#record_exists?(id) ⇒ Boolean

Returns:

  • (Boolean)


106
107
108
# File 'lib/elastic_record/index/documents.rb', line 106

def record_exists?(id)
  get(id)['found']
end

#scroll(scroll_id, scroll_keep_alive) ⇒ Object



135
136
137
138
# File 'lib/elastic_record/index/documents.rb', line 135

def scroll(scroll_id, scroll_keep_alive)
  options = {scroll_id: scroll_id, scroll: scroll_keep_alive}
  connection.json_get("/_search/scroll?#{options.to_query}")
end

#search(elastic_query, options = {}) ⇒ Object



110
111
112
113
114
115
116
117
# File 'lib/elastic_record/index/documents.rb', line 110

def search(elastic_query, options = {})
  url = "_search"
  if options.any?
    url += "?#{options.to_query}"
  end

  get url, elastic_query
end

#update_document(id, document, parent: nil, index_name: alias_name) ⇒ Object



63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
# File 'lib/elastic_record/index/documents.rb', line 63

def update_document(id, document, parent: nil, index_name: alias_name)
  params = {doc: document, doc_as_upsert: true}

  if batch = current_bulk_batch
    instructions = { _index: index_name, _type: type, _id: id, _retry_on_conflict: 3 }
    instructions[:parent] = parent if parent

    batch << { update: instructions }
    batch << params
  else
    path = "/#{index_name}/#{type}/#{id}/_update?retry_on_conflict=3"
    path << "&parent=#{parent}" if parent

    connection.json_post path, params
  end
end

#update_record(record, index_name: alias_name) ⇒ Object



42
43
44
45
46
# File 'lib/elastic_record/index/documents.rb', line 42

def update_record(record, index_name: alias_name)
  unless disabled
    update_document(record.send(record.class.primary_key), record.as_partial_update_document, index_name: index_name)
  end
end