Class: LogStash::Inputs::Elasticsearch::SearchAfter

Inherits:
PaginatedSearch show all
Defined in:
lib/logstash/inputs/elasticsearch/paginated_search.rb

Constant Summary collapse

PIT_JOB =
"create point in time (PIT)"
SEARCH_AFTER_JOB =
"search_after paginated search"

Instance Method Summary collapse

Methods inherited from PaginatedSearch

#do_run, #initialize, #retryable

Constructor Details

This class inherits a constructor from LogStash::Inputs::Elasticsearch::PaginatedSearch

Instance Method Details

#clear(pit_id) ⇒ Object



221
222
223
224
225
226
# File 'lib/logstash/inputs/elasticsearch/paginated_search.rb', line 221

def clear(pit_id)
  logger.info("Closing point in time (PIT)")
  @client.close_point_in_time(:body => {:id => pit_id} ) if pit?(pit_id)
rescue => e
  logger.debug("Ignoring close_point_in_time exception", message: e.message, exception: e.class)
end

#create_pitObject



129
130
131
132
133
# File 'lib/logstash/inputs/elasticsearch/paginated_search.rb', line 129

def create_pit
  logger.info("Create point in time (PIT)")
  r = @client.open_point_in_time(index: @index, keep_alive: @scroll)
  r['id']
end

#next_page(pit_id:, search_after: nil, slice_id: nil) ⇒ Object



157
158
159
160
161
# File 'lib/logstash/inputs/elasticsearch/paginated_search.rb', line 157

def next_page(pit_id: , search_after: nil, slice_id: nil)
  options = search_options(pit_id: pit_id, search_after: search_after, slice_id: slice_id)
  logger.trace("search options", options)
  @client.search(options)
end

#pit?(id) ⇒ Boolean

Returns:

  • (Boolean)


125
126
127
# File 'lib/logstash/inputs/elasticsearch/paginated_search.rb', line 125

def pit?(id)
  !!id&.is_a?(String)
end

#process_page(output_queue) ⇒ Object



163
164
165
166
167
168
169
170
171
# File 'lib/logstash/inputs/elasticsearch/paginated_search.rb', line 163

def process_page(output_queue)
  r = yield
  r['hits']['hits'].each { |hit| @plugin.push_hit(hit, output_queue) }

  has_hits = r['hits']['hits'].any?
  search_after = r['hits']['hits'][-1]['sort'] rescue nil
  logger.warn("Query got data but the sort value is empty") if has_hits && search_after.nil?
  [ has_hits, search_after ]
end

#retryable_search(output_queue) ⇒ Object



198
199
200
201
202
203
204
# File 'lib/logstash/inputs/elasticsearch/paginated_search.rb', line 198

def retryable_search(output_queue)
  with_pit do |pit_id|
    retryable(SEARCH_AFTER_JOB) do
      search(output_queue: output_queue, pit_id: pit_id)
    end
  end
end

#retryable_slice_search(output_queue) ⇒ Object



206
207
208
209
210
211
212
213
214
215
216
217
218
219
# File 'lib/logstash/inputs/elasticsearch/paginated_search.rb', line 206

def retryable_slice_search(output_queue)
  with_pit do |pit_id|
    @slices.times.map do |slice_id|
      Thread.new do
        LogStash::Util::set_thread_name("[#{@pipeline_id}]|input|elasticsearch|slice_#{slice_id}")
        retryable(SEARCH_AFTER_JOB) do
          search(output_queue: output_queue, slice_id: slice_id, pit_id: pit_id)
        end
      end
    end.map(&:join)
  end

  logger.trace("#{@slices} slices completed")
end

#search(output_queue:, slice_id: nil, pit_id:) ⇒ Object



180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
# File 'lib/logstash/inputs/elasticsearch/paginated_search.rb', line 180

def search(output_queue:, slice_id: nil, pit_id:)
  log_details = {}
  log_details = log_details.merge({ slice_id: slice_id, slices: @slices }) unless slice_id.nil?
  logger.info("Query start", log_details)

  has_hits = true
  search_after = nil

  while has_hits && !@plugin.stop?
    logger.debug("Query progress", log_details)
    has_hits, search_after = process_page(output_queue) do
      next_page(pit_id: pit_id, search_after: search_after, slice_id: slice_id)
    end
  end

  logger.info("Query completed", log_details)
end

#search_options(pit_id:, search_after: nil, slice_id: nil) ⇒ Object



135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
# File 'lib/logstash/inputs/elasticsearch/paginated_search.rb', line 135

def search_options(pit_id: , search_after: nil, slice_id: nil)
  body = @query.merge({
                        :pit => {
                          :id => pit_id,
                          :keep_alive => @scroll
                        }
                      })

  # search_after requires at least a sort field explicitly
  # we add default sort "_shard_doc": "asc" if the query doesn't have any sort field
  # by default, ES adds the same implicitly on top of the provided "sort"
  # https://www.elastic.co/guide/en/elasticsearch/reference/8.10/paginate-search-results.html#CO201-2
  body = body.merge(:sort => {"_shard_doc": "asc"}) if @query&.dig("sort").nil?

  body = body.merge(:search_after => search_after) unless search_after.nil?
  body = body.merge(:slice => {:id => slice_id, :max => @slices}) unless slice_id.nil?
  {
    :size => @size,
    :body => body
  }
end

#with_pitObject



173
174
175
176
177
178
# File 'lib/logstash/inputs/elasticsearch/paginated_search.rb', line 173

def with_pit
  pit_id = retryable(PIT_JOB) { create_pit }
  yield pit_id if pit?(pit_id)
ensure
  clear(pit_id)
end