Class: LogStash::Inputs::Elasticsearch::Scroll
Constant Summary
collapse
- SCROLL_JOB =
"scroll paginated search"
Instance Method Summary
collapse
#do_run, #initialize, #retryable
Instance Method Details
#clear(scroll_id) ⇒ Object
113
114
115
116
117
118
|
# File 'lib/logstash/inputs/elasticsearch/paginated_search.rb', line 113
def clear(scroll_id)
@client.clear_scroll(:body => { :scroll_id => scroll_id }) if scroll_id
rescue => e
logger.debug("Ignoring clear_scroll exception", message: e.message, exception: e.class)
end
|
#initial_search(slice_id) ⇒ Object
62
63
64
65
|
# File 'lib/logstash/inputs/elasticsearch/paginated_search.rb', line 62
def initial_search(slice_id)
options = search_options(slice_id)
@client.search(options)
end
|
#next_page(scroll_id) ⇒ Object
67
68
69
|
# File 'lib/logstash/inputs/elasticsearch/paginated_search.rb', line 67
def next_page(scroll_id)
@client.scroll(:body => { :scroll_id => scroll_id }, :scroll => @scroll)
end
|
#process_page(output_queue) ⇒ Object
71
72
73
74
75
|
# File 'lib/logstash/inputs/elasticsearch/paginated_search.rb', line 71
def process_page(output_queue)
r = yield
r['hits']['hits'].each { |hit| @plugin.push_hit(hit, output_queue) }
[r['hits']['hits'].any?, r['_scroll_id']]
end
|
#retryable_search(output_queue, slice_id = nil) ⇒ Object
94
95
96
97
98
|
# File 'lib/logstash/inputs/elasticsearch/paginated_search.rb', line 94
def retryable_search(output_queue, slice_id=nil)
retryable(SCROLL_JOB) do
search(output_queue, slice_id)
end
end
|
#retryable_slice_search(output_queue) ⇒ Object
100
101
102
103
104
105
106
107
108
109
110
111
|
# File 'lib/logstash/inputs/elasticsearch/paginated_search.rb', line 100
def retryable_slice_search(output_queue)
logger.warn("managed slices for query is very large (#{@slices}); consider reducing") if @slices > 8
@slices.times.map do |slice_id|
Thread.new do
LogStash::Util::set_thread_name("[#{@pipeline_id}]|input|elasticsearch|slice_#{slice_id}")
retryable_search(output_queue, slice_id)
end
end.map(&:join)
logger.trace("#{@slices} slices completed")
end
|
#search(output_queue, slice_id = nil) ⇒ Object
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
|
# File 'lib/logstash/inputs/elasticsearch/paginated_search.rb', line 77
def search(output_queue, slice_id=nil)
log_details = {}
log_details = log_details.merge({ slice_id: slice_id, slices: @slices }) unless slice_id.nil?
logger.info("Query start", log_details)
has_hits, scroll_id = process_page(output_queue) { initial_search(slice_id) }
while has_hits && scroll_id && !@plugin.stop?
logger.debug("Query progress", log_details)
has_hits, scroll_id = process_page(output_queue) { next_page(scroll_id) }
end
logger.info("Query completed", log_details)
ensure
clear(scroll_id)
end
|
#search_options(slice_id) ⇒ Object
51
52
53
54
55
56
57
58
59
60
|
# File 'lib/logstash/inputs/elasticsearch/paginated_search.rb', line 51
def search_options(slice_id)
query = @query
query = @query.merge('slice' => { 'id' => slice_id, 'max' => @slices}) unless slice_id.nil?
{
:index => @index,
:scroll => @scroll,
:size => @size,
:body => LogStash::Json.dump(query)
}
end
|