Class: Core::OutputSink::EsSink
- Defined in:
- lib/core/output_sink/es_sink.rb
Instance Method Summary collapse
- #delete(doc_id) ⇒ Object
- #delete_multiple(ids) ⇒ Object
- #flush(size: nil) ⇒ Object
- #ingest(document) ⇒ Object
- #ingest_multiple(documents) ⇒ Object
-
#initialize(index_name, request_pipeline, flush_threshold = 50) ⇒ EsSink
constructor
A new instance of EsSink.
Constructor Details
#initialize(index_name, request_pipeline, flush_threshold = 50) ⇒ EsSink
Returns a new instance of EsSink.
17 18 19 20 21 22 23 24 |
# File 'lib/core/output_sink/es_sink.rb', line 17 def initialize(index_name, request_pipeline, flush_threshold = 50) super() @client = Utility::EsClient.new(App::Config[:elasticsearch]) @index_name = index_name @request_pipeline = request_pipeline @operation_queue = [] @flush_threshold = flush_threshold end |
Instance Method Details
#delete(doc_id) ⇒ Object
33 34 35 36 37 38 |
# File 'lib/core/output_sink/es_sink.rb', line 33 def delete(doc_id) return if doc_id.nil? @operation_queue << { :delete => { :_index => index_name, :_id => doc_id } } flush if ready_to_flush? end |
#delete_multiple(ids) ⇒ Object
54 55 56 57 |
# File 'lib/core/output_sink/es_sink.rb', line 54 def delete_multiple(ids) Utility::Logger.debug "Enqueueing #{ids&.size} ids to delete from the index #{index_name}." ids.each { |id| delete(id) } end |
#flush(size: nil) ⇒ Object
40 41 42 43 44 45 46 47 |
# File 'lib/core/output_sink/es_sink.rb', line 40 def flush(size: nil) flush_size = size || @flush_threshold while @operation_queue.any? data_to_flush = @operation_queue.pop(flush_size) send_data(data_to_flush) end end |
#ingest(document) ⇒ Object
26 27 28 29 30 31 |
# File 'lib/core/output_sink/es_sink.rb', line 26 def ingest(document) return if document.blank? @operation_queue << { :index => { :_index => index_name, :_id => document[:id], :data => document } } flush if ready_to_flush? end |