Class: LogStash::Inputs::Elasticsearch::Scroll

Inherits:
PaginatedSearch show all
Defined in:
lib/logstash/inputs/elasticsearch/paginated_search.rb

Constant Summary collapse

SCROLL_JOB =
"scroll paginated search"

Instance Method Summary collapse

Methods inherited from PaginatedSearch

#do_run, #initialize, #retryable

Constructor Details

This class inherits a constructor from LogStash::Inputs::Elasticsearch::PaginatedSearch

Instance Method Details

#clear(scroll_id) ⇒ Object



113
114
115
116
117
118
# File 'lib/logstash/inputs/elasticsearch/paginated_search.rb', line 113

def clear(scroll_id)
  @client.clear_scroll(:body => { :scroll_id => scroll_id }) if scroll_id
rescue => e
  # ignore & log any clear_scroll errors
  logger.debug("Ignoring clear_scroll exception", message: e.message, exception: e.class)
end

#initial_search(slice_id) ⇒ Object



62
63
64
65
# File 'lib/logstash/inputs/elasticsearch/paginated_search.rb', line 62

def initial_search(slice_id)
  options = search_options(slice_id)
  @client.search(options)
end

#next_page(scroll_id) ⇒ Object



67
68
69
# File 'lib/logstash/inputs/elasticsearch/paginated_search.rb', line 67

def next_page(scroll_id)
  @client.scroll(:body => { :scroll_id => scroll_id }, :scroll => @scroll)
end

#process_page(output_queue) ⇒ Object



71
72
73
74
75
# File 'lib/logstash/inputs/elasticsearch/paginated_search.rb', line 71

def process_page(output_queue)
  r = yield
  r['hits']['hits'].each { |hit| @plugin.push_hit(hit, output_queue) }
  [r['hits']['hits'].any?, r['_scroll_id']]
end

#retryable_search(output_queue, slice_id = nil) ⇒ Object



94
95
96
97
98
# File 'lib/logstash/inputs/elasticsearch/paginated_search.rb', line 94

def retryable_search(output_queue, slice_id=nil)
  retryable(SCROLL_JOB) do
    search(output_queue, slice_id)
  end
end

#retryable_slice_search(output_queue) ⇒ Object



100
101
102
103
104
105
106
107
108
109
110
111
# File 'lib/logstash/inputs/elasticsearch/paginated_search.rb', line 100

def retryable_slice_search(output_queue)
  logger.warn("managed slices for query is very large (#{@slices}); consider reducing") if @slices > 8

  @slices.times.map do |slice_id|
    Thread.new do
      LogStash::Util::set_thread_name("[#{@pipeline_id}]|input|elasticsearch|slice_#{slice_id}")
      retryable_search(output_queue, slice_id)
    end
  end.map(&:join)

  logger.trace("#{@slices} slices completed")
end

#search(output_queue, slice_id = nil) ⇒ Object



77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
# File 'lib/logstash/inputs/elasticsearch/paginated_search.rb', line 77

def search(output_queue, slice_id=nil)
  log_details = {}
  log_details = log_details.merge({ slice_id: slice_id, slices: @slices }) unless slice_id.nil?

  logger.info("Query start", log_details)
  has_hits, scroll_id = process_page(output_queue) { initial_search(slice_id) }

  while has_hits && scroll_id && !@plugin.stop?
    logger.debug("Query progress", log_details)
    has_hits, scroll_id = process_page(output_queue) { next_page(scroll_id) }
  end

  logger.info("Query completed", log_details)
ensure
  clear(scroll_id)
end

#search_options(slice_id) ⇒ Object



51
52
53
54
55
56
57
58
59
60
# File 'lib/logstash/inputs/elasticsearch/paginated_search.rb', line 51

def search_options(slice_id)
  query = @query
  query = @query.merge('slice' => { 'id' => slice_id, 'max' => @slices}) unless slice_id.nil?
  {
    :index => @index,
    :scroll => @scroll,
    :size => @size,
    :body => LogStash::Json.dump(query)
  }
end